diff --git a/nexus/types/versions/src/initial/oxql.rs b/nexus/types/versions/src/initial/oxql.rs index 44bc2ecfba3..efd74278b0c 100644 --- a/nexus/types/versions/src/initial/oxql.rs +++ b/nexus/types/versions/src/initial/oxql.rs @@ -69,6 +69,8 @@ pub struct OxqlQuerySummary { pub elapsed_ms: usize, /// Summary of the data read and written. pub io_summary: oxql_types::IoSummary, + /// Aggregated ClickHouse profile events. + pub profile_summary: std::collections::BTreeMap, } impl From for OxqlQuerySummary { @@ -78,6 +80,7 @@ impl From for OxqlQuerySummary { query: query_summary.query, elapsed_ms: query_summary.elapsed.as_millis() as usize, io_summary: query_summary.io_summary, + profile_summary: query_summary.profile_summary, } } } diff --git a/oximeter/db/benches/oxql.rs b/oximeter/db/benches/oxql.rs index 69f069d632a..faba4864247 100644 --- a/oximeter/db/benches/oxql.rs +++ b/oximeter/db/benches/oxql.rs @@ -18,10 +18,29 @@ use rand::seq::SliceRandom; use std::net::IpAddr; use std::net::SocketAddr; use std::sync::Arc; +use std::time::Duration; use uuid::Uuid; const DEFAULT_CLICKHOUSE_PORT: u16 = 9000; +/// The metric to benchmark. +/// +/// Set via BENCH_METRIC env var. +enum BenchMetric { + /// Wall clock latency. + Latency, + /// Total cpu time (user and system). + CpuTime, +} + +fn bench_metric() -> BenchMetric { + match std::env::var("BENCH_METRIC").as_deref() { + Ok("cpu_time") => BenchMetric::CpuTime, + Ok("latency") => BenchMetric::Latency, + _ => panic!("BENCH_METRIC must be 'cpu_time' or 'latency'"), + } +} + /// Timeseries to benchmark, spanning a range of field table counts. const TIMESERIES_NAMES: &[&str] = &[ "crucible_upstairs:flush", @@ -125,6 +144,8 @@ fn get_timeseries_info(rt: &tokio::runtime::Runtime) -> Vec { // field lookup, and ignore measurements. Note that the user is responsible for // populating ClickHouse with test data. fn oxql_field_lookup(c: &mut Criterion) { + let metric = bench_metric(); + let rt = tokio::runtime::Builder::new_multi_thread() .enable_all() .build() @@ -136,8 +157,11 @@ fn oxql_field_lookup(c: &mut Criterion) { let mut timeseries_info = get_timeseries_info(&rt); timeseries_info.shuffle(&mut rand::rng()); - let max_cardinality = - timeseries_info.iter().map(|i| i.cardinality).max().unwrap_or(0); + let max_cardinality = timeseries_info + .iter() + .map(|ti| ti.cardinality) + .max() + .expect("No timeseries found"); let cardinality_width = max_cardinality.to_string().len(); for info in ×eries_info { @@ -158,16 +182,54 @@ fn oxql_field_lookup(c: &mut Criterion) { group.bench_function( BenchmarkId::new("field_lookup", &bench_id), - |bench| { - let client = client.clone(); - let query = query.clone(); - bench.to_async(&rt).iter(|| { - let client = client.clone(); - let query = query.clone(); - async move { - client.oxql_query(&query, QueryAuthzScope::Fleet).await - } - }) + |bench| match metric { + BenchMetric::CpuTime => { + bench.to_async(&rt).iter_custom(|iters| { + let client = client.clone(); + let query = query.clone(); + async move { + let mut total = Duration::ZERO; + for _ in 0..iters { + let result = client + .oxql_query(&query, QueryAuthzScope::Fleet) + .await + .unwrap(); + let cpu_us: i64 = result + .query_summaries + .iter() + .map(|s| { + // Profile events are occasionally and + // inexplicably empty; default to 0 + // for rare missing events. + s.profile_summary + .get("UserTimeMicroseconds") + .copied() + .unwrap_or(0) + + s.profile_summary + .get("SystemTimeMicroseconds") + .copied() + .unwrap_or(0) + }) + .sum(); + total += + Duration::from_micros(cpu_us.max(0) as u64); + } + total + } + }); + } + BenchMetric::Latency => { + bench.to_async(&rt).iter(|| { + let client = client.clone(); + let query = query.clone(); + async move { + client + .oxql_query(&query, QueryAuthzScope::Fleet) + .await + .unwrap() + } + }); + } }, ); } diff --git a/oximeter/db/src/native/packets/client.rs b/oximeter/db/src/native/packets/client.rs index cadd3ed2653..5992f35b5e5 100644 --- a/oximeter/db/src/native/packets/client.rs +++ b/oximeter/db/src/native/packets/client.rs @@ -154,8 +154,47 @@ impl QueryResult { query: self.query.clone(), elapsed: self.progress.query_time, io_summary: self.progress.into(), + profile_summary: self.profile_summary(), } } + + /// Aggregate ClickHouse profile events by name. + /// + /// By default, ClickHouse includes a block of ProfileEvents with each + /// query when using the native protocol: + /// + /// ```text + /// host_name current_time thread_id type name value + /// ────────────────────────────── ──────────────────── ────────── ───────── ──────────────────────────────── ────────── + /// oxz_clickhouse_aa646c82.local 2026-03-25 21:00:00 0 increment ReadCompressedBytes 124134764 + /// oxz_clickhouse_aa646c82.local 2026-03-25 21:00:00 0 increment DiskReadElapsedMicroseconds 211350 + /// oxz_clickhouse_aa646c82.local 2026-03-25 21:00:00 0 increment SelectedRows 16941579 + /// oxz_clickhouse_aa646c82.local 2026-03-25 21:00:00 0 increment UserTimeMicroseconds 339959 + /// oxz_clickhouse_aa646c82.local 2026-03-25 21:00:00 0 increment SystemTimeMicroseconds 2169535 + /// oxz_clickhouse_aa646c82.local 2026-03-25 21:00:00 0 gauge MemoryTrackerUsage 40823063 + /// oxz_clickhouse_aa646c82.local 2026-03-25 21:00:00 0 gauge MemoryTrackerPeakUsage 40823063 + /// ... + /// ``` + /// + /// If profile events are available, sum their values by event name to + /// attach to the query summary. + fn profile_summary(&self) -> BTreeMap { + let mut summary = BTreeMap::new(); + if let Some(block) = self.profile_events.as_ref() { + let names = block + .column_values("name") + .ok() + .and_then(|v| v.as_string().ok()); + let values = + block.column_values("value").ok().and_then(|v| v.as_i64().ok()); + if let (Some(names), Some(values)) = (names, values) { + for (name, &value) in names.iter().zip(values.iter()) { + *summary.entry(name.clone()).or_insert(0) += value; + } + } + } + summary + } } /// The stage through which we run a query. diff --git a/oximeter/db/tests/integration_test.rs b/oximeter/db/tests/integration_test.rs index f2eaeb830a9..8dae812af6f 100644 --- a/oximeter/db/tests/integration_test.rs +++ b/oximeter/db/tests/integration_test.rs @@ -263,6 +263,23 @@ async fn test_cluster() -> anyhow::Result<()> { // Ensure the samples are correct on this replica assert_input_and_output(&input, &samples, &oxql_res1); + // Assert that at least one query summary and at least one known profile + // type are present. + assert!( + !oxql_res1.query_summaries.is_empty(), + "expected at least one query summary" + ); + let total_user_cpu: i64 = oxql_res1 + .query_summaries + .iter() + .filter_map(|s| s.profile_summary.get("UserTimeMicroseconds")) + .sum(); + assert!( + total_user_cpu > 0, + "expected non-zero UserTimeMicroseconds in profile summaries, got {}", + total_user_cpu, + ); + let start = tokio::time::Instant::now(); wait_for_num_points(&log, &client2, samples.len()) .await diff --git a/oximeter/oxql-types/src/query_summary.rs b/oximeter/oxql-types/src/query_summary.rs index a8d3a33694d..3debc7e7b29 100644 --- a/oximeter/oxql-types/src/query_summary.rs +++ b/oximeter/oxql-types/src/query_summary.rs @@ -8,6 +8,7 @@ use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; use std::time::Duration; use uuid::Uuid; @@ -46,4 +47,6 @@ pub struct QuerySummary { pub elapsed: Duration, /// Summary of the data read and written. pub io_summary: IoSummary, + /// Aggregated profile events from the query. + pub profile_summary: BTreeMap, }