Skip to content

Commit f108bc5

Browse files
lxsaahclaude
andcommitted
feat(session): split Dispatch into shared Dispatch + per-connection Session
Phase 3 server-port prep (issue #39). The AimX wire reshape dissolved every seam except one: record.drain needs lazy per-connection cursors, but Dispatch is a shared Arc<D> with &self — nowhere for mutable per-connection state. Split the dispatch role: - Dispatch (Send + Sync, one Arc per server): authenticate + open() factory. - Session (Send, one Box per accepted connection): call/subscribe/write on &mut self. run_session owns the Box<dyn Session> and threads &mut into it. subscribe is defaulted to NotFound (its 'static stream is side-neutral). Additive + object-safe (mirrors the Phase-2 encode_inbound/decode_outbound precedent); recorded in 037. Engine round-trip + AimX client tests still green; contracts still cross-compile to thumbv7em. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 9204692 commit f108bc5

5 files changed

Lines changed: 211 additions & 50 deletions

File tree

aimdb-client/tests/aimx_session.rs

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use aimdb_client::AimxConnection;
1414
use aimdb_core::session::aimx::{AimxCodec, UdsConnection};
1515
use aimdb_core::session::{
1616
serve, AuthError, BoxFut, BoxStream, Connection, Dispatch, Listener, Payload, PeerInfo,
17-
RpcError, SessionConfig, SessionCtx, TransportError, TransportResult,
17+
RpcError, Session, SessionConfig, SessionCtx, TransportError, TransportResult,
1818
};
1919
use serde_json::json;
2020
use tokio::net::UnixListener;
@@ -59,9 +59,21 @@ impl Dispatch for TestDispatch {
5959
Box::pin(async { Ok(SessionCtx::default()) })
6060
}
6161

62+
fn open(&self, _ctx: &SessionCtx) -> Box<dyn Session> {
63+
Box::new(TestSession {
64+
writes: self.writes.clone(),
65+
})
66+
}
67+
}
68+
69+
/// Per-connection session for the test dispatch.
70+
struct TestSession {
71+
writes: WriteLog,
72+
}
73+
74+
impl Session for TestSession {
6275
fn call<'a>(
63-
&'a self,
64-
_ctx: &'a SessionCtx,
76+
&'a mut self,
6577
method: &'a str,
6678
params: Payload,
6779
) -> BoxFut<'a, Result<Payload, RpcError>> {
@@ -91,11 +103,7 @@ impl Dispatch for TestDispatch {
91103
})
92104
}
93105

94-
fn subscribe(
95-
&self,
96-
_ctx: &SessionCtx,
97-
topic: &str,
98-
) -> Result<BoxStream<'static, Payload>, RpcError> {
106+
fn subscribe(&mut self, topic: &str) -> Result<BoxStream<'static, Payload>, RpcError> {
99107
// Three synthetic updates derived from the topic, then end.
100108
let items: Vec<Payload> = (1..=3)
101109
.map(|i| payload(json!({ "topic": topic, "n": i })))
@@ -104,8 +112,7 @@ impl Dispatch for TestDispatch {
104112
}
105113

106114
fn write<'a>(
107-
&'a self,
108-
_ctx: &'a SessionCtx,
115+
&'a mut self,
109116
topic: &'a str,
110117
payload: Payload,
111118
) -> BoxFut<'a, Result<(), RpcError>> {

aimdb-core/src/session/mod.rs

Lines changed: 54 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -260,12 +260,24 @@ pub trait Dialer: Send {
260260

261261
// ===========================================================================
262262
// Layer 3 — dispatch (the semantics). Doc 034 § Layer 3 + § EnvelopeCodec.
263-
// RPC and streaming unify in ONE trait (Decision 2): three reply cardinalities
264-
// — `call` (one) / `subscribe` (many) / `write` (none).
263+
// RPC and streaming unify in ONE per-connection role (Decision 2): three reply
264+
// cardinalities — `call` (one) / `subscribe` (many) / `write` (none).
265+
//
266+
// The role is split across two traits so the shared, immutable half (one
267+
// `Arc<dyn Dispatch>` per server) and the per-connection mutable half (one
268+
// `Box<dyn Session>` per accepted connection) each own what they need:
269+
//
270+
// - [`Dispatch`] — `Send + Sync`, shared: `authenticate` + an `open` factory.
271+
// - [`Session`] — `Send`, per-connection: `call` / `subscribe` / `write` on
272+
// `&mut self`, so a connection can hold mutable state (e.g. `record.drain`'s
273+
// lazy per-record cursors — the one seam the AimX wire reshape did not
274+
// dissolve) without a lock. See doc 037 (the additive server-port refinement,
275+
// mirroring the Phase-2 `encode_inbound`/`decode_outbound` precedent).
265276
// ===========================================================================
266277

267-
/// The application dispatch: authenticate a session, then serve calls,
268-
/// subscriptions, and writes against a [`SessionCtx`].
278+
/// The shared application dispatch: authenticate a connection, then open a
279+
/// per-connection [`Session`]. One `Arc<dyn Dispatch>` is shared across every
280+
/// connection a server accepts, so it stays `Sync` and behind `&self`.
269281
pub trait Dispatch: Send + Sync {
270282
/// Resolve a [`SessionCtx`] from peer metadata and/or the first frame
271283
/// (WS supplies pre-resolved identity via [`PeerInfo`]; UDS reads a Hello).
@@ -275,28 +287,42 @@ pub trait Dispatch: Send + Sync {
275287
first: Option<&'a [u8]>,
276288
) -> BoxFut<'a, Result<SessionCtx, AuthError>>;
277289

290+
/// Open the per-connection [`Session`] once, after [`authenticate`]. The
291+
/// returned session owns the connection's mutable dispatch state (drain
292+
/// cursors today, per-session auth identity in Phase 4) that the shared
293+
/// `Arc<Self>` cannot hold behind `&self`; the engine threads `&mut` into it.
294+
fn open(&self, ctx: &SessionCtx) -> Box<dyn Session>;
295+
}
296+
297+
/// The per-connection session: serves calls, subscriptions, and writes for one
298+
/// accepted [`Connection`]. The engine ([`run_session`]) owns the
299+
/// `Box<dyn Session>` and threads `&mut self` into each method, so a session can
300+
/// hold per-connection mutable state without a lock — while the shared,
301+
/// immutable role stays on [`Dispatch`].
302+
pub trait Session: Send {
278303
/// One-shot RPC: one request → one reply.
279304
fn call<'a>(
280-
&'a self,
281-
ctx: &'a SessionCtx,
305+
&'a mut self,
282306
method: &'a str,
283307
params: Payload,
284308
) -> BoxFut<'a, Result<Payload, RpcError>>;
285309

286310
/// Streaming: open a subscription that yields many payloads. The stream is
287-
/// `'static` so it can hold its own buffer reader inside the engine's
288-
/// `FuturesUnordered` (doc 034 risk list).
289-
fn subscribe(
290-
&self,
291-
ctx: &SessionCtx,
292-
topic: &str,
293-
) -> Result<BoxStream<'static, Payload>, RpcError>;
311+
/// `'static` (it captures cloned handles), so it outlives the `&mut` borrow
312+
/// and lives in the engine's `FuturesUnordered` (doc 034 risk list).
313+
///
314+
/// Defaulted to [`RpcError::NotFound`] so a dispatch with no streaming
315+
/// surface need not implement it (doc 037 § the server-port refinement —
316+
/// the stream is side-neutral, so it is defaulted here for symmetry).
317+
fn subscribe(&mut self, topic: &str) -> Result<BoxStream<'static, Payload>, RpcError> {
318+
let _ = topic;
319+
Err(RpcError::NotFound)
320+
}
294321

295322
/// Fire-and-forget write: no reply. Routes through the existing
296323
/// producer/arbiter path (single-writer-per-key stays intact).
297324
fn write<'a>(
298-
&'a self,
299-
ctx: &'a SessionCtx,
325+
&'a mut self,
300326
topic: &'a str,
301327
payload: Payload,
302328
) -> BoxFut<'a, Result<(), RpcError>>;
@@ -368,12 +394,13 @@ pub trait Source: Send {
368394
// `Box<dyn Trait>` from a mock per the acceptance criteria.
369395
// ===========================================================================
370396

371-
#[allow(dead_code)]
397+
#[allow(dead_code, clippy::too_many_arguments)]
372398
fn _assert_object_safe(
373399
_connection: &dyn Connection,
374400
_listener: &dyn Listener,
375401
_dialer: &dyn Dialer,
376402
_dispatch: &dyn Dispatch,
403+
_session: &dyn Session,
377404
_codec: &dyn EnvelopeCodec,
378405
_sink: &dyn Sink,
379406
_source: &dyn Source,
@@ -420,24 +447,25 @@ mod tests {
420447
) -> BoxFut<'a, Result<SessionCtx, AuthError>> {
421448
unimplemented!()
422449
}
450+
fn open(&self, _ctx: &SessionCtx) -> Box<dyn Session> {
451+
unimplemented!()
452+
}
453+
}
454+
455+
struct MockSession;
456+
impl Session for MockSession {
423457
fn call<'a>(
424-
&'a self,
425-
_ctx: &'a SessionCtx,
458+
&'a mut self,
426459
_method: &'a str,
427460
_params: Payload,
428461
) -> BoxFut<'a, Result<Payload, RpcError>> {
429462
unimplemented!()
430463
}
431-
fn subscribe(
432-
&self,
433-
_ctx: &SessionCtx,
434-
_topic: &str,
435-
) -> Result<BoxStream<'static, Payload>, RpcError> {
464+
fn subscribe(&mut self, _topic: &str) -> Result<BoxStream<'static, Payload>, RpcError> {
436465
unimplemented!()
437466
}
438467
fn write<'a>(
439-
&'a self,
440-
_ctx: &'a SessionCtx,
468+
&'a mut self,
441469
_topic: &'a str,
442470
_payload: Payload,
443471
) -> BoxFut<'a, Result<(), RpcError>> {
@@ -487,6 +515,7 @@ mod tests {
487515
let _listener: Box<dyn Listener> = Box::new(MockListener);
488516
let _dialer: Box<dyn Dialer> = Box::new(MockDialer);
489517
let _dispatch: Box<dyn Dispatch> = Box::new(MockDispatch);
518+
let _session: Box<dyn Session> = Box::new(MockSession);
490519
let _codec: Box<dyn EnvelopeCodec> = Box::new(MockCodec);
491520
let _sink: Box<dyn Sink> = Box::new(MockSink);
492521
let _source: Box<dyn Source> = Box::new(MockSource);

aimdb-core/src/session/server.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,11 @@ pub async fn run_session<C, D>(
7979
}
8080
};
8181

82+
// Open the per-connection session once. It owns the connection's mutable
83+
// dispatch state (e.g. `record.drain` cursors); the loop below threads
84+
// `&mut` into its `call` / `subscribe` / `write`.
85+
let mut session = dispatch.open(&ctx);
86+
8287
// Event funnel: every per-subscription pump sends its updates here; the main
8388
// loop is the sole writer to the connection.
8489
let (event_tx, mut event_rx) = mpsc::unbounded_channel::<SubEvent>();
@@ -107,7 +112,7 @@ pub async fn run_session<C, D>(
107112
};
108113
match msg {
109114
Inbound::Request { id, method, params } => {
110-
let result = dispatch.call(&ctx, &method, params).await;
115+
let result = session.call(&method, params).await;
111116
out.clear();
112117
if codec.encode(Outbound::Reply { id, result }, &mut out).is_err() {
113118
continue;
@@ -124,7 +129,7 @@ pub async fn run_session<C, D>(
124129
send_reply_err(&mut conn, codec, &mut out, id, RpcError::Denied).await;
125130
continue;
126131
}
127-
match dispatch.subscribe(&ctx, &topic) {
132+
match session.subscribe(&topic) {
128133
Ok(stream) => {
129134
let (cancel_tx, cancel_rx) = oneshot::channel();
130135
cancels.insert(sub_id.clone(), cancel_tx);
@@ -145,8 +150,8 @@ pub async fn run_session<C, D>(
145150
cancels.remove(&sub);
146151
}
147152
Inbound::Write { topic, payload } => {
148-
// Fire-and-forget; routes through Dispatch (single-writer-per-key intact).
149-
let _ = dispatch.write(&ctx, &topic, payload).await;
153+
// Fire-and-forget; routes through the session (single-writer-per-key intact).
154+
let _ = session.write(&topic, payload).await;
150155
}
151156
Inbound::Ping => {
152157
out.clear();
@@ -208,7 +213,7 @@ async fn send_reply_err<C: EnvelopeCodec + ?Sized>(
208213
}
209214
}
210215

211-
/// Pump one `Dispatch::subscribe` stream into the connection's event funnel,
216+
/// Pump one `Session::subscribe` stream into the connection's event funnel,
212217
/// tagging each update with a monotonic `seq`. Ends when the stream finishes or
213218
/// the cancel handle is dropped/fired (Unsubscribe or connection teardown).
214219
async fn pump_subscription(

aimdb-core/tests/session_engine.rs

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use futures::StreamExt;
1818

1919
use aimdb_core::session::{
2020
run_client, serve, AuthError, BoxFut, BoxStream, ClientConfig, CodecError, Connection, Dialer,
21-
Dispatch, EnvelopeCodec, Inbound, Listener, Outbound, Payload, PeerInfo, RpcError,
21+
Dispatch, EnvelopeCodec, Inbound, Listener, Outbound, Payload, PeerInfo, RpcError, Session,
2222
SessionConfig, SessionCtx, TransportError, TransportResult,
2323
};
2424

@@ -257,21 +257,30 @@ impl Dispatch for EchoDispatch {
257257
Box::pin(async { Ok(SessionCtx::default()) })
258258
}
259259

260+
fn open(&self, _ctx: &SessionCtx) -> Box<dyn Session> {
261+
Box::new(EchoSession {
262+
writes: self.writes.clone(),
263+
})
264+
}
265+
}
266+
267+
/// Per-connection echo session — the shared `EchoDispatch` clones its write log
268+
/// into each one at `open` time.
269+
struct EchoSession {
270+
writes: WriteLog,
271+
}
272+
273+
impl Session for EchoSession {
260274
fn call<'a>(
261-
&'a self,
262-
_ctx: &'a SessionCtx,
275+
&'a mut self,
263276
_method: &'a str,
264277
params: Payload,
265278
) -> BoxFut<'a, Result<Payload, RpcError>> {
266279
// Echo the params straight back.
267280
Box::pin(async move { Ok(params) })
268281
}
269282

270-
fn subscribe(
271-
&self,
272-
_ctx: &SessionCtx,
273-
topic: &str,
274-
) -> Result<BoxStream<'static, Payload>, RpcError> {
283+
fn subscribe(&mut self, topic: &str) -> Result<BoxStream<'static, Payload>, RpcError> {
275284
// Three synthetic updates derived from the topic, then end.
276285
let items: Vec<Payload> = (1..=3)
277286
.map(|i| payload_from(&format!("{topic}#{i}")))
@@ -280,8 +289,7 @@ impl Dispatch for EchoDispatch {
280289
}
281290

282291
fn write<'a>(
283-
&'a self,
284-
_ctx: &'a SessionCtx,
292+
&'a mut self,
285293
topic: &'a str,
286294
payload: Payload,
287295
) -> BoxFut<'a, Result<(), RpcError>> {

0 commit comments

Comments
 (0)