diff --git a/sdk/cosmos/.cspell.json b/sdk/cosmos/.cspell.json index 117ff7d1c0e..0e18b8e16f1 100644 --- a/sdk/cosmos/.cspell.json +++ b/sdk/cosmos/.cspell.json @@ -21,6 +21,8 @@ "centralindia", "centralus", "centraluseuap", + "cgroupv", + "cgroupv2", "changefeed", "chinaeast", "chinanorth", @@ -162,6 +164,7 @@ "usdodsouthcentral", "usdodsouthwest", "usdodwestcentral", + "usec", "USGOV", "usgovarizona", "usgoviowa", @@ -196,4 +199,4 @@ ] } ] -} \ No newline at end of file +} diff --git a/sdk/cosmos/azure_data_cosmos_perf/src/operations/create_item.rs b/sdk/cosmos/azure_data_cosmos_perf/src/operations/create_item.rs index 7af0a13ee8b..74234cdd6d2 100644 --- a/sdk/cosmos/azure_data_cosmos_perf/src/operations/create_item.rs +++ b/sdk/cosmos/azure_data_cosmos_perf/src/operations/create_item.rs @@ -4,6 +4,7 @@ //! Create operation. use std::sync::Arc; +use std::time::Duration; use async_trait::async_trait; use azure_data_cosmos::clients::ContainerClient; @@ -11,7 +12,7 @@ use azure_data_cosmos::options::ItemWriteOptions; use rand::RngExt; use uuid::Uuid; -use super::{Operation, PerfItem}; +use super::{extract_backend_duration, Operation, PerfItem}; use crate::seed::{SeededItem, SharedItems}; /// Creates a new item with a unique ID and partition key. @@ -36,7 +37,7 @@ impl Operation for CreateItemOperation { "CreateItem" } - async fn execute(&self, container: &ContainerClient) -> azure_core::Result<()> { + async fn execute(&self, container: &ContainerClient) -> azure_core::Result> { let id = Uuid::new_v4().to_string(); let partition_key = Uuid::new_v4().to_string(); let value = rand::rng().random_range(0..u64::MAX); @@ -48,11 +49,12 @@ impl Operation for CreateItemOperation { payload: "perf-test-created".to_string(), }; - container + let response = container .create_item(&item.partition_key, &id, &item, self.options.clone()) .await?; + let backend = extract_backend_duration(response.headers()); self.items.push(SeededItem { id, partition_key }); - Ok(()) + Ok(backend) } } diff --git a/sdk/cosmos/azure_data_cosmos_perf/src/operations/mod.rs b/sdk/cosmos/azure_data_cosmos_perf/src/operations/mod.rs index c684ebc20c9..6b1ae69f9cb 100644 --- a/sdk/cosmos/azure_data_cosmos_perf/src/operations/mod.rs +++ b/sdk/cosmos/azure_data_cosmos_perf/src/operations/mod.rs @@ -14,6 +14,7 @@ mod read_item; mod upsert_item; use async_trait::async_trait; +use azure_core::http::headers::{HeaderName, Headers}; use azure_data_cosmos::clients::ContainerClient; use azure_data_cosmos::options::{ ExcludedRegions, ItemReadOptions, ItemWriteOptions, OperationOptions, @@ -21,6 +22,7 @@ use azure_data_cosmos::options::{ use azure_data_cosmos::regions::Region; use serde::{Deserialize, Serialize}; use std::sync::Arc; +use std::time::Duration; use crate::config::{Config, ExcludeRegionsScope}; pub use crate::operations::create_item::CreateItemOperation; @@ -29,6 +31,23 @@ pub use crate::operations::read_item::ReadItemOperation; pub use crate::operations::upsert_item::UpsertItemOperation; use crate::seed::SharedItems; +/// `x-ms-request-duration-ms` — server-reported request processing time in +/// milliseconds (floating-point string). +const REQUEST_DURATION_MS: HeaderName = HeaderName::from_static("x-ms-request-duration-ms"); + +/// Extracts the server-reported request duration from a Cosmos response. +/// +/// Returns `None` when the header is missing (e.g., on responses served +/// from cache or when the gateway omitted the diagnostic header) or when +/// the value cannot be parsed as `f64`. +pub(crate) fn extract_backend_duration(headers: &Headers) -> Option { + headers + .get_optional_str(&REQUEST_DURATION_MS) + .and_then(|s| s.parse::().ok()) + .filter(|ms| ms.is_finite() && *ms >= 0.0) + .map(|ms| Duration::from_secs_f64(ms / 1000.0)) +} + /// A single executable perf test operation. /// /// Implementations are expected to be stateless or use interior mutability. @@ -39,7 +58,14 @@ pub trait Operation: Send + Sync { fn name(&self) -> &'static str; /// Executes one instance of the operation. - async fn execute(&self, container: &ContainerClient) -> azure_core::Result<()>; + /// + /// Returns `Ok(Some(d))` when the server reported a processing duration + /// via the `x-ms-request-duration-ms` response header (this is the + /// backend latency surfaced separately from the client-observed + /// wall-clock latency). Returns `Ok(None)` when no backend duration + /// could be observed (multi-page query streams may aggregate, see + /// individual implementations). + async fn execute(&self, container: &ContainerClient) -> azure_core::Result>; } /// The item type used for seeding, reading, querying, and upserting. diff --git a/sdk/cosmos/azure_data_cosmos_perf/src/operations/query_items.rs b/sdk/cosmos/azure_data_cosmos_perf/src/operations/query_items.rs index ffc6a2d8933..00883f6ea5f 100644 --- a/sdk/cosmos/azure_data_cosmos_perf/src/operations/query_items.rs +++ b/sdk/cosmos/azure_data_cosmos_perf/src/operations/query_items.rs @@ -4,13 +4,14 @@ //! Single-partition query operation. use std::sync::Arc; +use std::time::Duration; use async_trait::async_trait; use azure_data_cosmos::clients::ContainerClient; use azure_data_cosmos::Query; use futures::StreamExt; -use super::Operation; +use super::{extract_backend_duration, Operation}; use crate::seed::SharedItems; /// Runs a single-partition query against a random seeded partition key. @@ -31,18 +32,28 @@ impl Operation for QueryItemsOperation { "QueryItems" } - async fn execute(&self, container: &ContainerClient) -> azure_core::Result<()> { + async fn execute(&self, container: &ContainerClient) -> azure_core::Result> { let item = self.items.random(); let pk = &item.partition_key; let query = Query::from("SELECT * FROM c WHERE c.partition_key = @pk").with_parameter("@pk", pk)?; - let mut stream = container.query_items::(query, pk, None)?; + let mut stream = container + .query_items::(query, pk, None)? + .into_pages(); + + // Sum backend durations across pages so a multi-page query reports + // the total server processing time, mirroring how the client-observed + // elapsed wraps the entire stream consumption. + let mut backend_total: Option = None; while let Some(result) = stream.next().await { - result?; + let page = result?; + if let Some(d) = extract_backend_duration(page.headers()) { + backend_total = Some(backend_total.unwrap_or_default() + d); + } } - Ok(()) + Ok(backend_total) } } diff --git a/sdk/cosmos/azure_data_cosmos_perf/src/operations/read_item.rs b/sdk/cosmos/azure_data_cosmos_perf/src/operations/read_item.rs index cd34350ea2a..ece2a6b03f9 100644 --- a/sdk/cosmos/azure_data_cosmos_perf/src/operations/read_item.rs +++ b/sdk/cosmos/azure_data_cosmos_perf/src/operations/read_item.rs @@ -4,12 +4,13 @@ //! Point read operation. use std::sync::Arc; +use std::time::Duration; use async_trait::async_trait; use azure_data_cosmos::clients::ContainerClient; use azure_data_cosmos::options::ItemReadOptions; -use super::Operation; +use super::{extract_backend_duration, Operation}; use crate::seed::SharedItems; /// Reads a random seeded item by ID and partition key. @@ -31,12 +32,12 @@ impl Operation for ReadItemOperation { "ReadItem" } - async fn execute(&self, container: &ContainerClient) -> azure_core::Result<()> { + async fn execute(&self, container: &ContainerClient) -> azure_core::Result> { let item = self.items.random(); - container + let response = container .read_item::(&item.partition_key, &item.id, self.options.clone()) .await?; - Ok(()) + Ok(extract_backend_duration(response.headers())) } } diff --git a/sdk/cosmos/azure_data_cosmos_perf/src/operations/upsert_item.rs b/sdk/cosmos/azure_data_cosmos_perf/src/operations/upsert_item.rs index 0b804483d3f..f04539a7053 100644 --- a/sdk/cosmos/azure_data_cosmos_perf/src/operations/upsert_item.rs +++ b/sdk/cosmos/azure_data_cosmos_perf/src/operations/upsert_item.rs @@ -4,13 +4,14 @@ //! Upsert operation. use std::sync::Arc; +use std::time::Duration; use async_trait::async_trait; use azure_data_cosmos::clients::ContainerClient; use azure_data_cosmos::options::ItemWriteOptions; use rand::RngExt; -use super::{Operation, PerfItem}; +use super::{extract_backend_duration, Operation, PerfItem}; use crate::seed::SharedItems; /// Upserts an item into a random seeded partition. @@ -32,7 +33,7 @@ impl Operation for UpsertItemOperation { "UpsertItem" } - async fn execute(&self, container: &ContainerClient) -> azure_core::Result<()> { + async fn execute(&self, container: &ContainerClient) -> azure_core::Result> { let seeded = self.items.random(); let value = rand::rng().random_range(0..u64::MAX); @@ -43,9 +44,9 @@ impl Operation for UpsertItemOperation { payload: "perf-test-payload".to_string(), }; - container + let response = container .upsert_item(&item.partition_key, &seeded.id, &item, self.options.clone()) .await?; - Ok(()) + Ok(extract_backend_duration(response.headers())) } } diff --git a/sdk/cosmos/azure_data_cosmos_perf/src/runner.rs b/sdk/cosmos/azure_data_cosmos_perf/src/runner.rs index 1d24329782e..ca2100c60d8 100644 --- a/sdk/cosmos/azure_data_cosmos_perf/src/runner.rs +++ b/sdk/cosmos/azure_data_cosmos_perf/src/runner.rs @@ -51,11 +51,28 @@ struct PerfResult { p50_ms: f64, p90_ms: f64, p99_ms: f64, + /// Server-reported request processing latency parsed from + /// `x-ms-request-duration-ms` response header. `None` for intervals + /// without backend samples. + #[serde(skip_serializing_if = "Option::is_none")] + backend_min_ms: Option, + #[serde(skip_serializing_if = "Option::is_none")] + backend_max_ms: Option, + #[serde(skip_serializing_if = "Option::is_none")] + backend_mean_ms: Option, + #[serde(skip_serializing_if = "Option::is_none")] + backend_p50_ms: Option, + #[serde(skip_serializing_if = "Option::is_none")] + backend_p90_ms: Option, + #[serde(skip_serializing_if = "Option::is_none")] + backend_p99_ms: Option, cpu_percent: f32, memory_bytes: u64, system_cpu_percent: f32, system_total_memory_bytes: u64, system_used_memory_bytes: u64, + #[serde(skip_serializing_if = "Option::is_none")] + cgroup_cpu_percent: Option, // Tokio runtime metrics (present only when tokio-metrics feature is enabled) #[serde(skip_serializing_if = "Option::is_none")] tokio_workers: Option, @@ -279,8 +296,8 @@ pub async fn run(config: RunConfig) { let op_start = Instant::now(); match op.execute(&container).await { - Ok(()) => { - stats.record_latency(op.name(), op_start.elapsed()); + Ok(backend) => { + stats.record_latency(op.name(), op_start.elapsed(), backend); } Err(e) => { stats.record_error(op.name()); @@ -345,7 +362,7 @@ async fn upsert_results( let now = time::OffsetDateTime::now_utc() .format(&time::format_description::well_known::Rfc3339) .expect("RFC 3339 formatting should never fail"); - let (cpu, mem, sys_cpu, sys_total, sys_used) = metrics + let (cpu, mem, sys_cpu, sys_total, sys_used, cgroup_cpu) = metrics .map(|m| { ( m.cpu_percent, @@ -353,9 +370,10 @@ async fn upsert_results( m.system_cpu_percent, m.system_total_memory_bytes, m.system_used_memory_bytes, + m.cgroup_cpu_percent, ) }) - .unwrap_or((0.0, 0, 0.0, 0, 0)); + .unwrap_or((0.0, 0, 0.0, 0, 0, None)); for s in summaries { let result = PerfResult { @@ -374,11 +392,18 @@ async fn upsert_results( p50_ms: s.p50.as_secs_f64() * 1000.0, p90_ms: s.p90.as_secs_f64() * 1000.0, p99_ms: s.p99.as_secs_f64() * 1000.0, + backend_min_ms: s.backend_min.map(|d| d.as_secs_f64() * 1000.0), + backend_max_ms: s.backend_max.map(|d| d.as_secs_f64() * 1000.0), + backend_mean_ms: s.backend_mean.map(|d| d.as_secs_f64() * 1000.0), + backend_p50_ms: s.backend_p50.map(|d| d.as_secs_f64() * 1000.0), + backend_p90_ms: s.backend_p90.map(|d| d.as_secs_f64() * 1000.0), + backend_p99_ms: s.backend_p99.map(|d| d.as_secs_f64() * 1000.0), cpu_percent: cpu, memory_bytes: mem, system_cpu_percent: sys_cpu, system_total_memory_bytes: sys_total, system_used_memory_bytes: sys_used, + cgroup_cpu_percent: cgroup_cpu, tokio_workers: tokio_fields.map(|t| t.workers), tokio_busy_pct: tokio_fields.map(|t| t.busy_pct), tokio_park_count: tokio_fields.map(|t| t.park_count), diff --git a/sdk/cosmos/azure_data_cosmos_perf/src/stats.rs b/sdk/cosmos/azure_data_cosmos_perf/src/stats.rs index 9b092be8b08..51e23cf2691 100644 --- a/sdk/cosmos/azure_data_cosmos_perf/src/stats.rs +++ b/sdk/cosmos/azure_data_cosmos_perf/src/stats.rs @@ -26,7 +26,9 @@ pub struct Stats { /// /// Uses an [`hdrhistogram::Histogram`] for percentile estimation with constant /// memory. Exact count, min, max, and sum are tracked separately so the mean -/// is always precise. +/// is always precise. Two parallel histograms are tracked: one for the +/// client-observed wall-clock latency, and one for the server-reported +/// processing duration parsed from `x-ms-request-duration-ms`. struct OperationStats { /// HdrHistogram for percentile estimation (values in microseconds). histogram: Histogram, @@ -40,6 +42,13 @@ struct OperationStats { sum: Duration, /// Number of failed operations. errors: u64, + /// Histogram for backend-reported request duration. + backend_histogram: Histogram, + /// Number of operations that contributed a backend duration sample. + backend_count: u64, + backend_min: Duration, + backend_max: Duration, + backend_sum: Duration, } /// Upper bound for the histogram: 1 hour in microseconds. @@ -56,13 +65,20 @@ impl Default for OperationStats { max: Duration::ZERO, sum: Duration::ZERO, errors: 0, + backend_histogram: Histogram::new_with_bounds(1, MAX_LATENCY_US, 3) + .expect("valid histogram bounds"), + backend_count: 0, + backend_min: Duration::MAX, + backend_max: Duration::ZERO, + backend_sum: Duration::ZERO, } } } impl OperationStats { - /// Records a latency sample into the histogram. - fn record(&mut self, latency: Duration) { + /// Records a client-observed latency sample, plus an optional + /// server-reported backend duration when available. + fn record(&mut self, latency: Duration, backend: Option) { self.count += 1; self.sum += latency; if latency < self.min { @@ -72,10 +88,23 @@ impl OperationStats { self.max = latency; } - let micros = latency.as_micros() as u64; + let micros = latency.as_micros().min(u64::MAX as u128) as u64; // Clamp to histogram bounds; values above MAX_LATENCY_US are recorded // at the max and still counted. let _ = self.histogram.record(micros.clamp(1, MAX_LATENCY_US)); + + if let Some(b) = backend { + self.backend_count += 1; + self.backend_sum += b; + if b < self.backend_min { + self.backend_min = b; + } + if b > self.backend_max { + self.backend_max = b; + } + let bm = b.as_micros().min(u64::MAX as u128) as u64; + let _ = self.backend_histogram.record(bm.clamp(1, MAX_LATENCY_US)); + } } /// Resets all counters and returns a fresh default instance. @@ -95,6 +124,14 @@ pub struct Summary { pub p50: Duration, pub p90: Duration, pub p99: Duration, + /// Server-reported processing latency (`x-ms-request-duration-ms`). + /// `None` when the interval contained zero samples carrying the header. + pub backend_min: Option, + pub backend_max: Option, + pub backend_mean: Option, + pub backend_p50: Option, + pub backend_p90: Option, + pub backend_p99: Option, } impl Stats { @@ -110,10 +147,11 @@ impl Stats { /// Records a successful operation latency. /// /// Only locks the mutex for this specific operation — no cross-operation - /// contention. - pub fn record_latency(&self, operation: &str, latency: Duration) { + /// contention. Pass `backend = Some(d)` when the underlying response + /// included an `x-ms-request-duration-ms` header. + pub fn record_latency(&self, operation: &str, latency: Duration, backend: Option) { if let Some(m) = self.shards.get(operation) { - m.lock().unwrap().record(latency); + m.lock().unwrap().record(latency, backend); } } @@ -159,6 +197,12 @@ fn compute_summary(name: String, stats: OperationStats) -> Summary { p50: Duration::ZERO, p90: Duration::ZERO, p99: Duration::ZERO, + backend_min: None, + backend_max: None, + backend_mean: None, + backend_p50: None, + backend_p90: None, + backend_p99: None, }; } @@ -167,6 +211,26 @@ fn compute_summary(name: String, stats: OperationStats) -> Summary { let p90 = Duration::from_micros(stats.histogram.value_at_quantile(0.90)); let p99 = Duration::from_micros(stats.histogram.value_at_quantile(0.99)); + let (backend_min, backend_max, backend_mean, backend_p50, backend_p90, backend_p99) = + if stats.backend_count > 0 { + let backend_mean_dur = Duration::from_secs_f64( + stats.backend_sum.as_secs_f64() / stats.backend_count as f64, + ); + let bp50 = Duration::from_micros(stats.backend_histogram.value_at_quantile(0.50)); + let bp90 = Duration::from_micros(stats.backend_histogram.value_at_quantile(0.90)); + let bp99 = Duration::from_micros(stats.backend_histogram.value_at_quantile(0.99)); + ( + Some(stats.backend_min), + Some(stats.backend_max), + Some(backend_mean_dur), + Some(bp50), + Some(bp90), + Some(bp99), + ) + } else { + (None, None, None, None, None, None) + }; + Summary { name, count: stats.count, @@ -177,6 +241,12 @@ fn compute_summary(name: String, stats: OperationStats) -> Summary { p50, p90, p99, + backend_min, + backend_max, + backend_mean, + backend_p50, + backend_p90, + backend_p99, } } @@ -188,14 +258,18 @@ pub fn print_report(summaries: &[Summary]) { } println!( - " {:<15} {:>8} {:>8} {:>10} {:>10} {:>10} {:>10} {:>10} {:>10}", - "Operation", "Count", "Errors", "Min", "Max", "Mean", "P50", "P90", "P99" + " {:<15} {:>8} {:>8} {:>10} {:>10} {:>10} {:>10} {:>10} {:>10} {:>10}", + "Operation", "Count", "Errors", "Min", "Max", "Mean", "P50", "P90", "P99", "BackendP99" ); - println!(" {}", "-".repeat(103)); + println!(" {}", "-".repeat(114)); for s in summaries { + let backend_p99 = s + .backend_p99 + .map(format_duration) + .unwrap_or_else(|| "—".to_string()); println!( - " {:<15} {:>8} {:>8} {:>10} {:>10} {:>10} {:>10} {:>10} {:>10}", + " {:<15} {:>8} {:>8} {:>10} {:>10} {:>10} {:>10} {:>10} {:>10} {:>10}", s.name, s.count, s.errors, @@ -205,6 +279,7 @@ pub fn print_report(summaries: &[Summary]) { format_duration(s.p50), format_duration(s.p90), format_duration(s.p99), + backend_p99, ); } } @@ -230,6 +305,9 @@ pub struct ProcessMetrics { pub system_total_memory_bytes: u64, /// Used physical memory in bytes. pub system_used_memory_bytes: u64, + /// Cgroup CPU usage as a percentage of the cgroup's CPU quota. + /// This matches what `kubectl top` reports. Only available on cgroupv2. + pub cgroup_cpu_percent: Option, } /// Captures process-level and system-level CPU and memory metrics. @@ -249,9 +327,66 @@ pub fn refresh_process_metrics(sys: &mut System) -> Option { system_cpu_percent: sys.global_cpu_usage(), system_total_memory_bytes: sys.total_memory(), system_used_memory_bytes: sys.used_memory(), + cgroup_cpu_percent: read_cgroup_cpu_percent(), }) } +/// Previous cgroup usage snapshot for delta computation. +static PREV_CGROUP_USAGE: std::sync::Mutex> = + std::sync::Mutex::new(None); + +/// Reads cgroupv2 CPU usage and computes utilization as a percentage of the +/// cgroup's CPU quota. This matches what `kubectl top pods` reports. +fn read_cgroup_cpu_percent() -> Option { + use std::fs; + use std::time::Instant; + + // Read current usage_usec from cgroup + let stat = fs::read_to_string("/sys/fs/cgroup/cpu.stat").ok()?; + let usage_usec: u64 = stat + .lines() + .find(|l| l.starts_with("usage_usec")) + .and_then(|l| l.split_whitespace().nth(1)) + .and_then(|v| v.parse().ok())?; + + // Read quota: "max 100000" means unlimited, "400000 100000" means 4 cores + let max_content = fs::read_to_string("/sys/fs/cgroup/cpu.max").ok()?; + let mut parts = max_content.split_whitespace(); + let quota_str = parts.next()?; + if quota_str == "max" { + return None; // unlimited, can't compute percentage + } + let quota_usec: u64 = quota_str.parse().ok()?; + let period_usec: u64 = parts.next().and_then(|v| v.parse().ok()).unwrap_or(100_000); + + if period_usec == 0 { + return None; + } + // cores_allocated = quota / period (e.g., 400000/100000 = 4.0 cores) + let cores = quota_usec as f64 / period_usec as f64; + if cores <= 0.0 { + return None; + } + + let now = Instant::now(); + let mut prev = PREV_CGROUP_USAGE.lock().ok()?; + let result = if let Some((prev_usage, prev_time)) = *prev { + let delta_usec = usage_usec.saturating_sub(prev_usage); + let delta_time = now.duration_since(prev_time); + let wall_usec = delta_time.as_micros() as f64; + if wall_usec > 0.0 { + // (cpu_usec_used / wall_usec) / cores * 100 + Some((delta_usec as f64 / wall_usec / cores * 100.0) as f32) + } else { + None + } + } else { + None // first call, no delta available yet + }; + *prev = Some((usage_usec, now)); + result +} + fn format_bytes(bytes: u64) -> String { const KB: u64 = 1024; const MB: u64 = KB * 1024; @@ -280,6 +415,9 @@ pub fn print_process_metrics(metrics: &ProcessMetrics) { format_bytes(metrics.system_used_memory_bytes), format_bytes(metrics.system_total_memory_bytes), ); + if let Some(cgroup) = metrics.cgroup_cpu_percent { + println!(" Cgroup: CPU {:.1}% (kubectl-equivalent)", cgroup); + } } #[cfg(test)] @@ -290,9 +428,9 @@ mod tests { #[test] fn record_and_drain() { let stats = Stats::new(&["read"]); - stats.record_latency("read", Duration::from_millis(10)); - stats.record_latency("read", Duration::from_millis(20)); - stats.record_latency("read", Duration::from_millis(30)); + stats.record_latency("read", Duration::from_millis(10), None); + stats.record_latency("read", Duration::from_millis(20), None); + stats.record_latency("read", Duration::from_millis(30), None); stats.record_error("read"); let summaries = stats.drain_summaries(); @@ -301,6 +439,7 @@ mod tests { assert_eq!(summaries[0].errors, 1); assert_eq!(summaries[0].min, Duration::from_millis(10)); assert_eq!(summaries[0].max, Duration::from_millis(30)); + assert!(summaries[0].backend_p99.is_none()); // After drain, should be empty let summaries = stats.drain_summaries(); @@ -324,7 +463,7 @@ mod tests { let stats = Stats::new(&["write"]); // Record 1ms through 100ms — p50 ≈ 50ms, p99 ≈ 99ms for i in 1..=100u64 { - stats.record_latency("write", Duration::from_millis(i)); + stats.record_latency("write", Duration::from_millis(i), None); } let summaries = stats.drain_summaries(); assert_eq!(summaries.len(), 1); @@ -345,7 +484,7 @@ mod tests { fn high_volume_recording() { let stats = Stats::new(&["write"]); for i in 0..20_000u64 { - stats.record_latency("write", Duration::from_micros(i)); + stats.record_latency("write", Duration::from_micros(i), None); } let summaries = stats.drain_summaries(); assert_eq!(summaries.len(), 1); @@ -357,9 +496,68 @@ mod tests { #[test] fn unknown_operation_ignored() { let stats = Stats::new(&["read"]); - stats.record_latency("unknown", Duration::from_millis(10)); + stats.record_latency("unknown", Duration::from_millis(10), None); stats.record_error("unknown"); let summaries = stats.drain_summaries(); assert!(summaries.is_empty()); } + + #[test] + fn backend_durations_aggregate_separately_from_client() { + let stats = Stats::new(&["read"]); + // Mix of samples: some carry a backend duration, some don't. + // Client latencies span 10..=30ms, backend latencies span 5..=15ms + // for the subset that have one — verifies the two histograms are + // independent. + stats.record_latency( + "read", + Duration::from_millis(10), + Some(Duration::from_millis(5)), + ); + stats.record_latency( + "read", + Duration::from_millis(20), + Some(Duration::from_millis(10)), + ); + stats.record_latency( + "read", + Duration::from_millis(30), + Some(Duration::from_millis(15)), + ); + stats.record_latency("read", Duration::from_millis(25), None); + + let summaries = stats.drain_summaries(); + assert_eq!(summaries.len(), 1); + let s = &summaries[0]; + + assert_eq!(s.count, 4, "all 4 client samples count"); + assert_eq!(s.min, Duration::from_millis(10)); + assert_eq!(s.max, Duration::from_millis(30)); + + let back_min = s.backend_min.expect("3 backend samples were recorded"); + let back_max = s.backend_max.expect("3 backend samples were recorded"); + let bp99 = s.backend_p99.expect("3 backend samples were recorded"); + assert_eq!(back_min, Duration::from_millis(5)); + assert_eq!(back_max, Duration::from_millis(15)); + // p99 of {5,10,15} is the max (15ms), within hdrhistogram tolerance. + let bp99_ms = bp99.as_millis(); + assert!((14..=16).contains(&bp99_ms), "backend p99 was {bp99_ms}ms"); + } + + #[test] + fn backend_summary_is_none_when_no_samples() { + let stats = Stats::new(&["read"]); + for i in 1..=10u64 { + stats.record_latency("read", Duration::from_millis(i), None); + } + let summaries = stats.drain_summaries(); + assert_eq!(summaries.len(), 1); + let s = &summaries[0]; + assert!(s.backend_min.is_none()); + assert!(s.backend_max.is_none()); + assert!(s.backend_mean.is_none()); + assert!(s.backend_p50.is_none()); + assert!(s.backend_p90.is_none()); + assert!(s.backend_p99.is_none()); + } }