Skip to content

Commit b44abab

Browse files
authored
56 context aware deserializers (#81)
1 parent 5f9e543 commit b44abab

37 files changed

Lines changed: 1844 additions & 270 deletions

File tree

CHANGELOG.md

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,29 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2727
2828
## [Unreleased]
2929

30-
No changes yet.
30+
### Added
31+
32+
- **Context-Aware Deserializers (Design 026)**: Inbound connector deserializers can now receive a `RuntimeContext<R>` for platform-independent timestamps and logging
33+
- New `.with_deserializer(|ctx, bytes| ...)` API on `InboundConnectorBuilder` provides `RuntimeContext<R>` to deserialization closures
34+
- New `.with_deserializer_raw(|bytes| ...)` for plain bytes-only deserialization when context is unnecessary
35+
- `DeserializerKind` enum enforces mutual exclusivity between raw and context-aware deserializers
36+
- `Router::route()` propagates optional runtime context to context-aware routes
37+
- **Context-Aware Serializers**: Outbound connector serializers can now receive a `RuntimeContext<R>`, symmetric with deserializers
38+
- New `.with_serializer(|ctx, value| ...)` API on `OutboundConnectorBuilder` provides `RuntimeContext<R>` to serialization closures
39+
- New `.with_serializer_raw(|value| ...)` for plain value-only serialization when context is unnecessary
40+
- `SerializerKind` enum (`Raw` / `Context`) enforces mutual exclusivity
41+
- All outbound connector publishers updated to propagate runtime context via `db.runtime_any()`
42+
- Design document: 026 (Context-Aware Deserializers)
43+
44+
### Changed
45+
46+
- **aimdb-core**: Breaking API changes to `InboundConnectorLink`, `Router`, and `RouterBuilder` to support `DeserializerKind` (see [aimdb-core/CHANGELOG.md](aimdb-core/CHANGELOG.md))
47+
- **aimdb-core**: Breaking API change — `ConnectorLink.serializer` now stores `SerializerKind` instead of `SerializerFn`
48+
- **aimdb-core**: `.with_serializer()` renamed to `.with_serializer_raw()` for the old single-argument pattern
49+
- **aimdb-mqtt-connector**: Updated router dispatch for new `route()` signature; outbound publishers dispatch via `SerializerKind`
50+
- **aimdb-knx-connector**: Updated router dispatch for new `route()` signature; outbound publishers dispatch via `SerializerKind`
51+
- **aimdb-websocket-connector**: Updated router dispatch for new `route()` signature; outbound publishers dispatch via `SerializerKind`
52+
- All connector examples updated to use new `.with_deserializer(|_ctx, bytes| ...)` and `.with_serializer_raw(|value| ...)` signatures
3153

3254
## [1.0.0] - 2026-03-16
3355

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

_external/embassy

Submodule embassy updated 112 files

aimdb-codegen/src/rust.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -905,7 +905,7 @@ fn emit_connector_chain(
905905
chain = quote! {
906906
#chain
907907
.link_to(#addr_var)
908-
.with_serializer(|v: &#value_type| {
908+
.with_serializer_raw(|v: &#value_type| {
909909
v.to_bytes()
910910
.map_err(|_| aimdb_core::connector::SerializeError::InvalidData)
911911
})
@@ -1515,7 +1515,7 @@ fn emit_transform_configure_block(rec: &RecordDef, task: &TaskDef) -> TokenStrea
15151515
let outbound_chain = if has_outbound {
15161516
quote! {
15171517
.link_to(addr)
1518-
.with_serializer(|v: &#value_type| {
1518+
.with_serializer_raw(|v: &#value_type| {
15191519
v.to_bytes()
15201520
.map_err(|_| aimdb_core::connector::SerializeError::InvalidData)
15211521
})

aimdb-core/CHANGELOG.md

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,32 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10-
No changes yet.
10+
### Added
11+
12+
- **Context-Aware Deserializers (Design 026)**: Inbound connector deserializers can now receive a `RuntimeContext<R>` for platform-independent timestamps and logging during deserialization
13+
- New `ContextDeserializerFn` type alias for context-aware type-erased deserializer callbacks
14+
- New `DeserializerKind` enum (`Raw` / `Context`) to enforce mutual exclusivity between plain and context-aware deserializers
15+
- `.with_deserializer(|ctx, bytes| ...)` now accepts a context-aware closure receiving `RuntimeContext<R>`
16+
- `.with_deserializer_raw(|bytes| ...)` added for plain bytes-only deserialization (no context needed)
17+
- `Router::route()` now accepts an optional type-erased runtime context (`Option<&Arc<dyn Any + Send + Sync>>`)
18+
- Context deserializer routes are gracefully skipped when no context is provided
19+
- **Context-Aware Serializers**: Outbound connector serializers can now receive a `RuntimeContext<R>`, symmetric with deserializers
20+
- New `ContextSerializerFn` type alias for context-aware type-erased serializer callbacks
21+
- New `SerializerKind` enum (`Raw` / `Context`) to enforce mutual exclusivity between plain and context-aware serializers
22+
- `.with_serializer(|ctx, value| ...)` now accepts a context-aware closure receiving `RuntimeContext<R>`
23+
- `.with_serializer_raw(|value| ...)` added for plain value-only serialization (no context needed)
24+
25+
### Changed
26+
27+
- **Breaking**: `InboundConnectorLink::deserializer` field type changed from `DeserializerFn` to `DeserializerKind`
28+
- **Breaking**: `InboundConnectorLink::new()` now takes `DeserializerKind` instead of `DeserializerFn`
29+
- **Breaking**: `Router::route()` signature changed to accept an additional `ctx` parameter
30+
- **Breaking**: `RouterBuilder::from_routes()` and `RouterBuilder::add_route()` now take `DeserializerKind` instead of `DeserializerFn`
31+
- **Breaking**: `ConnectorLink::serializer` field type changed from `Option<SerializerFn>` to `Option<SerializerKind>`
32+
- **Breaking**: `.with_serializer()` renamed to `.with_serializer_raw()` — old single-argument pattern
33+
- **Breaking**: `OutboundRoute` type alias updated to use `SerializerKind`
34+
- **Breaking**: `.with_deserializer()` on `InboundConnectorBuilder` now expects `Fn(RuntimeContext<R>, &[u8]) -> Result<T, String>` instead of `Fn(&[u8]) -> Result<T, String>` — use `.with_deserializer_raw()` for the previous bytes-only signature
35+
- `AimDb::collect_inbound_routes()` return type updated to use `DeserializerKind`
1136

1237
## [1.0.0] - 2026-03-11
1338

aimdb-core/src/builder.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,14 +55,14 @@ use crate::{DbError, DbResult};
5555
/// Each tuple contains:
5656
/// - `String` - Default topic/destination from the URL path
5757
/// - `Box<dyn ConsumerTrait>` - Consumer for subscribing to record values
58-
/// - `SerializerFn` - User-provided serializer for the record type
58+
/// - `SerializerKind` - User-provided serializer for the record type (raw or context-aware)
5959
/// - `Vec<(String, String)>` - Configuration options from the URL query
6060
/// - `Option<TopicProviderFn>` - Optional dynamic topic provider
6161
#[cfg(feature = "alloc")]
6262
pub type OutboundRoute = (
6363
String,
6464
Box<dyn crate::connector::ConsumerTrait>,
65-
crate::connector::SerializerFn,
65+
crate::connector::SerializerKind,
6666
Vec<(String, String)>,
6767
Option<crate::connector::TopicProviderFn>,
6868
);
@@ -1213,6 +1213,14 @@ impl<R: aimdb_executor::Spawn + 'static> AimDb<R> {
12131213
&self.runtime
12141214
}
12151215

1216+
/// Returns the runtime as a type-erased `Arc<dyn Any + Send + Sync>`
1217+
///
1218+
/// Used by connectors to provide `RuntimeContext` to context-aware
1219+
/// deserializers during inbound message routing.
1220+
pub fn runtime_any(&self) -> Arc<dyn core::any::Any + Send + Sync> {
1221+
self.runtime.clone()
1222+
}
1223+
12161224
/// Lists all registered records (std only)
12171225
///
12181226
/// Returns metadata for all registered records, useful for remote access introspection.
@@ -1450,7 +1458,7 @@ impl<R: aimdb_executor::Spawn + 'static> AimDb<R> {
14501458
) -> Vec<(
14511459
String,
14521460
Box<dyn crate::connector::ProducerTrait>,
1453-
crate::connector::DeserializerFn,
1461+
crate::connector::DeserializerKind,
14541462
)> {
14551463
let mut routes = Vec::new();
14561464

aimdb-core/src/connector.rs

Lines changed: 75 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,34 @@ impl std::error::Error for SerializeError {}
105105
pub type SerializerFn =
106106
Arc<dyn Fn(&dyn core::any::Any) -> Result<Vec<u8>, SerializeError> + Send + Sync>;
107107

108+
/// Type alias for context-aware type-erased serializer callbacks
109+
///
110+
/// Like `SerializerFn`, but receives a type-erased runtime context
111+
/// for platform-independent timestamps and logging during serialization.
112+
///
113+
/// The first argument is the type-erased runtime (as `Arc<dyn Any + Send + Sync>`),
114+
/// which is downcast to the concrete runtime type via `RuntimeContext::extract_from_any`.
115+
pub type ContextSerializerFn = Arc<
116+
dyn Fn(
117+
Arc<dyn core::any::Any + Send + Sync>,
118+
&dyn core::any::Any,
119+
) -> Result<Vec<u8>, SerializeError>
120+
+ Send
121+
+ Sync,
122+
>;
123+
124+
/// Which serializer variant is registered for an outbound link
125+
///
126+
/// Enforces mutual exclusivity between raw value-only serializers
127+
/// and context-aware serializers.
128+
#[derive(Clone)]
129+
pub enum SerializerKind {
130+
/// Plain value-only serializer (from `.with_serializer_raw()`)
131+
Raw(SerializerFn),
132+
/// Context-aware serializer (from `.with_serializer()`)
133+
Context(ContextSerializerFn),
134+
}
135+
108136
// ============================================================================
109137
// TopicProvider - Dynamic topic/destination routing
110138
// ============================================================================
@@ -434,13 +462,14 @@ pub struct ConnectorLink {
434462

435463
/// Serialization callback that converts record values to bytes for publishing
436464
///
437-
/// This is a type-erased function that takes `&dyn Any` and returns `Result<Vec<u8>, String>`.
438-
/// The connector implementation will downcast to the concrete type and call the serializer.
465+
/// Either a plain value-only serializer (`Raw`) or a context-aware
466+
/// serializer (`Context`) that receives `RuntimeContext` for timestamps
467+
/// and logging.
439468
///
440469
/// If `None`, the connector must provide a default serialization mechanism or fail.
441470
///
442471
/// Available in both `std` and `no_std` (with `alloc` feature) environments.
443-
pub serializer: Option<SerializerFn>,
472+
pub serializer: Option<SerializerKind>,
444473

445474
/// Consumer factory callback (alloc feature)
446475
///
@@ -471,7 +500,10 @@ impl Debug for ConnectorLink {
471500
.field("config", &self.config)
472501
.field(
473502
"serializer",
474-
&self.serializer.as_ref().map(|_| "<function>"),
503+
&self.serializer.as_ref().map(|s| match s {
504+
SerializerKind::Raw(_) => "<raw>",
505+
SerializerKind::Context(_) => "<context>",
506+
}),
475507
)
476508
.field(
477509
"consumer_factory",
@@ -531,6 +563,34 @@ impl ConnectorLink {
531563
pub type DeserializerFn =
532564
Arc<dyn Fn(&[u8]) -> Result<Box<dyn core::any::Any + Send>, String> + Send + Sync>;
533565

566+
/// Type alias for context-aware type-erased deserializer callbacks
567+
///
568+
/// Like `DeserializerFn`, but receives a type-erased runtime context
569+
/// for platform-independent timestamps and logging during deserialization.
570+
///
571+
/// The first argument is the type-erased runtime (as `Arc<dyn Any + Send + Sync>`),
572+
/// which is downcast to the concrete runtime type via `RuntimeContext::extract_from_any`.
573+
pub type ContextDeserializerFn = Arc<
574+
dyn Fn(
575+
Arc<dyn core::any::Any + Send + Sync>,
576+
&[u8],
577+
) -> Result<Box<dyn core::any::Any + Send>, String>
578+
+ Send
579+
+ Sync,
580+
>;
581+
582+
/// Which deserializer variant is registered for an inbound link
583+
///
584+
/// Enforces mutual exclusivity between raw bytes-only deserializers
585+
/// and context-aware deserializers.
586+
#[derive(Clone)]
587+
pub enum DeserializerKind {
588+
/// Plain bytes-only deserializer (from `.with_deserializer_raw()`)
589+
Raw(DeserializerFn),
590+
/// Context-aware deserializer (from `.with_deserializer()`)
591+
Context(ContextDeserializerFn),
592+
}
593+
534594
/// Type alias for producer factory callback (alloc feature)
535595
///
536596
/// Takes Arc<dyn Any> (which contains AimDb<R>) and returns a boxed ProducerTrait.
@@ -646,12 +706,12 @@ pub struct InboundConnectorLink {
646706

647707
/// Deserialization callback that converts bytes to typed values
648708
///
649-
/// This is a type-erased function that takes `&[u8]` and returns
650-
/// `Result<Box<dyn Any + Send>, String>`. The spawned task will
651-
/// downcast to the concrete type before producing.
709+
/// Either a plain bytes-only deserializer (`Raw`) or a context-aware
710+
/// deserializer (`Context`) that receives `RuntimeContext` for timestamps
711+
/// and logging.
652712
///
653713
/// Available in both `std` and `no_std` (with `alloc` feature) environments.
654-
pub deserializer: DeserializerFn,
714+
pub deserializer: DeserializerKind,
655715

656716
/// Producer creation callback (alloc feature)
657717
///
@@ -700,7 +760,7 @@ impl Debug for InboundConnectorLink {
700760

701761
impl InboundConnectorLink {
702762
/// Creates a new inbound connector link from a URL and deserializer
703-
pub fn new(url: ConnectorUrl, deserializer: DeserializerFn) -> Self {
763+
pub fn new(url: ConnectorUrl, deserializer: DeserializerKind) -> Self {
704764
Self {
705765
url,
706766
config: Vec::new(),
@@ -1153,25 +1213,25 @@ mod tests {
11531213

11541214
#[test]
11551215
fn test_inbound_connector_link_resolve_topic_default() {
1156-
use super::{ConnectorUrl, DeserializerFn, InboundConnectorLink};
1216+
use super::{ConnectorUrl, DeserializerFn, DeserializerKind, InboundConnectorLink};
11571217

11581218
let url = ConnectorUrl::parse("mqtt://sensors/temperature").unwrap();
11591219
let deserializer: DeserializerFn =
11601220
Arc::new(|_| Ok(Box::new(()) as Box<dyn core::any::Any + Send>));
1161-
let link = InboundConnectorLink::new(url, deserializer);
1221+
let link = InboundConnectorLink::new(url, DeserializerKind::Raw(deserializer));
11621222

11631223
// No resolver configured, should return static topic from URL
11641224
assert_eq!(link.resolve_topic(), "sensors/temperature");
11651225
}
11661226

11671227
#[test]
11681228
fn test_inbound_connector_link_resolve_topic_dynamic() {
1169-
use super::{ConnectorUrl, DeserializerFn, InboundConnectorLink};
1229+
use super::{ConnectorUrl, DeserializerFn, DeserializerKind, InboundConnectorLink};
11701230

11711231
let url = ConnectorUrl::parse("mqtt://sensors/default").unwrap();
11721232
let deserializer: DeserializerFn =
11731233
Arc::new(|_| Ok(Box::new(()) as Box<dyn core::any::Any + Send>));
1174-
let mut link = InboundConnectorLink::new(url, deserializer);
1234+
let mut link = InboundConnectorLink::new(url, DeserializerKind::Raw(deserializer));
11751235

11761236
// Configure dynamic resolver
11771237
link.topic_resolver = Some(Arc::new(|| Some("sensors/dynamic/kitchen".into())));
@@ -1182,12 +1242,12 @@ mod tests {
11821242

11831243
#[test]
11841244
fn test_inbound_connector_link_resolve_topic_fallback() {
1185-
use super::{ConnectorUrl, DeserializerFn, InboundConnectorLink};
1245+
use super::{ConnectorUrl, DeserializerFn, DeserializerKind, InboundConnectorLink};
11861246

11871247
let url = ConnectorUrl::parse("mqtt://sensors/fallback").unwrap();
11881248
let deserializer: DeserializerFn =
11891249
Arc::new(|_| Ok(Box::new(()) as Box<dyn core::any::Any + Send>));
1190-
let mut link = InboundConnectorLink::new(url, deserializer);
1250+
let mut link = InboundConnectorLink::new(url, DeserializerKind::Raw(deserializer));
11911251

11921252
// Configure resolver that returns None
11931253
link.topic_resolver = Some(Arc::new(|| None));

aimdb-core/src/context.rs

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ where
2424
#[cfg(feature = "std")]
2525
runtime: std::sync::Arc<R>,
2626
#[cfg(not(feature = "std"))]
27-
runtime: &'static R,
27+
runtime: alloc::sync::Arc<R>,
2828
}
2929

3030
#[cfg(feature = "std")]
@@ -64,8 +64,15 @@ impl<R> RuntimeContext<R>
6464
where
6565
R: Runtime,
6666
{
67-
/// Create a new RuntimeContext with static reference (no_std version)
68-
pub fn new(runtime: &'static R) -> Self {
67+
/// Create a new RuntimeContext (no_std version uses Arc internally)
68+
pub fn new(runtime: R) -> Self {
69+
Self {
70+
runtime: alloc::sync::Arc::new(runtime),
71+
}
72+
}
73+
74+
/// Create from an existing Arc to avoid double-wrapping
75+
pub fn from_arc(runtime: alloc::sync::Arc<R>) -> Self {
6976
Self { runtime }
7077
}
7178

@@ -74,21 +81,13 @@ where
7481
/// This is a helper for runtime adapters to convert the raw `Arc<dyn Any>`
7582
/// context passed to `.source_raw()` and `.tap_raw()` into a typed `RuntimeContext`.
7683
///
77-
/// For no_std, this leaks the Arc to obtain a `&'static` reference, which is safe
78-
/// because the runtime lives for the entire program lifetime in embedded contexts.
79-
///
8084
/// # Panics
8185
/// Panics if the runtime type doesn't match `R`.
8286
pub fn extract_from_any(ctx_any: alloc::sync::Arc<dyn core::any::Any + Send + Sync>) -> Self {
8387
let runtime = ctx_any
8488
.downcast::<R>()
8589
.expect("Runtime type mismatch - expected matching runtime adapter");
86-
87-
// Convert Arc<R> to &'static R by leaking it
88-
// This is safe because in embedded contexts, the runtime lives for the entire program
89-
let runtime_ref: &'static R = &*alloc::boxed::Box::leak(runtime.into());
90-
91-
Self::new(runtime_ref)
90+
Self::from_arc(runtime)
9291
}
9392
}
9493

@@ -117,8 +116,8 @@ where
117116
}
118117

119118
#[cfg(not(feature = "std"))]
120-
pub fn runtime(&self) -> &'static R {
121-
self.runtime
119+
pub fn runtime(&self) -> &R {
120+
&self.runtime
122121
}
123122
}
124123

aimdb-core/src/database.rs

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@ use core::fmt::Debug;
1212
extern crate alloc;
1313

1414
#[cfg(not(feature = "std"))]
15-
use alloc::boxed::Box;
15+
use alloc::{boxed::Box, sync::Arc};
1616

1717
#[cfg(feature = "std")]
18-
use std::boxed::Box;
18+
use std::{boxed::Box, sync::Arc};
1919

2020
/// AimDB Database implementation
2121
///
@@ -136,17 +136,7 @@ impl<A: RuntimeAdapter + aimdb_executor::Spawn + 'static> Database<A> {
136136
where
137137
A: aimdb_executor::Runtime + Clone,
138138
{
139-
#[cfg(feature = "std")]
140-
{
141-
RuntimeContext::from_arc(std::sync::Arc::new(self.adapter.clone()))
142-
}
143-
#[cfg(not(feature = "std"))]
144-
{
145-
// For no_std, we need a static reference - this would typically be handled
146-
// by the caller storing the adapter in a static cell first
147-
// For now, we'll document this limitation
148-
panic!("context() not supported in no_std without a static reference. To use context(), store your adapter in a static cell (e.g., StaticCell from portable-atomic or embassy-sync), or use adapter() directly.")
149-
}
139+
RuntimeContext::from_arc(Arc::new(self.adapter.clone()))
150140
}
151141
}
152142

0 commit comments

Comments
 (0)