Make AimX remote access spawn free#118
Merged
Merged
Conversation
- Updated the WS client connector to eliminate the use of tokio::spawn for background tasks. - Introduced a nested FuturesUnordered to manage the read, write, keepalive, and reconnect loops. - The connect method now returns a tuple containing the connector and its associated future. - Enhanced the reconnect watcher to send new loop information to the outer future, allowing for seamless reconnections without spawning new tasks. - Updated documentation to reflect changes in the connector's architecture and task management.
6 tasks
Contributor
There was a problem hiding this comment.
Pull request overview
This PR completes the “spawn-free” refactor for AimX remote access and the WebSocket client connector by removing remaining tokio::spawn fan-out points and driving per-connection / per-subscription (and per-reconnect) loops via nested FuturesUnordered owned by a single infrastructure future, with cancellation via dropping futures.
Changes:
- Refactors AimX supervisor + per-connection handler to own nested
FuturesUnorderedsets (no per-connection / per-subtokio::spawn), and enforcesmax_connections/max_subs_per_connection. - Introduces
stream_record_updateshelper returning aStreamfor record updates, and removes the oldAimDb::subscribe_record_updatesspawn-based helper. - Refactors WS client connector to return an infrastructure future that owns its background loops and uses an mpsc (
NewLoops) to swap read/write loops on reconnect (no spawns).
Reviewed changes
Copilot reviewed 15 out of 15 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| examples/remote-access-demo/src/server.rs | Updates demo config API usage to max_subs_per_connection. |
| examples/remote-access-demo/src/client.rs | Updates demo client to new record.subscribe request/response shape (no queue_size). |
| docs/design/030-M13-aimx-remote-spawn-free.md | New design doc describing the spawn-free implementation and rationale. |
| docs/design/028-M13-remove-spawn-trait.md | Updates “Group 4” bridge-state discussion to point to design 030 as resolved. |
| aimdb-websocket-connector/src/client/connector.rs | Reworks WS client connector to spawn-free nested FuturesUnordered model + reconnect loop handoff via NewLoops. |
| aimdb-websocket-connector/src/client/builder.rs | Prepends the new connector infrastructure future to the returned futures list. |
| aimdb-websocket-connector/CHANGELOG.md | Documents the WS client spawn-free refactor and connect signature change. |
| aimdb-core/src/remote/supervisor.rs | Replaces per-connection tokio::spawn with a supervisor-owned FuturesUnordered and enforces max_connections. |
| aimdb-core/src/remote/stream.rs | Adds stream_record_updates (unfold-based) stream adapter + tests. |
| aimdb-core/src/remote/mod.rs | Updates module docs and exports stream module under std. |
| aimdb-core/src/remote/handler.rs | Replaces per-subscription spawning with per-connection FuturesUnordered of subscription futures; unsubscribe uses Notify. Adds tests. |
| aimdb-core/src/remote/config.rs | Removes subscription_queue_size, adds max_subs_per_connection, updates defaults/tests. |
| aimdb-core/src/builder.rs | Deletes spawn-based AimDb::subscribe_record_updates helper. |
| aimdb-core/CHANGELOG.md | Documents AimX spawn-free refactor + breaking config/protocol shape changes. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Make AimX remote-access and WS client connector spawn-free
Closes #114. Implements design 030; finishes the "Group 4" work design 028 deferred.
Why
After #88 every
runtime.spawn(...)was gone — except nine baretokio::spawncalls in AimX (3) and the WS client connector (6). They didn't depend on theSpawntrait (gated via#[cfg(feature = "std")]), so the trait could ship, but the dynamic-fan-out shape needed a nestedFuturesUnorderedrewrite that was out of scope for #88. This PR does that rewrite. Zerotokio::spawnremain inaimdb-coreor the connectors; cancellation collapses to dropping the future.What changed
AimX (
aimdb-core/src/remote/)supervisor.rs— accept loop owns aFuturesUnordered<BoxFuture>of per-connection handlers (select! { biased; }).max_connections(previously declared-but-unread) is now actually enforced: over-cap connections are refused pre-handshake.handler.rs— eachrecord.subscribepushes a future onto a per-connectionFuturesUnordered. The two-task chain per subscription collapses into onerun_subscriptionfuture.stream.rs(new) —stream_record_updatesadapts aJsonBufferReaderinto aStream<Item = serde_json::Value>viaunfold. Drop to cancel.AimDb::subscribe_record_updatesdeleted — only caller was the AimX handler.Cancellation:
Notify, notAtomicBoolrecord.unsubscribefires anArc<Notify>raced againststream.next()in a biasedselect!. Cancellation is immediate even on a quiet record — the previous atomic-flag design could pin a buffer reader indefinitely until the next event.WS client connector
All six
tokio::spawnsites collapse into one infrastructure future owning aFuturesUnordered. The reconnect watcher sendsNewLoops { write_sink, read_stream, write_rx }over an mpsc to the outer future instead of spawning.WsClientConnectorImpl::connect()now returnsResult<(Self, BoxFuture), String>; internal-only.Breaking changes
AimxConfig::subscription_queue_sizeremoved → replace with.max_subs_per_connection(n)(default 32) or delete the call.AimxConfig::max_connections(default 16) is now actually enforced.Welcome.max_subscriptionsnow reportsmax_subs_per_connection(32) instead of the deadsubscription_queue_size(100).record.subscriberesponse no longer carriesqueue_size; result is{ "subscription_id": "..." }.Tests
New
#[cfg(all(test, feature = "std"))]tests inhandler.rs/stream.rs:dropping_subs_set_drops_subscription_stream— drops theFuturesUnordered, verifies the underlying stream is dropped.dropping_subs_set_drops_inner_stream_state— stronger version with a real channel-backed stream.unsubscribe_terminates_subscription_immediately—notify_one()exits the future parked onstream.next()with no further values needed.unfold_skips_lag_and_terminates_on_closed— coversBufferLaggedskip andBufferClosedexit.make check/make allgreen.Docs
Out of scope
Lifting
#[cfg(feature = "std")]on AimX — shape is now runtime-agnostic, but transport (UDS, tokio I/O) is still std-only.