Skip to content

Commit cf969b5

Browse files
lxsaahclaude
andcommitted
feat(client): grow AimxConnection to the full tool surface
Adds the typed wrappers tools/aimdb-cli + tools/aimdb-mcp need before they can migrate off the legacy AimxClient: drain_record(+_with_limit) -> DrainResponse, query, graph_nodes/edges/topo_order, reset_stage_profiling, reset_buffer_metrics (all over the cheap-clone ClientHandle). DrainResponse now lives in engine.rs so it survives connection.rs's removal in the next stage. Additive only — no caller switches yet, so main stays green. Covered by the aimx_session production-server test (graph + drain). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 40ba95c commit cf969b5

1 file changed

Lines changed: 75 additions & 1 deletion

File tree

aimdb-client/src/engine.rs

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
use std::path::Path;
1717

1818
use futures::StreamExt;
19-
use serde::Serialize;
19+
use serde::{Deserialize, Serialize};
2020
use serde_json::json;
2121
use tokio::task::JoinHandle;
2222

@@ -26,6 +26,18 @@ use aimdb_core::session::{run_client, BoxStream, ClientConfig, ClientHandle, Pay
2626
use crate::error::{ClientError, ClientResult};
2727
use crate::protocol::{RecordMetadata, WelcomeMessage};
2828

29+
/// Response from a `record.drain` call: the values accumulated since the
30+
/// previous drain for this connection's per-record cursor.
31+
#[derive(Debug, Clone, Serialize, Deserialize)]
32+
pub struct DrainResponse {
33+
/// Echo of the queried record name.
34+
pub record_name: String,
35+
/// Chronologically ordered values (raw JSON, as written by the producer).
36+
pub values: Vec<serde_json::Value>,
37+
/// Number of values returned.
38+
pub count: usize,
39+
}
40+
2941
/// A live connection to an AimDB instance over the shared session engine.
3042
///
3143
/// Holds the cheap-clone [`ClientHandle`] (use [`handle`](Self::handle) to issue
@@ -132,6 +144,68 @@ impl AimxConnection {
132144
.map_err(rpc_err)
133145
}
134146

147+
/// Drain all values accumulated since the previous drain of `name` (a
148+
/// destructive read against this connection's per-record cursor).
149+
pub async fn drain_record(&self, name: &str) -> ClientResult<DrainResponse> {
150+
let reply = self
151+
.call("record.drain", to_payload(&json!({ "name": name }))?)
152+
.await?;
153+
Ok(serde_json::from_slice(&reply)?)
154+
}
155+
156+
/// Drain at most `limit` values from `name`.
157+
pub async fn drain_record_with_limit(
158+
&self,
159+
name: &str,
160+
limit: u32,
161+
) -> ClientResult<DrainResponse> {
162+
let reply = self
163+
.call(
164+
"record.drain",
165+
to_payload(&json!({ "name": name, "limit": limit }))?,
166+
)
167+
.await?;
168+
Ok(serde_json::from_slice(&reply)?)
169+
}
170+
171+
/// Run a persistence query (requires the server's `with_persistence()`).
172+
pub async fn query(&self, params: serde_json::Value) -> ClientResult<serde_json::Value> {
173+
let reply = self.call("record.query", to_payload(&params)?).await?;
174+
Ok(serde_json::from_slice(&reply)?)
175+
}
176+
177+
/// All nodes in the dependency graph.
178+
pub async fn graph_nodes(&self) -> ClientResult<Vec<serde_json::Value>> {
179+
let reply = self.call("graph.nodes", null_payload()).await?;
180+
Ok(serde_json::from_slice(&reply)?)
181+
}
182+
183+
/// All edges in the dependency graph.
184+
pub async fn graph_edges(&self) -> ClientResult<Vec<serde_json::Value>> {
185+
let reply = self.call("graph.edges", null_payload()).await?;
186+
Ok(serde_json::from_slice(&reply)?)
187+
}
188+
189+
/// Record keys in topological order.
190+
pub async fn graph_topo_order(&self) -> ClientResult<Vec<String>> {
191+
let reply = self.call("graph.topo_order", null_payload()).await?;
192+
Ok(serde_json::from_slice(&reply)?)
193+
}
194+
195+
/// Reset stage-profiling counters (server built with `profiling`; needs write
196+
/// permission).
197+
pub async fn reset_stage_profiling(&self) -> ClientResult<serde_json::Value> {
198+
let reply = self.call("profiling.reset", null_payload()).await?;
199+
Ok(serde_json::from_slice(&reply)?)
200+
}
201+
202+
/// Reset buffer-metrics counters (server built with `metrics`; needs write
203+
/// permission).
204+
pub async fn reset_buffer_metrics(&self) -> ClientResult<serde_json::Value> {
205+
let reply = self.call("buffer_metrics.reset", null_payload()).await?;
206+
Ok(serde_json::from_slice(&reply)?)
207+
}
208+
135209
/// Issue a raw RPC and map a transport/engine failure to [`ClientError`].
136210
async fn call(&self, method: &str, params: Payload) -> ClientResult<Payload> {
137211
self.handle.call(method, params).await.map_err(rpc_err)

0 commit comments

Comments
 (0)