Skip to content

Commit 370bd37

Browse files
committed
Refactor WebSocket connector and session management
- Simplified `WebSocketConnectorImpl` by removing the `raw_payload` field and related logic, delegating payload framing to the `WsCodec`. - Introduced `WsDispatch` and `WsSession` for handling WebSocket connections, separating concerns for dispatching messages and managing session state. - Added `WsServerConnection` to manage WebSocket connections on the server side, including multi-topic subscription handling. - Retired the previous session management logic in favor of a more modular approach using `run_session`. - Implemented a new `SnapshotProvider` trait for late-join functionality, allowing clients to receive the current state of topics upon subscription. - Enhanced the transport layer with `WsDialer` and `WsClientConnection` for client-side WebSocket handling. - Updated server state management to include shared dispatch and client manager. - Added tests for multi-topic subscription handling and ensured compatibility with existing message protocols.
1 parent 13d9063 commit 370bd37

19 files changed

Lines changed: 1683 additions & 1548 deletions

File tree

aimdb-client/tests/pump_client.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,11 @@ async fn pump_client_mirrors_record_both_directions() {
7979
.with_connector(AimxClientConnector::new(&sock).with_config(ClientConfig {
8080
reconnect: true,
8181
reconnect_delay: Duration::from_millis(50),
82+
max_reconnect_delay: Duration::from_millis(50),
83+
max_reconnect_attempts: 0,
84+
keepalive_interval: None,
85+
max_offline_queue: usize::MAX,
86+
topic_routed_subs: false,
8287
sends_hello: false,
8388
}));
8489
cb.configure::<Msg>("cfg", |reg| {

aimdb-core/src/session/aimx/codec.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,11 @@ impl EnvelopeCodec for AimxCodec {
176176
write_frame(out, &frame)
177177
}
178178
Outbound::Pong => write_frame(out, &Frame::tagged("pong")),
179+
// AimX has no explicit subscribe ack (the client owns the id; events
180+
// carry it back). `run_session` only emits this when `acks_subscribe`
181+
// is set, which the AimX server leaves off — so this is unreachable on
182+
// the AimX wire.
183+
Outbound::Subscribed { .. } => Err(CodecError::Malformed),
179184
}
180185
}
181186

aimdb-core/src/session/aimx/dispatch.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -102,13 +102,19 @@ where
102102
})
103103
}
104104

105-
fn subscribe(&mut self, topic: &str) -> Result<BoxStream<'static, Payload>, RpcError> {
105+
fn subscribe<'a>(
106+
&'a mut self,
107+
topic: &'a str,
108+
) -> BoxFut<'a, Result<BoxStream<'static, Payload>, RpcError>> {
106109
// The engine owns the subscription lifecycle (keyed by request id) and
107110
// the per-connection cap (SessionLimits); no `generate_subscription_id`
108-
// / `max_subs` bookkeeping here.
109-
let stream =
110-
crate::remote::stream::stream_record_updates(&self.db, topic).map_err(map_db_err)?;
111-
Ok(Box::pin(stream.map(|v| to_payload(&v))))
111+
// / `max_subs` bookkeeping here. AimX has no async authorization, so this
112+
// is a trivial wrapper.
113+
Box::pin(async move {
114+
let stream = crate::remote::stream::stream_record_updates(&self.db, topic)
115+
.map_err(map_db_err)?;
116+
Ok(Box::pin(stream.map(|v| to_payload(&v))) as BoxStream<'static, Payload>)
117+
})
112118
}
113119

114120
fn write<'a>(
@@ -410,6 +416,8 @@ where
410416
max_subs_per_connection: config.max_subs_per_connection,
411417
},
412418
reads_hello: false,
419+
// AimX's subscribe ack stays implicit (events flow); no explicit ack frame.
420+
acks_subscribe: false,
413421
};
414422
let dispatch = Arc::new(AimxDispatch::new(db, config));
415423
let listener = UdsListener::new(listener);

aimdb-core/src/session/client.rs

Lines changed: 112 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,29 @@ use crate::{AimDb, RuntimeAdapter};
3333
pub struct ClientConfig {
3434
/// Redial after a dropped/failed connection instead of ending the engine.
3535
pub reconnect: bool,
36-
/// Delay before each redial when `reconnect` is set.
36+
/// Base delay before the first redial when `reconnect` is set. Subsequent
37+
/// redials grow this exponentially, capped at [`max_reconnect_delay`](Self::max_reconnect_delay).
3738
pub reconnect_delay: Duration,
39+
/// Upper bound for the exponential reconnect backoff. Defaults to
40+
/// [`reconnect_delay`](Self::reconnect_delay) (i.e. no escalation — a fixed
41+
/// delay, preserving the pre-Phase-4 behavior).
42+
pub max_reconnect_delay: Duration,
43+
/// Maximum redial attempts before the engine gives up. `0` = unlimited
44+
/// (the default).
45+
pub max_reconnect_attempts: usize,
46+
/// If set, send a keepalive `Ping` on this interval while a connection is
47+
/// idle. `None` (default) disables keepalive.
48+
pub keepalive_interval: Option<Duration>,
49+
/// Cap on caller commands buffered while disconnected; the oldest are dropped
50+
/// past this bound. Defaults to `usize::MAX` (effectively unbounded — the
51+
/// pre-Phase-4 behavior).
52+
pub max_offline_queue: usize,
53+
/// Key the subscription demux by **topic** instead of the engine request id.
54+
/// `false` (default, AimX-style) — events carry the request id back, demux by
55+
/// id. `true` (WS-style) — the wire pushes data keyed by topic with no id, so
56+
/// the codec's `decode_outbound` returns the topic as `Event.sub` and the
57+
/// engine routes by topic.
58+
pub topic_routed_subs: bool,
3859
/// Send a Ping handshake on connect and wait for the Pong before accepting
3960
/// caller commands (the proactive "handshake-as-caller"). Mirrors the
4061
/// server's `reads_hello`; a real protocol swaps Ping/Pong for its Hello.
@@ -46,11 +67,31 @@ impl Default for ClientConfig {
4667
Self {
4768
reconnect: true,
4869
reconnect_delay: Duration::from_millis(200),
70+
max_reconnect_delay: Duration::from_millis(200),
71+
max_reconnect_attempts: 0,
72+
keepalive_interval: None,
73+
max_offline_queue: usize::MAX,
74+
topic_routed_subs: false,
4975
sends_hello: false,
5076
}
5177
}
5278
}
5379

80+
/// Exponential backoff for the `attempt`-th redial (1-based), capped at
81+
/// [`ClientConfig::max_reconnect_delay`]. Defaults collapse this to a fixed
82+
/// `reconnect_delay` (max == base), preserving pre-Phase-4 behavior.
83+
fn backoff_delay(config: &ClientConfig, attempt: usize) -> Duration {
84+
let base = config.reconnect_delay;
85+
let cap = config.max_reconnect_delay.max(base);
86+
let shift = attempt.saturating_sub(1).min(16) as u32;
87+
base.saturating_mul(1u32 << shift).min(cap)
88+
}
89+
90+
/// Bound the offline backlog: drop the oldest buffered commands beyond `cap`.
91+
fn bound_offline_queue(cmd_rx: &mut mpsc::UnboundedReceiver<ClientCmd>, cap: usize) {
92+
while cmd_rx.len() > cap && cmd_rx.try_recv().is_ok() {}
93+
}
94+
5495
/// A cheap-clone handle to a running [`run_client`] engine — the caller-facing
5596
/// RPC surface. Every method funnels a command to the engine, which owns the
5697
/// pending-call map and the wire.
@@ -165,33 +206,62 @@ async fn client_loop<D, C>(
165206
D: Dialer,
166207
C: EnvelopeCodec,
167208
{
209+
// Consecutive failed attempts since the last successful connection; drives
210+
// exponential backoff and the optional attempt cap.
211+
let mut attempt: usize = 0;
168212
loop {
169213
let conn = match dialer.connect().await {
170-
Ok(conn) => conn,
214+
Ok(conn) => {
215+
attempt = 0;
216+
conn
217+
}
171218
Err(_e) => {
172219
#[cfg(feature = "tracing")]
173220
tracing::warn!("client dial failed: {:?}", _e);
174-
if config.reconnect {
175-
tokio::time::sleep(config.reconnect_delay).await;
176-
continue;
221+
match reconnect_after(&mut attempt, &config, &mut cmd_rx).await {
222+
true => continue,
223+
false => return,
177224
}
178-
return;
179225
}
180226
};
181227

182228
match drive_connection(conn, &codec, &mut cmd_rx, &config).await {
183229
Ended::HandlesDropped => return,
184230
Ended::Disconnected => {
185-
if config.reconnect {
186-
tokio::time::sleep(config.reconnect_delay).await;
187-
continue;
231+
match reconnect_after(&mut attempt, &config, &mut cmd_rx).await {
232+
true => continue,
233+
false => return,
188234
}
189-
return;
190235
}
191236
}
192237
}
193238
}
194239

240+
/// Decide whether to redial: honor `reconnect`, the attempt cap, the offline-queue
241+
/// bound, and the exponential backoff sleep. Returns `true` to retry, `false` to
242+
/// stop the engine.
243+
async fn reconnect_after(
244+
attempt: &mut usize,
245+
config: &ClientConfig,
246+
cmd_rx: &mut mpsc::UnboundedReceiver<ClientCmd>,
247+
) -> bool {
248+
if !config.reconnect {
249+
return false;
250+
}
251+
*attempt += 1;
252+
if config.max_reconnect_attempts != 0 && *attempt >= config.max_reconnect_attempts {
253+
#[cfg(feature = "tracing")]
254+
tracing::warn!(
255+
"client giving up after {} reconnect attempts",
256+
config.max_reconnect_attempts
257+
);
258+
return false;
259+
}
260+
bound_offline_queue(cmd_rx, config.max_offline_queue);
261+
tokio::time::sleep(backoff_delay(config, *attempt)).await;
262+
true
263+
}
264+
195265
/// Drive one dialed [`Connection`]: optional handshake, then `biased` demux of
196266
/// server frames (resolve `Reply` by `id`, route `Event`/`Snapshot` to their
197267
/// subscription channels) interleaved with caller commands. Pending state is
@@ -229,6 +299,9 @@ where
229299
}
230300
}
231301

302+
// Optional keepalive ticker — `None` parks the arm forever (see below).
303+
let mut keepalive = config.keepalive_interval.map(tokio::time::interval);
304+
232305
loop {
233306
tokio::select! {
234307
biased;
@@ -269,10 +342,31 @@ where
269342
}
270343
}
271344
Ok(Outbound::Pong) => {}
345+
// Explicit subscribe ack (WS). Informational — the local
346+
// event sink already exists from the Subscribe command, so
347+
// there is nothing to route; just confirm liveness.
348+
Ok(Outbound::Subscribed { .. }) => {}
272349
Err(_e) => continue, // skip a malformed frame, keep the connection
273350
}
274351
}
275352

353+
// ---- keepalive: send a Ping when the ticker fires --------------
354+
// With no interval configured the arm parks on `pending()` forever,
355+
// so it never wins the `select!`.
356+
_ = async {
357+
match keepalive.as_mut() {
358+
Some(i) => { i.tick().await; }
359+
None => std::future::pending::<()>().await,
360+
}
361+
} => {
362+
out.clear();
363+
if codec.encode_inbound(Inbound::Ping, &mut out).is_ok()
364+
&& conn.send(&out).await.is_err()
365+
{
366+
return Ended::Disconnected;
367+
}
368+
}
369+
276370
// ---- caller commands from ClientHandle -------------------------
277371
cmd = cmd_rx.recv() => {
278372
let cmd = match cmd {
@@ -299,7 +393,14 @@ where
299393
ClientCmd::Subscribe { topic, events } => {
300394
let id = next_id;
301395
next_id += 1;
302-
subs.insert(id.to_string(), events);
396+
// Topic-routed (WS): the wire pushes data keyed by topic,
397+
// so demux by topic; id-routed (AimX): events echo the id.
398+
let key = if config.topic_routed_subs {
399+
topic.clone()
400+
} else {
401+
id.to_string()
402+
};
403+
subs.insert(key, events);
303404
out.clear();
304405
let sent = codec
305406
.encode_inbound(Inbound::Subscribe { id, topic }, &mut out)

aimdb-core/src/session/mod.rs

Lines changed: 98 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -80,22 +80,78 @@ pub type TransportResult<T> = Result<T, TransportError>;
8080
// Supporting types (stubs — sufficient for the signatures to compile)
8181
// ===========================================================================
8282

83-
/// Remote-peer metadata carried by a [`Connection`] (remote addr, headers,
84-
/// pre-resolved auth).
83+
/// Remote-peer metadata carried by a [`Connection`] (remote addr, pre-resolved
84+
/// auth).
8585
///
86-
/// Opaque placeholder. The concrete fields — and whether one shape carries both
87-
/// AimX `SecurityPolicy` and WS `Permissions` — are **deferred to Phase 4**.
88-
#[derive(Debug, Clone, Default)]
86+
/// **Phase 4 (resolved — doc 037 auth-context gate).** One shape serves both
87+
/// connectors: a neutral [`peer_addr`](Self::peer_addr) plus a type-erased
88+
/// [`ext`](Self::ext) slot the connector fills with its own resolved identity
89+
/// (WS stuffs `ClientInfo`/`Permissions` at the HTTP upgrade; AimX stuffs its
90+
/// `SecurityPolicy`). Core stays connector-agnostic; each side downcasts `ext`.
91+
#[derive(Clone, Default)]
8992
#[non_exhaustive]
90-
pub struct PeerInfo {}
93+
pub struct PeerInfo {
94+
/// Remote address, if the transport exposes one.
95+
pub peer_addr: Option<String>,
96+
/// Connector-resolved identity, type-erased so core need not know the
97+
/// connector's auth types. Downcast with [`ext_as`](Self::ext_as).
98+
pub ext: Option<Arc<dyn core::any::Any + Send + Sync>>,
99+
}
100+
101+
impl PeerInfo {
102+
/// Attach a connector-resolved identity (consumed by [`Dispatch::authenticate`]).
103+
pub fn with_ext(mut self, ext: Arc<dyn core::any::Any + Send + Sync>) -> Self {
104+
self.ext = Some(ext);
105+
self
106+
}
107+
108+
/// Downcast the [`ext`](Self::ext) identity to a concrete connector type.
109+
pub fn ext_as<T: core::any::Any + Send + Sync>(&self) -> Option<Arc<T>> {
110+
self.ext.clone()?.downcast::<T>().ok()
111+
}
112+
}
113+
114+
impl core::fmt::Debug for PeerInfo {
115+
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
116+
f.debug_struct("PeerInfo")
117+
.field("peer_addr", &self.peer_addr)
118+
.field("ext", &self.ext.as_ref().map(|_| "<opaque>"))
119+
.finish()
120+
}
121+
}
91122

92123
/// The authenticated session context threaded through [`Dispatch`] calls.
93124
///
94-
/// Minimal/opaque placeholder. Auth fields are **deferred to Phase 4**
95-
/// (the auth-context shape gate).
96-
#[derive(Debug, Clone, Default)]
125+
/// **Phase 4 (resolved — doc 037 auth-context gate).** Carries the resolved
126+
/// principal as a type-erased [`ext`](Self::ext) that [`Dispatch::open`] threads
127+
/// into the per-connection [`Session`] for per-operation authorization
128+
/// (`authorize_subscribe`/`authorize_write`). AimX leaves it `None`.
129+
#[derive(Clone, Default)]
97130
#[non_exhaustive]
98-
pub struct SessionCtx {}
131+
pub struct SessionCtx {
132+
/// The resolved principal, type-erased. Downcast with [`ext_as`](Self::ext_as).
133+
pub ext: Option<Arc<dyn core::any::Any + Send + Sync>>,
134+
}
135+
136+
impl SessionCtx {
137+
/// Build a context carrying a connector-resolved principal.
138+
pub fn with_ext(ext: Arc<dyn core::any::Any + Send + Sync>) -> Self {
139+
Self { ext: Some(ext) }
140+
}
141+
142+
/// Downcast the [`ext`](Self::ext) principal to a concrete connector type.
143+
pub fn ext_as<T: core::any::Any + Send + Sync>(&self) -> Option<Arc<T>> {
144+
self.ext.clone()?.downcast::<T>().ok()
145+
}
146+
}
147+
148+
impl core::fmt::Debug for SessionCtx {
149+
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
150+
f.debug_struct("SessionCtx")
151+
.field("ext", &self.ext.as_ref().map(|_| "<opaque>"))
152+
.finish()
153+
}
154+
}
99155

100156
/// Engine-local bounds for a session (consumed by the Phase 2 engines, not by
101157
/// the contracts here).
@@ -222,6 +278,16 @@ pub enum Outbound<'a> {
222278
/// Unparsed record value.
223279
data: Payload,
224280
},
281+
/// An explicit acknowledgement that a subscription opened. Emitted by
282+
/// [`run_session`](super::server::run_session) only when
283+
/// [`SessionConfig::acks_subscribe`](super::server::SessionConfig) is set
284+
/// (WS needs it; AimX's ack is implicit, so it leaves the flag off and never
285+
/// emits this). The `sub` is the subscription's routing id — the same value
286+
/// that tags its [`Event`](Outbound::Event)s.
287+
Subscribed {
288+
/// Subscription id that was opened.
289+
sub: &'a str,
290+
},
225291
/// Keepalive response.
226292
Pong,
227293
}
@@ -314,9 +380,25 @@ pub trait Session: Send {
314380
/// Defaulted to [`RpcError::NotFound`] so a dispatch with no streaming
315381
/// surface need not implement it (doc 037 § the server-port refinement —
316382
/// the stream is side-neutral, so it is defaulted here for symmetry).
317-
fn subscribe(&mut self, topic: &str) -> Result<BoxStream<'static, Payload>, RpcError> {
383+
///
384+
/// Async (Phase 4): opening a subscription may need to *await* per-operation
385+
/// authorization (e.g. WS `authorize_subscribe`); the engine awaits it.
386+
fn subscribe<'a>(
387+
&'a mut self,
388+
topic: &'a str,
389+
) -> BoxFut<'a, Result<BoxStream<'static, Payload>, RpcError>> {
318390
let _ = topic;
319-
Err(RpcError::NotFound)
391+
Box::pin(async { Err(RpcError::NotFound) })
392+
}
393+
394+
/// Late-join snapshot: the current value for `topic`, emitted by
395+
/// [`run_session`](super::server::run_session) as an [`Outbound::Snapshot`]
396+
/// right after a successful [`subscribe`](Session::subscribe) and before the
397+
/// first event. Defaulted to `None` (no snapshot) — AimX inherits this; WS
398+
/// overrides it from its `SnapshotProvider`.
399+
fn snapshot(&mut self, topic: &str) -> Option<Payload> {
400+
let _ = topic;
401+
None
320402
}
321403

322404
/// Fire-and-forget write: no reply. Routes through the existing
@@ -461,7 +543,10 @@ mod tests {
461543
) -> BoxFut<'a, Result<Payload, RpcError>> {
462544
unimplemented!()
463545
}
464-
fn subscribe(&mut self, _topic: &str) -> Result<BoxStream<'static, Payload>, RpcError> {
546+
fn subscribe<'a>(
547+
&'a mut self,
548+
_topic: &'a str,
549+
) -> BoxFut<'a, Result<BoxStream<'static, Payload>, RpcError>> {
465550
unimplemented!()
466551
}
467552
fn write<'a>(

0 commit comments

Comments
 (0)