@@ -8,7 +8,7 @@ use std::{
88
99use futures:: { FutureExt , StreamExt , TryStreamExt , stream:: FuturesOrdered } ;
1010use futures_util:: stream:: FuturesUnordered ;
11- use metrics:: gauge;
11+ use metrics:: { gauge, histogram } ;
1212use stream_cancel:: { StreamExt as StreamCancelExt , Trigger , Tripwire } ;
1313use tokio:: {
1414 select,
@@ -745,14 +745,15 @@ impl<'a> Builder<'a> {
745745 let sender = self
746746 . utilization_registry
747747 . add_component ( node. key . clone ( ) , gauge ! ( "utilization" ) ) ;
748- let runner = Runner :: new (
748+ let mut runner = Runner :: new (
749749 t,
750750 input_rx,
751751 sender,
752752 node. input_details . data_type ( ) ,
753753 outputs,
754754 LatencyRecorder :: new ( self . config . global . latency_ewma_alpha ) ,
755755 ) ;
756+ runner. component_id = node. key . id ( ) . to_owned ( ) ;
756757 let transform = if node. enable_concurrency {
757758 runner. run_concurrently ( ) . boxed ( )
758759 } else {
@@ -1125,6 +1126,7 @@ struct Runner {
11251126 timer_tx : UtilizationComponentSender ,
11261127 latency_recorder : LatencyRecorder ,
11271128 events_received : Registered < EventsReceived > ,
1129+ component_id : String ,
11281130}
11291131
11301132impl Runner {
@@ -1144,6 +1146,7 @@ impl Runner {
11441146 timer_tx,
11451147 latency_recorder,
11461148 events_received : register ! ( EventsReceived ) ,
1149+ component_id : String :: new ( ) ,
11471150 }
11481151 }
11491152
@@ -1201,13 +1204,15 @@ impl Runner {
12011204
12021205 let mut in_flight = FuturesOrdered :: new ( ) ;
12031206 let mut shutting_down = false ;
1207+ let mut blocked_completions: u64 = 0 ;
12041208
12051209 self . timer_tx . try_send_start_wait ( ) ;
12061210 loop {
12071211 tokio:: select! {
12081212 biased;
12091213
12101214 result = in_flight. next( ) , if !in_flight. is_empty( ) => {
1215+ blocked_completions += 1 ;
12111216 match result {
12121217 Some ( Ok ( mut outputs_buf) ) => {
12131218 self . send_outputs( & mut outputs_buf) . await
@@ -1218,6 +1223,12 @@ impl Runner {
12181223 }
12191224
12201225 input_arrays = input_rx. next( ) , if in_flight. len( ) < * TRANSFORM_CONCURRENCY_LIMIT && !shutting_down => {
1226+ histogram!(
1227+ "transform_concurrent_scheduling_pressure" ,
1228+ "component_id" => self . component_id. clone( )
1229+ )
1230+ . record( ( blocked_completions as f64 / * TRANSFORM_CONCURRENCY_LIMIT as f64 ) . min( 1.0 ) ) ;
1231+ blocked_completions = 0 ;
12211232 match input_arrays {
12221233 Some ( input_arrays) => {
12231234 let mut len = 0 ;
@@ -1244,6 +1255,7 @@ impl Runner {
12441255 }
12451256
12461257 else => {
1258+ blocked_completions = 0 ;
12471259 if shutting_down {
12481260 break
12491261 }
0 commit comments