Skip to content

Commit 1ebf308

Browse files
lxsaahclaude
andauthored
Remove latest snapshot (#119)
* refactor: remove latest_snapshot and use buffer-native peek() (M15) Replaces the per-record latest_snapshot Mutex with a new DynBuffer::peek() method that reads from the buffer's native storage (watch::Sender slot for SingleLatest, Mutex<Option<T>> slot for Mailbox, None for SPMC Ring). The snapshot was a redundant second copy of every produced value, updated on two divergent code paths (TypedRecord::produce and RecordWriter::push), and the only mutex on the SingleLatest hot path. After this change the write path is unified through RecordWriter::push() and the SingleLatest produce path is lock-free. Also fixes a latent bug in TokioBuffer's Watch::push: it used tx.send() which returns Err and silently drops the value when no receivers exist. Switched to tx.send_replace() which always updates the slot — the snapshot had been masking this for record.get callers. Behaviour changes (documented in design 031 §Breaking Changes): - record.get returns not_found on SPMC Ring and bufferless records - record.get on Mailbox after record.drain returns not_found (slot was taken; previously the independent snapshot survived) - metadata mark_updated now fires on AimX record.set and the WASM adapter produce paths, which previously bypassed it Design: docs/design/031-M15-remove-latest-snapshot.md Verified: make check (fmt, clippy, std/no_std/embedded/wasm builds, deny) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * refactor: streamline buffer configuration handling across modules * refactor: rename producer_service to producer and update related methods * feat(codec): introduce JSON codec for records with no_std support - Added `RemoteSerialize` trait for JSON serialization/deserialization, blanket-implemented for all `serde` types. - Implemented `JsonCodec<T>` trait for type-erased JSON encoding/decoding. - Introduced `SerdeJsonCodec` as a zero-sized implementation of `JsonCodec`. - Updated `TypedRecord` to use a single `remote_codec` field instead of separate serializer/deserializer closures. - Modified `with_remote_access` to require `RemoteSerialize` and enable JSON codec installation. - Updated `RecordValue` to utilize the new codec for JSON serialization. - Removed the deprecated `with_read_only_serialization` method. - Added documentation for the new codec functionality and its usage. * refactor: update documentation and validation for remote-access records without buffer * refactor: update dependencies and enhance buffer functionality with peek() method * changelog: update changelogs for aimdb-core, aimdb-embassy-adapter, aimdb-tokio-adapter, and aimdb-wasm-adapter with new features, fixes, and breaking changes --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent ab4abf6 commit 1ebf308

29 files changed

Lines changed: 1855 additions & 542 deletions

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2727
2828
## [Unreleased]
2929

30+
### Added
31+
32+
- **M16 — JSON codec extracted behind the `json-serialize` feature; `RecordValue::as_json()` now works on `no_std + alloc`, not just `std` ([Design 032](docs/design/032-M16-aimx-json-codec.md)).** New `aimdb-core::codec` module: `RemoteSerialize` (blanket-impl'd for every `serde` `Serialize + DeserializeOwned` type), the object-safe `JsonCodec<T>`, and the zero-sized `SerdeJsonCodec`. `serde_json` runs on `alloc`, so embedded targets can opt in; `std` enables the feature transitively, so std builds are unaffected. ([aimdb-core](aimdb-core/CHANGELOG.md))
33+
- **Embassy buffer + join-queue tests now run in CI (Issue #85).** The join-queue tests previously sat behind `embassy-runtime`, which pulls `embassy-executor`'s cortex-m assembly and can't compile under `cargo test` on x86_64 — so ordering / backpressure / clone-routing regressions were never caught. The `join_queue` module is now gated on `embassy-sync`, and `make test` runs the embassy adapter's unit tests + doctests on the host (no executor). Also adds `EmbassyBuffer::peek()` and fixes a stale `EmbassyBuffer` doc example. ([aimdb-embassy-adapter](aimdb-embassy-adapter/CHANGELOG.md))
34+
3035
### Changed (breaking)
3136

37+
- **M15 — `latest_snapshot` removed; point-in-time reads go through the new buffer-native `DynBuffer::peek()` ([Design 031](docs/design/031-M15-remove-latest-snapshot.md)).** `TypedRecord::latest()` and AimX `record.get` read the buffer directly instead of a per-record snapshot mutex (one lock + clone off the `produce()` hot path). Consequences: a `.with_remote_access()` record with **no buffer** now fails `build()` (was a silent runtime no-op); `record.get` / `latest()` on an `SpmcRing` record returns `not_found` / `None` (rings have no canonical latest — use `record.drain` / `record.subscribe`); `SingleLatest` and `Mailbox` are unaffected. `TypedRecord::produce` is removed — all writes go through `WriteHandle::push`. Adapters implement `peek()` per buffer type. ([aimdb-core](aimdb-core/CHANGELOG.md), [aimdb-tokio-adapter](aimdb-tokio-adapter/CHANGELOG.md), [aimdb-embassy-adapter](aimdb-embassy-adapter/CHANGELOG.md), [aimdb-wasm-adapter](aimdb-wasm-adapter/CHANGELOG.md))
38+
- **M16 — `with_remote_access()` now requires the `json-serialize` feature (transitively enabled by `std`); `with_read_only_serialization()` removed ([Design 032](docs/design/032-M16-aimx-json-codec.md)).** The stored serializer/deserializer closures are replaced by a type-erased `Arc<dyn JsonCodec<T>>`. A `Serialize`-only record can no longer be exposed read-only over remote access. ([aimdb-core](aimdb-core/CHANGELOG.md))
39+
3240
- **M13 — `Spawn` trait removed across the workspace; `AimDbBuilder::build()` now returns `(AimDb, AimDbRunner)` (Issue #88, [Design 028](docs/design/028-M13-remove-spawn-trait.md)).** Every future the database drives — producer services, taps, transforms, join forwarders, connector loops, the remote-access supervisor, `on_start` tasks — is collected at build time and driven by a single `FuturesUnordered` inside `runner.run().await`. Adapter implementations (`TokioAdapter`, `EmbassyAdapter`, `WasmAdapter`) drop their `impl Spawn`. The `embassy-task-pool-8/16/32` features are deleted and `EmbassyAdapter::new_with_network` no longer takes a `Spawner`. Connector authors must update `ConnectorBuilder::build()` to return `Vec<BoxFuture>` instead of `Arc<dyn Connector>`. See each crate's CHANGELOG for the per-crate impact.
3341

3442
## [1.1.0] - 2026-05-22

Cargo.lock

Lines changed: 3 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Makefile

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,8 @@ test:
104104
cargo test --package aimdb-core --no-default-features --features "alloc,profiling"
105105
@printf "$(YELLOW) → Testing aimdb-core (no_std + alloc + metrics)$(NC)\n"
106106
cargo test --package aimdb-core --no-default-features --features "alloc,metrics"
107+
@printf "$(YELLOW) → Testing aimdb-core (no_std + alloc + json-serialize)$(NC)\n"
108+
cargo test --package aimdb-core --no-default-features --features "alloc,json-serialize"
107109
@printf "$(YELLOW) → Testing aimdb-core remote module$(NC)\n"
108110
cargo test --package aimdb-core --lib --features "std" remote::
109111
@printf "$(YELLOW) → Testing tokio adapter$(NC)\n"
@@ -112,6 +114,8 @@ test:
112114
cargo test --package aimdb-tokio-adapter --features "tokio-runtime,tracing,metrics"
113115
@printf "$(YELLOW) → Testing tokio adapter (with profiling)$(NC)\n"
114116
cargo test --package aimdb-tokio-adapter --features "tokio-runtime,tracing,profiling"
117+
@printf "$(YELLOW) → Testing embassy adapter (host, no executor: buffers, join-queue, doctests)$(NC)\n"
118+
cargo test --package aimdb-embassy-adapter --no-default-features --features "alloc,embassy-sync,embassy-time"
115119
@printf "$(YELLOW) → Testing sync wrapper$(NC)\n"
116120
cargo test --package aimdb-sync
117121
@printf "$(YELLOW) → Testing codegen library$(NC)\n"
@@ -167,6 +171,8 @@ clippy:
167171
cargo clippy --package aimdb-data-contracts --no-default-features --features alloc -- -D warnings
168172
@printf "$(YELLOW) → Clippy on aimdb-core (no_std + alloc)$(NC)\n"
169173
cargo clippy --package aimdb-core --no-default-features --features alloc --all-targets -- -D warnings
174+
@printf "$(YELLOW) → Clippy on aimdb-core (no_std + alloc + json-serialize)$(NC)\n"
175+
cargo clippy --package aimdb-core --no-default-features --features "alloc,json-serialize" --all-targets -- -D warnings
170176
@printf "$(YELLOW) → Clippy on aimdb-core (std)$(NC)\n"
171177
cargo clippy --package aimdb-core --features "std,tracing,metrics" --all-targets -- -D warnings
172178
@printf "$(YELLOW) → Clippy on tokio adapter$(NC)\n"
@@ -253,6 +259,8 @@ test-embedded:
253259
cargo check --package aimdb-data-contracts --target thumbv7em-none-eabihf --no-default-features --features alloc
254260
@printf "$(YELLOW) → Checking aimdb-core (no_std minimal) on thumbv7em-none-eabihf target$(NC)\n"
255261
cargo check --package aimdb-core --target thumbv7em-none-eabihf --no-default-features --features alloc
262+
@printf "$(YELLOW) → Checking aimdb-core (no_std + alloc + json-serialize) on thumbv7em-none-eabihf target$(NC)\n"
263+
cargo check --package aimdb-core --target thumbv7em-none-eabihf --no-default-features --features "alloc,json-serialize"
256264
@printf "$(YELLOW) → Checking aimdb-core (no_std/embassy) on thumbv7em-none-eabihf target$(NC)\n"
257265
cargo check --package aimdb-core --target thumbv7em-none-eabihf --no-default-features --features alloc
258266
@printf "$(YELLOW) → Checking aimdb-embassy-adapter on thumbv7em-none-eabihf target$(NC)\n"

aimdb-core/CHANGELOG.md

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Added
11+
12+
- **`json-serialize` feature + `codec` module (M16, Design 032).** New `crate::codec` module with `RemoteSerialize` (capability trait, blanket-impl'd for every `serde` `Serialize + DeserializeOwned` type), the object-safe `JsonCodec<T>` storage trait, and the zero-sized `SerdeJsonCodec`. All three are re-exported from the crate root. The feature is `no_std + alloc` compatible (`serde_json` runs on `alloc`), so `RecordValue::as_json()` now works on embedded targets, not just `std`. `std` enables `json-serialize` transitively, so existing std builds are unaffected.
13+
- **`DynBuffer::peek(&self) -> Option<T>` (M15, Design 031).** Non-destructive, buffer-native point-in-time read; the default impl returns `None` (correct for buffers with no canonical latest, e.g. broadcast/SPMC rings). AimX `record.get` and `TypedRecord::latest()` now route through it. Adapters implement it per buffer type — see the tokio/embassy adapter changelogs.
14+
1015
### Internal refactors
1116

1217
- **AimX remote-access path is now spawn-free (Issue #114, Design 030).** Every remaining `tokio::spawn` in `aimdb-core/src/remote/` was removed; the supervisor's accept loop and each connection handler now own their own `FuturesUnordered<BoxFuture>` driven by `tokio::select! { biased; }`. Cancellation collapsed to one mechanism — dropping the future.
@@ -17,6 +22,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1722

1823
### Changed (breaking)
1924

25+
- **`latest_snapshot` removed from `TypedRecord`; `latest()` / AimX `record.get` read the buffer via `peek()` (M15, Design 031).** Eliminates one snapshot-mutex lock + `Option<T>` clone per `produce()` on the hot path. Behavioural consequences:
26+
- A record configured with `.with_remote_access()` but **no buffer** now fails `build()` with a clear error (previously a silent runtime no-op — reads returned `not_found`, writes were discarded). Add a buffer, e.g. `.buffer(BufferCfg::SingleLatest)`.
27+
- `record.get` / `latest()` on an `SpmcRing` record now returns `not_found` / `None` — a ring keeps per-consumer history with no canonical latest. Use `record.drain` (history) or `record.subscribe` (live). `SingleLatest` and `Mailbox` are unaffected.
28+
- On `no_std`/embedded, `latest()` now depends on the adapter implementing `peek()` (the Embassy adapter does — see its changelog).
29+
- **`with_remote_access()` is now gated on `json-serialize` and bounded on `T: codec::RemoteSerialize` (M16, Design 032).** Same effective bound as before (`Serialize + DeserializeOwned`, blanket-impl'd), but the stored serializer/deserializer closures are replaced by a single type-erased `Arc<dyn JsonCodec<T>>`. `std` enables `json-serialize`, so std callers see no change; `no_std + alloc` callers must enable the `json-serialize` feature to call it.
30+
- **`producer_service` renamed to `producer` (M15).** `TypedRecord::set_producer_service``set_producer`, and `has_producer_service``has_producer` (the latter also on the `AnyRecord` trait). Affects code that called these methods directly; the public `.source()` registrar API is unchanged. Also collapses the std/no_std `cfg` split on `AnyRecord::buffer_info` / `transform_input_keys` into single signatures.
2031
- **`AimxConfig` lost `subscription_queue_size` (Issue #114, Design 030).** The field bounded a per-subscription mpsc channel that no longer exists — subscriptions are now one future in a `FuturesUnordered`. The builder method `.subscription_queue_size(n)` is removed; replace it with `.max_subs_per_connection(n)` if you were using the value as a soft cap on subscription count, or just delete the call.
2132
- **AimX `Welcome.max_subscriptions` now reports the real per-connection cap.** Previously it returned `subscription_queue_size` (default 100) while the actual cap was implicit; it now returns `max_subs_per_connection` (default 32). Clients that displayed this value will see the change.
2233
- **AimX `record.subscribe` response no longer carries `queue_size`.** Result object is now `{ "subscription_id": "..." }` — the previous `"queue_size"` reported a number that no longer corresponded to anything in the implementation.
@@ -25,7 +36,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2536
- **`Producer::produce` is now sync + infallible; `Consumer::subscribe` is now infallible (Design 029 follow-up, M14).** The pre-resolved `WriteHandle::push` cannot fail and the pre-resolved buffer Arc makes `subscribe()` infallible. Call sites collapse: `producer.produce(x).await?``producer.produce(x);` and `let Ok(reader) = consumer.subscribe() else { ... }``let reader = consumer.subscribe();`. The `ProducerTrait::produce_any` / `ConsumerTrait::subscribe_any` trait surfaces stay `Result`/`async` because the type-erasure downcast remains fallible.
2637
- `AimDb::produce<T>(key, value) -> DbResult<()>` is now sync; `.await` on the call site goes away. Only the key lookup can fail.
2738
- `Database::produce` likewise sync.
28-
- `TypedRecord::produce` is now `pub fn produce(&self, val: T)` (was `pub async fn produce`).
39+
- `TypedRecord::produce` was made sync here (was `pub async fn produce`), then **removed entirely in M15** — see _Removed (breaking)_ below.
2940
- `aimdb-wasm-adapter`: `bindings::poll_sync` helper deleted — no remaining callers now that `TypedRecord::produce` is sync.
3041
- Dead `consumer.subscribe()` error arms in `transform/single.rs` and `transform/join.rs` removed (the `Err` branch was unreachable after M14).
3142

@@ -52,6 +63,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
5263
- On the AimX remote-access path, three `runtime.spawn(...)` call sites were temporarily bridged to bare `tokio::spawn` under `#[cfg(feature = "std")]`. These have since been removed by the AimX spawn-free follow-up — see the "AimX remote-access path is now spawn-free" entry above.
5364
- `on_start` no_std bifurcation collapsed: a single `StartFnType<R>` alias replaces the byte-identical std/no_std pair.
5465

66+
### Removed (breaking)
67+
68+
- **`TypedRecord::produce` removed (M15, Design 031).** The M14 step (above) made it sync; M15 removes it entirely. All writes now go through `WriteHandle::push` via `TypedRecord::writer_handle()`. `AimDb::produce` and AimX `set_from_json` route through it; as a side effect `set_from_json` now marks record metadata as updated (previously skipped on that path). `WriteHandle` / `RecordWriter` no longer carry the snapshot mutex.
69+
- **`with_read_only_serialization()` removed (M16, Design 032).** A `Serialize`-only record can no longer be exposed read-only over remote access. Use `with_remote_access()`, which additionally requires `DeserializeOwned`. No in-tree callers existed.
70+
5571
## [1.1.0] - 2026-05-22
5672

5773
### Added

aimdb-core/Cargo.toml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,19 @@ std = [
2323
"serde",
2424
"thiserror",
2525
"anyhow",
26-
"serde_json",
26+
"json-serialize",
2727
"tokio",
2828
"aimdb-executor/std",
2929
]
3030

3131
# Heap allocation in no_std environments
3232
alloc = ["serde"] # Enable heap in no_std
3333

34+
# JSON codec (`crate::codec`): serde_json-backed `RemoteSerialize` / `JsonCodec`.
35+
# no_std-compatible (serde_json runs on alloc); opt in on embedded targets to
36+
# get `record.latest()?.as_json()` without std/AimX. `std` enables it for AimX.
37+
json-serialize = ["alloc", "serde_json"]
38+
3439
# Observability features (available on both std/no_std)
3540
tracing = ["dep:tracing"] # Works in both std and no_std environments
3641
defmt = ["dep:defmt"] # Embedded logging via probe (no_std)

aimdb-core/src/buffer/traits.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,22 @@ pub trait DynBuffer<T: Clone + Send>: Send + Sync {
7575
/// Returns self as Any for downcasting to concrete buffer types
7676
fn as_any(&self) -> &dyn core::any::Any;
7777

78+
/// Non-destructive read of the buffer's current value.
79+
///
80+
/// Returns `Some(T)` if the buffer holds a current value that can be read
81+
/// without affecting any consumer's position. Returns `None` if the buffer
82+
/// type has no canonical "current value" concept (e.g., SPMC Ring) or if
83+
/// no value has been produced yet.
84+
///
85+
/// This is the buffer-native point-in-time read used by AimX `record.get`
86+
/// (design 031). Implementations must not advance any reader position.
87+
///
88+
/// The default returns `None`, which is the correct behaviour for buffers
89+
/// without a canonical latest value.
90+
fn peek(&self) -> Option<T> {
91+
None
92+
}
93+
7894
/// Get buffer metrics snapshot (metrics feature only)
7995
///
8096
/// Returns `Some(snapshot)` if the buffer implementation supports metrics,

aimdb-core/src/buffer/writer.rs

Lines changed: 7 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
//! `RecordWriter<T>` — the sole implementor of `WriteHandle<T>` (design 029).
22
//!
3-
//! Pre-binds the three Arcs a `TypedRecord<T, R>` already owns (buffer,
4-
//! latest-snapshot, metadata tracker) so `Producer<T>` can push values without
5-
//! holding a `Arc<AimDb<R>>` or running a `HashMap` lookup per call.
3+
//! Pre-binds the buffer and (std-only) metadata tracker so `Producer<T>` can
4+
//! push values without holding a `Arc<AimDb<R>>` or running a `HashMap`
5+
//! lookup per call.
66
77
#[cfg(not(feature = "std"))]
88
extern crate alloc;
@@ -16,15 +16,9 @@ use std::sync::Arc;
1616
use super::traits::{DynBuffer, WriteHandle};
1717

1818
pub(crate) struct RecordWriter<T: Clone + Send + 'static> {
19-
/// `None` for records that only support `latest()` (no buffer configured).
19+
/// `None` for records without a configured buffer.
2020
buffer: Option<Arc<dyn DynBuffer<T>>>,
2121

22-
/// Snapshot slot shared with `TypedRecord` and any `latest()` reader.
23-
#[cfg(feature = "std")]
24-
latest_snapshot: Arc<std::sync::Mutex<Option<T>>>,
25-
#[cfg(not(feature = "std"))]
26-
latest_snapshot: Arc<spin::Mutex<Option<T>>>,
27-
2822
/// Metadata tracker (already `Clone` with shared inner `Arc<Mutex>` /
2923
/// `Arc<AtomicBool>`). std-only.
3024
#[cfg(feature = "std")]
@@ -35,39 +29,19 @@ impl<T: Clone + Send + 'static> RecordWriter<T> {
3529
#[cfg(feature = "std")]
3630
pub(crate) fn new(
3731
buffer: Option<Arc<dyn DynBuffer<T>>>,
38-
latest_snapshot: Arc<std::sync::Mutex<Option<T>>>,
3932
metadata: crate::typed_record::RecordMetadataTracker,
4033
) -> Self {
41-
Self {
42-
buffer,
43-
latest_snapshot,
44-
metadata,
45-
}
34+
Self { buffer, metadata }
4635
}
4736

4837
#[cfg(not(feature = "std"))]
49-
pub(crate) fn new(
50-
buffer: Option<Arc<dyn DynBuffer<T>>>,
51-
latest_snapshot: Arc<spin::Mutex<Option<T>>>,
52-
) -> Self {
53-
Self {
54-
buffer,
55-
latest_snapshot,
56-
}
38+
pub(crate) fn new(buffer: Option<Arc<dyn DynBuffer<T>>>) -> Self {
39+
Self { buffer }
5740
}
5841
}
5942

6043
impl<T: Clone + Send + 'static> WriteHandle<T> for RecordWriter<T> {
6144
fn push(&self, value: T) {
62-
#[cfg(feature = "std")]
63-
{
64-
*self.latest_snapshot.lock().unwrap() = Some(value.clone());
65-
}
66-
#[cfg(not(feature = "std"))]
67-
{
68-
*self.latest_snapshot.lock() = Some(value.clone());
69-
}
70-
7145
if let Some(buf) = &self.buffer {
7246
buf.push(value);
7347
#[cfg(feature = "std")]

aimdb-core/src/builder.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1102,8 +1102,10 @@ impl<R: aimdb_executor::RuntimeAdapter + 'static> AimDb<R> {
11021102
where
11031103
T: Send + 'static + Debug + Clone,
11041104
{
1105+
// Single write path via WriteHandle (design 031). For hot paths,
1106+
// prefer `db.producer::<T>(key)` once and reuse the returned handle.
11051107
let typed_rec = self.inner.get_typed_record_by_key::<T, R>(key)?;
1106-
typed_rec.produce(value);
1108+
typed_rec.writer_handle().push(value);
11071109
Ok(())
11081110
}
11091111

0 commit comments

Comments
 (0)