@@ -32,7 +32,8 @@ use otap_df_telemetry::{otel_debug, otel_info, otel_warn};
3232use serde_json:: Value ;
3333use std:: collections:: HashMap ;
3434use std:: sync:: Arc ;
35- use tokio:: time:: { Duration , Interval , interval} ;
35+ use std:: time:: Instant as StdInstant ;
36+ use tokio:: time:: { Duration , Interval , MissedTickBehavior , interval} ;
3637
3738use self :: producer:: { GenerateError , TrafficProducer } ;
3839
@@ -53,6 +54,8 @@ pub mod static_signal;
5354/// The URN for the fake data generator receiver
5455pub const OTAP_FAKE_DATA_GENERATOR_URN : & str = "urn:otel:receiver:traffic_generator" ;
5556
57+ const NANOS_PER_SECOND : u128 = 1_000_000_000 ;
58+
5659/// A Receiver that generates fake OTAP data for testing purposes.
5760pub struct FakeGeneratorReceiver {
5861 /// Configuration for the fake data generator
@@ -62,6 +65,24 @@ pub struct FakeGeneratorReceiver {
6265 metrics : MetricSet < FakeSignalReceiverMetrics > ,
6366}
6467
68+ fn smooth_batch_interval ( run_len : usize ) -> Option < Duration > {
69+ if run_len == 0 {
70+ return None ;
71+ }
72+
73+ let run_len = run_len as u128 ;
74+ let nanos = NANOS_PER_SECOND . div_ceil ( run_len) ;
75+ u64:: try_from ( nanos) . ok ( ) . map ( Duration :: from_nanos)
76+ }
77+
78+ fn duration_nanos ( duration : Duration ) -> f64 {
79+ duration. as_secs_f64 ( ) * 1e9
80+ }
81+
82+ fn elapsed_nanos ( start : StdInstant ) -> f64 {
83+ duration_nanos ( start. elapsed ( ) )
84+ }
85+
6586/// Declares the fake data generator as a local receiver factory
6687///
6788/// Unsafe code is temporarily used here to allow the use of `distributed_slice` macro
@@ -117,6 +138,7 @@ impl FakeGeneratorReceiver {
117138 transport_headers : Option < TransportHeaders > ,
118139 ) -> Result < TerminalState , Error > {
119140 let mut run_produced: u64 = 0 ;
141+ let mut next_pdata: Option < OtapPdata > = None ;
120142
121143 loop {
122144 producer. record_production ( run_produced) ;
@@ -126,6 +148,9 @@ impl FakeGeneratorReceiver {
126148 return wait_for_terminal ( ctrl_msg_recv, handler, & mut self . metrics ) . await ;
127149 } ;
128150
151+ self . metrics . smooth_runs_started . inc ( ) ;
152+ let mut run_completed = false ;
153+
129154 loop {
130155 tokio:: select! {
131156 biased;
@@ -137,29 +162,83 @@ impl FakeGeneratorReceiver {
137162 }
138163
139164 _ = run_ticker. tick( ) => {
140- if current_run. len( ) > 0 {
165+ let remaining_batches = current_run. len( ) + usize :: from( next_pdata. is_some( ) ) ;
166+ let remaining_items = current_run. remaining_signal_count( )
167+ + next_pdata. as_ref( ) . map_or( 0 , |pdata| pdata. num_items( ) as u64 ) ;
168+ if remaining_batches > 0 {
169+ self . metrics. smooth_runs_behind. inc( ) ;
170+ self . metrics
171+ . smooth_behind_remaining_batches
172+ . record( remaining_batches as f64 ) ;
173+ self . metrics
174+ . smooth_behind_remaining_items
175+ . record( remaining_items as f64 ) ;
141176 otel_warn!(
142177 "Data generator is falling behind and didn't finish the current run. For highest
143178 possible throughput, use production_mode: open" ,
144- remaining=current_run. len( ) ,
179+ remaining=remaining_batches,
180+ remaining_items,
145181 ) ;
182+ } else if !run_completed {
183+ self . metrics. smooth_runs_completed. inc( ) ;
184+ run_completed = true ;
185+ }
186+
187+ if next_pdata. is_some( ) {
188+ continue ;
146189 }
190+
147191 break ;
148192 }
149193
150- _ = batch_ticker. tick( ) => {
194+ scheduled = batch_ticker. tick( ) => {
195+ let tick_lateness = tokio:: time:: Instant :: now( )
196+ . saturating_duration_since( scheduled) ;
197+ self . metrics
198+ . smooth_batch_tick_lateness_duration_ns
199+ . record( duration_nanos( tick_lateness) ) ;
151200
152- let Some ( payload) = current_run. next( ) else {
153- continue ;
201+ let channel_result = match next_pdata. take( ) {
202+ Some ( pdata) => {
203+ self . metrics. smooth_payload_send_retry. inc( ) ;
204+ let send_start = StdInstant :: now( ) ;
205+ let result = self . export_pdata( handler, pdata) ?;
206+ self . metrics
207+ . smooth_payload_send_duration_ns
208+ . record( elapsed_nanos( send_start) ) ;
209+ result
210+ }
211+ None => {
212+ let generate_start = StdInstant :: now( ) ;
213+ let payload = current_run. next( ) ;
214+
215+ let Some ( payload) = payload else {
216+ if !run_completed {
217+ self . metrics. smooth_runs_completed. inc( ) ;
218+ run_completed = true ;
219+ }
220+ continue ;
221+ } ;
222+ self . metrics
223+ . smooth_payload_generate_duration_ns
224+ . record( elapsed_nanos( generate_start) ) ;
225+
226+ let send_start = StdInstant :: now( ) ;
227+ let result = self . handle_payload( handler, payload, & transport_headers) ?;
228+ self . metrics
229+ . smooth_payload_send_duration_ns
230+ . record( elapsed_nanos( send_start) ) ;
231+ result
232+ }
154233 } ;
155234
156- let channel_result = self . handle_payload( handler, payload, & transport_headers) ?;
157235 match channel_result {
158236 Ok ( count) => {
159237 run_produced += count;
160238 }
161- Err ( e) => {
162- otel_warn!( "Failed to push in smooth mode, skipping tick" , err=?e) ;
239+ Err ( pdata) => {
240+ self . metrics. smooth_payload_send_full. inc( ) ;
241+ next_pdata = Some ( pdata) ;
163242 }
164243 }
165244 }
@@ -448,18 +527,22 @@ impl local::Receiver<OtapPdata> for FakeGeneratorReceiver {
448527 . await ?;
449528
450529 let run_len = producer. run_len ( ) ;
451- let batch_duration_millis = 1000u64 . div_euclid ( run_len as u64 ) ;
452530
453531 // We consume one tick here because it's always immediately ready and would
454532 // make us think we're lagging;
455533 let mut run_ticker = interval ( Duration :: from_secs ( 1 ) ) ;
534+ run_ticker. set_missed_tick_behavior ( MissedTickBehavior :: Skip ) ;
456535 _ = run_ticker. tick ( ) . await ;
457536
458537 match self . config . get_traffic_config ( ) . production_mode {
459538 config:: ProductionMode :: Smooth => {
460- if batch_duration_millis > 0 {
461- let batch_ticker =
462- interval ( Duration :: from_millis ( 1000u64 . div_euclid ( run_len as u64 ) ) ) ;
539+ if let Some ( batch_duration) = smooth_batch_interval ( run_len) {
540+ self . metrics . smooth_run_batches . set ( run_len as u64 ) ;
541+ self . metrics
542+ . smooth_batch_interval_ns
543+ . set ( batch_duration. as_nanos ( ) as u64 ) ;
544+ let mut batch_ticker = interval ( batch_duration) ;
545+ batch_ticker. set_missed_tick_behavior ( MissedTickBehavior :: Skip ) ;
463546 self . run_smooth (
464547 ctrl_msg_recv,
465548 & effect_handler,
@@ -470,7 +553,9 @@ impl local::Receiver<OtapPdata> for FakeGeneratorReceiver {
470553 )
471554 . await
472555 } else {
473- otel_warn ! ( "Falling back to Open production mode as batch interval is sub 1ms" ) ;
556+ otel_warn ! (
557+ "Falling back to Open production mode because smooth batch interval is zero"
558+ ) ;
474559 self . run_open (
475560 ctrl_msg_recv,
476561 & effect_handler,
@@ -533,6 +618,21 @@ mod tests {
533618 const MAX_SIGNALS : u64 = 3 ;
534619 const MAX_BATCH : usize = 30 ;
535620
621+ #[ test]
622+ fn test_smooth_batch_interval_uses_sub_millisecond_precision ( ) {
623+ let interval = smooth_batch_interval ( 2000 ) . expect ( "interval should exist" ) ;
624+
625+ assert_eq ! ( interval, Duration :: from_micros( 500 ) ) ;
626+ }
627+
628+ #[ test]
629+ fn test_smooth_batch_interval_does_not_overdrive_run ( ) {
630+ let interval = smooth_batch_interval ( 88 ) . expect ( "interval should exist" ) ;
631+
632+ assert ! ( interval * 88 >= Duration :: from_secs( 1 ) ) ;
633+ assert ! ( interval * 87 < Duration :: from_secs( 1 ) ) ;
634+ }
635+
536636 /// Convert OtapPdata signal to OtlpProtoMessage for testing purposes.
537637 fn pdata_to_otlp_message ( value : OtapPdata ) -> OtlpProtoMessage {
538638 let otlp_bytes: OtlpProtoBytes = value
0 commit comments