@@ -8,6 +8,9 @@ use std::{
88 } ,
99} ;
1010
11+ #[ cfg( test) ]
12+ use std:: sync:: Mutex ;
13+
1114use async_stream:: stream;
1215use crossbeam_queue:: { ArrayQueue , SegQueue } ;
1316use futures:: Stream ;
8992 }
9093}
9194
95+ #[ derive( Clone , Debug ) ]
96+ pub struct ChannelMetricMetadata {
97+ prefix : & ' static str ,
98+ output : Option < String > ,
99+ }
100+
101+ impl ChannelMetricMetadata {
102+ pub fn new ( prefix : & ' static str , output : Option < String > ) -> Self {
103+ Self { prefix, output }
104+ }
105+ }
106+
92107#[ derive( Clone , Debug ) ]
93108struct Metrics {
94109 histogram : Histogram ,
@@ -98,28 +113,54 @@ struct Metrics {
98113 // field, so we need to suppress the warning here.
99114 #[ expect( dead_code) ]
100115 max_gauge : Gauge ,
116+ #[ cfg( test) ]
117+ recorded_values : Arc < Mutex < Vec < usize > > > ,
101118}
102119
103120impl Metrics {
104121 #[ expect( clippy:: cast_precision_loss) ] // We have to convert buffer sizes for a gauge, it's okay to lose precision here.
105- fn new ( limit : MemoryBufferSize , prefix : & ' static str , output : & str ) -> Self {
122+ fn new ( limit : MemoryBufferSize , metadata : ChannelMetricMetadata ) -> Self {
123+ let ChannelMetricMetadata { prefix, output } = metadata;
106124 let ( gauge_suffix, max_value) = match limit {
107125 MemoryBufferSize :: MaxEvents ( max_events) => ( "_max_event_size" , max_events. get ( ) as f64 ) ,
108126 MemoryBufferSize :: MaxSize ( max_bytes) => ( "_max_byte_size" , max_bytes. get ( ) as f64 ) ,
109127 } ;
110- let max_gauge = gauge ! ( format!( "{prefix}{gauge_suffix}" ) , "output" => output. to_string( ) ) ;
111- max_gauge. set ( max_value) ;
112- Self {
113- histogram : histogram ! ( format!( "{prefix}_utilization" ) , "output" => output. to_string( ) ) ,
114- gauge : gauge ! ( format!( "{prefix}_utilization_level" ) , "output" => output. to_string( ) ) ,
115- max_gauge,
128+ let max_gauge_name = format ! ( "{prefix}{gauge_suffix}" ) ;
129+ let histogram_name = format ! ( "{prefix}_utilization" ) ;
130+ let gauge_name = format ! ( "{prefix}_utilization_level" ) ;
131+ #[ cfg( test) ]
132+ let recorded_values = Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ;
133+ if let Some ( label_value) = output {
134+ let max_gauge = gauge ! ( max_gauge_name, "output" => label_value. clone( ) ) ;
135+ max_gauge. set ( max_value) ;
136+ Self {
137+ histogram : histogram ! ( histogram_name, "output" => label_value. clone( ) ) ,
138+ gauge : gauge ! ( gauge_name, "output" => label_value. clone( ) ) ,
139+ max_gauge,
140+ #[ cfg( test) ]
141+ recorded_values,
142+ }
143+ } else {
144+ let max_gauge = gauge ! ( max_gauge_name) ;
145+ max_gauge. set ( max_value) ;
146+ Self {
147+ histogram : histogram ! ( histogram_name) ,
148+ gauge : gauge ! ( gauge_name) ,
149+ max_gauge,
150+ #[ cfg( test) ]
151+ recorded_values,
152+ }
116153 }
117154 }
118155
119156 #[ expect( clippy:: cast_precision_loss) ]
120157 fn record ( & self , value : usize ) {
121158 self . histogram . record ( value as f64 ) ;
122159 self . gauge . set ( value as f64 ) ;
160+ #[ cfg( test) ]
161+ if let Ok ( mut recorded) = self . recorded_values . lock ( ) {
162+ recorded. push ( value) ;
163+ }
123164 }
124165}
125166
@@ -145,10 +186,9 @@ impl<T> Clone for Inner<T> {
145186}
146187
147188impl < T : InMemoryBufferable > Inner < T > {
148- fn new ( limit : MemoryBufferSize , metric_name_output : Option < ( & ' static str , & str ) > ) -> Self {
189+ fn new ( limit : MemoryBufferSize , metric_metadata : Option < ChannelMetricMetadata > ) -> Self {
149190 let read_waker = Arc :: new ( Notify :: new ( ) ) ;
150- let metrics =
151- metric_name_output. map ( |( prefix, output) | Metrics :: new ( limit, prefix, output) ) ;
191+ let metrics = metric_metadata. map ( |metadata| Metrics :: new ( limit, metadata) ) ;
152192 match limit {
153193 MemoryBufferSize :: MaxEvents ( max_events) => Inner {
154194 data : Arc :: new ( ArrayQueue :: new ( max_events. get ( ) ) ) ,
@@ -167,6 +207,11 @@ impl<T: InMemoryBufferable> Inner<T> {
167207 }
168208 }
169209
210+ /// Records a send after acquiring all required permits.
211+ ///
212+ /// The `total` value represents the channel utilization after this send completes. It may be
213+ /// greater than the configured limit because the channel intentionally allows a single
214+ /// oversized payload to flow through rather than forcing the sender to split it.
170215 fn send_with_permit ( & mut self , total : usize , permits : OwnedSemaphorePermit , item : T ) {
171216 self . data . push ( ( permits, item) ) ;
172217 self . read_waker . notify_one ( ) ;
@@ -335,9 +380,9 @@ impl<T> Drop for LimitedReceiver<T> {
335380
336381pub fn limited < T : InMemoryBufferable + fmt:: Debug > (
337382 limit : MemoryBufferSize ,
338- metric_name_output : Option < ( & ' static str , & str ) > ,
383+ metric_metadata : Option < ChannelMetricMetadata > ,
339384) -> ( LimitedSender < T > , LimitedReceiver < T > ) {
340- let inner = Inner :: new ( limit, metric_name_output ) ;
385+ let inner = Inner :: new ( limit, metric_metadata ) ;
341386
342387 let sender = LimitedSender {
343388 inner : inner. clone ( ) ,
@@ -355,7 +400,7 @@ mod tests {
355400 use tokio_test:: { assert_pending, assert_ready, task:: spawn} ;
356401 use vector_common:: byte_size_of:: ByteSizeOf ;
357402
358- use super :: limited;
403+ use super :: { ChannelMetricMetadata , limited} ;
359404 use crate :: {
360405 MemoryBufferSize ,
361406 test:: MultiEventRecord ,
@@ -391,6 +436,22 @@ mod tests {
391436 assert_eq ! ( Some ( Sample :: new( 42 ) ) , assert_ready!( recv. poll( ) ) ) ;
392437 }
393438
439+ #[ tokio:: test]
440+ async fn records_utilization_on_send ( ) {
441+ let limit = MemoryBufferSize :: MaxEvents ( NonZeroUsize :: new ( 2 ) . unwrap ( ) ) ;
442+ let ( mut tx, mut rx) = limited (
443+ limit,
444+ Some ( ChannelMetricMetadata :: new ( "test_channel" , None ) ) ,
445+ ) ;
446+
447+ let metrics = tx. inner . metrics . as_ref ( ) . unwrap ( ) . recorded_values . clone ( ) ;
448+
449+ tx. send ( Sample :: new ( 1 ) ) . await . expect ( "send should succeed" ) ;
450+ assert_eq ! ( metrics. lock( ) . unwrap( ) . last( ) . copied( ) , Some ( 1 ) ) ;
451+
452+ let _ = rx. next ( ) . await ;
453+ }
454+
394455 #[ test]
395456 fn test_limiting_by_byte_size ( ) {
396457 let max_elements = 10 ;
0 commit comments