diff --git a/docs/source/user-guide/metrics.md b/docs/source/user-guide/metrics.md index d5f292d8..be28fb41 100644 --- a/docs/source/user-guide/metrics.md +++ b/docs/source/user-guide/metrics.md @@ -66,7 +66,7 @@ This produces an EXPLAIN ANALYZE that spans the whole cluster — every stage an runtime metrics, including network-level metrics on the boundaries: ``` -┌───── DistributedExec ── Tasks: t0:[p0] plan_bytes_sent_0=8.07 K, plan_send_latency_avg_0=22.63ms, ... +┌───── DistributedExec ── Tasks: t0:[p0] plan_bytes_sent_0=8.07 KB, plan_send_latency_avg_0=22.63ms, ... │ SortPreservingMergeExec: [count(*)@0 DESC], fetch=5, metrics=[output_rows=5, elapsed_compute=391.83µs, ...] │ [Stage 2] => NetworkCoalesceExec: output_partitions=32, input_tasks=2, metrics=[elapsed_compute=5.86ms, bytes_transferred=20.1 KB, network_latency_p50=366.00µs, network_latency_p95=603.43µs, ...] └────────────────────────────────────────────────── diff --git a/src/coordinator/query_coordinator.rs b/src/coordinator/query_coordinator.rs index caa81c15..3d19c750 100644 --- a/src/coordinator/query_coordinator.rs +++ b/src/coordinator/query_coordinator.rs @@ -11,9 +11,9 @@ use crate::worker::generated::worker as pb; use crate::worker::generated::worker::coordinator_to_worker_msg::Inner; use crate::worker::generated::worker::set_plan_request::WorkUnitFeedDeclaration; use crate::{ - DISTRIBUTED_DATAFUSION_TASK_ID_LABEL, DistributedCodec, DistributedConfig, - DistributedTaskContext, DistributedWorkUnitFeedContext, TaskEstimator, TaskKey, - TaskRoutingContext, get_distributed_channel_resolver, get_distributed_worker_resolver, + BytesCounterMetric, BytesMetricExt, DISTRIBUTED_DATAFUSION_TASK_ID_LABEL, DistributedCodec, + DistributedConfig, DistributedTaskContext, DistributedWorkUnitFeedContext, TaskEstimator, + TaskKey, TaskRoutingContext, get_distributed_channel_resolver, get_distributed_worker_resolver, }; use datafusion::common::instant::Instant; use datafusion::common::runtime::JoinSet; @@ -21,9 +21,7 @@ use datafusion::common::tree_node::{Transformed, TreeNodeRecursion}; use datafusion::common::{DataFusionError, exec_datafusion_err}; use datafusion::common::{Result, exec_err}; use datafusion::execution::TaskContext; -use datafusion::physical_expr_common::metrics::{ - Count, ExecutionPlanMetricsSet, Label, MetricBuilder, -}; +use datafusion::physical_expr_common::metrics::{ExecutionPlanMetricsSet, Label, MetricBuilder}; use datafusion::physical_plan::ExecutionPlan; use datafusion_proto::physical_plan::AsExecutionPlan; use datafusion_proto::protobuf::PhysicalPlanNode; @@ -198,7 +196,7 @@ impl<'a> StageCoordinator<'a> { }) })?; metrics.plan_send_latency.record(&start); - metrics.plan_bytes_sent.add(plan_size); + metrics.plan_bytes_sent.add_bytes(plan_size); let mut worker_to_coordinator_stream = response.into_inner(); while let Some(msg_or_err) = worker_to_coordinator_stream.next().await { let msg = msg_or_err.map_err(|err| { @@ -420,7 +418,7 @@ impl Drop for NotifyGuard { /// Metrics that measure network details about communications between [DistributedExec] and a worker. #[derive(Clone)] pub(super) struct CoordinatorToWorkerMetrics { - pub(super) plan_bytes_sent: Count, + pub(super) plan_bytes_sent: BytesCounterMetric, pub(super) plan_send_latency: Arc, pub(super) instantiation_time: u64, } @@ -431,7 +429,7 @@ impl CoordinatorToWorkerMetrics { // Metric that measures to total sum of bytes worth of subplans sent. plan_bytes_sent: MetricBuilder::new(metrics) .with_label(Label::new(DISTRIBUTED_DATAFUSION_TASK_ID_LABEL, "0")) - .global_counter("plan_bytes_sent"), + .bytes_counter("plan_bytes_sent"), // Latency statistics about the network calls issued to the workers for feeding subplans. plan_send_latency: Arc::new(LatencyMetric::new( "plan_send_latency",