Skip to content

Make AimX remote-access spawn-free #114

@lxsaah

Description

@lxsaah

Motivation

AimX (the AimDB remote-access protocol over Unix domain sockets) is currently
#[cfg(feature = "std")]-gated. The long-term goal is to make it portable
across Tokio, Embassy, and WASM so it can be used over any transport, but the
project intentionally avoids that work until there is concrete demand. This
issue does not lift the gate.
It removes the first blocker — the
hard-coded tokio::spawn calls — so that when the gate-removal work is
scheduled, it does not also require rewriting the supervisor's
concurrency model under time pressure.

A secondary motivation: even within std-only AimX, dropping tokio::spawn
fixes two real bugs.

  1. Cancellation is currently dual-pathed. A subscription has both a
    oneshot::Sender<()> and an implicit "receiver dropped" exit. With
    nested FuturesUnordered, cancellation collapses to a single mechanism:
    dropping the future.
  2. Unbounded handler / subscription spawn. A misbehaving client can today
    open N connections × M subscriptions and consume N×M heap allocations as
    independent Tokio tasks. With nested sets the cost is the same heap-wise
    but observable via the supervisor future's own poll behaviour, so
    backpressure surfaces in the read loop as latency instead of silent OOM.

Current state (sites to fix)

Three call sites, plus one internal helper that's the wrong shape:

# Site What it does today
1 aimdb-core/src/remote/supervisor.rs:122 tokio::spawn(handle_connection(...)) per accepted client
2 aimdb-core/src/remote/handler.rs:1042 tokio::spawn(stream_subscription_events(...)) per record.subscribe
3 aimdb-core/src/builder.rs:1409 AimDb::subscribe_record_updates — internal helper that spawns a JSON forwarder task and returns (Receiver, cancel_tx)

Bonus (closely related, same pattern):

# Site What it does today
4 aimdb-websocket-connector/src/client/connector.rs:505,510 tokio::spawn of read/write loops on every WS-client reconnect

Site 4 lives outside aimdb-core but uses the same architectural pattern.
Bundling it into this issue keeps the "nested FuturesUnordered migration"
in one place.


Proposed approach

Pattern: every dynamic fan-out point owns its own
FuturesUnordered<BoxFuture<'static, ()>> and drives it via
futures::select_biased!. Cancellation of a child = drop the child future.
No Spawn trait, no tokio::spawn, no oneshot cancel channels.

Supervisor (site 1)

async fn supervisor_loop<R: RuntimeAdapter>(
    db: Arc<AimDb<R>>,
    listener: tokio::net::UnixListener,
    config: AimxConfig,
) {
    let mut connections: FuturesUnordered<BoxFuture<'static, ()>> = FuturesUnordered::new();
    loop {
        futures::select_biased! {
            accept_res = listener.accept().fuse() => match accept_res {
                Ok((stream, _)) => {
                    let fut = handle_connection(db.clone(), stream, config.clone()).boxed();
                    connections.push(fut);
                }
                Err(e) => { /* log; backoff */ }
            },
            // drain finished connections so the set doesn't grow forever
            _ = connections.select_next_some() => {}
        }
    }
}

Handler (sites 2 + 3)

subscribe_record_updates becomes a Stream-returning helper:

fn stream_record_updates<R: RuntimeAdapter>(
    db: &AimDb<R>,
    record_key: &str,
) -> DbResult<impl Stream<Item = serde_json::Value> + Send + 'static>;

The handler owns its own subscription set:

async fn handle_connection<R: RuntimeAdapter>(
    db: Arc<AimDb<R>>,
    stream: UnixStream,
    config: AimxConfig,
) {
    let mut subs: FuturesUnordered<BoxFuture<'static, ()>> = FuturesUnordered::new();
    loop {
        futures::select_biased! {
            req = read_request(&mut stream).fuse() => match req {
                Ok(Request::Subscribe { id, key }) => {
                    subs.push(run_subscription(db.clone(), id, key, event_tx.clone()).boxed());
                }
                Ok(Request::Unsubscribe { id }) => {
                    // mark the matching subscription via a cancel flag, or
                    // index subs by id in a separate map (see "Decisions").
                }
                ...
            },
            _ = subs.select_next_some() => {}
        }
    }
}

WS client reconnect (site 4)

Same shape: a FuturesUnordered on the client connector that holds the
current read+write loops; reconnect = drop the old futures and push two
new ones.


Decisions to make during implementation

These are not blockers but each has two reasonable answers; pick one before
coding.

  1. Unsubscribe semantics. Today a subscription holds a oneshot::Sender<()>
    that the handler fires on Unsubscribe. With FuturesUnordered there are
    two options:

    • (a) Indexed map. Keep a HashMap<SubId, AbortHandle> alongside the
      set; Unsubscribe calls abort(). Cleanest semantics, but reintroduces
      a tokio::task::AbortHandle dependency (Tokio-specific).
    • (b) Per-sub cancel flag. Each subscription future polls a
      Arc<AtomicBool> between Stream::poll_next calls; Unsubscribe sets
      the flag. Fully runtime-agnostic. Adds one atomic per subscription.

    Recommendation: (b). Keeps the path runtime-agnostic, which is the
    whole point. Cost is negligible (one atomic per active subscription).

  2. select_biased! vs hand-rolled poll_fn. futures::select_biased!
    requires default-features = false, features = ["async-await"] on
    futures-util. Already added by Replace Spawn trait with FuturesUnordered #88 for the top-level runner. No new
    dep.

  3. WS client (site 4) — same PR or separate? Same PR keeps the pattern in
    one review; separate PR keeps the PR size sane. The connector is
    self-contained so either works. Recommendation: same PR — the shape
    is identical, and reviewers benefit from seeing both applications of
    the pattern together.


Out of scope (explicitly)

  • Removing the #[cfg(feature = "std")] gate on AimX. That requires
    also porting the Unix-domain-socket transport to an embedded equivalent,
    serialising security policy without std::path::PathBuf, etc. — none of
    which this issue does. The goal here is only the concurrency-model
    refactor.
  • Changing the AimX wire protocol. Unchanged.
  • Touching anything outside aimdb-core/src/remote/ and (optionally) the
    WS client connector.
    The handler signatures, request/response types,
    serialisation, security policy — all untouched.

Acceptance criteria

  • git grep -nE 'tokio::spawn|runtime\.spawn' aimdb-core/src/remote/ returns no hits.
  • AimDb::subscribe_record_updates is removed; replaced by a Stream-returning helper used only by handler.rs.
  • No new unsafe impl blocks.
  • All existing AimX integration tests in aimdb-core/tests/ pass with no test changes (except where a test was asserting on the implementation — see migration notes).
  • Connection-count and subscription-count have soft upper bounds enforced somewhere (today: implicit via tokio::spawn accepting until OOM; after: a configurable max_connections / max_subs_per_connection in AimxConfig, or documented as "still unbounded — operator's responsibility"). Pick one and document.
  • CHANGELOG entry under "Internal refactors" — no user-facing semver bump implied.

Implementation outline

Suggested ordering; each step should pass make check.

Step 1 — stream_record_updates helper

Add a new pub(crate) fn stream_record_updates<R: RuntimeAdapter>(...) -> impl Stream<...> in builder.rs (or a new remote/stream.rs). It wraps the existing subscribe_json() path and yields serde_json::Value directly — no internal task, no channel.

Keep the old subscribe_record_updates temporarily for a single transition commit.

Step 2 — Migrate handler to nested FuturesUnordered

Rewrite handle_connection (the body invoked by tokio::spawn today) to own a FuturesUnordered<BoxFuture<'static, ()>> of subscription streams. Replace the spawn at handler.rs:1042 with subs.push(...). Replace the per-sub oneshot::Sender cancellation with Arc<AtomicBool> per Decision (b) above.

Step 3 — Migrate supervisor to nested FuturesUnordered

Rewrite supervisor_loop to own a FuturesUnordered of handler futures. Replace the spawn at supervisor.rs:122 with connections.push(...).

Step 4 — Delete legacy subscribe_record_updates

No more callers; remove the public method.

Step 5 — (optional, same PR) WS client reconnect

Apply the same pattern to client/connector.rs:505,510.

Step 6 — Tests

The existing integration tests should continue to pass unchanged. Add one new test that explicitly checks cancellation-on-drop: open a subscription, drop the client connection, assert the subscription's producer-side consumer is dropped within N ms.

Step 7 — Docs

Update docs/design/028-M13-remove-spawn-trait.md — the "Bridge state" note in the Remote supervisor section becomes "Target state achieved." Add a short Mermaid diagram to AimX docs if user-facing docs cover the supervisor at all.


Risks

Risk Likelihood Mitigation
Subtle fairness regression — one busy connection starves siblings Low FuturesUnordered is fair within a level; select_biased! polls accept before drain on the outer loop, which is the right default. Integration test covering "N connections, each subscribed, all see updates within X ms."
Embassy stack overflow when std gate eventually drops Medium (later) Document. When the gate-removal PR lands, place the supervisor in its own Embassy task with an explicit stack_size. Not this issue's problem.
oneshot-to-AtomicBool cancellation has a one-poll-cycle delay Low Acceptable for AimX semantics — clients expect Unsubscribe to be "soon," not "synchronous."
select_biased! macro expansion adds compile-time / binary-size overhead Negligible Already on the dep tree from #88.

Notes for the reviewer

This issue intentionally does not introduce an AimDbSpawner channel
handle or any other new public API. The whole point is that nested
FuturesUnordered makes such a primitive unnecessary for the only call
sites that would have used it.

If a future use case demands post-build dynamic spawning at a level above
the supervisor (e.g. "a connector wants to add an outbound route at runtime"),
that should be filed as its own issue — and the answer is likely "expose a
hook on the connector trait whose body is a future the connector itself
fans out from," not "reintroduce Spawn."

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions