Skip to content

Commit b9bb2e6

Browse files
lxsaahCopilot
andauthored
71 record history api (#72)
* add design document * feat: add record draining functionality to AimDB client and server - Implemented `drain_record` and `drain_record_with_limit` methods in AimxClient for draining values from records. - Added `DrainResponse` struct to encapsulate the response from drain calls. - Updated connection handler to manage drain readers for records. - Enhanced buffer traits with non-blocking `try_recv` method for various buffer types. - Modified existing record configuration methods to use `.with_remote_access()` instead of `.with_serialization()`. - Introduced `drain_record` tool in MCP server for remote access to drained values. - Updated documentation and examples to reflect new draining capabilities. * feat: add drain integration tests for AimX protocol and update dependencies * style: format assertions for clarity in buffer and drain integration tests * chore: update subproject commit for embassy dependency * fix: update stm32-metapac source URL and adjust button handler type for async mode * Refactor MCP server and remove subscription management - Removed the SubscriptionManager and related subscription handling from the MCP server. - Updated the MCP server initialization to eliminate notification directory and subscription manager dependencies. - Cleaned up the Notification struct by removing unused notification creation methods. - Removed subscription-related tools and tests, including subscribe_record, unsubscribe_record, and list_subscriptions. - Adjusted server capabilities to disable resource subscriptions. - Updated relevant documentation and comments to reflect the removal of subscription features. * feat: implement record drain functionality and enhance connection management * implement transform api * chore: update documentation for database builder and transform modules * feat(graph): introduce dependency graph module and introspection tools - Added a new `graph` module to the `aimdb-core` crate, implementing the `DependencyGraph` structure and related types for managing and introspecting the database topology. - Implemented methods for building and validating the dependency graph, including cycle detection and topological sorting. - Created new tools in the `aimdb-mcp` crate for graph introspection: `graph_nodes`, `graph_edges`, and `graph_topo_order`, allowing users to retrieve metadata about records and their relationships. - Extended the AimX protocol with new methods for graph introspection, including `graph.describe`, `graph.lineage`, and `graph.dot`, providing remote clients with visibility into data flow and dependencies. - Updated documentation to reflect the new graph features and their usage in the AimDB ecosystem. * feat(graph): add commands for dependency graph introspection and output formatting * feat: add record drain and graph introspection APIs, enhance changelog documentation * chore: update subproject commit for embassy * chore: update stm32-metapac source URL to latest tag * Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * feat: enhance error handling in buffer reader and improve limit parameter conversion --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 5f835e0 commit b9bb2e6

53 files changed

Lines changed: 8006 additions & 2155 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.vscode/mcp.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"servers": {
33
"aimdb": {
44
"type": "stdio",
5-
"command": "/aimdb/target/debug/aimdb-mcp",
5+
"command": "/aimdb_ws/aimdb/target/release/aimdb-mcp",
66
"args": [],
77
"env": {
88
"RUST_LOG": "info"

Cargo.lock

Lines changed: 4 additions & 51 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

_external/embassy

Submodule embassy updated 152 files

aimdb-client/CHANGELOG.md

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10-
No changes yet.
10+
### Added
11+
12+
- **Record Drain API**: New methods for batch history access
13+
- `drain_record(name)`: Drain all pending values since last drain call
14+
- `drain_record_with_limit(name, limit)`: Drain with maximum count limit
15+
- `DrainResponse` struct with `record_name`, `values` (JSON array), and `count`
16+
- Cold-start semantics: first drain creates reader and returns empty
17+
- **Graph Introspection API**: New methods for dependency graph exploration
18+
- `graph_nodes()`: Get all nodes with origin, buffer type, tap count, outbound status
19+
- `graph_edges()`: Get all directed edges showing data flow
20+
- `graph_topo_order()`: Get record keys in topological (spawn) order
21+
22+
### Changed
23+
24+
- **Re-export**: `DrainResponse` now re-exported from crate root for convenience
1125

1226
## [0.4.0] - 2025-12-25
1327

aimdb-client/src/connection.rs

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use crate::protocol::{
77
cli_hello, parse_message, serialize_message, Event, EventMessage, RecordMetadata, Request,
88
RequestExt, Response, ResponseExt, WelcomeMessage,
99
};
10+
use serde::{Deserialize, Serialize};
1011
use serde_json::json;
1112
use std::path::{Path, PathBuf};
1213
use std::time::Duration;
@@ -174,6 +175,71 @@ impl AimxClient {
174175
Ok(event_msg.event)
175176
}
176177

178+
/// Drain all pending values from a record's drain reader.
179+
///
180+
/// Returns all values accumulated since the last drain call,
181+
/// in chronological order. This is a destructive read — drained
182+
/// values will not be returned again.
183+
///
184+
/// The first call for a given record creates the drain reader and
185+
/// returns empty (cold start). Subsequent calls return accumulated values.
186+
pub async fn drain_record(&mut self, name: &str) -> ClientResult<DrainResponse> {
187+
let params = json!({ "name": name });
188+
let result = self.send_request("record.drain", Some(params)).await?;
189+
let response: DrainResponse = serde_json::from_value(result)?;
190+
Ok(response)
191+
}
192+
193+
/// Drain with a limit on the number of values returned.
194+
pub async fn drain_record_with_limit(
195+
&mut self,
196+
name: &str,
197+
limit: u32,
198+
) -> ClientResult<DrainResponse> {
199+
let params = json!({
200+
"name": name,
201+
"limit": limit,
202+
});
203+
let result = self.send_request("record.drain", Some(params)).await?;
204+
let response: DrainResponse = serde_json::from_value(result)?;
205+
Ok(response)
206+
}
207+
208+
// ========================================================================
209+
// Graph Introspection Methods
210+
// ========================================================================
211+
212+
/// Get all nodes in the dependency graph.
213+
///
214+
/// Returns a list of GraphNode objects representing all records
215+
/// and their connections in the database.
216+
pub async fn graph_nodes(&mut self) -> ClientResult<Vec<serde_json::Value>> {
217+
let result = self.send_request("graph.nodes", None).await?;
218+
let nodes: Vec<serde_json::Value> = serde_json::from_value(result)?;
219+
Ok(nodes)
220+
}
221+
222+
/// Get all edges in the dependency graph.
223+
///
224+
/// Returns a list of GraphEdge objects representing data flow
225+
/// connections between records.
226+
pub async fn graph_edges(&mut self) -> ClientResult<Vec<serde_json::Value>> {
227+
let result = self.send_request("graph.edges", None).await?;
228+
let edges: Vec<serde_json::Value> = serde_json::from_value(result)?;
229+
Ok(edges)
230+
}
231+
232+
/// Get the topological ordering of records.
233+
///
234+
/// Returns the record keys in topological order, ensuring all
235+
/// dependencies are listed before dependents. Useful for understanding
236+
/// data flow and initialization order.
237+
pub async fn graph_topo_order(&mut self) -> ClientResult<Vec<String>> {
238+
let result = self.send_request("graph.topo_order", None).await?;
239+
let order: Vec<String> = serde_json::from_value(result)?;
240+
Ok(order)
241+
}
242+
177243
/// Write a message to the stream
178244
async fn write_message<T: serde::Serialize>(&mut self, msg: &T) -> ClientResult<()> {
179245
let data = serialize_message(msg)?;
@@ -197,3 +263,14 @@ impl AimxClient {
197263
parse_message(&line).map_err(|e| e.into())
198264
}
199265
}
266+
267+
/// Response from a record.drain call
268+
#[derive(Debug, Clone, Serialize, Deserialize)]
269+
pub struct DrainResponse {
270+
/// Echo of the queried record name
271+
pub record_name: String,
272+
/// Chronologically ordered values (raw JSON, as written by the producer)
273+
pub values: Vec<serde_json::Value>,
274+
/// Number of values returned
275+
pub count: usize,
276+
}

aimdb-client/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ pub mod error;
3939
pub mod protocol;
4040

4141
// Re-export main types for convenience
42-
pub use connection::AimxClient;
42+
pub use connection::{AimxClient, DrainResponse};
4343
pub use discovery::{discover_instances, find_instance, InstanceInfo};
4444
pub use error::{ClientError, ClientResult};
4545
pub use protocol::{

aimdb-core/CHANGELOG.md

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,39 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99

1010
### Added
1111

12+
- **Transform API (Design 020)**: Reactive data transformations between records
13+
- **Single-Input Transforms**: `transform_raw()` method on `RecordRegistrar` for creating reactive derivations
14+
- **Multi-Input Joins**: `transform_join_raw()` for combining multiple input records with stateful handlers
15+
- **`TransformBuilder`**: Fluent API with `.with_state()` and `.map()` for transform configuration
16+
- **`JoinBuilder`**: Multi-input builder with `.input::<T>(key)` and `.with_state().on_trigger()` pattern
17+
- **`TransformDescriptor`**: Type-erased descriptor for storing transform configuration
18+
- **`JoinTrigger`**: Event type for multi-input join handlers with index and type-erased value
19+
- Transforms are spawned as tasks during `AimDb::build()` and subscribe to input buffers
20+
- Mutual exclusion enforced: a record cannot have both `.source()` and `.transform()`
21+
- Full tracing integration for transform lifecycle events
22+
- **Graph Introspection API (Design 021)**: Dependency graph visualization and introspection
23+
- **`RecordOrigin` enum**: Classifies record data sources (`Source`, `Link`, `Transform`, `TransformJoin`, `Passive`)
24+
- **`GraphNode` struct**: Node metadata including origin, buffer config, tap count, outbound status
25+
- **`GraphEdge` struct**: Directed edge with `from`, `to`, and edge type classification
26+
- **`DependencyGraph` struct**: Full graph with nodes, edges, and topological ordering
27+
- New `AnyRecord` trait methods: `has_transform()`, `record_origin()`, `buffer_info()`, `transform_input_keys()`
28+
- `RecordId::new()` now accepts `RecordOrigin` parameter for accurate metadata
29+
- Graph methods in `AimDbInner`: `build_dependency_graph()`, `graph_nodes()`, `graph_edges()`, `graph_topo_order()`
30+
- **Record Drain API (Design 019)**: Non-blocking batch history access
31+
- **`try_recv()` on BufferReader**: Non-blocking receive returning `Ok(T)`, `Err(BufferEmpty)`, or `Err(BufferLagged)`
32+
- **`try_recv_json()` on JsonBufferReader**: JSON-serialized non-blocking receive for remote access
33+
- **`record.drain` AimX protocol method**: Drain accumulated values since last call with optional limit
34+
- Cold-start semantics: first drain creates reader and returns empty
35+
- Supports `SpmcRing` (full history), `SingleLatest` (at most 1), `Mailbox` (at most 1)
36+
- Handler maintains per-connection drain readers via `ConnectionState`
37+
- **Extension Macro**: `impl_record_registrar_ext!` macro in `ext_macros.rs` for generating runtime adapter extension traits
38+
39+
### Changed
40+
41+
- **Renamed `.with_serialization()` to `.with_remote_access()`**: Clearer naming for JSON serialization configuration
42+
- **`RecordId` constructor**: Now requires `RecordOrigin` parameter for dependency graph support
43+
- **`set_from_json` protection**: Now also rejects writes on records with active transforms (in addition to sources)
44+
1245
- **Dynamic Topic/Destination Routing (Design 018)**: Complete support for dynamic topic resolution
1346
- **Outbound (`TopicProvider` trait)**: Dynamically determine MQTT topics or KNX group addresses based on data being published
1447
- New `TopicProvider<T>` trait for type-safe topic determination

aimdb-core/src/buffer/traits.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,16 @@ pub trait BufferReader<T: Clone + Send>: Send {
103103
/// - **SingleLatest**: Waits for value change, returns most recent
104104
/// - **Mailbox**: Waits for slot value, takes and clears it
105105
fn recv(&mut self) -> Pin<Box<dyn Future<Output = Result<T, DbError>> + Send + '_>>;
106+
107+
/// Non-blocking receive — returns immediately.
108+
///
109+
/// Returns `Err(DbError::BufferEmpty)` if no pending values.
110+
///
111+
/// # Behavior by Buffer Type
112+
/// - **SPMC Ring**: Returns next buffered value, or `BufferEmpty` if caught up
113+
/// - **SingleLatest**: Returns value if changed since last read, or `BufferEmpty`
114+
/// - **Mailbox**: Takes and returns slot value, or `BufferEmpty` if empty
115+
fn try_recv(&mut self) -> Result<T, DbError>;
106116
}
107117

108118
/// Reader trait for consuming JSON-serialized values from a buffer (std only)
@@ -114,7 +124,7 @@ pub trait BufferReader<T: Clone + Send>: Send {
114124
/// at compile time, by serializing values to JSON on each `recv_json()` call.
115125
///
116126
/// # Requirements
117-
/// - Record must be configured with `.with_serialization()`
127+
/// - Record must be configured with `.with_remote_access()`
118128
/// - Only available with `std` feature (requires serde_json)
119129
///
120130
/// # Example
@@ -139,6 +149,11 @@ pub trait JsonBufferReader: Send {
139149
fn recv_json(
140150
&mut self,
141151
) -> Pin<Box<dyn Future<Output = Result<serde_json::Value, DbError>> + Send + '_>>;
152+
153+
/// Non-blocking receive as JSON — returns immediately.
154+
///
155+
/// Returns `Err(DbError::BufferEmpty)` if no pending values.
156+
fn try_recv_json(&mut self) -> Result<serde_json::Value, DbError>;
142157
}
143158

144159
/// Snapshot of buffer metrics at a point in time
@@ -269,6 +284,10 @@ mod tests {
269284
})
270285
})
271286
}
287+
288+
fn try_recv(&mut self) -> Result<T, DbError> {
289+
Err(DbError::BufferEmpty)
290+
}
272291
}
273292

274293
#[test]

0 commit comments

Comments
 (0)