Skip to content

Commit 9e44017

Browse files
lxsaahCopilot
andcommitted
feat(remote-access-demo): enhance profiling integration and update README
Co-authored-by: Copilot <copilot@github.com>
1 parent c691c73 commit 9e44017

7 files changed

Lines changed: 178 additions & 97 deletions

File tree

.vscode/mcp.json

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,18 @@
33
"aimdb-weather": {
44
"type": "http",
55
"url": "http://aimdb.dev/mcp"
6+
},
7+
"aimdb-local": {
8+
"type": "stdio",
9+
"command": "cargo",
10+
"args": [
11+
"run",
12+
"--manifest-path",
13+
"/aimdb_ws/aimdb/tools/aimdb-mcp/Cargo.toml",
14+
"--",
15+
"--socket",
16+
"/tmp/aimdb-demo.sock"
17+
]
618
}
719
}
820
}

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
3030
### Added
3131

3232
- **Automatic stage profiling (Issue #58, RFC 014)**: New `profiling` feature on `aimdb-core` automatically measures wall-clock time per `.source()` / `.tap()` / `.link()` stage and exposes `call_count` / `total` / `avg` / `min` / `max` counters per stage. Name stages with `RecordRegistrar::with_name("...")`. Works on `no_std + alloc` via the runtime clock (`aimdb_executor::TimeOps`) and `portable-atomic` for 64-bit atomics on `thumbv7em`. New MCP tools `get_stage_profiling` (with bottleneck detection + recommendation) and `reset_stage_profiling`. Feature is off by default and zero-cost when disabled. ([aimdb-core](aimdb-core/CHANGELOG.md), [aimdb-executor](aimdb-executor/CHANGELOG.md), [aimdb-tokio-adapter](aimdb-tokio-adapter/CHANGELOG.md), [aimdb-embassy-adapter](aimdb-embassy-adapter/CHANGELOG.md), [aimdb-wasm-adapter](aimdb-wasm-adapter/CHANGELOG.md), [aimdb-client](aimdb-client/CHANGELOG.md), [tools/aimdb-mcp](tools/aimdb-mcp/CHANGELOG.md))
33+
- **`remote-access-demo` exercises stage profiling**: The `Temperature` and `SystemStatus` records now run from in-AimDB `.source()` + `.tap()` tasks (with `.with_name(...)` stage labels) and the demo enables the `profiling` feature. `SystemStatus.slow_status_processor` deliberately sleeps 100 ms per value so `get_stage_profiling` flags it as the bottleneck. README documents how to query and reset profiling via MCP / `socat`.
3334
- **`hello-mailbox` example (Issue #94)**: New minimal example demonstrating the Mailbox buffer (latest-wins) semantics using the synchronous API. First community contribution from [@ggmaldo](https://github.com/ggmaldo) — see [examples/hello-mailbox/](examples/hello-mailbox/).
3435
- **Writer-exclusivity validation (Issue #89)**: Combining `.source()`, `.transform()`, and `.link_from()` on the same record now panics at configuration time with a clear message instead of silently producing a last-writer-wins race on the buffer. Multiple `.link_from()` inbound connectors (fan-in) remain allowed. ([aimdb-core](aimdb-core/CHANGELOG.md))
3536
- **`no_std` Transform API (Design 027)**: `.transform()` and `.transform_join()` are now available on `no_std + alloc` targets — no longer Tokio-only. Multi-input join fan-in moved out of `aimdb-core` into the new `JoinFanInRuntime` traits in `aimdb-executor`, with implementations in the Tokio (`mpsc::channel`, capacity 64), Embassy (`embassy_sync::Channel`, capacity 8), and WASM (`futures_channel::mpsc`, capacity 64) adapters. ([aimdb-core](aimdb-core/CHANGELOG.md), [aimdb-executor](aimdb-executor/CHANGELOG.md), [aimdb-tokio-adapter](aimdb-tokio-adapter/CHANGELOG.md), [aimdb-embassy-adapter](aimdb-embassy-adapter/CHANGELOG.md), [aimdb-wasm-adapter](aimdb-wasm-adapter/CHANGELOG.md))

Cargo.lock

Lines changed: 3 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

_external/embassy

Submodule embassy updated 136 files

examples/remote-access-demo/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,11 @@ name = "client"
1313
path = "src/client.rs"
1414

1515
[dependencies]
16-
aimdb-core = { path = "../../aimdb-core", features = ["std", "tracing"] }
16+
aimdb-core = { path = "../../aimdb-core", features = ["std", "tracing", "profiling"] }
1717
aimdb-tokio-adapter = { path = "../../aimdb-tokio-adapter", features = [
1818
"tokio-runtime",
1919
"tracing",
20+
"profiling",
2021
] }
2122
tokio = { version = "1.48", features = ["full"] }
2223
tracing = "0.1"

examples/remote-access-demo/README.md

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ This example demonstrates the AimX v1 remote access protocol with `record.list`
55
## What It Does
66

77
**Server** (`server.rs`):
8-
- Creates an AimDB instance with 4 record types (Temperature, SystemStatus, UserEvent, Config)
8+
- Creates an AimDB instance with 5 record types (Temperature, SystemStatus, UserEvent, Config, AppSettings)
99
- Enables remote access on Unix domain socket `/tmp/aimdb-demo.sock`
10-
- Uses ReadOnly security policy
11-
- Populates some initial data
10+
- Uses ReadWrite security policy (`server::AppSettings` is the only writable key)
11+
- Drives `Temperature` and `SystemStatus` from in-AimDB `.source()` tasks with named `.tap()` consumers, so the `profiling` feature can time every stage automatically (see [Stage Profiling](#stage-profiling) below)
1212

1313
**Client** (`client.rs`):
1414
- Connects to the server via Unix domain socket
@@ -87,6 +87,35 @@ echo '{"version":"1.0","client":"test"}' | socat - UNIX-CONNECT:/tmp/aimdb-demo.
8787
- Write permissions
8888
- Timestamps
8989

90+
## Stage Profiling
91+
92+
The server is built with the `profiling` feature (see [Cargo.toml](Cargo.toml)) and
93+
registers both `Temperature` and `SystemStatus` via `.source()` + `.tap()` so AimDB
94+
owns the producer/consumer tasks and can time them automatically. Each stage is
95+
named via `.with_name("...")`:
96+
97+
| Record | Source stage | Tap stage |
98+
|----------------|----------------------|----------------------------------------------|
99+
| Temperature | `temp_simulator` | `temp_logger` (fast) |
100+
| SystemStatus | `status_simulator` | `slow_status_processor` (sleeps 100 ms each) |
101+
102+
Once the server has been running for a few seconds, query stage profiling via the
103+
`aimdb-mcp` server using the `get_stage_profiling` tool with `record_key="SystemStatus"`.
104+
The result is a list of `{call_count, avg_time_ns, min_time_ns, max_time_ns, name, ...}`
105+
entries plus a `bottleneck` field — for SystemStatus the bottleneck will point at
106+
`slow_status_processor` with an average around 100 ms. Reset the counters between
107+
windows with the `reset_stage_profiling` tool.
108+
109+
You can also call the raw RPC method directly:
110+
111+
```bash
112+
# reset the counters (requires write permission, ReadWrite policy already enabled)
113+
echo '{"id":1,"method":"profiling.reset"}' | socat - UNIX-CONNECT:/tmp/aimdb-demo.sock
114+
```
115+
116+
The per-stage snapshot is also embedded in each record's metadata via the
117+
`stage_profiling` field of `record.list`.
118+
90119
## Next Steps
91120

92121
Future enhancements will add:

examples/remote-access-demo/src/server.rs

Lines changed: 127 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
//! ```
1414
1515
use aimdb_core::remote::{AimxConfig, SecurityPolicy};
16-
use aimdb_core::{buffer::BufferCfg, AimDbBuilder};
16+
use aimdb_core::{buffer::BufferCfg, AimDbBuilder, Consumer, Producer, RuntimeContext};
1717
use aimdb_tokio_adapter::{TokioAdapter, TokioRecordRegistrarExt};
1818
use serde::{Deserialize, Serialize};
1919
use std::sync::Arc;
@@ -96,15 +96,31 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
9696
.with_remote_access(remote_config);
9797

9898
// Configure records
99-
// Use SpmcRing for Temperature and SystemStatus to support record.drain history
99+
// Use SpmcRing for Temperature and SystemStatus to support record.drain history.
100+
//
101+
// Temperature and SystemStatus use `.source()` + `.tap()` so AimDB owns the
102+
// producer/consumer task lifecycle — this also makes them eligible for
103+
// automatic stage profiling (feature `profiling`, query via the MCP
104+
// `get_stage_profiling` tool). `.with_name("...")` gives each stage a
105+
// human-readable label that shows up in the profiling output.
100106
builder.configure::<Temperature>("server::Temperature", |reg| {
101107
reg.buffer(BufferCfg::SpmcRing { capacity: 100 })
102-
.with_remote_access();
108+
.with_remote_access()
109+
.source(temperature_simulator)
110+
.with_name("temp_simulator")
111+
.tap(temperature_logger)
112+
.with_name("temp_logger");
103113
});
104114

105115
builder.configure::<SystemStatus>("server::SystemStatus", |reg| {
106116
reg.buffer(BufferCfg::SpmcRing { capacity: 50 })
107-
.with_remote_access();
117+
.with_remote_access()
118+
.source(system_status_simulator)
119+
.with_name("status_simulator")
120+
// Deliberately slow consumer — surfaces as the bottleneck in
121+
// `get_stage_profiling` for the SystemStatus record.
122+
.tap(slow_status_processor)
123+
.with_name("slow_status_processor");
108124
});
109125

110126
builder.configure::<UserEvent>("server::UserEvent", |reg| {
@@ -131,31 +147,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
131147

132148
info!("📝 Populating initial record data...");
133149

134-
// Produce some initial data
135-
let temp_producer = db.producer::<Temperature>("server::Temperature");
136-
let initial_duration = std::time::SystemTime::now()
137-
.duration_since(std::time::UNIX_EPOCH)
138-
.unwrap();
139-
let initial_timestamp = initial_duration.as_secs() as f64
140-
+ initial_duration.subsec_nanos() as f64 / 1_000_000_000.0;
141-
142-
temp_producer
143-
.produce(Temperature {
144-
sensor_id: "sensor-01".to_string(),
145-
celsius: 22.5,
146-
timestamp: initial_timestamp,
147-
})
148-
.await?;
149-
150-
let status_producer = db.producer::<SystemStatus>("server::SystemStatus");
151-
status_producer
152-
.produce(SystemStatus {
153-
uptime_seconds: 3600,
154-
cpu_usage: 15.3,
155-
memory_usage: 42.7,
156-
})
157-
.await?;
158-
159150
let config_producer = db.producer::<Config>("server::Config");
160151
config_producer
161152
.produce(Config {
@@ -177,68 +168,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
177168
)?;
178169

179170
info!("✅ Initial data populated");
180-
181-
// Spawn background task to continuously update Temperature
182-
info!("🌡️ Starting live temperature simulator...");
183-
let temp_producer_clone = temp_producer.clone();
184-
tokio::spawn(async move {
185-
let mut counter = 0u64;
186-
loop {
187-
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
188-
189-
counter += 1;
190-
let temp = 20.0 + (counter as f64 * 0.5) + (counter as f64 % 10.0);
191-
192-
let duration = std::time::SystemTime::now()
193-
.duration_since(std::time::UNIX_EPOCH)
194-
.unwrap();
195-
let timestamp =
196-
duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0;
197-
198-
let reading = Temperature {
199-
sensor_id: format!("sensor-{:02}", (counter % 3) + 1),
200-
celsius: temp,
201-
timestamp,
202-
};
203-
204-
if let Err(e) = temp_producer_clone.produce(reading.clone()).await {
205-
tracing::error!("Failed to produce temperature: {}", e);
206-
} else {
207-
tracing::debug!(
208-
"📊 Produced temperature: {} °C from {}",
209-
reading.celsius,
210-
reading.sensor_id
211-
);
212-
}
213-
}
214-
});
215-
216-
// Spawn background task to update SystemStatus
217-
info!("💻 Starting system status simulator...");
218-
let status_producer_clone = status_producer.clone();
219-
tokio::spawn(async move {
220-
let mut uptime = 3600u64;
221-
loop {
222-
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
223-
224-
uptime += 5;
225-
let status = SystemStatus {
226-
uptime_seconds: uptime,
227-
cpu_usage: 10.0 + (uptime as f64 % 30.0),
228-
memory_usage: 40.0 + ((uptime as f64 / 10.0) % 20.0),
229-
};
230-
231-
if let Err(e) = status_producer_clone.produce(status.clone()).await {
232-
tracing::error!("Failed to produce system status: {}", e);
233-
} else {
234-
tracing::debug!(
235-
"📊 Produced system status: CPU {:.1}%, MEM {:.1}%",
236-
status.cpu_usage,
237-
status.memory_usage
238-
);
239-
}
240-
}
241-
});
171+
info!("🌡️ Temperature simulator running via .source() (every 2s)");
172+
info!("💻 SystemStatus simulator running via .source() (every 5s)");
242173

243174
info!("");
244175
info!("🎯 Server ready! Connect with:");
@@ -254,6 +185,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
254185
info!(" Temperature: every 2 seconds");
255186
info!(" SystemStatus: every 5 seconds");
256187
info!("");
188+
info!("📈 Stage profiling is enabled. Query it via the aimdb-mcp tools:");
189+
info!(" get_stage_profiling record_key=\"Temperature\" → per-stage timing");
190+
info!(" reset_stage_profiling → clear counters");
191+
info!("");
257192
info!("Press Ctrl+C to stop the server");
258193

259194
// Keep server running
@@ -263,3 +198,104 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
263198

264199
Ok(())
265200
}
201+
202+
// ============================================================================
203+
// SOURCES & TAPS — owned by AimDB so the stage-profiling feature can time them
204+
// ============================================================================
205+
206+
/// Periodically produces Temperature readings.
207+
///
208+
/// Because this runs inside a `.source()` callback, every `produce()` call is
209+
/// timed by the `profiling` feature — see `get_stage_profiling`.
210+
async fn temperature_simulator(
211+
ctx: RuntimeContext<TokioAdapter>,
212+
temperature: Producer<Temperature, TokioAdapter>,
213+
) {
214+
let time = ctx.time();
215+
let mut counter = 0u64;
216+
loop {
217+
let celsius = 20.0 + (counter as f64 * 0.5) + (counter as f64 % 10.0);
218+
let duration = std::time::SystemTime::now()
219+
.duration_since(std::time::UNIX_EPOCH)
220+
.unwrap();
221+
let timestamp =
222+
duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0;
223+
224+
let reading = Temperature {
225+
sensor_id: format!("sensor-{:02}", (counter % 3) + 1),
226+
celsius,
227+
timestamp,
228+
};
229+
230+
if let Err(e) = temperature.produce(reading).await {
231+
tracing::error!("Failed to produce temperature: {}", e);
232+
}
233+
234+
counter += 1;
235+
time.sleep(time.secs(2)).await;
236+
}
237+
}
238+
239+
/// Fast tap on Temperature — just logs. Stage profiling will show it as
240+
/// substantially faster than `slow_status_processor` on SystemStatus.
241+
async fn temperature_logger(
242+
_ctx: RuntimeContext<TokioAdapter>,
243+
consumer: Consumer<Temperature, TokioAdapter>,
244+
) {
245+
let Ok(mut reader) = consumer.subscribe() else {
246+
tracing::error!("Failed to subscribe to Temperature");
247+
return;
248+
};
249+
while let Ok(reading) = reader.recv().await {
250+
tracing::debug!(
251+
"🌡️ Logged temperature: {:.1} °C from {}",
252+
reading.celsius,
253+
reading.sensor_id
254+
);
255+
}
256+
}
257+
258+
/// Periodically produces SystemStatus readings.
259+
async fn system_status_simulator(
260+
ctx: RuntimeContext<TokioAdapter>,
261+
status: Producer<SystemStatus, TokioAdapter>,
262+
) {
263+
let time = ctx.time();
264+
let mut uptime = 3600u64;
265+
loop {
266+
let snapshot = SystemStatus {
267+
uptime_seconds: uptime,
268+
cpu_usage: 10.0 + (uptime as f64 % 30.0),
269+
memory_usage: 40.0 + ((uptime as f64 / 10.0) % 20.0),
270+
};
271+
272+
if let Err(e) = status.produce(snapshot).await {
273+
tracing::error!("Failed to produce system status: {}", e);
274+
}
275+
276+
uptime += 5;
277+
time.sleep(time.secs(5)).await;
278+
}
279+
}
280+
281+
/// Intentionally slow tap on SystemStatus — sleeps 100ms per value to simulate
282+
/// expensive per-value processing. With profiling enabled, this stage shows up
283+
/// as the per-record bottleneck in `get_stage_profiling`.
284+
async fn slow_status_processor(
285+
ctx: RuntimeContext<TokioAdapter>,
286+
consumer: Consumer<SystemStatus, TokioAdapter>,
287+
) {
288+
let time = ctx.time();
289+
let Ok(mut reader) = consumer.subscribe() else {
290+
tracing::error!("Failed to subscribe to SystemStatus");
291+
return;
292+
};
293+
while let Ok(snapshot) = reader.recv().await {
294+
time.sleep(time.millis(100)).await;
295+
tracing::debug!(
296+
"💻 Processed status: CPU {:.1}%, MEM {:.1}%",
297+
snapshot.cpu_usage,
298+
snapshot.memory_usage
299+
);
300+
}
301+
}

0 commit comments

Comments
 (0)