Skip to content

Commit 686d72b

Browse files
lxsaahCopilot
andauthored
Feat/stage profiling phase1 3 (#108)
* feat(profiling): add automatic stage profiling for source, tap, and link stages - Introduced `profiling` feature to enable automatic timing of execution stages. - Added `duration_as_nanos` method to `TimeOps` trait for runtime-agnostic duration representation. - Implemented `RecordProfilingMetrics` to track metrics for each stage, including call counts and timing statistics. - Enhanced `Producer` and `Consumer` structs to support profiling state. - Added `with_name` method to `RecordRegistrar` for naming stages in profiling output. - Created tests to validate profiling functionality and ensure metrics are recorded correctly. * feat(profiling): implement automatic stage profiling and reset functionality * feat(profiling): update profiling feature dependencies to include portable-atomic/fallback * feat(profiling): optimize avg_time_ns calculation using checked_div Co-authored-by: Copilot <copilot@github.com> * update changelogs * feat(remote-access-demo): enhance profiling integration and update README Co-authored-by: Copilot <copilot@github.com> * fix(profiling): improve error handling in reset_stage_profiling and update snapshot documentation Co-authored-by: Copilot <copilot@github.com> --------- Co-authored-by: Copilot <copilot@github.com>
1 parent 2d9d08e commit 686d72b

41 files changed

Lines changed: 1472 additions & 111 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.vscode/mcp.json

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,18 @@
33
"aimdb-weather": {
44
"type": "http",
55
"url": "http://aimdb.dev/mcp"
6+
},
7+
"aimdb-local": {
8+
"type": "stdio",
9+
"command": "cargo",
10+
"args": [
11+
"run",
12+
"--manifest-path",
13+
"${workspaceFolder}/tools/aimdb-mcp/Cargo.toml",
14+
"--",
15+
"--socket",
16+
"/tmp/aimdb-demo.sock"
17+
]
618
}
719
}
820
}

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2929

3030
### Added
3131

32+
- **Automatic stage profiling (Issue #58, RFC 014)**: New `profiling` feature on `aimdb-core` automatically measures wall-clock time per `.source()` / `.tap()` / `.link()` stage and exposes `call_count` / `total` / `avg` / `min` / `max` counters per stage. Name stages with `RecordRegistrar::with_name("...")`. Works on `no_std + alloc` via the runtime clock (`aimdb_executor::TimeOps`) and `portable-atomic` for 64-bit atomics on `thumbv7em`. New MCP tools `get_stage_profiling` (with bottleneck detection + recommendation) and `reset_stage_profiling`. Feature is off by default and zero-cost when disabled. ([aimdb-core](aimdb-core/CHANGELOG.md), [aimdb-executor](aimdb-executor/CHANGELOG.md), [aimdb-tokio-adapter](aimdb-tokio-adapter/CHANGELOG.md), [aimdb-embassy-adapter](aimdb-embassy-adapter/CHANGELOG.md), [aimdb-wasm-adapter](aimdb-wasm-adapter/CHANGELOG.md), [aimdb-client](aimdb-client/CHANGELOG.md), [tools/aimdb-mcp](tools/aimdb-mcp/CHANGELOG.md))
33+
- **`remote-access-demo` exercises stage profiling**: The `Temperature` and `SystemStatus` records now run from in-AimDB `.source()` + `.tap()` tasks (with `.with_name(...)` stage labels) and the demo enables the `profiling` feature. `SystemStatus.slow_status_processor` deliberately sleeps 100 ms per value so `get_stage_profiling` flags it as the bottleneck. README documents how to query and reset profiling via MCP / `socat`.
3234
- **`hello-mailbox` example (Issue #94)**: New minimal example demonstrating the Mailbox buffer (latest-wins) semantics using the synchronous API. First community contribution from [@ggmaldo](https://github.com/ggmaldo) — see [examples/hello-mailbox/](examples/hello-mailbox/).
3335
- **Writer-exclusivity validation (Issue #89)**: Combining `.source()`, `.transform()`, and `.link_from()` on the same record now panics at configuration time with a clear message instead of silently producing a last-writer-wins race on the buffer. Multiple `.link_from()` inbound connectors (fan-in) remain allowed. ([aimdb-core](aimdb-core/CHANGELOG.md))
3436
- **`no_std` Transform API (Design 027)**: `.transform()` and `.transform_join()` are now available on `no_std + alloc` targets — no longer Tokio-only. Multi-input join fan-in moved out of `aimdb-core` into the new `JoinFanInRuntime` traits in `aimdb-executor`, with implementations in the Tokio (`mpsc::channel`, capacity 64), Embassy (`embassy_sync::Channel`, capacity 8), and WASM (`futures_channel::mpsc`, capacity 64) adapters. ([aimdb-core](aimdb-core/CHANGELOG.md), [aimdb-executor](aimdb-executor/CHANGELOG.md), [aimdb-tokio-adapter](aimdb-tokio-adapter/CHANGELOG.md), [aimdb-embassy-adapter](aimdb-embassy-adapter/CHANGELOG.md), [aimdb-wasm-adapter](aimdb-wasm-adapter/CHANGELOG.md))
@@ -51,6 +53,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
5153

5254
### Changed
5355

56+
- **aimdb-executor**: `TimeOps` trait gained a required `duration_as_nanos(duration) -> u64` method (runtime-agnostic numeric representation of elapsed time, used by stage profiling). Implemented in tokio, embassy, and wasm adapters. ([aimdb-executor](aimdb-executor/CHANGELOG.md))
5457
- **aimdb-embassy-adapter**: `SpmcRing` subscriber-slot exhaustion now emits a `defmt::error!` with guidance to increase the `CONSUMERS` const generic. Counting rule: one slot per `.tap()`, `.link_to()`, and `transform_join` input.
5558
- **aimdb-codegen**: Generated join handler stubs updated to the new `on_triggers` task model (`async fn task_handler(JoinEventRx, Producer<...>)`).
5659
- **aimdb-core**: Breaking API changes to `InboundConnectorLink`, `Router`, and `RouterBuilder` to support `DeserializerKind` (see [aimdb-core/CHANGELOG.md](aimdb-core/CHANGELOG.md))

Cargo.lock

Lines changed: 6 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Makefile

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,14 @@ build:
5757
cargo build --package aimdb-core --no-default-features --features alloc
5858
@printf "$(YELLOW) → Building aimdb-core (std platform)$(NC)\n"
5959
cargo build --package aimdb-core --features "std,tracing,metrics"
60+
@printf "$(YELLOW) → Building aimdb-core (no_std + alloc + profiling)$(NC)\n"
61+
cargo build --package aimdb-core --no-default-features --features "alloc,profiling"
62+
@printf "$(YELLOW) → Building aimdb-core (std + profiling)$(NC)\n"
63+
cargo build --package aimdb-core --features "std,tracing,profiling"
6064
@printf "$(YELLOW) → Building tokio adapter$(NC)\n"
6165
cargo build --package aimdb-tokio-adapter --features "tokio-runtime,tracing,metrics"
66+
@printf "$(YELLOW) → Building tokio adapter (with profiling)$(NC)\n"
67+
cargo build --package aimdb-tokio-adapter --features "tokio-runtime,tracing,profiling"
6268
@printf "$(YELLOW) → Building sync wrapper$(NC)\n"
6369
cargo build --package aimdb-sync
6470
@printf "$(YELLOW) → Building codegen library$(NC)\n"
@@ -90,12 +96,18 @@ test:
9096
cargo test --package aimdb-core --features "std,tracing"
9197
@printf "$(YELLOW) → Testing aimdb-core (std + metrics)$(NC)\n"
9298
cargo test --package aimdb-core --features "std,tracing,metrics"
99+
@printf "$(YELLOW) → Testing aimdb-core (std + profiling)$(NC)\n"
100+
cargo test --package aimdb-core --features "std,tracing,profiling"
101+
@printf "$(YELLOW) → Testing aimdb-core (no_std + alloc + profiling)$(NC)\n"
102+
cargo test --package aimdb-core --no-default-features --features "alloc,profiling"
93103
@printf "$(YELLOW) → Testing aimdb-core remote module$(NC)\n"
94104
cargo test --package aimdb-core --lib --features "std" remote::
95105
@printf "$(YELLOW) → Testing tokio adapter$(NC)\n"
96106
cargo test --package aimdb-tokio-adapter --features "tokio-runtime,tracing"
97107
@printf "$(YELLOW) → Testing tokio adapter (with metrics)$(NC)\n"
98108
cargo test --package aimdb-tokio-adapter --features "tokio-runtime,tracing,metrics"
109+
@printf "$(YELLOW) → Testing tokio adapter (with profiling)$(NC)\n"
110+
cargo test --package aimdb-tokio-adapter --features "tokio-runtime,tracing,profiling"
99111
@printf "$(YELLOW) → Testing sync wrapper$(NC)\n"
100112
cargo test --package aimdb-sync
101113
@printf "$(YELLOW) → Testing codegen library$(NC)\n"
@@ -243,6 +255,8 @@ test-embedded:
243255
cargo check --package aimdb-embassy-adapter --target thumbv7em-none-eabihf --no-default-features --features "embassy-runtime"
244256
@printf "$(YELLOW) → Checking aimdb-embassy-adapter with network support on thumbv7em-none-eabihf target$(NC)\n"
245257
cargo check --package aimdb-embassy-adapter --target thumbv7em-none-eabihf --no-default-features --features "embassy-runtime,embassy-net-support"
258+
@printf "$(YELLOW) → Checking aimdb-embassy-adapter with profiling on thumbv7em-none-eabihf target$(NC)\n"
259+
cargo check --package aimdb-embassy-adapter --target thumbv7em-none-eabihf --no-default-features --features "embassy-runtime,profiling"
246260
@printf "$(YELLOW) → Checking aimdb-mqtt-connector (Embassy) on thumbv7em-none-eabihf target$(NC)\n"
247261
cargo check --package aimdb-mqtt-connector --target thumbv7em-none-eabihf --no-default-features --features "embassy-runtime"
248262
@printf "$(YELLOW) → Checking aimdb-mqtt-connector (Embassy + defmt) on thumbv7em-none-eabihf target$(NC)\n"

_external/embassy

Submodule embassy updated 136 files

aimdb-client/CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10-
No changes yet.
10+
### Added
11+
12+
- **`AimxClient::reset_stage_profiling()`** (Issue #58): New method issuing the `profiling.reset` AimX request to clear stage profiling counters for every record on the server. Requires the server to be built with the `profiling` feature and the connection to have write permission.
1113

1214
## [0.5.0] - 2026-02-21
1315

aimdb-client/src/connection.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,14 @@ impl AimxClient {
124124
Ok(records)
125125
}
126126

127+
/// Reset stage profiling counters for every record on the server.
128+
///
129+
/// Requires the server to be built with the `profiling` feature and the
130+
/// connection to have write permission.
131+
pub async fn reset_stage_profiling(&mut self) -> ClientResult<serde_json::Value> {
132+
self.send_request("profiling.reset", None).await
133+
}
134+
127135
/// Get current value of a record
128136
pub async fn get_record(&mut self, name: &str) -> ClientResult<serde_json::Value> {
129137
let params = json!({ "record": name });

aimdb-core/CHANGELOG.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99

1010
### Added
1111

12+
- **Automatic stage profiling (Issue #58, RFC 014, feature `profiling`)**: AimDB now measures wall-clock time per `.source()`, `.tap()`, and `.link()` stage with no user instrumentation. Feature is off by default and adds zero overhead when disabled; `alloc` + a runtime clock is enough, so it works on `no_std + alloc` targets too.
13+
- New `profiling` module exporting `StageMetrics` (atomic `call_count` / `total_time_ns` / `avg_time_ns` / `min_time_ns` / `max_time_ns` counters), `RecordProfilingMetrics` per-record container, and serializable `StageProfilingInfo` snapshot.
14+
- Source-stage timing measures the interval between successive `Producer::produce()` calls via a new `ProducerProfilingState`. Tap- and link-stage timing wraps the `BufferReader` returned by `Consumer::subscribe()` in a new `ProfilingBufferReader` that times the interval between successive `recv()` yields. The whole-task closure shape of `.source()` / `.tap()` is preserved — no per-value handler changes.
15+
- `RecordRegistrar::with_name("...")` assigns a human-readable name to the most recently registered source/tap/link; surfaces in MCP output. Always callable — a no-op when the feature is disabled.
16+
- New `StageKind` enum (`Source` / `Tap` / `Link` / `Transform`); `.transform()` is stubbed for future instrumentation.
17+
- `RecordMetadata` gains an optional `stage_profiling: Vec<StageProfilingInfo>` field (feature-gated) attached automatically in `TypedRecord::collect_metadata`. New helper `RecordMetadata::with_stage_profiling`.
18+
- `AimDb::reset_stage_profiling()` clears every record's counters. New `profiling.reset` AimX RPC method (write-permission gated) wired through `remote::handler`.
19+
- New `RuntimeForProfiling` marker trait — blanket-implemented for every `R` when the feature is off, requires `aimdb_executor::TimeOps` when on. Surfaces on `AimDbBuilder::run` / `build` and `AimDb::build_with`. Public API is unchanged when the feature is disabled.
20+
- New `Time::duration_as_nanos` accessor on the context (delegates to `TimeOps`).
21+
- Dependency: `portable-atomic` (with the `fallback` + `critical-section` features enabled by the `profiling` feature) for 64-bit-atomic emulation on targets without native `AtomicU64` (e.g. `thumbv7em-none-eabihf`).
1222
- **Writer-exclusivity validation for `.link_from()` (Issue #89)**: `.source()`, `.transform()`, and `.link_from()` are now mutually exclusive on a single record — combining any two now panics at configuration time instead of silently racing on the buffer (last-writer-wins). The check fires from `LinkFromBuilder::finish()` (panic message includes the offending URL), with symmetric defense-in-depth checks added to `TypedRecord::set_producer_service`, `set_transform`, and `add_inbound_connector`. Multiple `.link_from()` calls on the same record (fan-in) remain permitted.
1323
- **`no_std` support for the full Transform API (Design 027)**: `.transform()` and `.transform_join()` are now available on `no_std + alloc` targets. Multi-input join fan-in is no longer hardcoded to `tokio::sync::mpsc`; it uses the runtime-agnostic `JoinFanInRuntime` traits from `aimdb-executor`, implemented by Tokio, Embassy, and WASM adapters.
1424
- **`JoinEventRx`** — type-erased trigger receiver passed to the `on_triggers` handler. Call `.recv().await` in a loop to consume `JoinTrigger` events from all input forwarders.

aimdb-core/Cargo.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,13 @@ tracing = ["dep:tracing"] # Works in both std and no_std environments
3636
defmt = ["dep:defmt"] # Embedded logging via probe (no_std)
3737
metrics = ["dep:metrics", "std"] # Requires std for aggregation
3838

39+
# Automatic stage profiling (.source()/.tap()/.link() timing).
40+
# Independent of `metrics`; works in no_std (only needs heap + a runtime clock).
41+
# `portable-atomic/critical-section` provides the 64-bit-atomic fallback on targets
42+
# without native 64-bit atomics (e.g. thumbv7em); it's a no-op where native atomics
43+
# exist, and embedded binaries already supply a `critical-section` impl.
44+
profiling = ["alloc", "portable-atomic/fallback", "portable-atomic/critical-section"]
45+
3946
# Testing features
4047
test-utils = ["std"]
4148

aimdb-core/src/builder.rs

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -589,6 +589,7 @@ where
589589
#[cfg(feature = "alloc")]
590590
record_key: record_key.as_str().to_string(),
591591
extensions: &self.extensions,
592+
last_stage: None,
592593
};
593594
f(&mut reg);
594595

@@ -675,7 +676,10 @@ where
675676
/// .run().await // Runs forever
676677
/// }
677678
/// ```
678-
pub async fn run(self) -> DbResult<()> {
679+
pub async fn run(self) -> DbResult<()>
680+
where
681+
R: crate::RuntimeForProfiling,
682+
{
679683
#[cfg(feature = "tracing")]
680684
tracing::info!("Building database and spawning background tasks...");
681685

@@ -728,7 +732,10 @@ where
728732
/// # Returns
729733
/// `DbResult<AimDb<R>>` - The database instance
730734
#[cfg_attr(not(feature = "std"), allow(unused_mut))]
731-
pub async fn build(self) -> DbResult<AimDb<R>> {
735+
pub async fn build(self) -> DbResult<AimDb<R>>
736+
where
737+
R: crate::RuntimeForProfiling,
738+
{
732739
use crate::DbError;
733740

734741
// Validate all records
@@ -836,9 +843,14 @@ where
836843
extensions: self.extensions,
837844
});
838845

846+
#[cfg(feature = "profiling")]
847+
let profiling_clock = crate::profiling::make_clock(runtime.clone());
848+
839849
let db = Arc::new(AimDb {
840850
inner: inner.clone(),
841851
runtime: runtime.clone(),
852+
#[cfg(feature = "profiling")]
853+
profiling_clock,
842854
});
843855

844856
#[cfg(feature = "tracing")]
@@ -1016,13 +1028,19 @@ pub struct AimDb<R: aimdb_executor::Spawn + 'static> {
10161028

10171029
/// Runtime adapter with concrete type
10181030
runtime: Arc<R>,
1031+
1032+
/// Shared wall clock for stage profiling, built from the runtime at `build()` time.
1033+
#[cfg(feature = "profiling")]
1034+
profiling_clock: crate::profiling::Clock,
10191035
}
10201036

10211037
impl<R: aimdb_executor::Spawn + 'static> Clone for AimDb<R> {
10221038
fn clone(&self) -> Self {
10231039
Self {
10241040
inner: self.inner.clone(),
10251041
runtime: self.runtime.clone(),
1042+
#[cfg(feature = "profiling")]
1043+
profiling_clock: self.profiling_clock.clone(),
10261044
}
10271045
}
10281046
}
@@ -1050,8 +1068,17 @@ impl<R: aimdb_executor::Spawn + 'static> AimDb<R> {
10501068
&self.inner.extensions
10511069
}
10521070

1071+
/// Shared wall clock used by stage profiling (nanoseconds since an arbitrary epoch).
1072+
#[cfg(feature = "profiling")]
1073+
pub(crate) fn profiling_clock(&self) -> &crate::profiling::Clock {
1074+
&self.profiling_clock
1075+
}
1076+
10531077
/// Builds a database with a closure-based builder pattern
1054-
pub async fn build_with(rt: Arc<R>, f: impl FnOnce(&mut AimDbBuilder<R>)) -> DbResult<()> {
1078+
pub async fn build_with(rt: Arc<R>, f: impl FnOnce(&mut AimDbBuilder<R>)) -> DbResult<()>
1079+
where
1080+
R: crate::RuntimeForProfiling,
1081+
{
10551082
let mut b = AimDbBuilder::new().runtime(rt);
10561083
f(&mut b);
10571084
b.run().await
@@ -1238,6 +1265,14 @@ impl<R: aimdb_executor::Spawn + 'static> AimDb<R> {
12381265
self.inner.list_records()
12391266
}
12401267

1268+
/// Resets stage profiling counters for every record (feature `profiling`).
1269+
#[cfg(feature = "profiling")]
1270+
pub fn reset_stage_profiling(&self) {
1271+
for record in &self.inner.storages {
1272+
record.reset_profiling();
1273+
}
1274+
}
1275+
12411276
/// Try to get record's latest value as JSON by name (std only)
12421277
///
12431278
/// Convenience wrapper around `AimDbInner::try_latest_as_json()`.

0 commit comments

Comments
 (0)