Skip to content

Commit ab4abf6

Browse files
lxsaahCopilot
andauthored
Make AimX remote access spawn free (#118)
* Refactor WS Client Connector to Use Nested FuturesUnordered - Updated the WS client connector to eliminate the use of tokio::spawn for background tasks. - Introduced a nested FuturesUnordered to manage the read, write, keepalive, and reconnect loops. - The connect method now returns a tuple containing the connector and its associated future. - Enhanced the reconnect watcher to send new loop information to the outer future, allowing for seamless reconnections without spawning new tasks. - Updated documentation to reflect changes in the connector's architecture and task management. * refactor: make AimX remote-access and WS client connector spawn-free * chore: update embassy subproject to latest commit * refactor: remove subscription_queue_size and update AimxConfig for spawn-free design * refactor: replace AtomicBool with Notify for immediate subscription cancellation * Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> * Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> * Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> * Apply suggestions from code review Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
1 parent 9f1a060 commit ab4abf6

15 files changed

Lines changed: 1588 additions & 485 deletions

File tree

_external/embassy

Submodule embassy updated 55 files

aimdb-core/CHANGELOG.md

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Internal refactors
11+
12+
- **AimX remote-access path is now spawn-free (Issue #114, Design 030).** Every remaining `tokio::spawn` in `aimdb-core/src/remote/` was removed; the supervisor's accept loop and each connection handler now own their own `FuturesUnordered<BoxFuture>` driven by `tokio::select! { biased; }`. Cancellation collapsed to one mechanism — dropping the future.
13+
- New `aimdb-core/src/remote/stream.rs` exports a `pub(crate) stream_record_updates` helper that adapts a record's `JsonBufferReader` into a `Stream<Item = serde_json::Value>` via `futures_util::stream::unfold`. No task, no channel — drop the stream to cancel.
14+
- `AimDb::subscribe_record_updates` **deleted**. The method had no out-of-tree callers (the only caller was the AimX handler); replaced by `stream_record_updates` above.
15+
- Per-subscription `oneshot::Sender<()>` cancel channels and the `SubscriptionHandle` struct **deleted**. `ConnectionState::subscriptions` is now `HashMap<String, Arc<tokio::sync::Notify>>`; `record.unsubscribe` calls `notify_one()`, waking the per-sub future immediately (even when parked on `stream.next()`).
16+
- The two-task chain per subscription (buffer-reader task + JSON-event forwarder task) **collapsed** into one `run_subscription` future per subscription, held in the connection's `FuturesUnordered`.
17+
1018
### Changed (breaking)
1119

20+
- **`AimxConfig` lost `subscription_queue_size` (Issue #114, Design 030).** The field bounded a per-subscription mpsc channel that no longer exists — subscriptions are now one future in a `FuturesUnordered`. The builder method `.subscription_queue_size(n)` is removed; replace it with `.max_subs_per_connection(n)` if you were using the value as a soft cap on subscription count, or just delete the call.
21+
- **AimX `Welcome.max_subscriptions` now reports the real per-connection cap.** Previously it returned `subscription_queue_size` (default 100) while the actual cap was implicit; it now returns `max_subs_per_connection` (default 32). Clients that displayed this value will see the change.
22+
- **AimX `record.subscribe` response no longer carries `queue_size`.** Result object is now `{ "subscription_id": "..." }` — the previous `"queue_size"` reported a number that no longer corresponded to anything in the implementation.
23+
- **`AimxConfig` gains `max_subs_per_connection: usize` (default 32)** — the dedicated per-connection subscription cap. The existing `max_connections: usize` (previously declared but unread) is now actually enforced by the supervisor; over-cap connections are refused by closing the accepted `UnixStream` pre-handshake.
24+
1225
- **`Producer::produce` is now sync + infallible; `Consumer::subscribe` is now infallible (Design 029 follow-up, M14).** The pre-resolved `WriteHandle::push` cannot fail and the pre-resolved buffer Arc makes `subscribe()` infallible. Call sites collapse: `producer.produce(x).await?``producer.produce(x);` and `let Ok(reader) = consumer.subscribe() else { ... }``let reader = consumer.subscribe();`. The `ProducerTrait::produce_any` / `ConsumerTrait::subscribe_any` trait surfaces stay `Result`/`async` because the type-erasure downcast remains fallible.
1326
- `AimDb::produce<T>(key, value) -> DbResult<()>` is now sync; `.await` on the call site goes away. Only the key lookup can fail.
1427
- `Database::produce` likewise sync.
@@ -36,7 +49,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
3649
- Join transforms now hoist their per-input forwarder construction to build time — `JoinPipeline::into_descriptor()` returns a `CollectedTransform { task_future, fanin_futures }` and the lazy `runtime.spawn(forwarder)` inside `run_join_transform` is gone.
3750
- `ConnectorBuilder::build()` now returns `Vec<BoxFuture<'static, ()>>` instead of `Arc<dyn Connector>` (which `AimDbBuilder` already discarded).
3851
- Unsafe `impl Send/Sync` blocks on `Producer<T, R>` / `Consumer<T, R>` deleted — they auto-derive now.
39-
- On the AimX remote-access path, three `runtime.spawn(...)` call sites bridge to `tokio::spawn` directly under `#[cfg(feature = "std")]`. These (per-connection handler, per-subscription event stream, `subscribe_record_updates`) are addressed in the AimX portability follow-up.
52+
- On the AimX remote-access path, three `runtime.spawn(...)` call sites were temporarily bridged to bare `tokio::spawn` under `#[cfg(feature = "std")]`. These have since been removed by the AimX spawn-free follow-up — see the "AimX remote-access path is now spawn-free" entry above.
4053
- `on_start` no_std bifurcation collapsed: a single `StartFnType<R>` alias replaces the byte-identical std/no_std pair.
4154

4255
## [1.1.0] - 2026-05-22

aimdb-core/src/builder.rs

Lines changed: 0 additions & 149 deletions
Original file line numberDiff line numberDiff line change
@@ -1327,155 +1327,6 @@ impl<R: aimdb_executor::RuntimeAdapter + 'static> AimDb<R> {
13271327
self.inner.set_record_from_json(record_name, json_value)
13281328
}
13291329

1330-
/// Subscribe to record updates as JSON stream (std only)
1331-
///
1332-
/// Creates a subscription to a record's buffer and forwards updates as JSON
1333-
/// to a bounded channel. This is used internally by the remote access protocol
1334-
/// for implementing `record.subscribe`.
1335-
///
1336-
/// # Architecture
1337-
///
1338-
/// Spawns a consumer task that:
1339-
/// 1. Subscribes to the record's buffer using the existing buffer API
1340-
/// 2. Reads values as they arrive
1341-
/// 3. Serializes each value to JSON
1342-
/// 4. Sends JSON values to a bounded channel (with backpressure handling)
1343-
/// 5. Terminates when either:
1344-
/// - The cancel signal is received (unsubscribe)
1345-
/// - The channel receiver is dropped (client disconnected)
1346-
///
1347-
/// # Arguments
1348-
/// * `record_key` - Key of the record to subscribe to
1349-
/// * `queue_size` - Size of the bounded channel for this subscription
1350-
///
1351-
/// # Returns
1352-
/// `Ok((receiver, cancel_tx))` where:
1353-
/// - `receiver`: Bounded channel receiver for JSON values
1354-
/// - `cancel_tx`: One-shot sender to cancel the subscription
1355-
///
1356-
/// `Err` if:
1357-
/// - Record not found for the given key
1358-
/// - Record not configured with `.with_remote_access()`
1359-
/// - Failed to subscribe to buffer
1360-
///
1361-
/// # Example (internal use)
1362-
///
1363-
/// ```rust,ignore
1364-
/// let (mut rx, cancel_tx) = db.subscribe_record_updates("sensor.temp", 100)?;
1365-
///
1366-
/// // Read events
1367-
/// while let Some(json_value) = rx.recv().await {
1368-
/// // Forward to client...
1369-
/// }
1370-
///
1371-
/// // Cancel subscription
1372-
/// let _ = cancel_tx.send(());
1373-
/// ```
1374-
#[cfg(feature = "std")]
1375-
#[allow(unused_variables)] // Variables used only in tracing feature
1376-
pub fn subscribe_record_updates(
1377-
&self,
1378-
record_key: &str,
1379-
queue_size: usize,
1380-
) -> DbResult<(
1381-
tokio::sync::mpsc::Receiver<serde_json::Value>,
1382-
tokio::sync::oneshot::Sender<()>,
1383-
)> {
1384-
use tokio::sync::{mpsc, oneshot};
1385-
1386-
// Find the record by key
1387-
let id = self
1388-
.inner
1389-
.resolve_str(record_key)
1390-
.ok_or_else(|| DbError::RecordKeyNotFound {
1391-
key: record_key.to_string(),
1392-
})?;
1393-
1394-
let record = self
1395-
.inner
1396-
.storage(id)
1397-
.ok_or_else(|| DbError::InvalidRecordId { id: id.raw() })?;
1398-
1399-
// Subscribe to the record's buffer as JSON stream
1400-
// This will fail if record not configured with .with_remote_access()
1401-
let mut json_reader = record.subscribe_json()?;
1402-
1403-
// Create channels for the subscription
1404-
let (value_tx, value_rx) = mpsc::channel(queue_size);
1405-
let (cancel_tx, mut cancel_rx) = oneshot::channel();
1406-
1407-
// Get metadata for logging
1408-
let type_id = self.inner.types[id.index()];
1409-
let key = self.inner.keys[id.index()];
1410-
let record_metadata = record.collect_metadata(type_id, key, id);
1411-
1412-
// Bridge state (design 028 §"Remote supervisor"): drop into `tokio::spawn`
1413-
// directly. The nested-`FuturesUnordered` rewrite is the AimX follow-up.
1414-
tokio::spawn(async move {
1415-
#[cfg(feature = "tracing")]
1416-
tracing::debug!(
1417-
"Subscription consumer task started for {}",
1418-
record_metadata.name
1419-
);
1420-
1421-
// Main event loop: read from buffer and forward to channel
1422-
loop {
1423-
tokio::select! {
1424-
// Handle cancellation signal
1425-
_ = &mut cancel_rx => {
1426-
#[cfg(feature = "tracing")]
1427-
tracing::debug!("Subscription cancelled");
1428-
break;
1429-
}
1430-
// Read next JSON value from buffer
1431-
result = json_reader.recv_json() => {
1432-
match result {
1433-
Ok(json_val) => {
1434-
// Send JSON value to subscription channel
1435-
if value_tx.send(json_val).await.is_err() {
1436-
#[cfg(feature = "tracing")]
1437-
tracing::debug!("Subscription receiver dropped");
1438-
break;
1439-
}
1440-
}
1441-
Err(DbError::BufferLagged { lag_count, .. }) => {
1442-
// Consumer fell behind - log warning but continue
1443-
#[cfg(feature = "tracing")]
1444-
tracing::warn!(
1445-
"Subscription for {} lagged by {} messages",
1446-
record_metadata.name,
1447-
lag_count
1448-
);
1449-
// Continue reading - next recv will get latest
1450-
}
1451-
Err(DbError::BufferClosed { .. }) => {
1452-
// Buffer closed (shutdown) - exit gracefully
1453-
#[cfg(feature = "tracing")]
1454-
tracing::debug!("Buffer closed for {}", record_metadata.name);
1455-
break;
1456-
}
1457-
Err(e) => {
1458-
// Other error (shouldn't happen in practice)
1459-
#[cfg(feature = "tracing")]
1460-
tracing::error!(
1461-
"Subscription error for {}: {:?}",
1462-
record_metadata.name,
1463-
e
1464-
);
1465-
break;
1466-
}
1467-
}
1468-
}
1469-
}
1470-
}
1471-
1472-
#[cfg(feature = "tracing")]
1473-
tracing::debug!("Subscription consumer task terminated");
1474-
});
1475-
1476-
Ok((value_rx, cancel_tx))
1477-
}
1478-
14791330
/// Collects inbound connector routes for automatic router construction (std only)
14801331
///
14811332
/// Iterates all records, filters their inbound_connectors by scheme,

aimdb-core/src/remote/config.rs

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::record_id::StringKey;
77
/// Configuration for AimX remote access
88
///
99
/// Defines how the remote access layer behaves, including socket path,
10-
/// security policy, connection limits, and subscription queue sizes.
10+
/// security policy and connection / subscription limits.
1111
#[derive(Debug, Clone)]
1212
pub struct AimxConfig {
1313
/// Path to Unix domain socket
@@ -16,11 +16,17 @@ pub struct AimxConfig {
1616
/// Security policy (read-only or read-write)
1717
pub security_policy: SecurityPolicy,
1818

19-
/// Maximum number of concurrent connections
19+
/// Maximum number of concurrent client connections accepted by the
20+
/// supervisor. When the in-flight connection count reaches this
21+
/// value, newly-accepted Unix sockets are closed immediately (the
22+
/// client sees a closed socket on connect).
2023
pub max_connections: usize,
2124

22-
/// Subscription queue size per client per subscription
23-
pub subscription_queue_size: usize,
25+
/// Maximum number of concurrent subscriptions allowed per connection.
26+
/// Once a connection holds this many subscriptions, further
27+
/// `record.subscribe` calls receive a `too_many_subscriptions` error
28+
/// until one is released via `record.unsubscribe`.
29+
pub max_subs_per_connection: usize,
2430

2531
/// Optional authentication token
2632
pub auth_token: Option<String>,
@@ -37,15 +43,15 @@ impl AimxConfig {
3743
/// - Socket path: `/tmp/aimdb.sock`
3844
/// - Security policy: Read-only
3945
/// - Max connections: 16
40-
/// - Subscription queue size: 100
46+
/// - Max subscriptions per connection: 32
4147
/// - No auth token
4248
/// - Socket permissions: 0o600 (owner-only)
4349
pub fn uds_default() -> Self {
4450
Self {
4551
socket_path: PathBuf::from("/tmp/aimdb.sock"),
4652
security_policy: SecurityPolicy::ReadOnly,
4753
max_connections: 16,
48-
subscription_queue_size: 100,
54+
max_subs_per_connection: 32,
4955
auth_token: None,
5056
socket_permissions: Some(0o600),
5157
}
@@ -69,9 +75,9 @@ impl AimxConfig {
6975
self
7076
}
7177

72-
/// Sets the subscription queue size per client
73-
pub fn subscription_queue_size(mut self, size: usize) -> Self {
74-
self.subscription_queue_size = size;
78+
/// Sets the maximum number of concurrent subscriptions per connection
79+
pub fn max_subs_per_connection(mut self, max: usize) -> Self {
80+
self.max_subs_per_connection = max;
7581
self
7682
}
7783

@@ -206,7 +212,7 @@ mod tests {
206212
let config = AimxConfig::uds_default();
207213
assert_eq!(config.socket_path, PathBuf::from("/tmp/aimdb.sock"));
208214
assert_eq!(config.max_connections, 16);
209-
assert_eq!(config.subscription_queue_size, 100);
215+
assert_eq!(config.max_subs_per_connection, 32);
210216
assert!(matches!(config.security_policy, SecurityPolicy::ReadOnly));
211217
assert!(config.auth_token.is_none());
212218
}
@@ -217,13 +223,13 @@ mod tests {
217223
let config = AimxConfig::uds_default()
218224
.socket_path("/var/run/aimdb.sock")
219225
.max_connections(32)
220-
.subscription_queue_size(200)
226+
.max_subs_per_connection(8)
221227
.auth_token("secret-token")
222228
.socket_permissions(0o660);
223229

224230
assert_eq!(config.socket_path, PathBuf::from("/var/run/aimdb.sock"));
225231
assert_eq!(config.max_connections, 32);
226-
assert_eq!(config.subscription_queue_size, 200);
232+
assert_eq!(config.max_subs_per_connection, 8);
227233
assert_eq!(config.auth_token, Some("secret-token".to_string()));
228234
assert_eq!(config.socket_permissions, Some(0o660));
229235
}

0 commit comments

Comments
 (0)