@@ -11,19 +11,17 @@ use crate::worker::generated::worker as pb;
1111use crate :: worker:: generated:: worker:: coordinator_to_worker_msg:: Inner ;
1212use crate :: worker:: generated:: worker:: set_plan_request:: WorkUnitFeedDeclaration ;
1313use crate :: {
14- DISTRIBUTED_DATAFUSION_TASK_ID_LABEL , DistributedCodec , DistributedConfig ,
15- DistributedTaskContext , DistributedWorkUnitFeedContext , TaskEstimator , TaskKey ,
16- TaskRoutingContext , get_distributed_channel_resolver, get_distributed_worker_resolver,
14+ BytesCounterMetric , BytesMetricExt , DISTRIBUTED_DATAFUSION_TASK_ID_LABEL , DistributedCodec ,
15+ DistributedConfig , DistributedTaskContext , DistributedWorkUnitFeedContext , TaskEstimator ,
16+ TaskKey , TaskRoutingContext , get_distributed_channel_resolver, get_distributed_worker_resolver,
1717} ;
1818use datafusion:: common:: instant:: Instant ;
1919use datafusion:: common:: runtime:: JoinSet ;
2020use datafusion:: common:: tree_node:: { Transformed , TreeNodeRecursion } ;
2121use datafusion:: common:: { DataFusionError , exec_datafusion_err} ;
2222use datafusion:: common:: { Result , exec_err} ;
2323use datafusion:: execution:: TaskContext ;
24- use datafusion:: physical_expr_common:: metrics:: {
25- Count , ExecutionPlanMetricsSet , Label , MetricBuilder ,
26- } ;
24+ use datafusion:: physical_expr_common:: metrics:: { ExecutionPlanMetricsSet , Label , MetricBuilder } ;
2725use datafusion:: physical_plan:: ExecutionPlan ;
2826use datafusion_proto:: physical_plan:: AsExecutionPlan ;
2927use datafusion_proto:: protobuf:: PhysicalPlanNode ;
@@ -198,7 +196,7 @@ impl<'a> StageCoordinator<'a> {
198196 } )
199197 } ) ?;
200198 metrics. plan_send_latency . record ( & start) ;
201- metrics. plan_bytes_sent . add ( plan_size) ;
199+ metrics. plan_bytes_sent . add_bytes ( plan_size) ;
202200 let mut worker_to_coordinator_stream = response. into_inner ( ) ;
203201 while let Some ( msg_or_err) = worker_to_coordinator_stream. next ( ) . await {
204202 let msg = msg_or_err. map_err ( |err| {
@@ -420,7 +418,7 @@ impl Drop for NotifyGuard {
420418/// Metrics that measure network details about communications between [DistributedExec] and a worker.
421419#[ derive( Clone ) ]
422420pub ( super ) struct CoordinatorToWorkerMetrics {
423- pub ( super ) plan_bytes_sent : Count ,
421+ pub ( super ) plan_bytes_sent : BytesCounterMetric ,
424422 pub ( super ) plan_send_latency : Arc < LatencyMetric > ,
425423 pub ( super ) instantiation_time : u64 ,
426424}
@@ -431,7 +429,7 @@ impl CoordinatorToWorkerMetrics {
431429 // Metric that measures to total sum of bytes worth of subplans sent.
432430 plan_bytes_sent : MetricBuilder :: new ( metrics)
433431 . with_label ( Label :: new ( DISTRIBUTED_DATAFUSION_TASK_ID_LABEL , "0" ) )
434- . global_counter ( "plan_bytes_sent" ) ,
432+ . bytes_counter ( "plan_bytes_sent" ) ,
435433 // Latency statistics about the network calls issued to the workers for feeding subplans.
436434 plan_send_latency : Arc :: new ( LatencyMetric :: new (
437435 "plan_send_latency" ,
0 commit comments