From a567b4b28170d6ee77103e2aa4a67ea29c730194 Mon Sep 17 00:00:00 2001 From: "jayant.shrivastava" Date: Thu, 18 Jun 2026 15:24:59 +0000 Subject: [PATCH 1/3] metrics: update plan_bytes_sent to be a bytes metric Instead of printing byte values like `10 GB`, this metric would print `10 B` where `B` is billion. This change fixes that by updating the metric from a counter to a bytes counter. --- docs/source/user-guide/metrics.md | 2 +- src/coordinator/query_coordinator.rs | 42 ++++++++++++++++++++++------ 2 files changed, 34 insertions(+), 10 deletions(-) diff --git a/docs/source/user-guide/metrics.md b/docs/source/user-guide/metrics.md index d5f292d8..be28fb41 100644 --- a/docs/source/user-guide/metrics.md +++ b/docs/source/user-guide/metrics.md @@ -66,7 +66,7 @@ This produces an EXPLAIN ANALYZE that spans the whole cluster — every stage an runtime metrics, including network-level metrics on the boundaries: ``` -┌───── DistributedExec ── Tasks: t0:[p0] plan_bytes_sent_0=8.07 K, plan_send_latency_avg_0=22.63ms, ... +┌───── DistributedExec ── Tasks: t0:[p0] plan_bytes_sent_0=8.07 KB, plan_send_latency_avg_0=22.63ms, ... │ SortPreservingMergeExec: [count(*)@0 DESC], fetch=5, metrics=[output_rows=5, elapsed_compute=391.83µs, ...] │ [Stage 2] => NetworkCoalesceExec: output_partitions=32, input_tasks=2, metrics=[elapsed_compute=5.86ms, bytes_transferred=20.1 KB, network_latency_p50=366.00µs, network_latency_p95=603.43µs, ...] └────────────────────────────────────────────────── diff --git a/src/coordinator/query_coordinator.rs b/src/coordinator/query_coordinator.rs index caa81c15..7bd7b652 100644 --- a/src/coordinator/query_coordinator.rs +++ b/src/coordinator/query_coordinator.rs @@ -11,9 +11,9 @@ use crate::worker::generated::worker as pb; use crate::worker::generated::worker::coordinator_to_worker_msg::Inner; use crate::worker::generated::worker::set_plan_request::WorkUnitFeedDeclaration; use crate::{ - DISTRIBUTED_DATAFUSION_TASK_ID_LABEL, DistributedCodec, DistributedConfig, - DistributedTaskContext, DistributedWorkUnitFeedContext, TaskEstimator, TaskKey, - TaskRoutingContext, get_distributed_channel_resolver, get_distributed_worker_resolver, + BytesCounterMetric, BytesMetricExt, DISTRIBUTED_DATAFUSION_TASK_ID_LABEL, DistributedCodec, + DistributedConfig, DistributedTaskContext, DistributedWorkUnitFeedContext, TaskEstimator, + TaskKey, TaskRoutingContext, get_distributed_channel_resolver, get_distributed_worker_resolver, }; use datafusion::common::instant::Instant; use datafusion::common::runtime::JoinSet; @@ -21,9 +21,7 @@ use datafusion::common::tree_node::{Transformed, TreeNodeRecursion}; use datafusion::common::{DataFusionError, exec_datafusion_err}; use datafusion::common::{Result, exec_err}; use datafusion::execution::TaskContext; -use datafusion::physical_expr_common::metrics::{ - Count, ExecutionPlanMetricsSet, Label, MetricBuilder, -}; +use datafusion::physical_expr_common::metrics::{ExecutionPlanMetricsSet, Label, MetricBuilder}; use datafusion::physical_plan::ExecutionPlan; use datafusion_proto::physical_plan::AsExecutionPlan; use datafusion_proto::protobuf::PhysicalPlanNode; @@ -198,7 +196,7 @@ impl<'a> StageCoordinator<'a> { }) })?; metrics.plan_send_latency.record(&start); - metrics.plan_bytes_sent.add(plan_size); + metrics.plan_bytes_sent.add_bytes(plan_size); let mut worker_to_coordinator_stream = response.into_inner(); while let Some(msg_or_err) = worker_to_coordinator_stream.next().await { let msg = msg_or_err.map_err(|err| { @@ -420,7 +418,7 @@ impl Drop for NotifyGuard { /// Metrics that measure network details about communications between [DistributedExec] and a worker. #[derive(Clone)] pub(super) struct CoordinatorToWorkerMetrics { - pub(super) plan_bytes_sent: Count, + pub(super) plan_bytes_sent: BytesCounterMetric, pub(super) plan_send_latency: Arc, pub(super) instantiation_time: u64, } @@ -431,7 +429,7 @@ impl CoordinatorToWorkerMetrics { // Metric that measures to total sum of bytes worth of subplans sent. plan_bytes_sent: MetricBuilder::new(metrics) .with_label(Label::new(DISTRIBUTED_DATAFUSION_TASK_ID_LABEL, "0")) - .global_counter("plan_bytes_sent"), + .bytes_counter("plan_bytes_sent"), // Latency statistics about the network calls issued to the workers for feeding subplans. plan_send_latency: Arc::new(LatencyMetric::new( "plan_send_latency", @@ -442,3 +440,29 @@ impl CoordinatorToWorkerMetrics { } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn coordinator_to_worker_plan_bytes_sent_prints_as_bytes() { + let metrics = ExecutionPlanMetricsSet::new(); + let coordinator_metrics = CoordinatorToWorkerMetrics::new(&metrics); + + coordinator_metrics + .plan_bytes_sent + .add_bytes(4 * 1024 * 1024 * 1024); + + let metrics_set = metrics.clone_inner(); + let metric = metrics_set + .iter() + .find(|m| m.value().name() == "plan_bytes_sent") + .expect("plan_bytes_sent metric should be registered"); + + assert_eq!( + format!("{}={}", metric.value().name(), metric.value()), + "plan_bytes_sent=4.0 GB" + ); + } +} From 40eba83e315feb93a64511695fde51f26837f41c Mon Sep 17 00:00:00 2001 From: "jayant.shrivastava" Date: Thu, 18 Jun 2026 15:33:45 +0000 Subject: [PATCH 2/3] nuke test --- src/coordinator/query_coordinator.rs | 25 ------------------------- 1 file changed, 25 deletions(-) diff --git a/src/coordinator/query_coordinator.rs b/src/coordinator/query_coordinator.rs index 7bd7b652..e5097222 100644 --- a/src/coordinator/query_coordinator.rs +++ b/src/coordinator/query_coordinator.rs @@ -441,28 +441,3 @@ impl CoordinatorToWorkerMetrics { } } -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn coordinator_to_worker_plan_bytes_sent_prints_as_bytes() { - let metrics = ExecutionPlanMetricsSet::new(); - let coordinator_metrics = CoordinatorToWorkerMetrics::new(&metrics); - - coordinator_metrics - .plan_bytes_sent - .add_bytes(4 * 1024 * 1024 * 1024); - - let metrics_set = metrics.clone_inner(); - let metric = metrics_set - .iter() - .find(|m| m.value().name() == "plan_bytes_sent") - .expect("plan_bytes_sent metric should be registered"); - - assert_eq!( - format!("{}={}", metric.value().name(), metric.value()), - "plan_bytes_sent=4.0 GB" - ); - } -} From 8822f6df7b658cc96e249f4707c2551a2c3ec474 Mon Sep 17 00:00:00 2001 From: "jayant.shrivastava" Date: Thu, 18 Jun 2026 15:38:28 +0000 Subject: [PATCH 3/3] newline --- src/coordinator/query_coordinator.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/coordinator/query_coordinator.rs b/src/coordinator/query_coordinator.rs index e5097222..3d19c750 100644 --- a/src/coordinator/query_coordinator.rs +++ b/src/coordinator/query_coordinator.rs @@ -440,4 +440,3 @@ impl CoordinatorToWorkerMetrics { } } } -