Skip to content

Commit 4d8d07e

Browse files
lxsaahclaude
andcommitted
feat(session): port AimX server onto serve/run_session (UdsListener + AimxDispatch)
Phase 3 server half (issue #39). Replaces the test-local UdsListener + TestDispatch with production server code: - UdsListener: Listener over tokio UnixListener (the accepting transport half, deferred from the client port; reuses the role-neutral UdsConnection). - AimxDispatch<R> (shared) + AimxSession<R> (per-connection): method bodies ported from remote/handler.rs onto the Session seam. record.drain's lazy per-record cursors live in AimxSession. subscribe reuses stream_record_updates; write/record.set reuse set_record_from_json (single-writer-per-key intact). SecurityPolicy enforced per-call (ReadOnly denies; writable_records membership). v2 client param shapes: record.get {name}, record.set {name,value}, write {value}. - build_aimx_server: supervisor.rs's socket setup (remove-stale/bind/chmod) then the spawn-free serve() engine; max_connections/max_subs -> SessionLimits. The aimx_session exit test now stands up a real AimDb and drives the production server end-to-end (hello/get/set/subscribe/write) over a real UDS socket. Legacy handler.rs/supervisor.rs remain untouched (main green); retired in a later stage. record.query is a sync fn returning the 'static handler future (an async fn(&self) would force AimxSession: Sync, which the drain_readers box is not). Welcome's writable_records is derived from the policy directly so build_aimx_server is self-contained standalone. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent f108bc5 commit 4d8d07e

6 files changed

Lines changed: 563 additions & 178 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

aimdb-client/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,3 +38,7 @@ thiserror = "1"
3838
tokio = { version = "1", features = ["rt-multi-thread", "test-util"] }
3939
tokio-test = "0.4"
4040
tempfile = "3"
41+
# The aimx_session exit test stands up a real AimDb + production AimX server
42+
# (`build_aimx_server`) and drives it with the engine-based `AimxConnection`.
43+
aimdb-tokio-adapter = { version = "0.6.0", path = "../aimdb-tokio-adapter" }
44+
serde = { version = "1", features = ["derive"] }

aimdb-client/tests/aimx_session.rs

Lines changed: 93 additions & 164 deletions
Original file line numberDiff line numberDiff line change
@@ -1,197 +1,126 @@
1-
//! Phase 3 client-first exit criterion: the engine-based [`AimxConnection`]
2-
//! round-trips the AimX-v2 wire — `hello` handshake, RPC (`record.get`/`set`),
3-
//! a streaming subscription, and a fire-and-forget write — against a `serve`
4-
//! engine test-server over a **real Unix-domain socket**.
1+
//! Phase 3 **server-port** exit criterion: the engine-based [`AimxConnection`]
2+
//! round-trips the reshaped AimX-v2 wire — `hello` handshake, RPC
3+
//! (`record.get`/`record.set`), a streaming subscription, and a
4+
//! fire-and-forget write — against the **production** server
5+
//! ([`build_aimx_server`] → `serve`/`run_session` + `AimxDispatch`) over a real
6+
//! Unix-domain socket.
57
//!
6-
//! The server side uses the production [`AimxCodec`] + [`UdsConnection`]; the
7-
//! only test-local pieces are a `UdsListener` (the accepting transport half,
8-
//! deferred from core to the server port) and a small echo-ish `Dispatch`. This
9-
//! proves the reshaped wire end-to-end before the real server `Dispatch` exists.
8+
//! This swaps the client-half milestone's test-local `UdsListener` +
9+
//! `TestDispatch` for real server code standing up an actual `AimDb`, proving
10+
//! the reshaped wire end-to-end through the shared session engine.
1011
11-
use std::sync::{Arc, Mutex};
12+
use std::sync::Arc;
13+
use std::time::Duration;
1214

1315
use aimdb_client::AimxConnection;
14-
use aimdb_core::session::aimx::{AimxCodec, UdsConnection};
15-
use aimdb_core::session::{
16-
serve, AuthError, BoxFut, BoxStream, Connection, Dispatch, Listener, Payload, PeerInfo,
17-
RpcError, Session, SessionConfig, SessionCtx, TransportError, TransportResult,
18-
};
16+
use aimdb_core::buffer::BufferCfg;
17+
use aimdb_core::remote::{AimxConfig, SecurityPolicy};
18+
use aimdb_core::session::aimx::build_aimx_server;
19+
use aimdb_core::AimDbBuilder;
20+
use aimdb_tokio_adapter::{TokioAdapter, TokioRecordRegistrarExt};
21+
use futures::StreamExt;
22+
use serde::{Deserialize, Serialize};
1923
use serde_json::json;
20-
use tokio::net::UnixListener;
2124

22-
// ---------------------------------------------------------------------------
23-
// Test-local accepting transport (the `UdsListener` core gains in the server port)
24-
// ---------------------------------------------------------------------------
25-
26-
struct TestUdsListener {
27-
inner: UnixListener,
28-
}
29-
30-
impl Listener for TestUdsListener {
31-
fn accept(&mut self) -> BoxFut<'_, TransportResult<Box<dyn Connection>>> {
32-
Box::pin(async move {
33-
let (stream, _addr) = self.inner.accept().await.map_err(|_| TransportError::Io)?;
34-
Ok(Box::new(UdsConnection::new(stream)) as Box<dyn Connection>)
35-
})
36-
}
37-
}
38-
39-
// ---------------------------------------------------------------------------
40-
// Minimal AimX dispatch (stand-in for the real server `Dispatch`, server port)
41-
// ---------------------------------------------------------------------------
42-
43-
type WriteLog = Arc<Mutex<Vec<(String, Vec<u8>)>>>;
44-
45-
struct TestDispatch {
46-
writes: WriteLog,
47-
}
48-
49-
fn payload(v: serde_json::Value) -> Payload {
50-
Payload::from(serde_json::to_vec(&v).unwrap().as_slice())
51-
}
52-
53-
impl Dispatch for TestDispatch {
54-
fn authenticate<'a>(
55-
&'a self,
56-
_peer: &'a PeerInfo,
57-
_first: Option<&'a [u8]>,
58-
) -> BoxFut<'a, Result<SessionCtx, AuthError>> {
59-
Box::pin(async { Ok(SessionCtx::default()) })
60-
}
61-
62-
fn open(&self, _ctx: &SessionCtx) -> Box<dyn Session> {
63-
Box::new(TestSession {
64-
writes: self.writes.clone(),
65-
})
66-
}
67-
}
68-
69-
/// Per-connection session for the test dispatch.
70-
struct TestSession {
71-
writes: WriteLog,
25+
/// A writable config-style record (SingleLatest, no producer → remotely settable).
26+
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
27+
struct Setting {
28+
level: u64,
7229
}
7330

74-
impl Session for TestSession {
75-
fn call<'a>(
76-
&'a mut self,
77-
method: &'a str,
78-
params: Payload,
79-
) -> BoxFut<'a, Result<Payload, RpcError>> {
80-
let method = method.to_string();
81-
Box::pin(async move {
82-
match method.as_str() {
83-
"hello" => Ok(payload(json!({
84-
"version": "2.0",
85-
"server": "test",
86-
"permissions": ["read", "write"],
87-
"writable_records": ["temp"],
88-
}))),
89-
"record.list" => Ok(payload(json!([{ "name": "temp", "writable": true }]))),
90-
"record.get" => {
91-
// Echo the requested name back as a fixed value.
92-
let v: serde_json::Value = serde_json::from_slice(&params).unwrap_or_default();
93-
let name = v.get("name").and_then(|n| n.as_str()).unwrap_or("");
94-
if name == "temp" {
95-
Ok(payload(json!(42)))
96-
} else {
97-
Err(RpcError::NotFound)
98-
}
99-
}
100-
"record.set" => Ok(payload(json!({ "ok": true }))),
101-
_ => Err(RpcError::NotFound),
102-
}
103-
})
104-
}
105-
106-
fn subscribe(&mut self, topic: &str) -> Result<BoxStream<'static, Payload>, RpcError> {
107-
// Three synthetic updates derived from the topic, then end.
108-
let items: Vec<Payload> = (1..=3)
109-
.map(|i| payload(json!({ "topic": topic, "n": i })))
110-
.collect();
111-
Ok(Box::pin(futures::stream::iter(items)))
112-
}
113-
114-
fn write<'a>(
115-
&'a mut self,
116-
topic: &'a str,
117-
payload: Payload,
118-
) -> BoxFut<'a, Result<(), RpcError>> {
119-
let writes = self.writes.clone();
120-
let topic = topic.to_string();
121-
Box::pin(async move {
122-
writes.lock().unwrap().push((topic, payload.to_vec()));
123-
Ok(())
124-
})
125-
}
31+
/// A streamed record (SpmcRing) fed by a producer in the test.
32+
#[derive(Debug, Clone, Serialize, Deserialize)]
33+
struct Reading {
34+
n: u64,
12635
}
12736

128-
// ---------------------------------------------------------------------------
129-
// The exit-criterion test
130-
// ---------------------------------------------------------------------------
131-
13237
#[tokio::test]
133-
async fn aimx_client_roundtrip_over_uds() {
134-
use futures::StreamExt;
135-
38+
async fn aimx_roundtrip_over_uds_production_server() {
13639
let dir = tempfile::tempdir().unwrap();
13740
let sock = dir.path().join("aimdb.sock");
13841

139-
// Bind before connecting so the client's dial always finds the socket.
140-
let listener = TestUdsListener {
141-
inner: UnixListener::bind(&sock).unwrap(),
142-
};
143-
let writes: WriteLog = Arc::new(Mutex::new(Vec::new()));
144-
let dispatch = Arc::new(TestDispatch {
145-
writes: writes.clone(),
42+
// Build a real AimDb with two remote-accessible records.
43+
let mut builder = AimDbBuilder::new().runtime(Arc::new(TokioAdapter));
44+
builder.configure::<Setting>("setting", |reg| {
45+
reg.buffer(BufferCfg::SingleLatest).with_remote_access();
14646
});
147-
let server = tokio::spawn(serve(
148-
listener,
149-
Arc::new(AimxCodec),
150-
dispatch,
151-
SessionConfig::default(),
152-
));
153-
154-
// Connect: this performs the `hello` handshake and captures the Welcome.
47+
builder.configure::<Reading>("events", |reg| {
48+
reg.buffer(BufferCfg::SpmcRing { capacity: 64 })
49+
.with_remote_access();
50+
});
51+
let (db, runner) = builder.build().await.expect("build db");
52+
let db = Arc::new(db);
53+
tokio::spawn(runner.run());
54+
55+
// Seed the writable record before connecting so `record.get` has a value.
56+
db.set_record_from_json("setting", json!({ "level": 1 }))
57+
.expect("seed setting");
58+
59+
// ReadWrite policy with `setting` writable; `events` stays read-only.
60+
let mut policy = SecurityPolicy::read_write();
61+
policy.allow_write_key("setting");
62+
let config = AimxConfig::uds_default()
63+
.socket_path(&sock)
64+
.security_policy(policy)
65+
.max_connections(8)
66+
.max_subs_per_connection(8);
67+
68+
// Production server future, driven on a task (the engine itself is spawn-free).
69+
let server = tokio::spawn(build_aimx_server(db.clone(), config).expect("bind server"));
70+
71+
// Connect: performs the `hello` handshake and captures the Welcome.
15572
let conn = AimxConnection::connect(&sock).await.expect("connect");
156-
assert_eq!(conn.server_info().server, "test");
73+
assert_eq!(conn.server_info().server, "aimdb");
15774
assert!(conn
15875
.server_info()
15976
.permissions
16077
.contains(&"write".to_string()));
78+
assert!(conn
79+
.server_info()
80+
.writable_records
81+
.contains(&"setting".to_string()));
16182

162-
// RPC: record.get
163-
let temp = conn.get_record("temp").await.expect("get temp");
164-
assert_eq!(temp, json!(42));
83+
// RPC: record.get on the seeded record.
84+
let got = conn.get_record("setting").await.expect("get setting");
85+
assert_eq!(got, json!({ "level": 1 }));
16586

16687
// RPC: record.get on a missing record maps to a server error.
16788
assert!(conn.get_record("missing").await.is_err());
16889

169-
// RPC: record.set
170-
let set = conn.set_record("temp", json!(7)).await.expect("set temp");
171-
assert_eq!(set, json!({ "ok": true }));
90+
// RPC: record.set (permission-checked) echoes the new value.
91+
let set = conn
92+
.set_record("setting", json!({ "level": 7 }))
93+
.await
94+
.expect("set setting");
95+
assert_eq!(set.get("value").unwrap(), &json!({ "level": 7 }));
96+
assert_eq!(
97+
conn.get_record("setting").await.unwrap(),
98+
json!({ "level": 7 })
99+
);
172100

173-
// Streaming subscription: three events routed back by the engine-owned id.
174-
let mut stream = conn.subscribe("temp").expect("subscribe");
175-
for n in 1..=3 {
176-
let ev = stream.next().await.expect("event");
177-
assert_eq!(ev, json!({ "topic": "temp", "n": n }));
101+
// Streaming: a producer feeds `events`; the subscription routes updates back.
102+
let producer = db.producer::<Reading>("events").expect("producer");
103+
tokio::spawn(async move {
104+
for n in 1..=50 {
105+
producer.produce(Reading { n });
106+
tokio::time::sleep(Duration::from_millis(10)).await;
107+
}
108+
});
109+
let mut stream = conn.subscribe("events").expect("subscribe");
110+
for _ in 0..3 {
111+
let ev = tokio::time::timeout(Duration::from_secs(2), stream.next())
112+
.await
113+
.expect("event within timeout")
114+
.expect("event");
115+
assert!(ev.get("n").is_some(), "event carries a Reading: {ev}");
178116
}
179117

180118
// Fire-and-forget write, then a follow-up RPC. FIFO over the single
181119
// connection guarantees the write is processed before the reply returns.
182-
conn.write_record("temp", json!("on")).expect("write");
183-
let _ = conn.get_record("temp").await.expect("get after write");
184-
let got = writes.lock().unwrap().clone();
185-
assert_eq!(
186-
got.len(),
187-
1,
188-
"server should have received exactly one write"
189-
);
190-
assert_eq!(got[0].0, "temp");
191-
assert_eq!(
192-
serde_json::from_slice::<serde_json::Value>(&got[0].1).unwrap(),
193-
json!({ "value": "on" })
194-
);
120+
conn.write_record("setting", json!({ "level": 9 }))
121+
.expect("write");
122+
let after = conn.get_record("setting").await.expect("get after write");
123+
assert_eq!(after, json!({ "level": 9 }));
195124

196125
drop(conn); // stops the client engine
197126
server.abort();

0 commit comments

Comments
 (0)