Skip to content

Commit ba6b7dc

Browse files
authored
89 validate writer exclusivity for link from at config time (#90)
* feat: enforce writer exclusivity for source, transform and link_from in records * chore: update subproject commit for embassy * feat: add writer-exclusivity validation for `.link_from()` in records
1 parent 76c042f commit ba6b7dc

5 files changed

Lines changed: 182 additions & 4 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2929

3030
### Added
3131

32+
- **Writer-exclusivity validation (Issue #89)**: Combining `.source()`, `.transform()`, and `.link_from()` on the same record now panics at configuration time with a clear message instead of silently producing a last-writer-wins race on the buffer. Multiple `.link_from()` inbound connectors (fan-in) remain allowed. ([aimdb-core](aimdb-core/CHANGELOG.md))
3233
- **`no_std` Transform API (Design 027)**: `.transform()` and `.transform_join()` are now available on `no_std + alloc` targets — no longer Tokio-only. Multi-input join fan-in moved out of `aimdb-core` into the new `JoinFanInRuntime` traits in `aimdb-executor`, with implementations in the Tokio (`mpsc::channel`, capacity 64), Embassy (`embassy_sync::Channel`, capacity 8), and WASM (`futures_channel::mpsc`, capacity 64) adapters. ([aimdb-core](aimdb-core/CHANGELOG.md), [aimdb-executor](aimdb-executor/CHANGELOG.md), [aimdb-tokio-adapter](aimdb-tokio-adapter/CHANGELOG.md), [aimdb-embassy-adapter](aimdb-embassy-adapter/CHANGELOG.md), [aimdb-wasm-adapter](aimdb-wasm-adapter/CHANGELOG.md))
3334
- **Task-model join handler**: New `JoinBuilder::on_triggers(FnOnce(JoinEventRx, Producer) -> impl Future)` API replaces the previous callback model. Eliminates per-event heap allocation and lets handler state borrow across `.await` points. **Breaking change** vs. the old `with_state().on_trigger(...)` form — see [aimdb-core](aimdb-core/CHANGELOG.md).
3435
- **Weather-mesh `DewPoint` demo**: All three weather stations (alpha, beta, gamma) now derive a `DewPoint` record from `Temperature` and `Humidity` via `transform_join`, demonstrating the API end-to-end on Tokio and Embassy.

aimdb-core/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99

1010
### Added
1111

12+
- **Writer-exclusivity validation for `.link_from()` (Issue #89)**: `.source()`, `.transform()`, and `.link_from()` are now mutually exclusive on a single record — combining any two now panics at configuration time instead of silently racing on the buffer (last-writer-wins). The check fires from `LinkFromBuilder::finish()` (panic message includes the offending URL), with symmetric defense-in-depth checks added to `TypedRecord::set_producer_service`, `set_transform`, and `add_inbound_connector`. Multiple `.link_from()` calls on the same record (fan-in) remain permitted.
1213
- **`no_std` support for the full Transform API (Design 027)**: `.transform()` and `.transform_join()` are now available on `no_std + alloc` targets. Multi-input join fan-in is no longer hardcoded to `tokio::sync::mpsc`; it uses the runtime-agnostic `JoinFanInRuntime` traits from `aimdb-executor`, implemented by Tokio, Embassy, and WASM adapters.
1314
- **`JoinEventRx`** — type-erased trigger receiver passed to the `on_triggers` handler. Call `.recv().await` in a loop to consume `JoinTrigger` events from all input forwarders.
1415
- **`transform_join` as an inherent method on `RecordRegistrar`** (gated `feature = "alloc"`, `R: JoinFanInRuntime`). Previously only exposed via the `impl_record_registrar_ext!` macro under `feature = "std"`.

aimdb-core/src/typed_api.rs

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -905,6 +905,8 @@ where
905905
/// - If no deserializer is provided
906906
/// - If no connector is registered for the URL scheme
907907
/// - If the URL is invalid
908+
/// - If the record already has a `.source()` or `.transform()`
909+
/// (local producer + inbound connector would race as last-writer-wins)
908910
pub fn finish(self) -> &'a mut RecordRegistrar<'a, T, R> {
909911
use crate::connector::{ConnectorUrl, DeserializerKind, InboundConnectorLink};
910912

@@ -921,6 +923,23 @@ where
921923
);
922924
}
923925

926+
// Mutual exclusion with local producers — both write to the same
927+
// buffer and would race as last-writer-wins. Builder-level check
928+
// surfaces the URL in the message; `add_inbound_connector` enforces
929+
// the same invariant from the other direction.
930+
if self.registrar.rec.has_transform() {
931+
panic!(
932+
"Record already has a .transform(); cannot also have a .link_from() for {}",
933+
self.url
934+
);
935+
}
936+
if self.registrar.rec.has_producer_service() {
937+
panic!(
938+
"Record already has a .source(); cannot also have a .link_from() for {}",
939+
self.url
940+
);
941+
}
942+
924943
// Resolve deserializer variant (mutually exclusive)
925944
let deser_kind = if let Some(ctx_deser) = self.context_deserializer {
926945
DeserializerKind::Context(ctx_deser)
@@ -1445,4 +1464,127 @@ mod tests {
14451464
// No serializer set — should panic
14461465
reg.link_to("mqtt://broker/topic").finish();
14471466
}
1467+
1468+
// ====================================================================
1469+
// Writer-exclusivity tests (.source / .transform / .link_from)
1470+
// ====================================================================
1471+
1472+
/// Helper: build a `TransformDescriptor` with a no-op spawn function.
1473+
fn dummy_transform_descriptor() -> crate::transform::TransformDescriptor<TestRecord, MockRuntime>
1474+
{
1475+
crate::transform::TransformDescriptor::<TestRecord, MockRuntime> {
1476+
input_keys: vec![],
1477+
spawn_fn: Box::new(|_p, _db, _ctx| Box::pin(async {})),
1478+
}
1479+
}
1480+
1481+
#[test]
1482+
#[should_panic(
1483+
expected = "Record already has a .source(); cannot also have a .link_from() for mqtt://broker/topic"
1484+
)]
1485+
fn link_from_after_source_panics() {
1486+
let mut rec = crate::typed_record::TypedRecord::<TestRecord, MockRuntime>::new();
1487+
rec.set_buffer(Box::new(MockBuffer));
1488+
rec.set_producer_service(|_p, _ctx| async move {});
1489+
1490+
let builders: Vec<Box<dyn crate::connector::ConnectorBuilder<MockRuntime>>> =
1491+
vec![Box::new(MockConnectorBuilder {
1492+
scheme: "mqtt".to_string(),
1493+
})];
1494+
let extensions = crate::extensions::Extensions::new();
1495+
1496+
let mut reg = make_registrar(&mut rec, &builders, &extensions);
1497+
reg.link_from("mqtt://broker/topic")
1498+
.with_deserializer_raw(|_b: &[u8]| Ok(TestRecord { value: 0 }))
1499+
.finish();
1500+
}
1501+
1502+
#[test]
1503+
#[should_panic(
1504+
expected = "Record already has a .transform(); cannot also have a .link_from() for mqtt://broker/topic"
1505+
)]
1506+
fn link_from_after_transform_panics() {
1507+
let mut rec = crate::typed_record::TypedRecord::<TestRecord, MockRuntime>::new();
1508+
rec.set_buffer(Box::new(MockBuffer));
1509+
rec.set_transform(dummy_transform_descriptor());
1510+
1511+
let builders: Vec<Box<dyn crate::connector::ConnectorBuilder<MockRuntime>>> =
1512+
vec![Box::new(MockConnectorBuilder {
1513+
scheme: "mqtt".to_string(),
1514+
})];
1515+
let extensions = crate::extensions::Extensions::new();
1516+
1517+
let mut reg = make_registrar(&mut rec, &builders, &extensions);
1518+
reg.link_from("mqtt://broker/topic")
1519+
.with_deserializer_raw(|_b: &[u8]| Ok(TestRecord { value: 0 }))
1520+
.finish();
1521+
}
1522+
1523+
#[test]
1524+
#[should_panic(expected = "Record already has a .link_from(); cannot also have a .source().")]
1525+
fn source_after_link_from_panics() {
1526+
let mut rec = crate::typed_record::TypedRecord::<TestRecord, MockRuntime>::new();
1527+
rec.set_buffer(Box::new(MockBuffer));
1528+
1529+
let builders: Vec<Box<dyn crate::connector::ConnectorBuilder<MockRuntime>>> =
1530+
vec![Box::new(MockConnectorBuilder {
1531+
scheme: "mqtt".to_string(),
1532+
})];
1533+
let extensions = crate::extensions::Extensions::new();
1534+
{
1535+
let mut reg = make_registrar(&mut rec, &builders, &extensions);
1536+
reg.link_from("mqtt://broker/topic")
1537+
.with_deserializer_raw(|_b: &[u8]| Ok(TestRecord { value: 0 }))
1538+
.finish();
1539+
}
1540+
1541+
rec.set_producer_service(|_p, _ctx| async move {});
1542+
}
1543+
1544+
#[test]
1545+
#[should_panic(
1546+
expected = "Record already has a .link_from(); cannot also have a .transform()."
1547+
)]
1548+
fn transform_after_link_from_panics() {
1549+
let mut rec = crate::typed_record::TypedRecord::<TestRecord, MockRuntime>::new();
1550+
rec.set_buffer(Box::new(MockBuffer));
1551+
1552+
let builders: Vec<Box<dyn crate::connector::ConnectorBuilder<MockRuntime>>> =
1553+
vec![Box::new(MockConnectorBuilder {
1554+
scheme: "mqtt".to_string(),
1555+
})];
1556+
let extensions = crate::extensions::Extensions::new();
1557+
{
1558+
let mut reg = make_registrar(&mut rec, &builders, &extensions);
1559+
reg.link_from("mqtt://broker/topic")
1560+
.with_deserializer_raw(|_b: &[u8]| Ok(TestRecord { value: 0 }))
1561+
.finish();
1562+
}
1563+
1564+
rec.set_transform(dummy_transform_descriptor());
1565+
}
1566+
1567+
#[test]
1568+
fn multiple_link_from_allowed() {
1569+
let mut rec = crate::typed_record::TypedRecord::<TestRecord, MockRuntime>::new();
1570+
rec.set_buffer(Box::new(MockBuffer));
1571+
1572+
let builders: Vec<Box<dyn crate::connector::ConnectorBuilder<MockRuntime>>> =
1573+
vec![Box::new(MockConnectorBuilder {
1574+
scheme: "mqtt".to_string(),
1575+
})];
1576+
let extensions = crate::extensions::Extensions::new();
1577+
let mut reg = make_registrar(&mut rec, &builders, &extensions);
1578+
1579+
// Chain via finish() → &mut RecordRegistrar — the registrar's
1580+
// lifetime only permits one borrow chain at a time.
1581+
reg.link_from("mqtt://broker/topic-a")
1582+
.with_deserializer_raw(|_b: &[u8]| Ok(TestRecord { value: 0 }))
1583+
.finish()
1584+
.link_from("mqtt://broker/topic-b")
1585+
.with_deserializer_raw(|_b: &[u8]| Ok(TestRecord { value: 0 }))
1586+
.finish();
1587+
1588+
assert_eq!(rec.inbound_connectors().len(), 2);
1589+
}
14481590
}

aimdb-core/src/typed_record.rs

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -623,7 +623,8 @@ impl<T: Send + 'static + Debug + Clone, R: aimdb_executor::Spawn + 'static> Type
623623
/// Long-running task that generates data via `producer.produce()`. Auto-spawned during `build()`.
624624
///
625625
/// # Panics
626-
/// Panics if producer already set (one producer per record).
626+
/// Panics if producer already set (one producer per record), if a transform is registered,
627+
/// or if a `.link_from()` inbound connector is registered (all three would race on the buffer).
627628
pub fn set_producer_service<F, Fut>(&mut self, f: F)
628629
where
629630
F: FnOnce(crate::Producer<T, R>, Arc<dyn Any + Send + Sync>) -> Fut + Send + Sync + 'static,
@@ -639,6 +640,10 @@ impl<T: Send + 'static + Debug + Clone, R: aimdb_executor::Spawn + 'static> Type
639640
panic!("Record already has a .transform(); cannot also have a .source().");
640641
}
641642

643+
if !self.inbound_connectors.is_empty() {
644+
panic!("Record already has a .link_from(); cannot also have a .source().");
645+
}
646+
642647
// Check if already set
643648
#[cfg(feature = "std")]
644649
let already_set = self.producer_service.lock().unwrap().is_some();
@@ -711,8 +716,10 @@ impl<T: Send + 'static + Debug + Clone, R: aimdb_executor::Spawn + 'static> Type
711716

712717
/// Sets the transform descriptor for this record.
713718
///
714-
/// A transform is mutually exclusive with a `.source()` — a record cannot
715-
/// have both. Panics if a source or transform is already registered.
719+
/// A transform is mutually exclusive with `.source()` and with any
720+
/// `.link_from()` inbound connector — all three write to the same buffer
721+
/// and would race as last-writer-wins. Panics if any of those are
722+
/// already registered, or if a transform is already set.
716723
pub(crate) fn set_transform(
717724
&mut self,
718725
descriptor: crate::transform::TransformDescriptor<T, R>,
@@ -727,6 +734,10 @@ impl<T: Send + 'static + Debug + Clone, R: aimdb_executor::Spawn + 'static> Type
727734
panic!("Record already has a .source(); cannot also have a .transform().");
728735
}
729736

737+
if !self.inbound_connectors.is_empty() {
738+
panic!("Record already has a .link_from(); cannot also have a .transform().");
739+
}
740+
730741
#[cfg(feature = "std")]
731742
let mut slot = self.transform.lock().unwrap();
732743
#[cfg(not(feature = "std"))]
@@ -1061,7 +1072,30 @@ impl<T: Send + 'static + Debug + Clone, R: aimdb_executor::Spawn + 'static> Type
10611072
/// Adds an inbound connector link (External → AimDB)
10621073
///
10631074
/// Called by `.link_from()` builder API during record configuration.
1075+
///
1076+
/// # Panics
1077+
/// Panics if a `.source()` or `.transform()` is already registered.
1078+
/// All three write to the same buffer and would race as last-writer-wins.
1079+
/// Multiple inbound connectors on the same record are permitted (fan-in).
10641080
pub fn add_inbound_connector(&mut self, link: crate::connector::InboundConnectorLink) {
1081+
#[cfg(feature = "std")]
1082+
let has_source = self.producer_service.lock().unwrap().is_some();
1083+
#[cfg(not(feature = "std"))]
1084+
let has_source = self.producer_service.lock().is_some();
1085+
1086+
if has_source {
1087+
panic!("Record already has a .source(); cannot also have a .link_from().");
1088+
}
1089+
1090+
#[cfg(feature = "std")]
1091+
let has_transform = self.transform.lock().unwrap().is_some();
1092+
#[cfg(not(feature = "std"))]
1093+
let has_transform = self.transform.lock().is_some();
1094+
1095+
if has_transform {
1096+
panic!("Record already has a .transform(); cannot also have a .link_from().");
1097+
}
1098+
10651099
self.inbound_connectors.push(link);
10661100
}
10671101

0 commit comments

Comments
 (0)