@@ -8,7 +8,7 @@ use std::{
88
99use futures:: { FutureExt , StreamExt , TryStreamExt , stream:: FuturesOrdered } ;
1010use futures_util:: stream:: FuturesUnordered ;
11- use metrics:: gauge;
11+ use metrics:: { gauge, histogram } ;
1212use stream_cancel:: { StreamExt as StreamCancelExt , Trigger , Tripwire } ;
1313use tokio:: {
1414 select,
@@ -748,14 +748,15 @@ impl<'a> Builder<'a> {
748748 let sender = self
749749 . utilization_registry
750750 . add_component ( node. key . clone ( ) , gauge ! ( "utilization" ) ) ;
751- let runner = Runner :: new (
751+ let mut runner = Runner :: new (
752752 t,
753753 input_rx,
754754 sender,
755755 node. input_details . data_type ( ) ,
756756 outputs,
757757 LatencyRecorder :: new ( self . config . global . latency_ewma_alpha ) ,
758758 ) ;
759+ runner. component_id = node. key . id ( ) . to_owned ( ) ;
759760 let transform = if node. enable_concurrency {
760761 runner. run_concurrently ( ) . boxed ( )
761762 } else {
@@ -1130,6 +1131,7 @@ struct Runner {
11301131 timer_tx : UtilizationComponentSender ,
11311132 latency_recorder : LatencyRecorder ,
11321133 events_received : Registered < EventsReceived > ,
1134+ component_id : String ,
11331135}
11341136
11351137impl Runner {
@@ -1149,6 +1151,7 @@ impl Runner {
11491151 timer_tx,
11501152 latency_recorder,
11511153 events_received : register ! ( EventsReceived ) ,
1154+ component_id : String :: new ( ) ,
11521155 }
11531156 }
11541157
@@ -1206,13 +1209,15 @@ impl Runner {
12061209
12071210 let mut in_flight = FuturesOrdered :: new ( ) ;
12081211 let mut shutting_down = false ;
1212+ let mut blocked_completions: u64 = 0 ;
12091213
12101214 self . timer_tx . try_send_start_wait ( ) ;
12111215 loop {
12121216 tokio:: select! {
12131217 biased;
12141218
12151219 result = in_flight. next( ) , if !in_flight. is_empty( ) => {
1220+ blocked_completions += 1 ;
12161221 match result {
12171222 Some ( Ok ( mut outputs_buf) ) => {
12181223 self . send_outputs( & mut outputs_buf) . await
@@ -1223,6 +1228,12 @@ impl Runner {
12231228 }
12241229
12251230 input_arrays = input_rx. next( ) , if in_flight. len( ) < * TRANSFORM_CONCURRENCY_LIMIT && !shutting_down => {
1231+ histogram!(
1232+ "transform_concurrent_scheduling_pressure" ,
1233+ "component_id" => self . component_id. clone( )
1234+ )
1235+ . record( ( blocked_completions as f64 / * TRANSFORM_CONCURRENCY_LIMIT as f64 ) . min( 1.0 ) ) ;
1236+ blocked_completions = 0 ;
12261237 match input_arrays {
12271238 Some ( input_arrays) => {
12281239 let mut len = 0 ;
@@ -1249,6 +1260,7 @@ impl Runner {
12491260 }
12501261
12511262 else => {
1263+ blocked_completions = 0 ;
12521264 if shutting_down {
12531265 break
12541266 }
0 commit comments