Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions aimdb-core/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ impl AimDbInner {
#[cfg(feature = "remote-access")]
pub fn try_latest_as_json(&self, record_key: &str) -> Option<serde_json::Value> {
let id = self.resolve_str(record_key)?;
self.storages.get(id.index())?.latest_json()
self.storages.get(id.index())?.json_access()?.latest_json()
}

/// Sets a record value from JSON (remote access API)
Expand Down Expand Up @@ -260,7 +260,14 @@ impl AimDbInner {
.resolve_str(record_key)
.ok_or_else(|| DbError::record_key_not_found(record_key.to_string()))?;

self.storages[id.index()].set_from_json(json_value)
self.storages[id.index()]
.json_access()
.ok_or_else(|| {
DbError::runtime_error(alloc::format!(
"Record '{record_key}' does not support JSON remote access"
))
})?
.set_from_json(json_value)
}
}

Expand Down
6 changes: 5 additions & 1 deletion aimdb-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ pub use typed_api::{
Consumer, InboundConnectorBuilder, OutboundConnectorBuilder, Producer, RecordRegistrar,
RecordT, StageKind,
};
pub use typed_record::{AnyRecord, AnyRecordExt, TypedRecord};
#[cfg(feature = "remote-access")]
pub use typed_record::JsonRecordAccess;
pub use typed_record::{
AnyRecord, AnyRecordExt, RecordIntrospect, RecordMetricsReset, TypedRecord,
};

// JSON codec (feature `json-serialize`, no_std + alloc compatible)
#[cfg(feature = "json-serialize")]
Expand Down
9 changes: 8 additions & 1 deletion aimdb-core/src/remote/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,14 @@ pub(crate) fn stream_record_updates(
let record = inner
.storage(id)
.ok_or(DbError::InvalidRecordId { id: id.raw() })?;
let reader = record.subscribe_json()?;
let reader = record
.json_access()
.ok_or_else(|| {
DbError::runtime_error(alloc::format!(
"Record '{record_key}' does not support JSON remote access"
))
})?
.subscribe_json()?;

// Pair the reader with an owned copy of the record key so lag/error
// logs identify which record fell behind — the previous mpsc-based
Expand Down
10 changes: 9 additions & 1 deletion aimdb-core/src/session/aimx/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,15 @@ impl AimxSession {
let record = self.db.inner().storage(id).ok_or(RpcError::NotFound)?;
// `subscribe_json` fails if the record was not configured with
// `.with_remote_access()`.
let reader = record.subscribe_json().map_err(map_db_err)?;
let reader = record
.json_access()
.ok_or_else(|| {
map_db_err(DbError::runtime_error(alloc::format!(
"Record '{name}' does not support JSON remote access"
)))
})?
.subscribe_json()
.map_err(map_db_err)?;
self.drain_readers.insert(name.to_string(), reader);
}

Expand Down
178 changes: 112 additions & 66 deletions aimdb-core/src/typed_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,22 @@ type ConsumerServiceFn<T> =
type ProducerServiceFn<T> =
Box<dyn FnOnce(crate::RuntimeContext, crate::Producer<T>) -> BoxFuture<'static, ()> + Send>;

/// Type-erased trait for records
/// Type-erased trait for records — the storage and lifecycle contract.
///
/// Allows storage of heterogeneous record types in a single collection
/// while maintaining type safety through downcast operations.
///
/// Since the 036 W2 split this trait carries only the storage/lifecycle
/// surface. The other capabilities live in dedicated traits, reachable from
/// any `dyn AnyRecord`:
/// - [`RecordIntrospect`] (supertrait) — graph/metadata introspection
/// - [`RecordMetricsReset`] (supertrait) — profiling/metrics counter resets
/// - [`JsonRecordAccess`] — JSON remote access, via [`AnyRecord::json_access`]
///
/// Consumers: `AimDbBuilder::build()` (validation, config-error draining,
/// typed downcasts via [`AnyRecordExt`]) and connectors applying the
/// remote-access security policy (`set_writable_erased`).
///
/// # Thread Safety Requirements
///
/// This trait requires both `Send` and `Sync` because:
Expand All @@ -207,7 +218,7 @@ type ProducerServiceFn<T> =
/// **Migration:** If your record type `T` is not `Sync`, wrap non-`Sync` fields
/// in `Arc<Mutex<_>>` or `Arc<RwLock<_>>` to achieve interior mutability with
/// thread-safe sharing.
pub trait AnyRecord: Send + Sync {
pub trait AnyRecord: RecordIntrospect + RecordMetricsReset + Send + Sync {
/// Validates that the record has correct producer/consumer setup
///
/// Rules: Must have exactly one producer and at least one consumer.
Expand All @@ -219,18 +230,53 @@ pub trait AnyRecord: Send + Sync {
/// Returns self as mutable Any for downcasting
fn as_any_mut(&mut self) -> &mut dyn Any;

/// Drains the configuration mistakes recorded during registration.
///
/// Called by `AimDbBuilder::build()`, which fills in the record key and
/// reports every finding via
/// [`DbError::InvalidConfiguration`](crate::DbError::InvalidConfiguration).
fn drain_config_errors(&mut self) -> Vec<crate::error::ConfigError>;

/// Sets the writable flag for this record (type-erased)
///
/// Used internally by the builder to apply security policy to records.
fn set_writable_erased(&self, writable: bool);

/// Returns the record's JSON remote-access surface, if it has one.
///
/// This accessor is the single place the `remote-access` cfg-gate lives
/// for consumers: they query the capability here instead of cfg-gating
/// every call site. `TypedRecord` always returns `Some`; the runtime
/// "configured with `.with_remote_access()`" checks stay inside the
/// [`JsonRecordAccess`] methods.
#[cfg(feature = "remote-access")]
fn json_access(&self) -> Option<&dyn JsonRecordAccess> {
None
}
}

/// Graph and metadata introspection for type-erased records.
///
/// Supertrait of [`AnyRecord`], so every stored record exposes it and a
/// `dyn AnyRecord` can be upcast to `&dyn RecordIntrospect` where only
/// introspection is needed.
///
/// Consumers: `AimDbBuilder::build()` (link validation and the dependency
/// graph fed to [`crate::graph`]), the builder's inbound/outbound route
/// collection, and `AimDbInner::list_records` (remote introspection
/// metadata).
pub trait RecordIntrospect {
/// Returns the number of registered outbound connectors
fn outbound_connector_count(&self) -> usize;

/// Returns the outbound connector URLs as strings
#[cfg(feature = "std")]
fn outbound_connector_urls(&self) -> Vec<String>;

/// Gets the outbound connector links
///
/// Returns outbound connector configuration list for spawning logic.
fn outbound_connectors(&self) -> &[crate::connector::ConnectorLink];

/// Get the inbound connector links for this record
fn inbound_connectors(&self) -> &[crate::connector::InboundConnectorLink];

/// Returns the number of registered consumers (tap observers)
fn consumer_count(&self) -> usize;

Expand All @@ -240,13 +286,6 @@ pub trait AnyRecord: Send + Sync {
/// Returns whether a buffer is configured
fn has_buffer(&self) -> bool;

/// Drains the configuration mistakes recorded during registration.
///
/// Called by `AimDbBuilder::build()`, which fills in the record key and
/// reports every finding via
/// [`DbError::InvalidConfiguration`](crate::DbError::InvalidConfiguration).
fn drain_config_errors(&mut self) -> Vec<crate::error::ConfigError>;

/// Returns whether a transform is registered for this record
fn has_transform(&self) -> bool;

Expand All @@ -261,11 +300,6 @@ pub trait AnyRecord: Send + Sync {
/// Returns the transform input keys (if a transform is registered)
fn transform_input_keys(&self) -> Option<Vec<String>>;

/// Sets the writable flag for this record (type-erased)
///
/// Used internally by the builder to apply security policy to records.
fn set_writable_erased(&self, writable: bool);

/// Collects metadata for this record
#[cfg(feature = "remote-access")]
fn collect_metadata(
Expand All @@ -274,12 +308,23 @@ pub trait AnyRecord: Send + Sync {
key: crate::record_id::StringKey,
id: crate::record_id::RecordId,
) -> crate::remote::RecordMetadata;
}

/// Internal: Returns JSON for type-erased remote access
/// Type-erased JSON read/subscribe/write for remote access.
///
/// Internal to the remote-access protocol — application code reads values
/// via `record.latest()?.as_json()` instead. Obtained from a record through
/// [`AnyRecord::json_access`], which is where the `remote-access` cfg-gate
/// lives for consumers.
///
/// Consumers: `AimDbInner::try_latest_as_json` / `set_record_from_json`
/// (`record.get` / `record.set`), the AimX session dispatch (`record.subscribe`
/// value drain), and `remote::stream::stream_record_updates`.
#[cfg(feature = "remote-access")]
pub trait JsonRecordAccess {
/// Returns JSON for type-erased remote access
///
/// Used internally by remote access protocol. **Users should use `record.latest()?.as_json()`.**
#[doc(hidden)]
#[cfg(feature = "remote-access")]
fn latest_json(&self) -> Option<serde_json::Value>;

/// Subscribe to record updates as JSON stream
Expand All @@ -301,16 +346,13 @@ pub trait AnyRecord: Send + Sync {
///
/// # Example (internal use)
/// ```rust,ignore
/// let type_id = TypeId::of::<Temperature>();
/// let record: &Box<dyn AnyRecord> = db.records.get(&type_id)?;
/// let mut json_reader = record.subscribe_json()?;
/// let record: &Box<dyn AnyRecord> = db.storage(id)?;
/// let mut json_reader = record.json_access().unwrap().subscribe_json()?;
///
/// while let Ok(json_val) = json_reader.recv_json().await {
/// // Forward to remote client...
/// }
/// ```
#[doc(hidden)]
#[cfg(feature = "remote-access")]
fn subscribe_json(&self) -> crate::DbResult<Box<dyn crate::buffer::JsonBufferReader + Send>>;

/// Sets a record value from JSON
Expand Down Expand Up @@ -339,18 +381,24 @@ pub trait AnyRecord: Send + Sync {
///
/// # Example (internal use)
/// ```rust,ignore
/// let type_id = TypeId::of::<AppConfig>();
/// let record: &Box<dyn AnyRecord> = db.records.get(&type_id)?;
/// let record: &Box<dyn AnyRecord> = db.storage(id)?;
/// let json_val = serde_json::json!({"log_level": "debug"});
/// record.set_from_json(json_val)?; // Only works if producer_count == 0
/// // Only works if producer_count == 0
/// record.json_access().unwrap().set_from_json(json_val)?;
/// ```
#[doc(hidden)]
#[cfg(feature = "remote-access")]
fn set_from_json(&self, json_value: serde_json::Value) -> crate::DbResult<()>;
}

/// Get the inbound connector links for this record
fn inbound_connectors(&self) -> &[crate::connector::InboundConnectorLink];

/// Observability counter resets (features `profiling` / `metrics`).
///
/// Supertrait of [`AnyRecord`] with no-op defaults, so the cfg-gated reset
/// methods stay off the core storage contract while remaining callable on
/// every stored record.
///
/// Consumers: `AimDb::reset_profiling` / `AimDb::reset_buffer_metrics`,
/// driven by the AimX `control.reset_buffer_metrics` RPC and the MCP
/// buffer-metrics tool.
pub trait RecordMetricsReset {
/// Resets this record's stage profiling counters (feature `profiling`).
///
/// Default implementation is a no-op; `TypedRecord` overrides it.
Expand Down Expand Up @@ -1134,16 +1182,31 @@ impl<T: Send + Sync + 'static + Debug + Clone> AnyRecord for TypedRecord<T> {
self
}

fn outbound_connector_count(&self) -> usize {
self.outbound_connectors.len()
fn drain_config_errors(&mut self) -> Vec<crate::error::ConfigError> {
core::mem::take(&mut self.config_errors)
}

#[cfg(feature = "std")]
fn outbound_connector_urls(&self) -> Vec<String> {
self.outbound_connectors
.iter()
.map(|link| format!("{}", link.url))
.collect()
fn set_writable_erased(&self, writable: bool) {
#[cfg(feature = "remote-access")]
{
self.writable
.store(writable, portable_atomic::Ordering::SeqCst);
}
#[cfg(not(feature = "remote-access"))]
{
let _ = writable; // Suppress unused warning
}
}

#[cfg(feature = "remote-access")]
fn json_access(&self) -> Option<&dyn JsonRecordAccess> {
Some(self)
}
}

impl<T: Send + Sync + 'static + Debug + Clone> RecordIntrospect for TypedRecord<T> {
fn outbound_connector_count(&self) -> usize {
self.outbound_connectors.len()
}

fn outbound_connectors(&self) -> &[crate::connector::ConnectorLink] {
Expand All @@ -1162,10 +1225,6 @@ impl<T: Send + Sync + 'static + Debug + Clone> AnyRecord for TypedRecord<T> {
TypedRecord::has_buffer(self)
}

fn drain_config_errors(&mut self) -> Vec<crate::error::ConfigError> {
core::mem::take(&mut self.config_errors)
}

fn has_transform(&self) -> bool {
TypedRecord::has_transform(self)
}
Expand All @@ -1182,16 +1241,8 @@ impl<T: Send + Sync + 'static + Debug + Clone> AnyRecord for TypedRecord<T> {
TypedRecord::transform_input_keys(self)
}

fn set_writable_erased(&self, writable: bool) {
#[cfg(feature = "remote-access")]
{
self.writable
.store(writable, portable_atomic::Ordering::SeqCst);
}
#[cfg(not(feature = "remote-access"))]
{
let _ = writable; // Suppress unused warning
}
fn inbound_connectors(&self) -> &[crate::connector::InboundConnectorLink] {
&self.inbound_connectors
}

#[cfg(feature = "remote-access")]
Expand Down Expand Up @@ -1247,9 +1298,10 @@ impl<T: Send + Sync + 'static + Debug + Clone> AnyRecord for TypedRecord<T> {

metadata
}
}

#[doc(hidden)]
#[cfg(feature = "remote-access")]
#[cfg(feature = "remote-access")]
impl<T: Send + Sync + 'static + Debug + Clone> JsonRecordAccess for TypedRecord<T> {
fn latest_json(&self) -> Option<serde_json::Value> {
log_debug!(
"latest_json called for type: {}",
Expand All @@ -1266,8 +1318,6 @@ impl<T: Send + Sync + 'static + Debug + Clone> AnyRecord for TypedRecord<T> {
result
}

#[doc(hidden)]
#[cfg(feature = "remote-access")]
fn subscribe_json(&self) -> crate::DbResult<Box<dyn crate::buffer::JsonBufferReader + Send>> {
use crate::DbError;

Expand Down Expand Up @@ -1302,8 +1352,6 @@ impl<T: Send + Sync + 'static + Debug + Clone> AnyRecord for TypedRecord<T> {
Ok(Box::new(json_reader))
}

#[doc(hidden)]
#[cfg(feature = "remote-access")]
fn set_from_json(&self, json_value: serde_json::Value) -> crate::DbResult<()> {
use crate::DbError;

Expand Down Expand Up @@ -1370,11 +1418,9 @@ impl<T: Send + Sync + 'static + Debug + Clone> AnyRecord for TypedRecord<T> {

Ok(())
}
}

fn inbound_connectors(&self) -> &[crate::connector::InboundConnectorLink] {
&self.inbound_connectors
}

impl<T: Send + Sync + 'static + Debug + Clone> RecordMetricsReset for TypedRecord<T> {
#[cfg(feature = "profiling")]
fn reset_profiling(&self) {
self.profiling.reset_all();
Expand Down
4 changes: 2 additions & 2 deletions docs/design/036-followup-refactoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ The registry keeps storing `Box<dyn AnyRecord>`; consumers upcast to the capabil

**Payoff:** the core storage contract stops churning every time remote access or profiling evolves, and each consumer's dependency is visible in its signature. **Acceptance:** `AnyRecord` ≤ ~8 methods; no behavior change; rustdoc for each trait states its consumer.

**Size:** M. Mechanical but wide. **File together with W1** (it touches the same files; do W2 first or in the same series — W2 shrinks the surface W1 has to move).
**Size:** M. Mechanical but wide. **File together with W1** (it touches the same files; do W2 first or in the same series — W2 shrinks the surface W1 has to move). **Status:** implemented in PR [#142](https://github.com/aimdb-dev/aimdb/pull/142), stacked on #141 (W1 had already landed, so the "W2 first" ordering note was moot). Deviations from the sketch: the JSON trait's gate is `remote-access` — the actual gate on those methods — not `json-serialize`; the resets live on a `RecordMetricsReset` supertrait (default no-ops, so the supertrait list needs no cfg); the dead `outbound_connector_urls` (cfg `std`, zero callers in-tree and in aimdb-pro) was dropped rather than moved. Result: `AnyRecord` has 6 methods.

### W3 — Execute the KNX hardware validation matrix (035 §3)

Expand Down Expand Up @@ -141,7 +141,7 @@ Both protocols now ride the session engine (the hard part), but two subscribe/wr
| Item | Issue | When to file |
|---|---|---|
| W1 data-plane de-`Any` | PR [#141](https://github.com/aimdb-dev/aimdb/pull/141) | done — no separate issue, direct PR |
| W2 `AnyRecord` split | — | on #140 merge (same series as W1) |
| W2 `AnyRecord` split | PR [#142](https://github.com/aimdb-dev/aimdb/pull/142) | done — stacked on #141, no separate issue |
| W3 hardware matrix | — | none if run with #140; else a validation task |
| W4 ACK-retransmit knob | — | on #140 merge |
| W5 `StringKey` interner | — | opportunistic; file if not done by next release |
Expand Down