@@ -8,7 +8,7 @@ use std::{
88
99use futures:: { FutureExt , StreamExt , TryStreamExt } ;
1010use futures_util:: stream:: FuturesUnordered ;
11- use metrics:: gauge ;
11+ use metrics:: histogram ;
1212use stream_cancel:: { StreamExt as StreamCancelExt , Trigger , Tripwire } ;
1313use tokio:: {
1414 select,
@@ -750,7 +750,7 @@ impl<'a> Builder<'a> {
750750 . global
751751 . preserve_ordering_stateless_transforms
752752 . unwrap_or ( true ) ;
753- let runner = Runner :: new (
753+ let mut runner = Runner :: new (
754754 t,
755755 input_rx,
756756 sender,
@@ -759,6 +759,7 @@ impl<'a> Builder<'a> {
759759 LatencyRecorder :: new ( self . config . global . latency_ewma_alpha ) ,
760760 preserve_ordering,
761761 ) ;
762+ runner. component_id = node. key . id ( ) . to_owned ( ) ;
762763 let transform = if node. enable_concurrency {
763764 runner. run_concurrently ( ) . boxed ( )
764765 } else {
@@ -1132,6 +1133,7 @@ struct Runner {
11321133 latency_recorder : LatencyRecorder ,
11331134 events_received : Registered < EventsReceived > ,
11341135 preserve_ordering : bool ,
1136+ component_id : String ,
11351137}
11361138
11371139impl Runner {
@@ -1153,6 +1155,7 @@ impl Runner {
11531155 latency_recorder,
11541156 events_received : register ! ( EventsReceived ) ,
11551157 preserve_ordering,
1158+ component_id : String :: new ( ) ,
11561159 }
11571160 }
11581161
@@ -1211,13 +1214,15 @@ impl Runner {
12111214 let mut in_flight =
12121215 super :: in_flight_queue:: InFlightQueue :: new ( self . preserve_ordering ) ;
12131216 let mut shutting_down = false ;
1217+ let mut blocked_completions: u64 = 0 ;
12141218
12151219 self . timer_tx . try_send_start_wait ( ) ;
12161220 loop {
12171221 tokio:: select! {
12181222 biased;
12191223
12201224 result = in_flight. next( ) , if !in_flight. is_empty( ) => {
1225+ blocked_completions += 1 ;
12211226 match result {
12221227 Some ( Ok ( mut outputs_buf) ) => {
12231228 self . send_outputs( & mut outputs_buf) . await
@@ -1228,6 +1233,12 @@ impl Runner {
12281233 }
12291234
12301235 input_arrays = input_rx. next( ) , if in_flight. len( ) < * TRANSFORM_CONCURRENCY_LIMIT && !shutting_down => {
1236+ histogram!(
1237+ "transform_concurrent_scheduling_pressure" ,
1238+ "component_id" => self . component_id. clone( )
1239+ )
1240+ . record( ( blocked_completions as f64 / * TRANSFORM_CONCURRENCY_LIMIT as f64 ) . min( 1.0 ) ) ;
1241+ blocked_completions = 0 ;
12311242 match input_arrays {
12321243 Some ( input_arrays) => {
12331244 let mut len = 0 ;
@@ -1254,6 +1265,7 @@ impl Runner {
12541265 }
12551266
12561267 else => {
1268+ blocked_completions = 0 ;
12571269 if shutting_down {
12581270 break
12591271 }
0 commit comments