Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions nexus/types/versions/src/initial/oxql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ pub struct OxqlQuerySummary {
pub elapsed_ms: usize,
/// Summary of the data read and written.
pub io_summary: oxql_types::IoSummary,
/// Aggregated ClickHouse profile events.
pub profile_summary: std::collections::BTreeMap<String, i64>,
}

impl From<oxql_types::QuerySummary> for OxqlQuerySummary {
Expand All @@ -78,6 +80,7 @@ impl From<oxql_types::QuerySummary> for OxqlQuerySummary {
query: query_summary.query,
elapsed_ms: query_summary.elapsed.as_millis() as usize,
io_summary: query_summary.io_summary,
profile_summary: query_summary.profile_summary,
}
}
}
Expand Down
86 changes: 74 additions & 12 deletions oximeter/db/benches/oxql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,29 @@ use rand::seq::SliceRandom;
use std::net::IpAddr;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use uuid::Uuid;

const DEFAULT_CLICKHOUSE_PORT: u16 = 9000;

/// The metric to benchmark.
///
/// Set via BENCH_METRIC env var.
enum BenchMetric {
/// Wall clock latency.
Latency,
/// Total cpu time (user and system).
CpuTime,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make a note that this is user + system time as reported by the DB.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

fn bench_metric() -> BenchMetric {
match std::env::var("BENCH_METRIC").as_deref() {
Ok("cpu_time") => BenchMetric::CpuTime,
Ok("latency") => BenchMetric::Latency,
_ => panic!("BENCH_METRIC must be 'cpu_time' or 'latency'"),
}
}

/// Timeseries to benchmark, spanning a range of field table counts.
const TIMESERIES_NAMES: &[&str] = &[
"crucible_upstairs:flush",
Expand Down Expand Up @@ -125,6 +144,8 @@ fn get_timeseries_info(rt: &tokio::runtime::Runtime) -> Vec<TimeseriesInfo> {
// field lookup, and ignore measurements. Note that the user is responsible for
// populating ClickHouse with test data.
fn oxql_field_lookup(c: &mut Criterion) {
let metric = bench_metric();

let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
Expand All @@ -136,8 +157,11 @@ fn oxql_field_lookup(c: &mut Criterion) {
let mut timeseries_info = get_timeseries_info(&rt);
timeseries_info.shuffle(&mut rand::rng());

let max_cardinality =
timeseries_info.iter().map(|i| i.cardinality).max().unwrap_or(0);
let max_cardinality = timeseries_info
.iter()
.map(|ti| ti.cardinality)
.max()
.expect("No timeseries found");
let cardinality_width = max_cardinality.to_string().len();

for info in &timeseries_info {
Expand All @@ -158,16 +182,54 @@ fn oxql_field_lookup(c: &mut Criterion) {

group.bench_function(
BenchmarkId::new("field_lookup", &bench_id),
|bench| {
let client = client.clone();
let query = query.clone();
bench.to_async(&rt).iter(|| {
let client = client.clone();
let query = query.clone();
async move {
client.oxql_query(&query, QueryAuthzScope::Fleet).await
}
})
|bench| match metric {
BenchMetric::CpuTime => {
bench.to_async(&rt).iter_custom(|iters| {
let client = client.clone();
let query = query.clone();
async move {
let mut total = Duration::ZERO;
for _ in 0..iters {
let result = client
.oxql_query(&query, QueryAuthzScope::Fleet)
.await
.unwrap();
let cpu_us: i64 = result
.query_summaries
.iter()
.map(|s| {
// Profile events are occasionally and
// inexplicably empty; default to 0
// for rare missing events.
s.profile_summary
.get("UserTimeMicroseconds")
.copied()
.unwrap_or(0)
+ s.profile_summary
.get("SystemTimeMicroseconds")
.copied()
.unwrap_or(0)
})
.sum();
total +=
Duration::from_micros(cpu_us.max(0) as u64);
}
total
}
});
}
BenchMetric::Latency => {
bench.to_async(&rt).iter(|| {
let client = client.clone();
let query = query.clone();
async move {
client
.oxql_query(&query, QueryAuthzScope::Fleet)
.await
.unwrap()
}
});
}
},
);
}
Expand Down
39 changes: 39 additions & 0 deletions oximeter/db/src/native/packets/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,47 @@ impl QueryResult {
query: self.query.clone(),
elapsed: self.progress.query_time,
io_summary: self.progress.into(),
profile_summary: self.profile_summary(),
}
}

/// Aggregate ClickHouse profile events by name.
///
/// By default, ClickHouse includes a block of ProfileEvents with each
/// query when using the native protocol:
///
/// ```text
/// host_name current_time thread_id type name value
/// ────────────────────────────── ──────────────────── ────────── ───────── ──────────────────────────────── ──────────
/// oxz_clickhouse_aa646c82.local 2026-03-25 21:00:00 0 increment ReadCompressedBytes 124134764
/// oxz_clickhouse_aa646c82.local 2026-03-25 21:00:00 0 increment DiskReadElapsedMicroseconds 211350
/// oxz_clickhouse_aa646c82.local 2026-03-25 21:00:00 0 increment SelectedRows 16941579
/// oxz_clickhouse_aa646c82.local 2026-03-25 21:00:00 0 increment UserTimeMicroseconds 339959
/// oxz_clickhouse_aa646c82.local 2026-03-25 21:00:00 0 increment SystemTimeMicroseconds 2169535
/// oxz_clickhouse_aa646c82.local 2026-03-25 21:00:00 0 gauge MemoryTrackerUsage 40823063
/// oxz_clickhouse_aa646c82.local 2026-03-25 21:00:00 0 gauge MemoryTrackerPeakUsage 40823063
/// ...
/// ```
///
/// If profile events are available, sum their values by event name to
/// attach to the query summary.
fn profile_summary(&self) -> BTreeMap<String, i64> {
let mut summary = BTreeMap::new();
if let Some(block) = self.profile_events.as_ref() {
let names = block
.column_values("name")
.ok()
.and_then(|v| v.as_string().ok());
let values =
block.column_values("value").ok().and_then(|v| v.as_i64().ok());
if let (Some(names), Some(values)) = (names, values) {
for (name, &value) in names.iter().zip(values.iter()) {
*summary.entry(name.clone()).or_insert(0) += value;
}
}
}
summary
}
}

/// The stage through which we run a query.
Expand Down
17 changes: 17 additions & 0 deletions oximeter/db/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,23 @@ async fn test_cluster() -> anyhow::Result<()> {
// Ensure the samples are correct on this replica
assert_input_and_output(&input, &samples, &oxql_res1);

// Assert that at least one query summary and at least one known profile
// type are present.
assert!(
!oxql_res1.query_summaries.is_empty(),
"expected at least one query summary"
);
let total_user_cpu: i64 = oxql_res1
.query_summaries
.iter()
.filter_map(|s| s.profile_summary.get("UserTimeMicroseconds"))
.sum();
assert!(
total_user_cpu > 0,
"expected non-zero UserTimeMicroseconds in profile summaries, got {}",
total_user_cpu,
);

let start = tokio::time::Instant::now();
wait_for_num_points(&log, &client2, samples.len())
.await
Expand Down
3 changes: 3 additions & 0 deletions oximeter/oxql-types/src/query_summary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::time::Duration;
use uuid::Uuid;

Expand Down Expand Up @@ -46,4 +47,6 @@ pub struct QuerySummary {
pub elapsed: Duration,
/// Summary of the data read and written.
pub io_summary: IoSummary,
/// Aggregated profile events from the query.
pub profile_summary: BTreeMap<String, i64>,
}
Loading