Skip to content

Commit a214e30

Browse files
committed
refactor: rename producer_service to producer and update related methods
1 parent b108c81 commit a214e30

3 files changed

Lines changed: 22 additions & 23 deletions

File tree

aimdb-core/src/typed_api.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -407,7 +407,7 @@ where
407407
+ 'static,
408408
Fut: Future<Output = ()> + Send + 'static,
409409
{
410-
self.rec.set_producer_service(f);
410+
self.rec.set_producer(f);
411411
#[cfg(feature = "profiling")]
412412
{
413413
let (idx, _) = self.rec.profiling_mut().push_source();
@@ -1054,7 +1054,7 @@ where
10541054
self.url
10551055
);
10561056
}
1057-
if self.registrar.rec.has_producer_service() {
1057+
if self.registrar.rec.has_producer() {
10581058
panic!(
10591059
"Record already has a .source(); cannot also have a .link_from() for {}",
10601060
self.url
@@ -1618,7 +1618,7 @@ mod tests {
16181618
fn link_from_after_source_panics() {
16191619
let mut rec = crate::typed_record::TypedRecord::<TestRecord, MockRuntime>::new();
16201620
rec.set_buffer(Box::new(MockBuffer));
1621-
rec.set_producer_service(|_p, _ctx| async move {});
1621+
rec.set_producer(|_p, _ctx| async move {});
16221622

16231623
let builders: Vec<Box<dyn crate::connector::ConnectorBuilder<MockRuntime>>> =
16241624
vec![Box::new(MockConnectorBuilder {
@@ -1671,7 +1671,7 @@ mod tests {
16711671
.finish();
16721672
}
16731673

1674-
rec.set_producer_service(|_p, _ctx| async move {});
1674+
rec.set_producer(|_p, _ctx| async move {});
16751675
}
16761676

16771677
#[test]

aimdb-core/src/typed_record.rs

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ pub trait AnyRecord: Send + Sync {
303303
fn consumer_count(&self) -> usize;
304304

305305
/// Returns whether a producer service is registered
306-
fn has_producer_service(&self) -> bool;
306+
fn has_producer(&self) -> bool;
307307

308308
/// Returns whether a transform is registered for this record
309309
fn has_transform(&self) -> bool;
@@ -515,7 +515,7 @@ where
515515

516516
let mut futures = Vec::new();
517517

518-
if typed_record.has_producer_service() {
518+
if typed_record.has_producer() {
519519
if let Some(f) = typed_record.collect_producer_future(runtime, db, record_key)? {
520520
futures.push(f);
521521
}
@@ -544,14 +544,14 @@ pub struct TypedRecord<
544544
/// This will be auto-spawned during build() if present
545545
/// Stored as FnOnce that takes (Producer<T>, RuntimeContext) and returns a Future
546546
/// Wrapped in Mutex for interior mutability (needed to take() during spawning)
547-
producer_service: Mutex<Option<ProducerServiceFn<T>>>,
547+
producer: Mutex<Option<ProducerServiceFn<T>>>,
548548

549549
/// List of consumer/tap tasks - wrapped in Mutex for Sync + taking out during spawn
550550
/// Each is spawned as an independent background task that subscribes to the buffer
551551
/// Using Mutex provides the Sync bound required by AnyRecord trait
552552
consumers: Mutex<Vec<ConsumerServiceFn<T>>>,
553553

554-
/// Transform descriptor — mutually exclusive with producer_service.
554+
/// Transform descriptor — mutually exclusive with producer.
555555
/// If set, this record is a reactive derivation from one or more input records.
556556
/// Uses the same Mutex pattern for take()-during-spawn.
557557
transform: Mutex<Option<crate::transform::TransformDescriptor<T, R>>>,
@@ -607,7 +607,7 @@ impl<T: Send + 'static + Debug + Clone, R: aimdb_executor::RuntimeAdapter + 'sta
607607
/// Call `.with_remote_access()` to enable JSON (std only).
608608
pub fn new() -> Self {
609609
Self {
610-
producer_service: Mutex::new(None),
610+
producer: Mutex::new(None),
611611
consumers: Mutex::new(Vec::new()),
612612
transform: Mutex::new(None),
613613
buffer: None,
@@ -645,7 +645,7 @@ impl<T: Send + 'static + Debug + Clone, R: aimdb_executor::RuntimeAdapter + 'sta
645645
/// # Panics
646646
/// Panics if producer already set (one producer per record), if a transform is registered,
647647
/// or if a `.link_from()` inbound connector is registered (all three would race on the buffer).
648-
pub fn set_producer_service<F, Fut>(&mut self, f: F)
648+
pub fn set_producer<F, Fut>(&mut self, f: F)
649649
where
650650
F: FnOnce(crate::Producer<T>, Arc<dyn Any + Send + Sync>) -> Fut + Send + 'static,
651651
Fut: core::future::Future<Output = ()> + Send + 'static,
@@ -660,7 +660,7 @@ impl<T: Send + 'static + Debug + Clone, R: aimdb_executor::RuntimeAdapter + 'sta
660660
}
661661

662662
// Check if already set
663-
if lock(&self.producer_service).is_some() {
663+
if lock(&self.producer).is_some() {
664664
panic!("This record type already has a producer service");
665665
}
666666

@@ -672,7 +672,7 @@ impl<T: Send + 'static + Debug + Clone, R: aimdb_executor::RuntimeAdapter + 'sta
672672
);
673673

674674
// Store it in the mutex
675-
*lock(&self.producer_service) = Some(boxed_fn);
675+
*lock(&self.producer) = Some(boxed_fn);
676676
}
677677

678678
/// Adds a consumer function for this record
@@ -720,7 +720,7 @@ impl<T: Send + 'static + Debug + Clone, R: aimdb_executor::RuntimeAdapter + 'sta
720720
descriptor: crate::transform::TransformDescriptor<T, R>,
721721
) {
722722
// Enforce mutual exclusion with .source()
723-
if lock(&self.producer_service).is_some() {
723+
if lock(&self.producer).is_some() {
724724
panic!("Record already has a .source(); cannot also have a .transform().");
725725
}
726726

@@ -770,7 +770,7 @@ impl<T: Send + 'static + Debug + Clone, R: aimdb_executor::RuntimeAdapter + 'sta
770770
}
771771

772772
// Check for producer service (source)
773-
if lock(&self.producer_service).is_some() {
773+
if lock(&self.producer).is_some() {
774774
return crate::graph::RecordOrigin::Source;
775775
}
776776

@@ -1024,7 +1024,7 @@ impl<T: Send + 'static + Debug + Clone, R: aimdb_executor::RuntimeAdapter + 'sta
10241024
/// All three write to the same buffer and would race as last-writer-wins.
10251025
/// Multiple inbound connectors on the same record are permitted (fan-in).
10261026
pub fn add_inbound_connector(&mut self, link: crate::connector::InboundConnectorLink) {
1027-
if lock(&self.producer_service).is_some() {
1027+
if lock(&self.producer).is_some() {
10281028
panic!("Record already has a .source(); cannot also have a .link_from().");
10291029
}
10301030

@@ -1140,7 +1140,7 @@ impl<T: Send + 'static + Debug + Clone, R: aimdb_executor::RuntimeAdapter + 'sta
11401140
let _ = db;
11411141

11421142
// Take the producer service (can only collect once)
1143-
let service = lock(&self.producer_service).take();
1143+
let service = lock(&self.producer).take();
11441144

11451145
if let Some(service_fn) = service {
11461146
#[cfg(feature = "tracing")]
@@ -1170,8 +1170,8 @@ impl<T: Send + 'static + Debug + Clone, R: aimdb_executor::RuntimeAdapter + 'sta
11701170
}
11711171

11721172
/// Returns whether a producer service is registered
1173-
pub fn has_producer_service(&self) -> bool {
1174-
lock(&self.producer_service).is_some()
1173+
pub fn has_producer(&self) -> bool {
1174+
lock(&self.producer).is_some()
11751175
}
11761176

11771177
/// Marks this record as writable for remote access (std only)
@@ -1283,8 +1283,8 @@ impl<T: Send + Sync + 'static + Debug + Clone, R: aimdb_executor::RuntimeAdapter
12831283
TypedRecord::consumer_count(self)
12841284
}
12851285

1286-
fn has_producer_service(&self) -> bool {
1287-
TypedRecord::has_producer_service(self)
1286+
fn has_producer(&self) -> bool {
1287+
TypedRecord::has_producer(self)
12881288
}
12891289

12901290
fn has_transform(&self) -> bool {
@@ -1347,7 +1347,7 @@ impl<T: Send + Sync + 'static + Debug + Clone, R: aimdb_executor::RuntimeAdapter
13471347
self.record_origin(),
13481348
buffer_type,
13491349
buffer_capacity,
1350-
if self.has_producer_service() { 1 } else { 0 },
1350+
if self.has_producer() { 1 } else { 0 },
13511351
self.consumer_count(),
13521352
self.metadata
13531353
.writable
@@ -1452,7 +1452,7 @@ impl<T: Send + Sync + 'static + Debug + Clone, R: aimdb_executor::RuntimeAdapter
14521452
);
14531453

14541454
// SAFETY CHECK 1: Enforce "No Producer Override" rule
1455-
if self.has_producer_service() || self.has_transform() {
1455+
if self.has_producer() || self.has_transform() {
14561456
#[cfg(feature = "tracing")]
14571457
tracing::warn!(
14581458
"Rejected set_from_json for '{}': has active producer or transform",

aimdb-tokio-adapter/tests/stage_profiling.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@ async fn source_and_tap_stages_are_timed_and_named() {
7272
);
7373
assert!(s.min_time_ns() <= s.avg_time_ns());
7474
assert!(s.avg_time_ns() <= s.max_time_ns());
75-
assert_eq!(s.avg_time_ns(), s.total_time_ns() / s.call_count());
7675

7776
// Tap stage.
7877
let tap = prof.tap(0).expect("tap stage registered");

0 commit comments

Comments
 (0)