@@ -498,108 +498,39 @@ async fn extension_loop_active(
498498 let maybe_shutdown_event;
499499
500500 let current_flush_decision = flush_control. evaluate_flush_decision ( ) ;
501- if current_flush_decision == FlushDecision :: End {
502- // break loop after runtime done
503- // flush everything
504- // call next
505- // optionally flush after tick for long running invos
506- ' flush_end: loop {
507- tokio:: select! {
508- biased;
509- Some ( event) = event_bus. rx. recv( ) => {
510- if let Some ( telemetry_event) = handle_event_bus_event( event, invocation_processor. clone( ) , appsec_processor. clone( ) , tags_provider. clone( ) , trace_processor. clone( ) , trace_agent_channel. clone( ) ) . await {
511- if let TelemetryRecord :: PlatformRuntimeDone { .. } = telemetry_event. record {
512- break ' flush_end;
501+ match current_flush_decision {
502+ FlushDecision :: End => {
503+ // break loop after runtime done
504+ // flush everything
505+ // call next
506+ // optionally flush after tick for long running invos
507+ ' flush_end: loop {
508+ tokio:: select! {
509+ biased;
510+ Some ( event) = event_bus. rx. recv( ) => {
511+ if let Some ( telemetry_event) = handle_event_bus_event( event, invocation_processor. clone( ) , appsec_processor. clone( ) , tags_provider. clone( ) , trace_processor. clone( ) , trace_agent_channel. clone( ) ) . await {
512+ if let TelemetryRecord :: PlatformRuntimeDone { .. } = telemetry_event. record {
513+ break ' flush_end;
514+ }
513515 }
514516 }
515- }
516- _ = race_flush_interval. tick( ) => {
517- let mut locked_metrics = metrics_flushers. lock( ) . await ;
518- blocking_flush_all(
519- & logs_flusher,
520- & mut locked_metrics,
521- & * trace_flusher,
522- & * stats_flusher,
523- & proxy_flusher,
524- & mut race_flush_interval,
525- & metrics_aggr_handle. clone( ) ,
526- false ,
527- )
528- . await ;
529- }
530- }
531- }
532- // flush
533- let mut locked_metrics = metrics_flushers. lock ( ) . await ;
534- blocking_flush_all (
535- & logs_flusher,
536- & mut locked_metrics,
537- & * trace_flusher,
538- & * stats_flusher,
539- & proxy_flusher,
540- & mut race_flush_interval,
541- & metrics_aggr_handle. clone ( ) ,
542- false ,
543- )
544- . await ;
545- let next_response =
546- extension:: next_event ( client, & aws_config. runtime_api , & r. extension_id ) . await ;
547- maybe_shutdown_event =
548- handle_next_invocation ( next_response, invocation_processor. clone ( ) ) . await ;
549- } else {
550- //Periodic flush scenario, flush at top of invocation
551- if current_flush_decision == FlushDecision :: Continuous
552- && !pending_flush_handles. has_pending_handles ( )
553- {
554- let lf = logs_flusher. clone ( ) ;
555- pending_flush_handles
556- . log_flush_handles
557- . push ( tokio:: spawn ( async move { lf. flush ( None ) . await } ) ) ;
558- let tf = trace_flusher. clone ( ) ;
559- pending_flush_handles
560- . trace_flush_handles
561- . push ( tokio:: spawn ( async move {
562- tf. flush ( None ) . await . unwrap_or_default ( )
563- } ) ) ;
564- let ( metrics_flushers_copy, series, sketches) = {
565- let locked_metrics = metrics_flushers. lock ( ) . await ;
566- let flush_response = metrics_aggr_handle
567- . clone ( )
568- . flush ( )
569- . await
570- . expect ( "can't flush metrics handle" ) ;
571- (
572- locked_metrics. clone ( ) ,
573- flush_response. series ,
574- flush_response. distributions ,
575- )
576- } ;
577- for ( idx, mut flusher) in metrics_flushers_copy. into_iter ( ) . enumerate ( ) {
578- let series_clone = series. clone ( ) ;
579- let sketches_clone = sketches. clone ( ) ;
580- let handle = tokio:: spawn ( async move {
581- let ( retry_series, retry_sketches) = flusher
582- . flush_metrics ( series_clone. clone ( ) , sketches_clone. clone ( ) )
583- . await
584- . unwrap_or_default ( ) ;
585- MetricsRetryBatch {
586- flusher_id : idx,
587- series : retry_series,
588- sketches : retry_sketches,
517+ _ = race_flush_interval. tick( ) => {
518+ let mut locked_metrics = metrics_flushers. lock( ) . await ;
519+ blocking_flush_all(
520+ & logs_flusher,
521+ & mut locked_metrics,
522+ & * trace_flusher,
523+ & * stats_flusher,
524+ & proxy_flusher,
525+ & mut race_flush_interval,
526+ & metrics_aggr_handle. clone( ) ,
527+ false ,
528+ )
529+ . await ;
589530 }
590- } ) ;
591- pending_flush_handles. metric_flush_handles . push ( handle) ;
531+ }
592532 }
593-
594- let pf = proxy_flusher. clone ( ) ;
595- pending_flush_handles
596- . proxy_flush_handles
597- . push ( tokio:: spawn ( async move {
598- pf. flush ( None ) . await . unwrap_or_default ( )
599- } ) ) ;
600-
601- race_flush_interval. reset ( ) ;
602- } else if current_flush_decision == FlushDecision :: Periodic {
533+ // flush
603534 let mut locked_metrics = metrics_flushers. lock ( ) . await ;
604535 blocking_flush_all (
605536 & logs_flusher,
@@ -608,44 +539,119 @@ async fn extension_loop_active(
608539 & * stats_flusher,
609540 & proxy_flusher,
610541 & mut race_flush_interval,
611- & metrics_aggr_handle,
612- false , // force_flush_trace_stats
542+ & metrics_aggr_handle. clone ( ) ,
543+ false ,
613544 )
614545 . await ;
546+ let next_response =
547+ extension:: next_event ( client, & aws_config. runtime_api , & r. extension_id ) . await ;
548+ maybe_shutdown_event =
549+ handle_next_invocation ( next_response, invocation_processor. clone ( ) ) . await ;
615550 }
616- // NO FLUSH SCENARIO
617- // JUST LOOP OVER PIPELINE AND WAIT FOR NEXT EVENT
618- // If we get platform.runtimeDone or platform.runtimeReport
619- // That's fine, we still wait to break until we get the response from next
620- // and then we break to determine if we'll flush or not
621- let next_lambda_response =
622- extension:: next_event ( client, & aws_config. runtime_api , & r. extension_id ) ;
623- tokio:: pin!( next_lambda_response) ;
624- ' next_invocation: loop {
625- tokio:: select! {
626- biased;
627- next_response = & mut next_lambda_response => {
628- maybe_shutdown_event = handle_next_invocation( next_response, invocation_processor. clone( ) ) . await ;
629- // Need to break here to re-call next
630- break ' next_invocation;
551+ FlushDecision :: Continuous | FlushDecision :: Periodic | FlushDecision :: Dont => {
552+ match current_flush_decision {
553+ //Periodic flush scenario, flush at top of invocation
554+ FlushDecision :: Continuous => {
555+ if !pending_flush_handles. has_pending_handles ( ) {
556+ let lf = logs_flusher. clone ( ) ;
557+ pending_flush_handles
558+ . log_flush_handles
559+ . push ( tokio:: spawn ( async move { lf. flush ( None ) . await } ) ) ;
560+ let tf = trace_flusher. clone ( ) ;
561+ pending_flush_handles. trace_flush_handles . push ( tokio:: spawn (
562+ async move { tf. flush ( None ) . await . unwrap_or_default ( ) } ,
563+ ) ) ;
564+ let ( metrics_flushers_copy, series, sketches) = {
565+ let locked_metrics = metrics_flushers. lock ( ) . await ;
566+ let flush_response = metrics_aggr_handle
567+ . clone ( )
568+ . flush ( )
569+ . await
570+ . expect ( "can't flush metrics handle" ) ;
571+ (
572+ locked_metrics. clone ( ) ,
573+ flush_response. series ,
574+ flush_response. distributions ,
575+ )
576+ } ;
577+ for ( idx, mut flusher) in metrics_flushers_copy. into_iter ( ) . enumerate ( )
578+ {
579+ let series_clone = series. clone ( ) ;
580+ let sketches_clone = sketches. clone ( ) ;
581+ let handle = tokio:: spawn ( async move {
582+ let ( retry_series, retry_sketches) = flusher
583+ . flush_metrics ( series_clone. clone ( ) , sketches_clone. clone ( ) )
584+ . await
585+ . unwrap_or_default ( ) ;
586+ MetricsRetryBatch {
587+ flusher_id : idx,
588+ series : retry_series,
589+ sketches : retry_sketches,
590+ }
591+ } ) ;
592+ pending_flush_handles. metric_flush_handles . push ( handle) ;
593+ }
594+
595+ let pf = proxy_flusher. clone ( ) ;
596+ pending_flush_handles. proxy_flush_handles . push ( tokio:: spawn (
597+ async move { pf. flush ( None ) . await . unwrap_or_default ( ) } ,
598+ ) ) ;
599+
600+ race_flush_interval. reset ( ) ;
601+ }
631602 }
632- Some ( event) = event_bus. rx. recv( ) => {
633- handle_event_bus_event( event, invocation_processor. clone( ) , appsec_processor. clone( ) , tags_provider. clone( ) , trace_processor. clone( ) , trace_agent_channel. clone( ) ) . await ;
603+ FlushDecision :: Periodic => {
604+ let mut locked_metrics = metrics_flushers. lock ( ) . await ;
605+ blocking_flush_all (
606+ & logs_flusher,
607+ & mut locked_metrics,
608+ & * trace_flusher,
609+ & * stats_flusher,
610+ & proxy_flusher,
611+ & mut race_flush_interval,
612+ & metrics_aggr_handle,
613+ false , // force_flush_trace_stats
614+ )
615+ . await ;
634616 }
635- _ = race_flush_interval. tick( ) => {
636- if flush_control. flush_strategy == FlushStrategy :: Default {
637- let mut locked_metrics = metrics_flushers. lock( ) . await ;
638- blocking_flush_all(
639- & logs_flusher,
640- & mut locked_metrics,
641- & * trace_flusher,
642- & * stats_flusher,
643- & proxy_flusher,
644- & mut race_flush_interval,
645- & metrics_aggr_handle,
646- false , // force_flush_trace_stats
647- )
648- . await ;
617+ _ => {
618+ // No specific flush logic for Dont or End (End already handled above)
619+ }
620+ }
621+ // NO FLUSH SCENARIO
622+ // JUST LOOP OVER PIPELINE AND WAIT FOR NEXT EVENT
623+ // If we get platform.runtimeDone or platform.runtimeReport
624+ // That's fine, we still wait to break until we get the response from next
625+ // and then we break to determine if we'll flush or not
626+ let next_lambda_response =
627+ extension:: next_event ( client, & aws_config. runtime_api , & r. extension_id ) ;
628+ tokio:: pin!( next_lambda_response) ;
629+ ' next_invocation: loop {
630+ tokio:: select! {
631+ biased;
632+ next_response = & mut next_lambda_response => {
633+ maybe_shutdown_event = handle_next_invocation( next_response, invocation_processor. clone( ) ) . await ;
634+ // Need to break here to re-call next
635+ break ' next_invocation;
636+ }
637+ Some ( event) = event_bus. rx. recv( ) => {
638+ handle_event_bus_event( event, invocation_processor. clone( ) , appsec_processor. clone( ) , tags_provider. clone( ) , trace_processor. clone( ) , trace_agent_channel. clone( ) ) . await ;
639+ }
640+ _ = race_flush_interval. tick( ) => {
641+ if flush_control. flush_strategy == FlushStrategy :: Default {
642+ let mut locked_metrics = metrics_flushers. lock( ) . await ;
643+ blocking_flush_all(
644+ & logs_flusher,
645+ & mut locked_metrics,
646+ & * trace_flusher,
647+ & * stats_flusher,
648+ & proxy_flusher,
649+ & mut race_flush_interval,
650+ & metrics_aggr_handle,
651+ false , // force_flush_trace_stats
652+ )
653+ . await ;
654+ }
649655 }
650656 }
651657 }
0 commit comments