Skip to content

Commit 654cf23

Browse files
lxsaahclaude
andcommitted
feat(session): subscribe-ack + pump_client record mirroring (AimxClientConnector)
Phase 3 server port (issue #39), client refinements: - Subscribe-ack (run_client): a server subscribe-failure Reply carries an id the client never registered as a pending call. Recognize it and drop the matching event sink so a rejected subscribe ends the stream (None) instead of hanging forever. Contract: success is acknowledged implicitly by events flowing; the server replies only on failure. Covered by a new session_engine test. - pump_client(db, scheme, handle): mirrors records both directions over a running run_client engine — outbound routes stream local updates via ClientHandle::write; inbound routes subscribe and produce into local records through the Router (arbiter path; single-writer-per-key intact). Uses the type-erased consumer/serializer + producer/deserializer machinery (db.runtime_any() supplies the ctx for context-aware (de)serializers). - AimxClientConnector: a ConnectorBuilder for the `aimx://` scheme so records can .link_to/.link_from an AimX peer; on build it dials via run_client and returns the pump futures + engine future for the runner (spawn-free). The registerable wrapper around pump_client, mirroring build_aimx_server on the server side. New aimdb-client test mirrors a record client->server and server->client through the real connector path. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 4d8d07e commit 654cf23

6 files changed

Lines changed: 347 additions & 1 deletion

File tree

aimdb-client/tests/pump_client.rs

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
//! Phase 3 server-port exit criterion (§4): `pump_client` mirrors a record
2+
//! **both directions** between a local AimDb and a remote AimDb over the shared
3+
//! session engine.
4+
//!
5+
//! Topology: a server `AimDb` (served by `build_aimx_server`) and a client
6+
//! `AimDb` whose records carry `aimx://` connector links. `run_client` opens the
7+
//! connection; `pump_client` wires the client's outbound/inbound routes to the
8+
//! `ClientHandle`:
9+
//! - **client → server**: producing the client's `cfg` record streams it to the
10+
//! server via `ClientHandle::write` → the server's `record.set` path.
11+
//! - **server → client**: updating the server's `tele` record streams it back
12+
//! through a subscription → the client's inbound producer (arbiter path).
13+
14+
use std::sync::Arc;
15+
use std::time::Duration;
16+
17+
use aimdb_core::buffer::BufferCfg;
18+
use aimdb_core::remote::{AimxConfig, SecurityPolicy};
19+
use aimdb_core::session::aimx::{build_aimx_server, AimxClientConnector};
20+
use aimdb_core::session::ClientConfig;
21+
use aimdb_core::{AimDb, AimDbBuilder};
22+
use aimdb_tokio_adapter::{TokioAdapter, TokioRecordRegistrarExt};
23+
use serde::{Deserialize, Serialize};
24+
use serde_json::json;
25+
26+
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
27+
struct Msg {
28+
v: u64,
29+
}
30+
31+
/// Re-assert `db.<key>` reaches `want`, re-driving `push` each tick so the test
32+
/// is robust against subscription-registration timing (a fresh subscriber may
33+
/// only see values produced after it attaches).
34+
async fn mirror_reaches(
35+
db: &Arc<AimDb<TokioAdapter>>,
36+
key: &str,
37+
want: &serde_json::Value,
38+
mut push: impl FnMut(),
39+
) -> bool {
40+
for _ in 0..100 {
41+
push();
42+
tokio::time::sleep(Duration::from_millis(20)).await;
43+
if db.try_latest_as_json(key).as_ref() == Some(want) {
44+
return true;
45+
}
46+
}
47+
false
48+
}
49+
50+
#[tokio::test]
51+
async fn pump_client_mirrors_record_both_directions() {
52+
let dir = tempfile::tempdir().unwrap();
53+
let sock = dir.path().join("aimdb.sock");
54+
55+
// --- server: cfg (writable target) + tele (streamed source) ------------
56+
let mut sb = AimDbBuilder::new().runtime(Arc::new(TokioAdapter));
57+
sb.configure::<Msg>("cfg", |reg| {
58+
reg.buffer(BufferCfg::SingleLatest).with_remote_access();
59+
});
60+
sb.configure::<Msg>("tele", |reg| {
61+
reg.buffer(BufferCfg::SingleLatest).with_remote_access();
62+
});
63+
let (server_db, server_runner) = sb.build().await.expect("build server db");
64+
let server_db = Arc::new(server_db);
65+
tokio::spawn(server_runner.run());
66+
67+
let mut policy = SecurityPolicy::read_write();
68+
policy.allow_write_key("cfg");
69+
let config = AimxConfig::uds_default()
70+
.socket_path(&sock)
71+
.security_policy(policy);
72+
let server = tokio::spawn(build_aimx_server(server_db.clone(), config).expect("bind server"));
73+
74+
// --- client: cfg links *to* the server, tele links *from* it -----------
75+
// The AimxClientConnector registers the `aimx://` scheme (so the links
76+
// validate) and, on build, dials the server + drives the mirroring pumps.
77+
let mut cb = AimDbBuilder::new()
78+
.runtime(Arc::new(TokioAdapter))
79+
.with_connector(AimxClientConnector::new(&sock).with_config(ClientConfig {
80+
reconnect: true,
81+
reconnect_delay: Duration::from_millis(50),
82+
sends_hello: false,
83+
}));
84+
cb.configure::<Msg>("cfg", |reg| {
85+
reg.buffer(BufferCfg::SingleLatest)
86+
.with_remote_access()
87+
.link_to("aimx://cfg")
88+
.with_serializer_raw(|m: &Msg| Ok(serde_json::to_vec(m).expect("serialize")))
89+
.finish();
90+
});
91+
cb.configure::<Msg>("tele", |reg| {
92+
reg.buffer(BufferCfg::SingleLatest)
93+
.with_remote_access()
94+
.link_from("aimx://tele")
95+
.with_deserializer_raw(|d: &[u8]| {
96+
serde_json::from_slice::<Msg>(d).map_err(|e| e.to_string())
97+
})
98+
.finish();
99+
});
100+
// build() collects the connector's engine + pump futures; the runner drives
101+
// them (spawn-free engine, driven on a task here).
102+
let (client_db, client_runner) = cb.build().await.expect("build client db");
103+
let client_db = Arc::new(client_db);
104+
tokio::spawn(client_runner.run());
105+
106+
// client → server: producing client `cfg` mirrors to server `cfg`.
107+
let want_cfg = json!({ "v": 7 });
108+
let mirrored_out = mirror_reaches(&server_db, "cfg", &want_cfg, || {
109+
client_db
110+
.set_record_from_json("cfg", json!({ "v": 7 }))
111+
.expect("set client cfg");
112+
})
113+
.await;
114+
assert!(
115+
mirrored_out,
116+
"client→server mirror did not reach the server"
117+
);
118+
119+
// server → client: updating server `tele` mirrors to client `tele`.
120+
let want_tele = json!({ "v": 9 });
121+
let mirrored_in = mirror_reaches(&client_db, "tele", &want_tele, || {
122+
server_db
123+
.set_record_from_json("tele", json!({ "v": 9 }))
124+
.expect("set server tele");
125+
})
126+
.await;
127+
assert!(mirrored_in, "server→client mirror did not reach the client");
128+
129+
server.abort();
130+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
//! AimX **client** connector (Phase 3 server port, std-only) — registers the
2+
//! `aimx://` scheme so records can `.link_to`/`.link_from` an AimX peer, and on
3+
//! build dials that peer and drives the mirroring pumps.
4+
//!
5+
//! This is the registerable wrapper around [`pump_client`](crate::session::pump_client):
6+
//! `build` opens the connection with [`run_client`](crate::session::run_client)
7+
//! and returns one spawn-free future per route (plus the engine future) for the
8+
//! runner to drive — collapsing the AimX *client* onto the shared session engine
9+
//! the same way [`build_aimx_server`](super::build_aimx_server) does the server.
10+
11+
use std::future::Future;
12+
use std::path::PathBuf;
13+
use std::pin::Pin;
14+
15+
use crate::builder::AimDb;
16+
use crate::connector::ConnectorBuilder;
17+
use crate::session::{pump_client, run_client, ClientConfig};
18+
use crate::{DbResult, RuntimeAdapter};
19+
20+
use super::{AimxCodec, UdsDialer};
21+
22+
type BoxFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
23+
24+
/// A connector that mirrors records to/from an AimX peer over a Unix-domain
25+
/// socket. Register it with [`AimDbBuilder::with_connector`](crate::AimDbBuilder::with_connector)
26+
/// so `aimx://<record>` links validate; its `build` wires every collected route
27+
/// to the connection.
28+
pub struct AimxClientConnector {
29+
socket_path: PathBuf,
30+
config: ClientConfig,
31+
}
32+
33+
impl AimxClientConnector {
34+
/// Mirror records over the AimX peer listening at `socket_path`.
35+
pub fn new(socket_path: impl Into<PathBuf>) -> Self {
36+
Self {
37+
socket_path: socket_path.into(),
38+
config: ClientConfig::default(),
39+
}
40+
}
41+
42+
/// Override the client engine config (reconnect policy, etc.).
43+
pub fn with_config(mut self, config: ClientConfig) -> Self {
44+
self.config = config;
45+
self
46+
}
47+
}
48+
49+
impl<R> ConnectorBuilder<R> for AimxClientConnector
50+
where
51+
R: RuntimeAdapter + 'static,
52+
{
53+
fn build<'a>(
54+
&'a self,
55+
db: &'a AimDb<R>,
56+
) -> Pin<Box<dyn Future<Output = DbResult<Vec<BoxFuture>>> + Send + 'a>> {
57+
Box::pin(async move {
58+
let (handle, engine_fut) = run_client(
59+
UdsDialer::new(self.socket_path.clone()),
60+
AimxCodec,
61+
self.config.clone(),
62+
);
63+
// One pump future per route; they hold `ClientHandle` clones, so the
64+
// engine stays alive as long as any mirror runs. `handle` drops here.
65+
let mut futures = pump_client(db, "aimx", &handle);
66+
futures.push(engine_fut);
67+
Ok(futures)
68+
})
69+
}
70+
71+
fn scheme(&self) -> &str {
72+
"aimx"
73+
}
74+
}

aimdb-core/src/session/aimx/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@
66
//! and the accepting transport ([`UdsListener`]) + server [`AimxDispatch`] that
77
//! [`build_aimx_server`] drives via `serve`.
88
9+
mod client_connector;
910
mod codec;
1011
mod dispatch;
1112
mod transport;
1213

14+
pub use client_connector::AimxClientConnector;
1315
pub use codec::AimxCodec;
1416
pub use dispatch::{build_aimx_server, AimxDispatch};
1517
pub use transport::{UdsConnection, UdsDialer, UdsListener};

aimdb-core/src/session/client.rs

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,17 @@
1616
//! it never spawns.
1717
1818
use std::collections::HashMap;
19+
use std::sync::Arc;
1920
use std::time::Duration;
2021

2122
use tokio::sync::{mpsc, oneshot};
2223

2324
use super::{
2425
BoxFut, BoxStream, Connection, Dialer, EnvelopeCodec, Inbound, Outbound, Payload, RpcError,
2526
};
27+
use crate::connector::SerializerKind;
28+
use crate::router::RouterBuilder;
29+
use crate::{AimDb, RuntimeAdapter};
2630

2731
/// Client engine knobs.
2832
#[derive(Debug, Clone)]
@@ -239,6 +243,15 @@ where
239243
Ok(Outbound::Reply { id, result }) => {
240244
if let Some(tx) = pending.remove(&id) {
241245
let _ = tx.send(result);
246+
} else if result.is_err() {
247+
// Subscribe-ack contract: a successful subscribe is
248+
// acknowledged implicitly by its events flowing; the
249+
// server replies only on *failure* (unknown record,
250+
// sub cap). Such a Reply carries the subscribe `id`,
251+
// which was never registered as a pending call — so
252+
// drop the matching event sink to end the stream
253+
// (`None`) instead of leaving it hanging forever.
254+
subs.remove(&id.to_string());
242255
}
243256
}
244257
Ok(Outbound::Event { sub, seq: _, data }) => {
@@ -311,3 +324,87 @@ where
311324
}
312325
}
313326
}
327+
328+
/// Mirror records between a local [`AimDb`] and a remote peer over a running
329+
/// [`run_client`] engine — the connector-link half of the client capability.
330+
///
331+
/// For the given connector `scheme` (e.g. `"aimx"`):
332+
/// - **outbound** routes (`db.collect_outbound_routes`) stream local record
333+
/// updates to the remote via [`ClientHandle::write`];
334+
/// - **inbound** routes (`db.collect_inbound_routes`) subscribe to the remote and
335+
/// produce each update into the local record through the producer/arbiter path
336+
/// — single-writer-per-key stays intact (a mirrored-in record is produced
337+
/// through its inbound producer, never a direct co-writer).
338+
///
339+
/// Returns one spawn-free pump future per route for the runner to drive
340+
/// (mirroring the `ConnectorBuilder::build -> Vec<BoxFuture>` spine); it drives
341+
/// the **same** engine as [`run_client`], never a second one.
342+
pub fn pump_client<R>(
343+
db: &AimDb<R>,
344+
scheme: &str,
345+
handle: &ClientHandle,
346+
) -> Vec<BoxFut<'static, ()>>
347+
where
348+
R: RuntimeAdapter + 'static,
349+
{
350+
use futures_util::StreamExt;
351+
352+
// The type-erased runtime context for context-aware (de)serializers.
353+
let ctx = db.runtime_any();
354+
let mut pumps: Vec<BoxFut<'static, ()>> = Vec::new();
355+
356+
// --- outbound: local record updates -> remote `write` ------------------
357+
for (destination, consumer, serializer, _config, topic_provider) in
358+
db.collect_outbound_routes(scheme)
359+
{
360+
let handle = handle.clone();
361+
let ctx = ctx.clone();
362+
pumps.push(Box::pin(async move {
363+
let mut reader = match consumer.subscribe_any().await {
364+
Ok(r) => r,
365+
Err(_e) => return,
366+
};
367+
while let Ok(value) = reader.recv_any().await {
368+
// Dynamic destination (topic provider) or the static link target.
369+
let dest = topic_provider
370+
.as_ref()
371+
.and_then(|p| p.topic_any(&*value))
372+
.unwrap_or_else(|| destination.clone());
373+
let bytes = match &serializer {
374+
SerializerKind::Raw(ser) => match ser(&*value) {
375+
Ok(b) => b,
376+
Err(_e) => continue,
377+
},
378+
SerializerKind::Context(ser) => match ser(ctx.clone(), &*value) {
379+
Ok(b) => b,
380+
Err(_e) => continue,
381+
},
382+
};
383+
if handle.write(dest, Payload::from(bytes.as_slice())).is_err() {
384+
break; // engine stopped — all handles dropped
385+
}
386+
}
387+
}));
388+
}
389+
390+
// --- inbound: remote events -> local producer (via the Router) ---------
391+
// The Router applies each route's deserializer and produces the value; one
392+
// subscription per unique remote topic feeds it.
393+
let router = Arc::new(RouterBuilder::from_routes(db.collect_inbound_routes(scheme)).build());
394+
for id in router.resource_ids() {
395+
let handle = handle.clone();
396+
let router = router.clone();
397+
let ctx = ctx.clone();
398+
pumps.push(Box::pin(async move {
399+
let mut stream = match handle.subscribe(id.as_ref()) {
400+
Ok(s) => s,
401+
Err(_e) => return,
402+
};
403+
while let Some(payload) = stream.next().await {
404+
let _ = router.route(id.as_ref(), &payload, Some(&ctx)).await;
405+
}
406+
}));
407+
}
408+
409+
pumps
410+
}

aimdb-core/src/session/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ mod server;
4646
pub mod aimx;
4747

4848
#[cfg(feature = "std")]
49-
pub use client::{run_client, ClientConfig, ClientHandle};
49+
pub use client::{pump_client, run_client, ClientConfig, ClientHandle};
5050
#[cfg(feature = "std")]
5151
pub use server::{run_session, serve, SessionConfig};
5252

0 commit comments

Comments
 (0)