refactor(core)!: data-plane de-Any — fuse typed pipelines at registration (036 W1)#141
Open
lxsaah wants to merge 5 commits into
Open
refactor(core)!: data-plane de-Any — fuse typed pipelines at registration (036 W1)#141lxsaah wants to merge 5 commits into
lxsaah wants to merge 5 commits into
Conversation
…6 W1 inbound) Replace the per-message Box<dyn Any> inbound path (DeserializerKind -> produce_any downcast) with a fused IngestFn built in InboundConnectorBuilder::finish() where T is known: deserialize + produce in one typed closure, no erasure crossing and no boxed future per message (Producer::produce is sync + infallible, design 029). - IngestFn / IngestFactoryFn replace DeserializerFn/ContextDeserializerFn/ DeserializerKind and ProducerTrait/ProducerFactoryFn (all deleted) - InboundConnectorLink carries the ingest factory (non-optional; finish() validates the deserializer before registering, unchanged error strings) - Router::route is now a sync fn taking &RuntimeContext (every production caller already passed Some(&ctx); the context-skip branch is unrepresentable with fused closures, its test removed) - collect_inbound_routes returns Vec<(String, IngestFn)> - pump_source / pump_client inbound / ws dispatch drop one .await - delete dead TypedRecord::create_producer_trait Registrar API (with_deserializer/_raw, link_from) is source-compatible; MQTT/KNX/WS builders pass routes opaquely and compile unchanged. Part of design 036 W1 (data-plane de-Any). Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…ion (036 W1 outbound)
Replace the per-message Box<dyn Any> outbound path (subscribe_any ->
recv_any -> SerializerFn(&dyn Any) downcast, plus topic_any(&dyn Any))
with a fused SerializedSource built in OutboundConnectorBuilder::finish()
where T is known: its readers yield destination + serialized payload
directly (subscribe -> recv -> resolve topic -> serialize, all typed).
- SerializedSource / SerializedReader / SerializedValue / SourceFactoryFn
replace ConsumerTrait/AnyReader/SerializerKind/SerializerFn/
ContextSerializerFn/ConsumerFactoryFn and the erased TopicProviderAny/
TopicProviderWrapper/TopicProviderFn (all deleted; the typed
TopicProvider<T> trait is now stored as-is)
- OutboundRoute is { topic, source, config }; ConnectorLink carries the
source factory (non-optional; finish() validates the serializer before
registering, unchanged error strings; the "skip links without
serializer" branch in collect_outbound_routes is gone)
- RuntimeContext is threaded into SerializedReader::recv per call (026
context serializers), not captured; raw serializers skip the ctx clone
- Buffer errors propagate unchanged (BufferLagged => pump continues,
other => pump stops); serialize failures are logged and skipped inside
the reader, observably identical to the old pump-side continue
- pump_sink / pump_client outbound collapse to recv + publish
What disappears per message: the Box<dyn Any> allocation, two downcasts,
the topic_any erasure crossing, and the subscribe_any boxed future. The
one remaining Box::pin per recv is the object-safe-async cost that
already existed.
Registrar API (with_serializer/_raw, with_topic_provider, link_to) is
source-compatible; the KNX fake-gateway and session smoke tests pass
unmodified.
Part of design 036 W1 (data-plane de-Any).
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…asure (036 W1 wrap-up) - delete SerializeError::TypeMismatch — both constructors died with the W1 fusion (the downcasts are gone); DbError::TypeMismatch is unrelated and stays - delete dead ConnectorClient (held Arc<dyn Any>, zero users) and OutboundConnectorLink (zero users) - document on JoinTrigger why the join fan-in deliberately keeps its Box<dyn Any + Send> (the erasure is the multi-type join API) - CHANGELOG entries (global + aimdb-core + websocket-connector) - check in design docs 034/035/036 and amend the 036 W1 acceptance grep: DynBuffer::as_any and the session auth ext slots are setup-time hits the original literal grep missed Part of design 036 W1 (data-plane de-Any). Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This was referenced Jun 12, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Implements W1 — data-plane de-
Anyfrom design 036: #131 removed the control-plane erasure; this removes the per-message kind. The typed pipeline is fused inside closures at registration time (finish(), whereTis known), so noBox<dyn Any>is constructed on any per-message path — connectors, session pumps, AimX client mirroring.Box<dyn Any>→produce_anydowncast, boxed futureIngestFn(deserialize + produce, typed; no box, noBox::pin)subscribe_any→recv_any→Box<dyn Any>→SerializerFn(&dyn Any)downcastSerializedReader::recv(ctx)→SerializedValue { dest, payload }topic_any(&dyn Any)downcastTopicProvider<T>called inside the fused readerBox<dyn Any>per inputJoinTrigger)Design decisions
Producer::produceis sync + infallible (design 029), soIngestFn = Arc<dyn Fn(&RuntimeContext, &[u8]) -> Result<(), String>>andRouter::routebecomes a sync fn. The 036 sketch'sConsumeContextdoesn't exist in core —RuntimeContextis threaded per call (raw variants skip the per-message ctx clone).Router::routectx is mandatory — every production caller already passedSome(&ctx); the context-skip branch is unrepresentable inside fused closures (its test removed with the semantics).BufferLagged→ skip, other → stop); serialize failures are logged + skipped inside the reader (log line moves — see CHANGELOG).ProducerTrait/ConsumerTrait/AnyReader,SerializerKind/DeserializerKind+ theirFnaliases,TopicProviderAny/Wrapper/Fn,SerializeError::TypeMismatch, and deadConnectorClient/OutboundConnectorLink/create_producer_trait.Compatibility
with_serializer/_raw,with_deserializer/_raw,with_topic_provider,link_to/link_fromkeep identical signatures. Examples, codegen output, and aimdb-pro compile with zero source changes (verifiedcargo check --workspace).ConfigErrorstrings and order; existing*_records_errortests pass unmodified.RouterBuilder::from_routes/pump_*— untouched. Websocket connector: internal-only (WsDispatchctx non-optional, sync route call).Acceptance criteria (036 §W1)
grep -rnE "dyn (core::any::)?Any\b" aimdb-core/srchits only setup-time uses:ExtensionMap,AnyRecord::as_any/as_any_mut,DynBuffer::as_any, the per-connection session authextslots, and the documented join exception. (036 acceptance line amended — the literal"dyn Any"grep had missed the latter two.)tunnel_roundtrip_against_fake_gateway) and the session smoke suites (session_smoke.rs,connectors_smoke.rs,embassy_smoke.rs) pass unmodified.make build,make test,make test-embedded,make clippy(incl. thumbv7em),make fmt-checkgreen.New tests
Ok, Lagged, Ok, Closed), serialize-skip, dynamic-topic resolution.AimDbBuilder::build(): outbound (collect → subscribe → recv yields dest + payload; context-over-raw override) and inbound (ingest produces into the buffer; bad bytes propagateErr, nothing produced; both override directions).MockProducer).🤖 Generated with Claude Code