You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
AimX (the AimDB remote-access protocol over Unix domain sockets) is currently #[cfg(feature = "std")]-gated. The long-term goal is to make it portable
across Tokio, Embassy, and WASM so it can be used over any transport, but the
project intentionally avoids that work until there is concrete demand. This
issue does not lift the gate. It removes the first blocker — the
hard-coded tokio::spawn calls — so that when the gate-removal work is
scheduled, it does not also require rewriting the supervisor's
concurrency model under time pressure.
A secondary motivation: even within std-only AimX, dropping tokio::spawn
fixes two real bugs.
Cancellation is currently dual-pathed. A subscription has both a oneshot::Sender<()>and an implicit "receiver dropped" exit. With
nested FuturesUnordered, cancellation collapses to a single mechanism:
dropping the future.
Unbounded handler / subscription spawn. A misbehaving client can today
open N connections × M subscriptions and consume N×M heap allocations as
independent Tokio tasks. With nested sets the cost is the same heap-wise
but observable via the supervisor future's own poll behaviour, so
backpressure surfaces in the read loop as latency instead of silent OOM.
Current state (sites to fix)
Three call sites, plus one internal helper that's the wrong shape:
tokio::spawn of read/write loops on every WS-client reconnect
Site 4 lives outside aimdb-core but uses the same architectural pattern.
Bundling it into this issue keeps the "nested FuturesUnordered migration"
in one place.
Proposed approach
Pattern: every dynamic fan-out point owns its own FuturesUnordered<BoxFuture<'static, ()>> and drives it via futures::select_biased!. Cancellation of a child = drop the child future.
No Spawn trait, no tokio::spawn, no oneshot cancel channels.
Supervisor (site 1)
asyncfnsupervisor_loop<R:RuntimeAdapter>(db:Arc<AimDb<R>>,listener: tokio::net::UnixListener,config:AimxConfig,){letmut connections:FuturesUnordered<BoxFuture<'static,()>> = FuturesUnordered::new();loop{
futures::select_biased! {
accept_res = listener.accept().fuse() => match accept_res {Ok((stream, _)) => {let fut = handle_connection(db.clone(), stream, config.clone()).boxed();
connections.push(fut);}Err(e) => {/* log; backoff */}},// drain finished connections so the set doesn't grow forever
_ = connections.select_next_some() => {}}}}
Handler (sites 2 + 3)
subscribe_record_updates becomes a Stream-returning helper:
asyncfnhandle_connection<R:RuntimeAdapter>(db:Arc<AimDb<R>>,stream:UnixStream,config:AimxConfig,){letmut subs:FuturesUnordered<BoxFuture<'static,()>> = FuturesUnordered::new();loop{
futures::select_biased! {
req = read_request(&mut stream).fuse() => match req {Ok(Request::Subscribe{ id, key }) => {
subs.push(run_subscription(db.clone(), id, key, event_tx.clone()).boxed());}Ok(Request::Unsubscribe{ id }) => {// mark the matching subscription via a cancel flag, or// index subs by id in a separate map (see "Decisions").}
...
},
_ = subs.select_next_some() => {}}}}
WS client reconnect (site 4)
Same shape: a FuturesUnordered on the client connector that holds the
current read+write loops; reconnect = drop the old futures and push two
new ones.
Decisions to make during implementation
These are not blockers but each has two reasonable answers; pick one before
coding.
Unsubscribe semantics. Today a subscription holds a oneshot::Sender<()>
that the handler fires on Unsubscribe. With FuturesUnordered there are
two options:
(a) Indexed map. Keep a HashMap<SubId, AbortHandle> alongside the
set; Unsubscribe calls abort(). Cleanest semantics, but reintroduces
a tokio::task::AbortHandle dependency (Tokio-specific).
(b) Per-sub cancel flag. Each subscription future polls a Arc<AtomicBool> between Stream::poll_next calls; Unsubscribe sets
the flag. Fully runtime-agnostic. Adds one atomic per subscription.
Recommendation: (b). Keeps the path runtime-agnostic, which is the
whole point. Cost is negligible (one atomic per active subscription).
select_biased! vs hand-rolled poll_fn.futures::select_biased!
requires default-features = false, features = ["async-await"] on futures-util. Already added by Replace Spawn trait with FuturesUnordered #88 for the top-level runner. No new
dep.
WS client (site 4) — same PR or separate? Same PR keeps the pattern in
one review; separate PR keeps the PR size sane. The connector is
self-contained so either works. Recommendation: same PR — the shape
is identical, and reviewers benefit from seeing both applications of
the pattern together.
Out of scope (explicitly)
Removing the #[cfg(feature = "std")] gate on AimX. That requires
also porting the Unix-domain-socket transport to an embedded equivalent,
serialising security policy without std::path::PathBuf, etc. — none of
which this issue does. The goal here is only the concurrency-model
refactor.
Changing the AimX wire protocol. Unchanged.
Touching anything outside aimdb-core/src/remote/ and (optionally) the
WS client connector. The handler signatures, request/response types,
serialisation, security policy — all untouched.
Acceptance criteria
git grep -nE 'tokio::spawn|runtime\.spawn' aimdb-core/src/remote/ returns no hits.
AimDb::subscribe_record_updates is removed; replaced by a Stream-returning helper used only by handler.rs.
No new unsafe impl blocks.
All existing AimX integration tests in aimdb-core/tests/ pass with no test changes (except where a test was asserting on the implementation — see migration notes).
Connection-count and subscription-count have soft upper bounds enforced somewhere (today: implicit via tokio::spawn accepting until OOM; after: a configurable max_connections / max_subs_per_connection in AimxConfig, or documented as "still unbounded — operator's responsibility"). Pick one and document.
CHANGELOG entry under "Internal refactors" — no user-facing semver bump implied.
Implementation outline
Suggested ordering; each step should pass make check.
Step 1 — stream_record_updates helper
Add a new pub(crate) fn stream_record_updates<R: RuntimeAdapter>(...) -> impl Stream<...> in builder.rs (or a new remote/stream.rs). It wraps the existing subscribe_json() path and yields serde_json::Value directly — no internal task, no channel.
Keep the old subscribe_record_updates temporarily for a single transition commit.
Step 2 — Migrate handler to nested FuturesUnordered
Rewrite handle_connection (the body invoked by tokio::spawn today) to own a FuturesUnordered<BoxFuture<'static, ()>> of subscription streams. Replace the spawn at handler.rs:1042 with subs.push(...). Replace the per-sub oneshot::Sender cancellation with Arc<AtomicBool> per Decision (b) above.
Step 3 — Migrate supervisor to nested FuturesUnordered
Rewrite supervisor_loop to own a FuturesUnordered of handler futures. Replace the spawn at supervisor.rs:122 with connections.push(...).
Step 4 — Delete legacy subscribe_record_updates
No more callers; remove the public method.
Step 5 — (optional, same PR) WS client reconnect
Apply the same pattern to client/connector.rs:505,510.
Step 6 — Tests
The existing integration tests should continue to pass unchanged. Add one new test that explicitly checks cancellation-on-drop: open a subscription, drop the client connection, assert the subscription's producer-side consumer is dropped within N ms.
Step 7 — Docs
Update docs/design/028-M13-remove-spawn-trait.md — the "Bridge state" note in the Remote supervisor section becomes "Target state achieved." Add a short Mermaid diagram to AimX docs if user-facing docs cover the supervisor at all.
Risks
Risk
Likelihood
Mitigation
Subtle fairness regression — one busy connection starves siblings
Low
FuturesUnordered is fair within a level; select_biased! polls accept before drain on the outer loop, which is the right default. Integration test covering "N connections, each subscribed, all see updates within X ms."
Embassy stack overflow when std gate eventually drops
Medium (later)
Document. When the gate-removal PR lands, place the supervisor in its own Embassy task with an explicit stack_size. Not this issue's problem.
oneshot-to-AtomicBool cancellation has a one-poll-cycle delay
Low
Acceptable for AimX semantics — clients expect Unsubscribe to be "soon," not "synchronous."
This issue intentionally does not introduce an AimDbSpawner channel
handle or any other new public API. The whole point is that nested FuturesUnordered makes such a primitive unnecessary for the only call
sites that would have used it.
If a future use case demands post-build dynamic spawning at a level above
the supervisor (e.g. "a connector wants to add an outbound route at runtime"),
that should be filed as its own issue — and the answer is likely "expose a
hook on the connector trait whose body is a future the connector itself
fans out from," not "reintroduce Spawn."
Motivation
AimX (the AimDB remote-access protocol over Unix domain sockets) is currently
#[cfg(feature = "std")]-gated. The long-term goal is to make it portableacross Tokio, Embassy, and WASM so it can be used over any transport, but the
project intentionally avoids that work until there is concrete demand. This
issue does not lift the gate. It removes the first blocker — the
hard-coded
tokio::spawncalls — so that when the gate-removal work isscheduled, it does not also require rewriting the supervisor's
concurrency model under time pressure.
A secondary motivation: even within std-only AimX, dropping
tokio::spawnfixes two real bugs.
oneshot::Sender<()>and an implicit "receiver dropped" exit. Withnested
FuturesUnordered, cancellation collapses to a single mechanism:dropping the future.
open N connections × M subscriptions and consume N×M heap allocations as
independent Tokio tasks. With nested sets the cost is the same heap-wise
but observable via the supervisor future's own poll behaviour, so
backpressure surfaces in the read loop as latency instead of silent OOM.
Current state (sites to fix)
Three call sites, plus one internal helper that's the wrong shape:
aimdb-core/src/remote/supervisor.rs:122tokio::spawn(handle_connection(...))per accepted clientaimdb-core/src/remote/handler.rs:1042tokio::spawn(stream_subscription_events(...))perrecord.subscribeaimdb-core/src/builder.rs:1409AimDb::subscribe_record_updates— internal helper that spawns a JSON forwarder task and returns(Receiver, cancel_tx)Bonus (closely related, same pattern):
aimdb-websocket-connector/src/client/connector.rs:505,510tokio::spawnof read/write loops on every WS-client reconnectSite 4 lives outside
aimdb-corebut uses the same architectural pattern.Bundling it into this issue keeps the "nested
FuturesUnorderedmigration"in one place.
Proposed approach
Pattern: every dynamic fan-out point owns its own
FuturesUnordered<BoxFuture<'static, ()>>and drives it viafutures::select_biased!. Cancellation of a child =dropthe child future.No
Spawntrait, notokio::spawn, nooneshotcancel channels.Supervisor (site 1)
Handler (sites 2 + 3)
subscribe_record_updatesbecomes aStream-returning helper:The handler owns its own subscription set:
WS client reconnect (site 4)
Same shape: a
FuturesUnorderedon the client connector that holds thecurrent read+write loops; reconnect = drop the old futures and push two
new ones.
Decisions to make during implementation
These are not blockers but each has two reasonable answers; pick one before
coding.
Unsubscribe semantics. Today a subscription holds a
oneshot::Sender<()>that the handler fires on
Unsubscribe. WithFuturesUnorderedthere aretwo options:
HashMap<SubId, AbortHandle>alongside theset;
Unsubscribecallsabort(). Cleanest semantics, but reintroducesa
tokio::task::AbortHandledependency (Tokio-specific).Arc<AtomicBool>betweenStream::poll_nextcalls;Unsubscribesetsthe flag. Fully runtime-agnostic. Adds one atomic per subscription.
Recommendation: (b). Keeps the path runtime-agnostic, which is the
whole point. Cost is negligible (one atomic per active subscription).
select_biased!vs hand-rolledpoll_fn.futures::select_biased!requires
default-features = false, features = ["async-await"]onfutures-util. Already added by ReplaceSpawntrait withFuturesUnordered#88 for the top-level runner. No newdep.
WS client (site 4) — same PR or separate? Same PR keeps the pattern in
one review; separate PR keeps the PR size sane. The connector is
self-contained so either works. Recommendation: same PR — the shape
is identical, and reviewers benefit from seeing both applications of
the pattern together.
Out of scope (explicitly)
#[cfg(feature = "std")]gate on AimX. That requiresalso porting the Unix-domain-socket transport to an embedded equivalent,
serialising security policy without
std::path::PathBuf, etc. — none ofwhich this issue does. The goal here is only the concurrency-model
refactor.
aimdb-core/src/remote/and (optionally) theWS client connector. The handler signatures, request/response types,
serialisation, security policy — all untouched.
Acceptance criteria
git grep -nE 'tokio::spawn|runtime\.spawn' aimdb-core/src/remote/returns no hits.AimDb::subscribe_record_updatesis removed; replaced by aStream-returning helper used only byhandler.rs.unsafe implblocks.aimdb-core/tests/pass with no test changes (except where a test was asserting on the implementation — see migration notes).tokio::spawnaccepting until OOM; after: a configurablemax_connections/max_subs_per_connectioninAimxConfig, or documented as "still unbounded — operator's responsibility"). Pick one and document.Implementation outline
Suggested ordering; each step should pass
make check.Step 1 —
stream_record_updateshelperAdd a new
pub(crate) fn stream_record_updates<R: RuntimeAdapter>(...) -> impl Stream<...>inbuilder.rs(or a newremote/stream.rs). It wraps the existingsubscribe_json()path and yieldsserde_json::Valuedirectly — no internal task, no channel.Keep the old
subscribe_record_updatestemporarily for a single transition commit.Step 2 — Migrate handler to nested
FuturesUnorderedRewrite
handle_connection(the body invoked bytokio::spawntoday) to own aFuturesUnordered<BoxFuture<'static, ()>>of subscription streams. Replace the spawn athandler.rs:1042withsubs.push(...). Replace the per-suboneshot::Sendercancellation withArc<AtomicBool>per Decision (b) above.Step 3 — Migrate supervisor to nested
FuturesUnorderedRewrite
supervisor_loopto own aFuturesUnorderedof handler futures. Replace the spawn atsupervisor.rs:122withconnections.push(...).Step 4 — Delete legacy
subscribe_record_updatesNo more callers; remove the public method.
Step 5 — (optional, same PR) WS client reconnect
Apply the same pattern to
client/connector.rs:505,510.Step 6 — Tests
The existing integration tests should continue to pass unchanged. Add one new test that explicitly checks cancellation-on-drop: open a subscription, drop the client connection, assert the subscription's producer-side consumer is dropped within N ms.
Step 7 — Docs
Update
docs/design/028-M13-remove-spawn-trait.md— the "Bridge state" note in the Remote supervisor section becomes "Target state achieved." Add a short Mermaid diagram to AimX docs if user-facing docs cover the supervisor at all.Risks
FuturesUnorderedis fair within a level;select_biased!polls accept before drain on the outer loop, which is the right default. Integration test covering "N connections, each subscribed, all see updates within X ms."stdgate eventually dropsstack_size. Not this issue's problem.oneshot-to-AtomicBoolcancellation has a one-poll-cycle delayselect_biased!macro expansion adds compile-time / binary-size overheadNotes for the reviewer
This issue intentionally does not introduce an
AimDbSpawnerchannelhandle or any other new public API. The whole point is that nested
FuturesUnorderedmakes such a primitive unnecessary for the only callsites that would have used it.
If a future use case demands post-build dynamic spawning at a level above
the supervisor (e.g. "a connector wants to add an outbound route at runtime"),
that should be filed as its own issue — and the answer is likely "expose a
hook on the connector trait whose body is a future the connector itself
fans out from," not "reintroduce
Spawn."