Skip to content

Commit 6f0ec89

Browse files
committed
feat: enable remote access for embedded no_std environments with tokio adapter integration
1 parent 85b23de commit 6f0ec89

9 files changed

Lines changed: 821 additions & 15 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

aimdb-core/src/session/server.rs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ pub struct SessionConfig {
4343
pub acks_subscribe: bool,
4444
}
4545

46+
/// Bound for the per-connection event funnel — caps how many pending outbound
47+
/// updates a single connection may buffer before pumps start dropping (matches
48+
/// the hand-rolled WS server's default per-client channel capacity).
49+
const EVENT_BUFFER: usize = 256;
50+
4651
/// One subscription update on its way back to the connection's send half.
4752
struct SubEvent {
4853
sub: String,
@@ -91,8 +96,12 @@ pub async fn run_session<C, D>(
9196
let mut session = dispatch.open(&ctx);
9297

9398
// Event funnel: every per-subscription pump sends its updates here; the main
94-
// loop is the sole writer to the connection.
95-
let (event_tx, mut event_rx) = mpsc::unbounded_channel::<SubEvent>();
99+
// loop is the sole writer to the connection. **Bounded** so a slow client
100+
// (one whose socket is backpressured, stalling the main loop) cannot grow the
101+
// funnel without limit — the pumps drop on overflow rather than accumulate
102+
// (events carry a monotonic `seq`, so a client can detect the gap). This
103+
// restores the bounded-buffer slow-client protection the hand-rolled loops had.
104+
let (event_tx, mut event_rx) = mpsc::channel::<SubEvent>(EVENT_BUFFER);
96105
// Per-connection subscription pumps; the engine future is their sole owner.
97106
let mut subs: FuturesUnordered<BoxFut<'static, ()>> = FuturesUnordered::new();
98107
// sub-id → cancel handle (dropping/sending the oneshot cancels the pump,
@@ -248,9 +257,10 @@ async fn send_reply_err<C: EnvelopeCodec + ?Sized>(
248257
async fn pump_subscription(
249258
sub_id: String,
250259
mut stream: BoxStream<'static, Payload>,
251-
tx: mpsc::UnboundedSender<SubEvent>,
260+
tx: mpsc::Sender<SubEvent>,
252261
mut cancel: oneshot::Receiver<()>,
253262
) {
263+
use tokio::sync::mpsc::error::TrySendError;
254264
let mut seq: u64 = 0;
255265
loop {
256266
tokio::select! {
@@ -260,8 +270,12 @@ async fn pump_subscription(
260270
next = stream.next() => match next {
261271
Some(data) => {
262272
seq += 1;
263-
if tx.send(SubEvent { sub: sub_id.clone(), seq, data }).is_err() {
264-
break; // funnel closed → connection gone
273+
// `try_send` keeps the pump non-blocking: a backpressured
274+
// funnel drops this update (slow-client protection) rather
275+
// than stalling the bus; only a closed funnel ends the pump.
276+
match tx.try_send(SubEvent { sub: sub_id.clone(), seq, data }) {
277+
Ok(()) | Err(TrySendError::Full(_)) => {}
278+
Err(TrySendError::Closed(_)) => break, // connection gone
265279
}
266280
}
267281
None => break, // stream exhausted

aimdb-websocket-connector/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,3 +71,4 @@ tracing = { version = "0.1", optional = true }
7171
[dev-dependencies]
7272
tokio = { version = "1", features = ["full", "test-util"] }
7373
tokio-tungstenite = "0.26"
74+
aimdb-tokio-adapter = { version = "0.6.0", path = "../aimdb-tokio-adapter" }

aimdb-websocket-connector/src/client_manager.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,4 +215,21 @@ mod tests {
215215
mgr.broadcast("t", b"v").await;
216216
assert_eq!(mgr.subscription_count(), 0);
217217
}
218+
219+
// Layer 2.2 (#2): one broadcast → N subscribers all receive the *same*
220+
// pre-serialized bytes (a shared `Arc`), evidencing a single serialization
221+
// regardless of subscriber count (O(1) fan-out, not O(N)).
222+
#[tokio::test]
223+
async fn broadcast_serializes_once_and_shares_to_all() {
224+
let mgr = ClientManager::new(false);
225+
let mut streams: Vec<_> = (0..8).map(|_| mgr.subscribe("#").1).collect();
226+
mgr.broadcast("t", b"123").await;
227+
let mut frames = Vec::new();
228+
for s in &mut streams {
229+
frames.push(s.next().await.unwrap());
230+
}
231+
// Every subscriber got byte-identical content (the one serialized frame).
232+
let first = &frames[0];
233+
assert!(frames.iter().all(|f| f.as_ref() == first.as_ref()));
234+
}
218235
}

aimdb-websocket-connector/src/codec.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,27 @@ mod tests {
424424
}
425425
}
426426

427+
// Layer 2.3 (#1): decoding many *distinct*-topic Data frames must not
428+
// accumulate any process-lifetime state (the old `leak_topic` interner would
429+
// have grown one `&'static str` per topic here). The borrow is zero-copy.
430+
#[test]
431+
fn decode_outbound_high_cardinality_no_static_growth() {
432+
let codec = WsCodec::new();
433+
for i in 0..10_000 {
434+
let topic = format!("sensors/dev-{i}");
435+
let frame = serde_json::to_vec(&ServerMessage::Data {
436+
topic: topic.clone(),
437+
payload: Some(serde_json::json!(i)),
438+
ts: 0,
439+
})
440+
.unwrap();
441+
match codec.decode_outbound(&frame).unwrap() {
442+
Outbound::Event { sub, .. } => assert_eq!(sub, topic),
443+
_ => panic!("expected Event"),
444+
}
445+
}
446+
}
447+
427448
#[test]
428449
fn write_carries_payload() {
429450
let codec = WsCodec::new();

0 commit comments

Comments
 (0)