Skip to content

refactor(core)!: data-plane de-Any — fuse typed pipelines at registration (036 W1)#141

Open
lxsaah wants to merge 5 commits into
mainfrom
refactor/036-w1-data-plane-de-any
Open

refactor(core)!: data-plane de-Any — fuse typed pipelines at registration (036 W1)#141
lxsaah wants to merge 5 commits into
mainfrom
refactor/036-w1-data-plane-de-any

Conversation

@lxsaah

@lxsaah lxsaah commented Jun 11, 2026

Copy link
Copy Markdown
Contributor

Summary

Implements W1 — data-plane de-Any from design 036: #131 removed the control-plane erasure; this removes the per-message kind. The typed pipeline is fused inside closures at registration time (finish(), where T is known), so no Box<dyn Any> is constructed on any per-message path — connectors, session pumps, AimX client mirroring.

Path Before (per message) After
Inbound deserialize → Box<dyn Any>produce_any downcast, boxed future sync IngestFn (deserialize + produce, typed; no box, no Box::pin)
Outbound subscribe_anyrecv_anyBox<dyn Any>SerializerFn(&dyn Any) downcast fused SerializedReader::recv(ctx)SerializedValue { dest, payload }
Dynamic topic topic_any(&dyn Any) downcast typed TopicProvider<T> called inside the fused reader
Join fan-in Box<dyn Any> per input kept deliberately — it is the multi-type join API (documented on JoinTrigger)

Design decisions

  • Sync ingest (decision 2026-06-11): Producer::produce is sync + infallible (design 029), so IngestFn = Arc<dyn Fn(&RuntimeContext, &[u8]) -> Result<(), String>> and Router::route becomes a sync fn. The 036 sketch's ConsumeContext doesn't exist in core — RuntimeContext is threaded per call (raw variants skip the per-message ctx clone).
  • Router::route ctx is mandatory — every production caller already passed Some(&ctx); the context-skip branch is unrepresentable inside fused closures (its test removed with the semantics).
  • Reader error contract pins pump behavior: buffer errors propagate unchanged (BufferLagged → skip, other → stop); serialize failures are logged + skipped inside the reader (log line moves — see CHANGELOG).
  • Deleted with the downcasts: ProducerTrait/ConsumerTrait/AnyReader, SerializerKind/DeserializerKind + their Fn aliases, TopicProviderAny/Wrapper/Fn, SerializeError::TypeMismatch, and dead ConnectorClient/OutboundConnectorLink/create_producer_trait.

Compatibility

  • Registrar API is source-compatible: with_serializer/_raw, with_deserializer/_raw, with_topic_provider, link_to/link_from keep identical signatures. Examples, codegen output, and aimdb-pro compile with zero source changes (verified cargo check --workspace).
  • Builder validation (refactor(core): replace panic-based builder validation with collected configuration errors from build() #133 contract) keeps the exact ConfigError strings and order; existing *_records_error tests pass unmodified.
  • MQTT/KNX connectors pass routes opaquely into RouterBuilder::from_routes/pump_* — untouched. Websocket connector: internal-only (WsDispatch ctx non-optional, sync route call).

Acceptance criteria (036 §W1)

  • grep -rnE "dyn (core::any::)?Any\b" aimdb-core/src hits only setup-time uses: ExtensionMap, AnyRecord::as_any/as_any_mut, DynBuffer::as_any, the per-connection session auth ext slots, and the documented join exception. (036 acceptance line amended — the literal "dyn Any" grep had missed the latter two.)
  • ✅ KNX fake-gateway test (tunnel_roundtrip_against_fake_gateway) and the session smoke suites (session_smoke.rs, connectors_smoke.rs, embassy_smoke.rs) pass unmodified.
  • make build, make test, make test-embedded, make clippy (incl. thumbv7em), make fmt-check green.

New tests

  • Fused-reader unit tests: buffer-error propagation script (Ok, Lagged, Ok, Closed), serialize-skip, dynamic-topic resolution.
  • End-to-end roundtrips through AimDbBuilder::build(): outbound (collect → subscribe → recv yields dest + payload; context-over-raw override) and inbound (ingest produces into the buffer; bad bytes propagate Err, nothing produced; both override directions).
  • Router tests rewritten to counting ingest closures (simpler than the old MockProducer).

🤖 Generated with Claude Code

lxsaah and others added 5 commits June 11, 2026 21:12
…6 W1 inbound)

Replace the per-message Box<dyn Any> inbound path (DeserializerKind ->
produce_any downcast) with a fused IngestFn built in
InboundConnectorBuilder::finish() where T is known: deserialize + produce
in one typed closure, no erasure crossing and no boxed future per message
(Producer::produce is sync + infallible, design 029).

- IngestFn / IngestFactoryFn replace DeserializerFn/ContextDeserializerFn/
  DeserializerKind and ProducerTrait/ProducerFactoryFn (all deleted)
- InboundConnectorLink carries the ingest factory (non-optional; finish()
  validates the deserializer before registering, unchanged error strings)
- Router::route is now a sync fn taking &RuntimeContext (every production
  caller already passed Some(&ctx); the context-skip branch is
  unrepresentable with fused closures, its test removed)
- collect_inbound_routes returns Vec<(String, IngestFn)>
- pump_source / pump_client inbound / ws dispatch drop one .await
- delete dead TypedRecord::create_producer_trait

Registrar API (with_deserializer/_raw, link_from) is source-compatible;
MQTT/KNX/WS builders pass routes opaquely and compile unchanged.

Part of design 036 W1 (data-plane de-Any).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…ion (036 W1 outbound)

Replace the per-message Box<dyn Any> outbound path (subscribe_any ->
recv_any -> SerializerFn(&dyn Any) downcast, plus topic_any(&dyn Any))
with a fused SerializedSource built in OutboundConnectorBuilder::finish()
where T is known: its readers yield destination + serialized payload
directly (subscribe -> recv -> resolve topic -> serialize, all typed).

- SerializedSource / SerializedReader / SerializedValue / SourceFactoryFn
  replace ConsumerTrait/AnyReader/SerializerKind/SerializerFn/
  ContextSerializerFn/ConsumerFactoryFn and the erased TopicProviderAny/
  TopicProviderWrapper/TopicProviderFn (all deleted; the typed
  TopicProvider<T> trait is now stored as-is)
- OutboundRoute is { topic, source, config }; ConnectorLink carries the
  source factory (non-optional; finish() validates the serializer before
  registering, unchanged error strings; the "skip links without
  serializer" branch in collect_outbound_routes is gone)
- RuntimeContext is threaded into SerializedReader::recv per call (026
  context serializers), not captured; raw serializers skip the ctx clone
- Buffer errors propagate unchanged (BufferLagged => pump continues,
  other => pump stops); serialize failures are logged and skipped inside
  the reader, observably identical to the old pump-side continue
- pump_sink / pump_client outbound collapse to recv + publish

What disappears per message: the Box<dyn Any> allocation, two downcasts,
the topic_any erasure crossing, and the subscribe_any boxed future. The
one remaining Box::pin per recv is the object-safe-async cost that
already existed.

Registrar API (with_serializer/_raw, with_topic_provider, link_to) is
source-compatible; the KNX fake-gateway and session smoke tests pass
unmodified.

Part of design 036 W1 (data-plane de-Any).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…asure (036 W1 wrap-up)

- delete SerializeError::TypeMismatch — both constructors died with the
  W1 fusion (the downcasts are gone); DbError::TypeMismatch is unrelated
  and stays
- delete dead ConnectorClient (held Arc<dyn Any>, zero users) and
  OutboundConnectorLink (zero users)
- document on JoinTrigger why the join fan-in deliberately keeps its
  Box<dyn Any + Send> (the erasure is the multi-type join API)
- CHANGELOG entries (global + aimdb-core + websocket-connector)
- check in design docs 034/035/036 and amend the 036 W1 acceptance grep:
  DynBuffer::as_any and the session auth ext slots are setup-time hits
  the original literal grep missed

Part of design 036 W1 (data-plane de-Any).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant