Skip to content

Commit 6410c7f

Browse files
authored
feat: Shutdown blocks for report (#593)
* feat: Shutdown blocks for report * feat: holy smokes you can match on enum + options * feat: Consolidate no-flush case with flush-periodic-top-of-invocation case to reduce code duplication. Move periodic case into conditional at top of no-flush * Remove duplicated interval reset * refactor: Move all handling into handle_event_bus_event, return telemetry event and break where it makes sense
1 parent 83baa63 commit 6410c7f

File tree

1 file changed

+55
-130
lines changed

1 file changed

+55
-130
lines changed

bottlecap/src/bin/bottlecap/main.rs

Lines changed: 55 additions & 130 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use bottlecap::{
2828
telemetry::{
2929
self,
3030
client::TelemetryApiClient,
31-
events::{RuntimeDoneMetrics, Status, TelemetryEvent, TelemetryRecord},
31+
events::{TelemetryEvent, TelemetryRecord},
3232
listener::TelemetryListener,
3333
},
3434
traces::{
@@ -360,24 +360,14 @@ async fn extension_loop_active(
360360
// flush everything
361361
// call next
362362
// optionally flush after tick for long running invos
363-
'inner: loop {
363+
'flush_end: loop {
364364
tokio::select! {
365365
biased;
366366
Some(event) = event_bus.rx.recv() => {
367-
if let Some(runtime_done_meta) = handle_event_bus_event(event, invocation_processor.clone()).await {
368-
let mut p = invocation_processor.lock().await;
369-
p.on_platform_runtime_done(
370-
&runtime_done_meta.request_id,
371-
runtime_done_meta.metrics.duration_ms,
372-
runtime_done_meta.status,
373-
config.clone(),
374-
tags_provider.clone(),
375-
trace_processor.clone(),
376-
trace_agent_channel.clone(),
377-
runtime_done_meta.timestamp,
378-
).await;
379-
drop(p);
380-
break 'inner;
367+
if let Some(telemetry_event) = handle_event_bus_event(event, invocation_processor.clone(), config.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone()).await {
368+
if let TelemetryRecord::PlatformRuntimeDone{ .. } = telemetry_event.record {
369+
break 'flush_end;
370+
}
381371
}
382372
}
383373
_ = race_flush_interval.tick() => {
@@ -402,79 +392,27 @@ async fn extension_loop_active(
402392
.await;
403393
let next_response = next_event(client, &r.extension_id).await;
404394
shutdown = handle_next_invocation(next_response, invocation_processor.clone()).await;
405-
} else if flush_control.should_periodic_flush() {
406-
// Should flush at the top of the invocation, which is now
407-
// flush
408-
//
409-
flush_all(
410-
&logs_flusher,
411-
&mut metrics_flusher,
412-
&*trace_flusher,
413-
&*stats_flusher,
414-
&mut race_flush_interval,
415-
)
416-
.await;
417-
race_flush_interval.reset();
418-
// Don't await! Pin it and check in the tokio loop
419-
// Tokio select! drops whatever task does not complete first
420-
// but the state machine API in Lambda will give us an invalid error
421-
// if we call /next twice instead of waiting
422-
let next_lambda_response = next_event(client, &r.extension_id);
423-
tokio::pin!(next_lambda_response);
424-
// call next
425-
// optionally flush after tick for long running invos
426-
'inner: loop {
427-
tokio::select! {
428-
biased;
429-
next_response = &mut next_lambda_response => {
430-
// Dear reader this is important, you may be tempted to remove this
431-
// after all, why reset the flush interval if we're not flushing?
432-
// It's because the race_flush_interval is only for the RACE FLUSH
433-
// For long-running txns. The call to `flush_control.should_flush_end()`
434-
// has its own interval which is not reset here.
435-
race_flush_interval.reset();
436-
// Thank you for not removing race_flush_interval.reset();
437-
438-
shutdown = handle_next_invocation(next_response, invocation_processor.clone()).await;
439-
// Need to break here to re-call next
440-
break 'inner;
441-
}
442-
Some(event) = event_bus.rx.recv() => {
443-
if let Some(runtime_done_meta) = handle_event_bus_event(event, invocation_processor.clone()).await {
444-
let mut p = invocation_processor.lock().await;
445-
p.on_platform_runtime_done(
446-
&runtime_done_meta.request_id,
447-
runtime_done_meta.metrics.duration_ms,
448-
runtime_done_meta.status,
449-
config.clone(),
450-
tags_provider.clone(),
451-
trace_processor.clone(),
452-
trace_agent_channel.clone(),
453-
runtime_done_meta.timestamp,
454-
).await;
455-
drop(p);
456-
}
457-
}
458-
_ = race_flush_interval.tick() => {
459-
flush_all(
460-
&logs_flusher,
461-
&mut metrics_flusher,
462-
&*trace_flusher,
463-
&*stats_flusher,
464-
&mut race_flush_interval,
465-
).await;
466-
}
467-
}
468-
}
469395
} else {
396+
//Periodic flush scenario, flush at top of invocation
397+
if flush_control.should_periodic_flush() {
398+
// Should flush at the top of the invocation, which is now
399+
flush_all(
400+
&logs_flusher,
401+
&mut metrics_flusher,
402+
&*trace_flusher,
403+
&*stats_flusher,
404+
&mut race_flush_interval,
405+
)
406+
.await;
407+
}
470408
// NO FLUSH SCENARIO
471409
// JUST LOOP OVER PIPELINE AND WAIT FOR NEXT EVENT
472410
// If we get platform.runtimeDone or platform.runtimeReport
473411
// That's fine, we still wait to break until we get the response from next
474412
// and then we break to determine if we'll flush or not
475413
let next_lambda_response = next_event(client, &r.extension_id);
476414
tokio::pin!(next_lambda_response);
477-
'inner: loop {
415+
'next_invocation: loop {
478416
tokio::select! {
479417
biased;
480418
next_response = &mut next_lambda_response => {
@@ -488,23 +426,10 @@ async fn extension_loop_active(
488426

489427
shutdown = handle_next_invocation(next_response, invocation_processor.clone()).await;
490428
// Need to break here to re-call next
491-
break 'inner;
429+
break 'next_invocation;
492430
}
493431
Some(event) = event_bus.rx.recv() => {
494-
if let Some(runtime_done_meta) = handle_event_bus_event(event, invocation_processor.clone()).await {
495-
let mut p = invocation_processor.lock().await;
496-
p.on_platform_runtime_done(
497-
&runtime_done_meta.request_id,
498-
runtime_done_meta.metrics.duration_ms,
499-
runtime_done_meta.status,
500-
config.clone(),
501-
tags_provider.clone(),
502-
trace_processor.clone(),
503-
trace_agent_channel.clone(),
504-
runtime_done_meta.timestamp,
505-
).await;
506-
drop(p);
507-
}
432+
handle_event_bus_event(event, invocation_processor.clone(), config.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone()).await;
508433
}
509434
_ = race_flush_interval.tick() => {
510435
flush_all(
@@ -520,6 +445,18 @@ async fn extension_loop_active(
520445
}
521446

522447
if shutdown {
448+
'shutdown: loop {
449+
tokio::select! {
450+
Some(event) = event_bus.rx.recv() => {
451+
if let Some(telemetry_event) = handle_event_bus_event(event, invocation_processor.clone(), config.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone()).await {
452+
if let TelemetryRecord::PlatformReport{ .. } = telemetry_event.record {
453+
// Wait for the report event before shutting down
454+
break 'shutdown;
455+
}
456+
}
457+
}
458+
}
459+
}
523460
dogstatsd_cancel_token.cancel();
524461
telemetry_listener_cancel_token.cancel();
525462
flush_all(
@@ -551,17 +488,14 @@ async fn flush_all(
551488
race_flush_interval.reset();
552489
}
553490

554-
struct RuntimeDoneMeta {
555-
request_id: String,
556-
status: Status,
557-
metrics: RuntimeDoneMetrics,
558-
timestamp: i64,
559-
}
560-
561491
async fn handle_event_bus_event(
562492
event: Event,
563493
invocation_processor: Arc<TokioMutex<InvocationProcessor>>,
564-
) -> Option<RuntimeDoneMeta> {
494+
config: Arc<Config>,
495+
tags_provider: Arc<TagProvider>,
496+
trace_processor: Arc<trace_processor::ServerlessTraceProcessor>,
497+
trace_agent_channel: Sender<datadog_trace_utils::send_data::SendData>,
498+
) -> Option<TelemetryEvent> {
565499
match event {
566500
Event::Metric(event) => {
567501
debug!("Metric event: {:?}", event);
@@ -593,44 +527,35 @@ async fn handle_event_bus_event(
593527
drop(p);
594528
}
595529
TelemetryRecord::PlatformRuntimeDone {
596-
request_id,
530+
ref request_id,
531+
metrics: Some(metrics),
597532
status,
598-
metrics,
599-
error_type,
600533
..
601534
} => {
602-
debug!(
603-
"Runtime done for request_id: {:?} with status: {:?} and error: {:?}",
535+
let mut p = invocation_processor.lock().await;
536+
p.on_platform_runtime_done(
604537
request_id,
538+
metrics.duration_ms,
605539
status,
606-
error_type.unwrap_or("None".to_string())
607-
);
608-
609-
if let Some(metrics) = metrics {
610-
return Some(RuntimeDoneMeta {
611-
request_id,
612-
status,
613-
metrics,
614-
timestamp: event.time.timestamp(),
615-
});
616-
}
540+
config.clone(),
541+
tags_provider.clone(),
542+
trace_processor.clone(),
543+
trace_agent_channel.clone(),
544+
event.time.timestamp(),
545+
)
546+
.await;
547+
drop(p);
548+
return Some(event);
617549
}
618550
TelemetryRecord::PlatformReport {
619-
request_id,
620-
status,
551+
ref request_id,
621552
metrics,
622-
error_type,
623553
..
624554
} => {
625-
debug!(
626-
"Platform report for request_id: {:?} with status: {:?} and error: {:?}",
627-
request_id,
628-
status,
629-
error_type.unwrap_or("None".to_string())
630-
);
631555
let mut p = invocation_processor.lock().await;
632-
p.on_platform_report(&request_id, metrics, event.time.timestamp());
556+
p.on_platform_report(request_id, metrics, event.time.timestamp());
633557
drop(p);
558+
return Some(event);
634559
}
635560
_ => {
636561
debug!("Unforwarded Telemetry event: {:?}", event);

0 commit comments

Comments
 (0)