Skip to content

Commit 5791caf

Browse files
lxsaahclaude
andcommitted
refactor: remove latest_snapshot and use buffer-native peek() (M15)
Replaces the per-record latest_snapshot Mutex with a new DynBuffer::peek() method that reads from the buffer's native storage (watch::Sender slot for SingleLatest, Mutex<Option<T>> slot for Mailbox, None for SPMC Ring). The snapshot was a redundant second copy of every produced value, updated on two divergent code paths (TypedRecord::produce and RecordWriter::push), and the only mutex on the SingleLatest hot path. After this change the write path is unified through RecordWriter::push() and the SingleLatest produce path is lock-free. Also fixes a latent bug in TokioBuffer's Watch::push: it used tx.send() which returns Err and silently drops the value when no receivers exist. Switched to tx.send_replace() which always updates the slot — the snapshot had been masking this for record.get callers. Behaviour changes (documented in design 031 §Breaking Changes): - record.get returns not_found on SPMC Ring and bufferless records - record.get on Mailbox after record.drain returns not_found (slot was taken; previously the independent snapshot survived) - metadata mark_updated now fires on AimX record.set and the WASM adapter produce paths, which previously bypassed it Design: docs/design/031-M15-remove-latest-snapshot.md Verified: make check (fmt, clippy, std/no_std/embedded/wasm builds, deny) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent ab4abf6 commit 5791caf

8 files changed

Lines changed: 807 additions & 96 deletions

File tree

aimdb-core/src/buffer/traits.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,22 @@ pub trait DynBuffer<T: Clone + Send>: Send + Sync {
7575
/// Returns self as Any for downcasting to concrete buffer types
7676
fn as_any(&self) -> &dyn core::any::Any;
7777

78+
/// Non-destructive read of the buffer's current value.
79+
///
80+
/// Returns `Some(T)` if the buffer holds a current value that can be read
81+
/// without affecting any consumer's position. Returns `None` if the buffer
82+
/// type has no canonical "current value" concept (e.g., SPMC Ring) or if
83+
/// no value has been produced yet.
84+
///
85+
/// This is the buffer-native point-in-time read used by AimX `record.get`
86+
/// (design 031). Implementations must not advance any reader position.
87+
///
88+
/// The default returns `None`, which is the correct behaviour for buffers
89+
/// without a canonical latest value.
90+
fn peek(&self) -> Option<T> {
91+
None
92+
}
93+
7894
/// Get buffer metrics snapshot (metrics feature only)
7995
///
8096
/// Returns `Some(snapshot)` if the buffer implementation supports metrics,

aimdb-core/src/buffer/writer.rs

Lines changed: 7 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
//! `RecordWriter<T>` — the sole implementor of `WriteHandle<T>` (design 029).
22
//!
3-
//! Pre-binds the three Arcs a `TypedRecord<T, R>` already owns (buffer,
4-
//! latest-snapshot, metadata tracker) so `Producer<T>` can push values without
5-
//! holding a `Arc<AimDb<R>>` or running a `HashMap` lookup per call.
3+
//! Pre-binds the buffer and (std-only) metadata tracker so `Producer<T>` can
4+
//! push values without holding a `Arc<AimDb<R>>` or running a `HashMap`
5+
//! lookup per call.
66
77
#[cfg(not(feature = "std"))]
88
extern crate alloc;
@@ -16,15 +16,9 @@ use std::sync::Arc;
1616
use super::traits::{DynBuffer, WriteHandle};
1717

1818
pub(crate) struct RecordWriter<T: Clone + Send + 'static> {
19-
/// `None` for records that only support `latest()` (no buffer configured).
19+
/// `None` for records without a configured buffer.
2020
buffer: Option<Arc<dyn DynBuffer<T>>>,
2121

22-
/// Snapshot slot shared with `TypedRecord` and any `latest()` reader.
23-
#[cfg(feature = "std")]
24-
latest_snapshot: Arc<std::sync::Mutex<Option<T>>>,
25-
#[cfg(not(feature = "std"))]
26-
latest_snapshot: Arc<spin::Mutex<Option<T>>>,
27-
2822
/// Metadata tracker (already `Clone` with shared inner `Arc<Mutex>` /
2923
/// `Arc<AtomicBool>`). std-only.
3024
#[cfg(feature = "std")]
@@ -35,39 +29,19 @@ impl<T: Clone + Send + 'static> RecordWriter<T> {
3529
#[cfg(feature = "std")]
3630
pub(crate) fn new(
3731
buffer: Option<Arc<dyn DynBuffer<T>>>,
38-
latest_snapshot: Arc<std::sync::Mutex<Option<T>>>,
3932
metadata: crate::typed_record::RecordMetadataTracker,
4033
) -> Self {
41-
Self {
42-
buffer,
43-
latest_snapshot,
44-
metadata,
45-
}
34+
Self { buffer, metadata }
4635
}
4736

4837
#[cfg(not(feature = "std"))]
49-
pub(crate) fn new(
50-
buffer: Option<Arc<dyn DynBuffer<T>>>,
51-
latest_snapshot: Arc<spin::Mutex<Option<T>>>,
52-
) -> Self {
53-
Self {
54-
buffer,
55-
latest_snapshot,
56-
}
38+
pub(crate) fn new(buffer: Option<Arc<dyn DynBuffer<T>>>) -> Self {
39+
Self { buffer }
5740
}
5841
}
5942

6043
impl<T: Clone + Send + 'static> WriteHandle<T> for RecordWriter<T> {
6144
fn push(&self, value: T) {
62-
#[cfg(feature = "std")]
63-
{
64-
*self.latest_snapshot.lock().unwrap() = Some(value.clone());
65-
}
66-
#[cfg(not(feature = "std"))]
67-
{
68-
*self.latest_snapshot.lock() = Some(value.clone());
69-
}
70-
7145
if let Some(buf) = &self.buffer {
7246
buf.push(value);
7347
#[cfg(feature = "std")]

aimdb-core/src/builder.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1102,8 +1102,10 @@ impl<R: aimdb_executor::RuntimeAdapter + 'static> AimDb<R> {
11021102
where
11031103
T: Send + 'static + Debug + Clone,
11041104
{
1105+
// Single write path via WriteHandle (design 031). For hot paths,
1106+
// prefer `db.producer::<T>(key)` once and reuse the returned handle.
11051107
let typed_rec = self.inner.get_typed_record_by_key::<T, R>(key)?;
1106-
typed_rec.produce(value);
1108+
typed_rec.writer_handle().push(value);
11071109
Ok(())
11081110
}
11091111

aimdb-core/src/typed_record.rs

Lines changed: 12 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -598,16 +598,6 @@ pub struct TypedRecord<
598598
/// Stores the deserialization logic where T: Deserialize is known at call site
599599
#[cfg(feature = "std")]
600600
json_deserializer: Option<JsonDeserializer<T>>,
601-
602-
/// Latest value snapshot - for latest() API
603-
/// Cached atomically on every produce() call to support latest()
604-
/// This provides a buffer-agnostic way to query the latest value
605-
/// Available in both std and no_std environments
606-
#[cfg(feature = "std")]
607-
latest_snapshot: Arc<std::sync::Mutex<Option<T>>>,
608-
609-
#[cfg(not(feature = "std"))]
610-
latest_snapshot: Arc<spin::Mutex<Option<T>>>,
611601
}
612602

613603
impl<T: Send + 'static + Debug + Clone, R: aimdb_executor::RuntimeAdapter + 'static>
@@ -643,10 +633,6 @@ impl<T: Send + 'static + Debug + Clone, R: aimdb_executor::RuntimeAdapter + 'sta
643633
json_serializer: None,
644634
#[cfg(feature = "std")]
645635
json_deserializer: None,
646-
#[cfg(feature = "std")]
647-
latest_snapshot: Arc::new(std::sync::Mutex::new(None)),
648-
#[cfg(not(feature = "std"))]
649-
latest_snapshot: Arc::new(spin::Mutex::new(None)),
650636
}
651637
}
652638

@@ -1011,16 +997,12 @@ impl<T: Send + 'static + Debug + Clone, R: aimdb_executor::RuntimeAdapter + 'sta
1011997
{
1012998
Arc::new(crate::buffer::RecordWriter::new(
1013999
self.buffer.clone(),
1014-
self.latest_snapshot.clone(),
10151000
self.metadata.clone(),
10161001
))
10171002
}
10181003
#[cfg(not(feature = "std"))]
10191004
{
1020-
Arc::new(crate::buffer::RecordWriter::new(
1021-
self.buffer.clone(),
1022-
self.latest_snapshot.clone(),
1023-
))
1005+
Arc::new(crate::buffer::RecordWriter::new(self.buffer.clone()))
10241006
}
10251007
}
10261008

@@ -1334,31 +1316,6 @@ impl<T: Send + 'static + Debug + Clone, R: aimdb_executor::RuntimeAdapter + 'sta
13341316
}
13351317
}
13361318

1337-
/// Produces a value by pushing to the buffer
1338-
///
1339-
/// Enqueues value for consumer tasks and updates latest snapshot.
1340-
pub fn produce(&self, val: T) {
1341-
// Cache snapshot for latest() API (both std and no_std)
1342-
#[cfg(feature = "std")]
1343-
{
1344-
*self.latest_snapshot.lock().unwrap() = Some(val.clone());
1345-
}
1346-
1347-
#[cfg(not(feature = "std"))]
1348-
{
1349-
*self.latest_snapshot.lock() = Some(val.clone());
1350-
}
1351-
1352-
// Push to buffer - consumer tasks will receive it
1353-
if let Some(buf) = &self.buffer {
1354-
buf.push(val);
1355-
1356-
// Update metadata timestamp (std only)
1357-
#[cfg(feature = "std")]
1358-
self.metadata.mark_updated();
1359-
}
1360-
}
1361-
13621319
/// Returns whether a producer service is registered
13631320
pub fn has_producer_service(&self) -> bool {
13641321
#[cfg(feature = "std")]
@@ -1399,15 +1356,15 @@ impl<T: Send + 'static + Debug + Clone, R: aimdb_executor::RuntimeAdapter + 'sta
13991356
/// }
14001357
/// ```
14011358
pub fn latest(&self) -> Option<RecordValue<T>> {
1359+
// Read buffer-native storage via peek() (design 031). Records without
1360+
// a buffer return None — see Breaking Changes in design 031.
1361+
let value = self.buffer.as_ref()?.peek()?;
14021362
#[cfg(feature = "std")]
14031363
{
1404-
let value = self.latest_snapshot.lock().unwrap().clone()?;
14051364
Some(RecordValue::new(value, self.json_serializer.clone()))
14061365
}
1407-
14081366
#[cfg(not(feature = "std"))]
14091367
{
1410-
let value = self.latest_snapshot.lock().clone()?;
14111368
Some(RecordValue::new(value, None))
14121369
}
14131370
}
@@ -1596,8 +1553,11 @@ impl<T: Send + Sync + 'static + Debug + Clone, R: aimdb_executor::RuntimeAdapter
15961553
core::any::type_name::<T>()
15971554
);
15981555

1599-
// Delegate to latest() which returns RecordValue<T> with serializer attached
1600-
let result = self.latest().and_then(|v| v.as_json());
1556+
// Read buffer-native storage via peek() (design 031). Records without
1557+
// a buffer return None — see Breaking Changes in design doc.
1558+
let value = self.buffer.as_ref()?.peek()?;
1559+
let serializer = self.json_serializer.as_ref()?;
1560+
let result = serializer(&value);
16011561

16021562
#[cfg(feature = "tracing")]
16031563
tracing::debug!("Serialization result: {:?}", result.is_some());
@@ -1713,7 +1673,9 @@ impl<T: Send + Sync + 'static + Debug + Clone, R: aimdb_executor::RuntimeAdapter
17131673
core::any::type_name::<T>()
17141674
);
17151675

1716-
self.produce(value);
1676+
// Push through the unified write path (design 031). This also marks
1677+
// metadata as updated — previously skipped on this path.
1678+
self.writer_handle().push(value);
17171679

17181680
#[cfg(feature = "tracing")]
17191681
tracing::info!(

aimdb-tokio-adapter/src/buffer.rs

Lines changed: 112 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,11 @@ impl<T: Clone + Send + Sync + 'static> Buffer<T> for TokioBuffer<T> {
8181
let _ = tx.send(value);
8282
}
8383
TokioBufferInner::Watch { tx } => {
84-
let _ = tx.send(Some(value));
84+
// send_replace updates the slot unconditionally; send() would
85+
// fail (and silently drop the value) when no receivers exist,
86+
// which would also break peek() for producers that publish
87+
// before any subscriber attaches.
88+
tx.send_replace(Some(value));
8589
}
8690
TokioBufferInner::Notify { slot, notify } => {
8791
*slot.lock().unwrap() = Some(value);
@@ -129,6 +133,17 @@ impl<T: Clone + Send + Sync + 'static> aimdb_core::buffer::DynBuffer<T> for Toki
129133
self
130134
}
131135

136+
fn peek(&self) -> Option<T> {
137+
match &*self.inner {
138+
// watch::Sender::borrow() reads the slot non-destructively.
139+
TokioBufferInner::Watch { tx } => tx.borrow().clone(),
140+
// Same Mutex the Mailbox buffer already uses for the slot.
141+
TokioBufferInner::Notify { slot, .. } => slot.lock().unwrap().clone(),
142+
// broadcast has no canonical latest — see design 031 §SPMC Ring.
143+
TokioBufferInner::Broadcast { .. } => None,
144+
}
145+
}
146+
132147
#[cfg(feature = "metrics")]
133148
fn metrics_snapshot(&self) -> Option<BufferMetricsSnapshot> {
134149
Some(<Self as BufferMetrics>::metrics(self))
@@ -1091,6 +1106,102 @@ mod tests {
10911106
assert_eq!(remaining, vec![20, 30]);
10921107
}
10931108

1109+
// ========================================================================
1110+
// peek() Tests — non-destructive buffer-native reads (design 031)
1111+
// ========================================================================
1112+
1113+
mod peek_tests {
1114+
use super::super::*;
1115+
use aimdb_core::buffer::DynBuffer;
1116+
1117+
#[tokio::test]
1118+
async fn test_peek_single_latest_empty() {
1119+
let buffer = TokioBuffer::<i32>::new(&BufferCfg::SingleLatest);
1120+
assert_eq!(buffer.peek(), None);
1121+
}
1122+
1123+
#[tokio::test]
1124+
async fn test_peek_single_latest_returns_latest() {
1125+
let buffer = TokioBuffer::<i32>::new(&BufferCfg::SingleLatest);
1126+
DynBuffer::push(&buffer, 1);
1127+
DynBuffer::push(&buffer, 2);
1128+
DynBuffer::push(&buffer, 3);
1129+
assert_eq!(buffer.peek(), Some(3));
1130+
}
1131+
1132+
#[tokio::test]
1133+
async fn test_peek_single_latest_is_non_destructive() {
1134+
let buffer = TokioBuffer::<i32>::new(&BufferCfg::SingleLatest);
1135+
// Subscribe BEFORE push so the receiver's version counter advances
1136+
// on send_replace. (Watch receivers created after a push will only
1137+
// wake on the *next* push — that's the gap peek() exists to fill.)
1138+
let mut reader = Buffer::subscribe(&buffer);
1139+
DynBuffer::push(&buffer, 42);
1140+
1141+
// Multiple peeks return the same value.
1142+
assert_eq!(buffer.peek(), Some(42));
1143+
assert_eq!(buffer.peek(), Some(42));
1144+
1145+
// Peek did not consume the value from the subscriber's perspective.
1146+
assert_eq!(reader.recv().await.unwrap(), 42);
1147+
1148+
// And peek still works after the subscriber received.
1149+
assert_eq!(buffer.peek(), Some(42));
1150+
}
1151+
1152+
#[tokio::test]
1153+
async fn test_peek_single_latest_works_without_subscriber() {
1154+
// The exact case the design 031 snapshot was originally added for:
1155+
// a producer pushes before anyone subscribes. peek() must see it.
1156+
let buffer = TokioBuffer::<i32>::new(&BufferCfg::SingleLatest);
1157+
DynBuffer::push(&buffer, 17);
1158+
assert_eq!(buffer.peek(), Some(17));
1159+
}
1160+
1161+
#[tokio::test]
1162+
async fn test_peek_mailbox_empty() {
1163+
let buffer = TokioBuffer::<i32>::new(&BufferCfg::Mailbox);
1164+
assert_eq!(buffer.peek(), None);
1165+
}
1166+
1167+
#[tokio::test]
1168+
async fn test_peek_mailbox_returns_pending() {
1169+
let buffer = TokioBuffer::<i32>::new(&BufferCfg::Mailbox);
1170+
DynBuffer::push(&buffer, 7);
1171+
assert_eq!(buffer.peek(), Some(7));
1172+
}
1173+
1174+
#[tokio::test]
1175+
async fn test_peek_mailbox_drained_after_recv() {
1176+
let buffer = TokioBuffer::<i32>::new(&BufferCfg::Mailbox);
1177+
DynBuffer::push(&buffer, 99);
1178+
assert_eq!(buffer.peek(), Some(99));
1179+
// Subscriber takes the slot.
1180+
let mut reader = Buffer::subscribe(&buffer);
1181+
assert_eq!(reader.recv().await.unwrap(), 99);
1182+
// After take(), peek sees the slot is empty.
1183+
assert_eq!(buffer.peek(), None);
1184+
}
1185+
1186+
#[tokio::test]
1187+
async fn test_peek_mailbox_reflects_overwrite() {
1188+
let buffer = TokioBuffer::<i32>::new(&BufferCfg::Mailbox);
1189+
DynBuffer::push(&buffer, 1);
1190+
DynBuffer::push(&buffer, 2);
1191+
assert_eq!(buffer.peek(), Some(2));
1192+
}
1193+
1194+
#[tokio::test]
1195+
async fn test_peek_spmc_ring_returns_none() {
1196+
// Broadcast has no canonical latest — see design 031 §SPMC Ring.
1197+
let buffer = TokioBuffer::<i32>::new(&BufferCfg::SpmcRing { capacity: 8 });
1198+
assert_eq!(buffer.peek(), None);
1199+
DynBuffer::push(&buffer, 1);
1200+
DynBuffer::push(&buffer, 2);
1201+
assert_eq!(buffer.peek(), None);
1202+
}
1203+
}
1204+
10941205
// ========================================================================
10951206
// Metrics Tests (feature-gated)
10961207
// ========================================================================

aimdb-wasm-adapter/src/bindings.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -545,12 +545,10 @@ where
545545
let val: T = serde_wasm_bindgen::from_value(value)
546546
.map_err(|e| JsError::new(&format!("Contract violation: {e}")))?;
547547

548-
let inner = db.inner();
549-
let typed = inner
550-
.get_typed_record_by_key::<T, WasmAdapter>(key)
551-
.map_err(|e| JsError::new(&format!("{e:?}")))?;
552-
553-
typed.produce(val);
548+
// Single write path via Producer<T> (design 031).
549+
db.producer::<T>(key)
550+
.map_err(|e| JsError::new(&format!("{e:?}")))?
551+
.produce(val);
554552
Ok(())
555553
}
556554

0 commit comments

Comments
 (0)