Skip to content

Commit bf7d7d9

Browse files
committed
feat(metrics): implement buffer introspection metrics
- Introduced `BufferCounters` for tracking produced, consumed, and dropped counts, along with occupancy metrics. - Integrated metrics into `EmbassyBuffer` and `TokioBuffer`, allowing for real-time monitoring of buffer states. - Added `reset_buffer_metrics` method to clear metrics counters, gated by write permissions. - Implemented `get_buffer_metrics` and `reset_buffer_metrics` tools in the MCP for querying and resetting metrics. - Updated documentation and examples to reflect new metrics functionality. - Ensured compatibility with `no_std` environments using `portable-atomic`.
1 parent 94d3ef6 commit bf7d7d9

28 files changed

Lines changed: 631 additions & 189 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 11 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,6 @@ serde_json = { version = "1.0", default-features = false, features = ["alloc"] }
7272
# Basic observability
7373
tracing = { version = "0.1", default-features = false }
7474

75-
# Metrics
76-
metrics = "0.23"
77-
7875
# Async utilities
7976
futures = "0.3"
8077

Makefile

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ build:
6161
cargo build --package aimdb-core --no-default-features --features "alloc,profiling"
6262
@printf "$(YELLOW) → Building aimdb-core (std + profiling)$(NC)\n"
6363
cargo build --package aimdb-core --features "std,tracing,profiling"
64+
@printf "$(YELLOW) → Building aimdb-core (no_std + alloc + metrics)$(NC)\n"
65+
cargo build --package aimdb-core --no-default-features --features "alloc,metrics"
6466
@printf "$(YELLOW) → Building tokio adapter$(NC)\n"
6567
cargo build --package aimdb-tokio-adapter --features "tokio-runtime,tracing,metrics"
6668
@printf "$(YELLOW) → Building tokio adapter (with profiling)$(NC)\n"
@@ -100,6 +102,8 @@ test:
100102
cargo test --package aimdb-core --features "std,tracing,profiling"
101103
@printf "$(YELLOW) → Testing aimdb-core (no_std + alloc + profiling)$(NC)\n"
102104
cargo test --package aimdb-core --no-default-features --features "alloc,profiling"
105+
@printf "$(YELLOW) → Testing aimdb-core (no_std + alloc + metrics)$(NC)\n"
106+
cargo test --package aimdb-core --no-default-features --features "alloc,metrics"
103107
@printf "$(YELLOW) → Testing aimdb-core remote module$(NC)\n"
104108
cargo test --package aimdb-core --lib --features "std" remote::
105109
@printf "$(YELLOW) → Testing tokio adapter$(NC)\n"
@@ -257,6 +261,8 @@ test-embedded:
257261
cargo check --package aimdb-embassy-adapter --target thumbv7em-none-eabihf --no-default-features --features "embassy-runtime,embassy-net-support"
258262
@printf "$(YELLOW) → Checking aimdb-embassy-adapter with profiling on thumbv7em-none-eabihf target$(NC)\n"
259263
cargo check --package aimdb-embassy-adapter --target thumbv7em-none-eabihf --no-default-features --features "embassy-runtime,profiling"
264+
@printf "$(YELLOW) → Checking aimdb-embassy-adapter with metrics on thumbv7em-none-eabihf target$(NC)\n"
265+
cargo check --package aimdb-embassy-adapter --target thumbv7em-none-eabihf --no-default-features --features "embassy-runtime,metrics"
260266
@printf "$(YELLOW) → Checking aimdb-mqtt-connector (Embassy) on thumbv7em-none-eabihf target$(NC)\n"
261267
cargo check --package aimdb-mqtt-connector --target thumbv7em-none-eabihf --no-default-features --features "embassy-runtime"
262268
@printf "$(YELLOW) → Checking aimdb-mqtt-connector (Embassy + defmt) on thumbv7em-none-eabihf target$(NC)\n"

aimdb-client/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@ description = "AimX protocol client for remote AimDB introspection and managemen
99
keywords = ["client", "protocol", "database", "remote", "rpc"]
1010
categories = ["database", "network-programming"]
1111

12+
[features]
13+
metrics = ["aimdb-core/metrics"]
14+
profiling = ["aimdb-core/profiling"]
15+
1216
[dependencies]
1317
# Core dependencies - protocol types from aimdb-core
1418
aimdb-core = { version = "1.0.0", path = "../aimdb-core", features = ["std"] }

aimdb-client/src/connection.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,14 @@ impl AimxClient {
132132
self.send_request("profiling.reset", None).await
133133
}
134134

135+
/// Reset buffer introspection counters for every record on the server.
136+
///
137+
/// Requires the server to be built with the `metrics` feature and the
138+
/// connection to have write permission.
139+
pub async fn reset_buffer_metrics(&mut self) -> ClientResult<serde_json::Value> {
140+
self.send_request("buffer_metrics.reset", None).await
141+
}
142+
135143
/// Get current value of a record
136144
pub async fn get_record(&mut self, name: &str) -> ClientResult<serde_json::Value> {
137145
let params = json!({ "record": name });

aimdb-core/Cargo.toml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,11 @@ alloc = ["serde"] # Enable heap in no_std
3434
# Observability features (available on both std/no_std)
3535
tracing = ["dep:tracing"] # Works in both std and no_std environments
3636
defmt = ["dep:defmt"] # Embedded logging via probe (no_std)
37-
metrics = ["dep:metrics", "std"] # Requires std for aggregation
37+
# Buffer introspection counters (produced/consumed/dropped/occupancy).
38+
# Independent of `profiling`; works in no_std (only needs heap + atomics).
39+
# `portable-atomic/critical-section` provides the 64-bit-atomic fallback on
40+
# targets without native 64-bit atomics (e.g. thumbv7em); no-op elsewhere.
41+
metrics = ["alloc", "portable-atomic/fallback", "portable-atomic/critical-section"]
3842

3943
# Automatic stage profiling (.source()/.tap()/.link() timing).
4044
# Independent of `metrics`; works in no_std (only needs heap + a runtime clock).
@@ -81,7 +85,6 @@ portable-atomic = { version = "1.9", default-features = false }
8185
# Observability (optional)
8286
tracing = { workspace = true, optional = true }
8387
defmt = { workspace = true, optional = true }
84-
metrics = { workspace = true, optional = true }
8588

8689
# Synchronization primitives for no_std
8790
spin = { version = "0.9", default-features = false, features = [

aimdb-core/build.rs

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ fn main() {
1414
let std_enabled = env::var("CARGO_FEATURE_STD").is_ok();
1515
let tokio_runtime_enabled = env::var("CARGO_FEATURE_TOKIO_RUNTIME").is_ok();
1616
let embassy_runtime_enabled = env::var("CARGO_FEATURE_EMBASSY_RUNTIME").is_ok();
17-
let metrics_enabled = env::var("CARGO_FEATURE_METRICS").is_ok();
1817

1918
// Note: no_std is the absence of std feature, no validation needed for mutual exclusion
2019

@@ -25,19 +24,6 @@ fn main() {
2524
eprintln!(" This is only valid for testing. Production builds should use one runtime.");
2625
}
2726

28-
// Validate metrics require std
29-
if metrics_enabled && !std_enabled {
30-
panic!(
31-
r#"
32-
❌ Invalid feature combination: 'metrics' requires 'std' platform
33-
34-
Metrics collection requires standard library support.
35-
36-
Use: features = ["std", "metrics"]
37-
"#
38-
);
39-
}
40-
4127
// Validate runtime dependencies
4228
if tokio_runtime_enabled && !std_enabled {
4329
panic!(

aimdb-core/src/buffer/counters.rs

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
//! Shared atomic counter state for buffer introspection (`metrics` feature).
2+
//!
3+
//! Embedded in adapter buffers (`TokioBuffer`, `EmbassyBuffer`) so the same
4+
//! `produced`/`consumed`/`dropped` accounting is reused across runtimes.
5+
//! `portable-atomic` keeps this `no_std` and works on targets without native
6+
//! 64-bit atomics (e.g. `thumbv7em`) when the
7+
//! `portable-atomic/{fallback,critical-section}` features are enabled.
8+
9+
use core::sync::atomic::Ordering;
10+
use portable_atomic::AtomicU64;
11+
12+
use super::BufferMetricsSnapshot;
13+
14+
/// Atomic counters shared between a buffer and its readers.
15+
///
16+
/// Updated with `Ordering::Relaxed` — these are diagnostics, not synchronization
17+
/// primitives.
18+
#[derive(Debug)]
19+
pub struct BufferCounters {
20+
produced: AtomicU64,
21+
consumed: AtomicU64,
22+
dropped: AtomicU64,
23+
capacity: usize,
24+
}
25+
26+
impl BufferCounters {
27+
/// Creates new counters, remembering the buffer's declared capacity for
28+
/// inclusion in [`Self::snapshot`] occupancy tuples.
29+
pub fn new(capacity: usize) -> Self {
30+
Self {
31+
produced: AtomicU64::new(0),
32+
consumed: AtomicU64::new(0),
33+
dropped: AtomicU64::new(0),
34+
capacity,
35+
}
36+
}
37+
38+
/// Increments the `produced` counter (call on each successful push).
39+
pub fn increment_produced(&self) {
40+
self.produced.fetch_add(1, Ordering::Relaxed);
41+
}
42+
43+
/// Increments the `consumed` counter (call on each successful recv).
44+
pub fn increment_consumed(&self) {
45+
self.consumed.fetch_add(1, Ordering::Relaxed);
46+
}
47+
48+
/// Adds `count` to the `dropped` counter (call on lag/overflow detection).
49+
pub fn add_dropped(&self, count: u64) {
50+
self.dropped.fetch_add(count, Ordering::Relaxed);
51+
}
52+
53+
/// Returns the declared buffer capacity passed to [`Self::new`].
54+
pub fn capacity(&self) -> usize {
55+
self.capacity
56+
}
57+
58+
/// Snapshots the current counter values, attaching the supplied
59+
/// `(current_items, capacity)` occupancy tuple.
60+
pub fn snapshot(&self, occupancy: (usize, usize)) -> BufferMetricsSnapshot {
61+
BufferMetricsSnapshot {
62+
produced_count: self.produced.load(Ordering::Relaxed),
63+
consumed_count: self.consumed.load(Ordering::Relaxed),
64+
dropped_count: self.dropped.load(Ordering::Relaxed),
65+
occupancy,
66+
}
67+
}
68+
69+
/// Zeroes all counters. Capacity is preserved.
70+
pub fn reset(&self) {
71+
self.produced.store(0, Ordering::Relaxed);
72+
self.consumed.store(0, Ordering::Relaxed);
73+
self.dropped.store(0, Ordering::Relaxed);
74+
}
75+
}
76+
77+
#[cfg(test)]
78+
mod tests {
79+
use super::*;
80+
81+
#[test]
82+
fn zero_state() {
83+
let c = BufferCounters::new(16);
84+
let snap = c.snapshot((0, 16));
85+
assert_eq!(snap.produced_count, 0);
86+
assert_eq!(snap.consumed_count, 0);
87+
assert_eq!(snap.dropped_count, 0);
88+
assert_eq!(snap.occupancy, (0, 16));
89+
assert_eq!(c.capacity(), 16);
90+
}
91+
92+
#[test]
93+
fn record_one_of_each() {
94+
let c = BufferCounters::new(4);
95+
c.increment_produced();
96+
c.increment_consumed();
97+
c.add_dropped(3);
98+
let snap = c.snapshot((1, 4));
99+
assert_eq!(snap.produced_count, 1);
100+
assert_eq!(snap.consumed_count, 1);
101+
assert_eq!(snap.dropped_count, 3);
102+
assert_eq!(snap.occupancy, (1, 4));
103+
}
104+
105+
#[test]
106+
fn record_many() {
107+
let c = BufferCounters::new(8);
108+
for _ in 0..5 {
109+
c.increment_produced();
110+
}
111+
for _ in 0..3 {
112+
c.increment_consumed();
113+
}
114+
c.add_dropped(2);
115+
c.add_dropped(5);
116+
let snap = c.snapshot((2, 8));
117+
assert_eq!(snap.produced_count, 5);
118+
assert_eq!(snap.consumed_count, 3);
119+
assert_eq!(snap.dropped_count, 7);
120+
}
121+
122+
#[test]
123+
fn reset_clears_counts_but_not_capacity() {
124+
let c = BufferCounters::new(32);
125+
c.increment_produced();
126+
c.increment_consumed();
127+
c.add_dropped(9);
128+
c.reset();
129+
let snap = c.snapshot((0, 32));
130+
assert_eq!(snap.produced_count, 0);
131+
assert_eq!(snap.consumed_count, 0);
132+
assert_eq!(snap.dropped_count, 0);
133+
assert_eq!(c.capacity(), 32);
134+
// Still usable afterwards.
135+
c.increment_produced();
136+
assert_eq!(c.snapshot((1, 32)).produced_count, 1);
137+
}
138+
139+
#[cfg(feature = "std")]
140+
#[test]
141+
fn concurrent_increments() {
142+
use std::sync::Arc;
143+
use std::thread;
144+
145+
let c = Arc::new(BufferCounters::new(64));
146+
let threads = 8;
147+
let per_thread = 1000u64;
148+
let handles: Vec<_> = (0..threads)
149+
.map(|_| {
150+
let c = Arc::clone(&c);
151+
thread::spawn(move || {
152+
for _ in 0..per_thread {
153+
c.increment_produced();
154+
c.increment_consumed();
155+
c.add_dropped(1);
156+
}
157+
})
158+
})
159+
.collect();
160+
for h in handles {
161+
h.join().unwrap();
162+
}
163+
let snap = c.snapshot((0, 64));
164+
assert_eq!(snap.produced_count, threads as u64 * per_thread);
165+
assert_eq!(snap.consumed_count, threads as u64 * per_thread);
166+
assert_eq!(snap.dropped_count, threads as u64 * per_thread);
167+
}
168+
}

aimdb-core/src/buffer/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ extern crate alloc;
5757

5858
// Module structure
5959
mod cfg;
60+
#[cfg(feature = "metrics")]
61+
mod counters;
6062
mod traits;
6163

6264
// Public API exports
@@ -67,7 +69,9 @@ pub use traits::{Buffer, BufferReader, DynBuffer};
6769
#[cfg(feature = "std")]
6870
pub use traits::JsonBufferReader;
6971

70-
// Buffer metrics (std only, feature-gated)
72+
// Buffer metrics (feature-gated; works in no_std with portable-atomic)
73+
#[cfg(feature = "metrics")]
74+
pub use counters::BufferCounters;
7175
#[cfg(feature = "metrics")]
7276
pub use traits::{BufferMetrics, BufferMetricsSnapshot};
7377

aimdb-core/src/buffer/traits.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,14 @@ pub trait DynBuffer<T: Clone + Send>: Send + Sync {
8383
fn metrics_snapshot(&self) -> Option<BufferMetricsSnapshot> {
8484
None
8585
}
86+
87+
/// Reset buffer metrics counters (metrics feature only)
88+
///
89+
/// Default implementation is a no-op so buffers without metrics support are
90+
/// safe to call. Implementations that track counters should override this
91+
/// to zero them.
92+
#[cfg(feature = "metrics")]
93+
fn reset_metrics(&self) {}
8694
}
8795

8896
/// Reader trait for consuming values from a buffer

0 commit comments

Comments
 (0)