@@ -66,6 +66,7 @@ use datadog_trace_utils::send_data::SendData;
6666use decrypt:: resolve_secrets;
6767use dogstatsd:: {
6868 aggregator:: Aggregator as MetricsAggregator ,
69+ api_key:: ApiKeyFactory ,
6970 constants:: CONTEXTS ,
7071 datadog:: {
7172 DdDdUrl , DdUrl , MetricsIntakeUrlPrefix , MetricsIntakeUrlPrefixOverride ,
@@ -88,7 +89,7 @@ use std::{
8889 sync:: { Arc , Mutex } ,
8990 time:: { Duration , Instant } ,
9091} ;
91- use tokio:: { sync:: mpsc:: Sender , sync:: Mutex as TokioMutex , task:: JoinHandle } ;
92+ use tokio:: { sync:: mpsc:: Sender , sync:: Mutex as TokioMutex , sync :: RwLock , task:: JoinHandle } ;
9293use tokio_util:: sync:: CancellationToken ;
9394use tracing:: { debug, error} ;
9495use tracing_subscriber:: EnvFilter ;
@@ -333,7 +334,7 @@ async fn register(client: &Client) -> Result<RegisterResponse> {
333334#[ tokio:: main]
334335async fn main ( ) -> Result < ( ) > {
335336 let start_time = Instant :: now ( ) ;
336- let ( aws_config, mut aws_credentials, config) = load_configs ( start_time) ;
337+ let ( aws_config, aws_credentials, config) = load_configs ( start_time) ;
337338
338339 enable_logging_subsystem ( & config) ;
339340 log_fips_status ( & aws_config. region ) ;
@@ -360,33 +361,27 @@ async fn main() -> Result<()> {
360361 . await
361362 . map_err ( |e| Error :: new ( std:: io:: ErrorKind :: InvalidData , e. to_string ( ) ) ) ?;
362363
363- if let Some ( resolved_api_key) =
364- resolve_secrets ( Arc :: clone ( & config) , & aws_config, & mut aws_credentials) . await
364+ let aws_config = Arc :: new ( aws_config) ;
365+ let api_key_factory = create_api_key_factory ( & config, & aws_config, aws_credentials) ;
366+
367+ match extension_loop_active (
368+ Arc :: clone ( & aws_config) ,
369+ & config,
370+ & client,
371+ & r,
372+ Arc :: clone ( & api_key_factory) ,
373+ start_time,
374+ )
375+ . await
365376 {
366- match extension_loop_active (
367- & aws_config,
368- & config,
369- & client,
370- & r,
371- resolved_api_key,
372- start_time,
373- )
374- . await
375- {
376- Ok ( ( ) ) => {
377- debug ! ( "Extension loop completed successfully" ) ;
378- Ok ( ( ) )
379- }
380- Err ( e) => {
381- error ! (
382- "Extension loop failed: {e:?}, Calling /next without Datadog instrumentation"
383- ) ;
384- extension_loop_idle ( & client, & r) . await
385- }
377+ Ok ( ( ) ) => {
378+ debug ! ( "Extension loop completed successfully" ) ;
379+ Ok ( ( ) )
380+ }
381+ Err ( e) => {
382+ error ! ( "Extension loop failed: {e:?}, Calling /next without Datadog instrumentation" ) ;
383+ extension_loop_idle ( & client, & r) . await
386384 }
387- } else {
388- error ! ( "Failed to resolve secrets, Datadog extension will be idle" ) ;
389- extension_loop_idle ( & client, & r) . await
390385 }
391386}
392387
@@ -430,6 +425,28 @@ fn enable_logging_subsystem(config: &Arc<Config>) {
430425 debug ! ( "Logging subsystem enabled" ) ;
431426}
432427
428+ fn create_api_key_factory (
429+ config : & Arc < Config > ,
430+ aws_config : & Arc < AwsConfig > ,
431+ aws_credentials : AwsCredentials ,
432+ ) -> Arc < ApiKeyFactory > {
433+ let config = Arc :: clone ( config) ;
434+ let aws_config = Arc :: clone ( aws_config) ;
435+ let aws_credentials = Arc :: new ( RwLock :: new ( aws_credentials) ) ;
436+
437+ Arc :: new ( ApiKeyFactory :: new_from_resolver ( Arc :: new ( move || {
438+ let config = Arc :: clone ( & config) ;
439+ let aws_config = Arc :: clone ( & aws_config) ;
440+ let aws_credentials = Arc :: clone ( & aws_credentials) ;
441+
442+ Box :: pin ( async move {
443+ resolve_secrets ( config, aws_config, aws_credentials)
444+ . await
445+ . expect ( "Failed to resolve API key" )
446+ } )
447+ } ) ) )
448+ }
449+
433450async fn extension_loop_idle ( client : & Client , r : & RegisterResponse ) -> Result < ( ) > {
434451 loop {
435452 match next_event ( client, & r. extension_id ) . await {
@@ -446,11 +463,11 @@ async fn extension_loop_idle(client: &Client, r: &RegisterResponse) -> Result<()
446463
447464#[ allow( clippy:: too_many_lines) ]
448465async fn extension_loop_active (
449- aws_config : & AwsConfig ,
466+ aws_config : Arc < AwsConfig > ,
450467 config : & Arc < Config > ,
451468 client : & Client ,
452469 r : & RegisterResponse ,
453- resolved_api_key : String ,
470+ api_key_factory : Arc < ApiKeyFactory > ,
454471 start_time : Instant ,
455472) -> Result < ( ) > {
456473 let mut event_bus = EventBus :: run ( ) ;
@@ -460,11 +477,11 @@ async fn extension_loop_active(
460477 . as_ref ( )
461478 . unwrap_or ( & "none" . to_string ( ) )
462479 . to_string ( ) ;
463- let tags_provider = setup_tag_provider ( aws_config, config, & account_id) ;
480+ let tags_provider = setup_tag_provider ( & Arc :: clone ( & aws_config) , config, & account_id) ;
464481
465482 let ( logs_agent_channel, logs_flusher) = start_logs_agent (
466483 config,
467- resolved_api_key . clone ( ) ,
484+ Arc :: clone ( & api_key_factory ) ,
468485 & tags_provider,
469486 event_bus. get_sender_copy ( ) ,
470487 ) ;
@@ -478,15 +495,15 @@ async fn extension_loop_active(
478495 ) ) ;
479496
480497 let metrics_flushers = Arc :: new ( TokioMutex :: new ( start_metrics_flushers (
481- resolved_api_key . clone ( ) ,
498+ Arc :: clone ( & api_key_factory ) ,
482499 & metrics_aggr,
483500 config,
484501 ) ) ) ;
485502 // Lifecycle Invocation Processor
486503 let invocation_processor = Arc :: new ( TokioMutex :: new ( InvocationProcessor :: new (
487504 Arc :: clone ( & tags_provider) ,
488505 Arc :: clone ( config) ,
489- aws_config,
506+ Arc :: clone ( & aws_config) ,
490507 Arc :: clone ( & metrics_aggr) ,
491508 ) ) ) ;
492509
@@ -500,7 +517,7 @@ async fn extension_loop_active(
500517 trace_agent_shutdown_token,
501518 ) = start_trace_agent (
502519 config,
503- resolved_api_key . clone ( ) ,
520+ Arc :: clone ( & api_key_factory ) ,
504521 & tags_provider,
505522 Arc :: clone ( & invocation_processor) ,
506523 Arc :: clone ( & trace_aggregator) ,
@@ -919,7 +936,7 @@ async fn handle_next_invocation(
919936}
920937
921938fn setup_tag_provider (
922- aws_config : & AwsConfig ,
939+ aws_config : & Arc < AwsConfig > ,
923940 config : & Arc < Config > ,
924941 account_id : & str ,
925942) -> Arc < TagProvider > {
@@ -938,14 +955,14 @@ fn setup_tag_provider(
938955
939956fn start_logs_agent (
940957 config : & Arc < Config > ,
941- resolved_api_key : String ,
958+ api_key_factory : Arc < ApiKeyFactory > ,
942959 tags_provider : & Arc < TagProvider > ,
943960 event_bus : Sender < Event > ,
944961) -> ( Sender < TelemetryEvent > , LogsFlusher ) {
945962 let mut logs_agent = LogsAgent :: new ( Arc :: clone ( tags_provider) , Arc :: clone ( config) , event_bus) ;
946963 let logs_agent_channel = logs_agent. get_sender_copy ( ) ;
947964 let logs_flusher = LogsFlusher :: new (
948- resolved_api_key ,
965+ api_key_factory ,
949966 Arc :: clone ( & logs_agent. aggregator ) ,
950967 config. clone ( ) ,
951968 ) ;
@@ -956,7 +973,7 @@ fn start_logs_agent(
956973}
957974
958975fn start_metrics_flushers (
959- resolved_api_key : String ,
976+ api_key_factory : Arc < ApiKeyFactory > ,
960977 metrics_aggr : & Arc < Mutex < MetricsAggregator > > ,
961978 config : & Arc < Config > ,
962979) -> Vec < MetricsFlusher > {
@@ -979,7 +996,7 @@ fn start_metrics_flushers(
979996 } ;
980997
981998 let flusher_config = MetricsFlusherConfig {
982- api_key : resolved_api_key ,
999+ api_key_factory ,
9831000 aggregator : Arc :: clone ( metrics_aggr) ,
9841001 metrics_intake_url_prefix : metrics_intake_url. expect ( "can't parse site or override" ) ,
9851002 https_proxy : config. proxy_https . clone ( ) ,
@@ -1003,8 +1020,9 @@ fn start_metrics_flushers(
10031020
10041021 // Create a flusher for each endpoint URL and API key pair
10051022 for api_key in api_keys {
1023+ let additional_api_key_factory = Arc :: new ( ApiKeyFactory :: new ( api_key) ) ;
10061024 let additional_flusher_config = MetricsFlusherConfig {
1007- api_key : api_key . clone ( ) ,
1025+ api_key_factory : additional_api_key_factory ,
10081026 aggregator : metrics_aggr. clone ( ) ,
10091027 metrics_intake_url_prefix : metrics_intake_url. clone ( ) ,
10101028 https_proxy : config. proxy_https . clone ( ) ,
@@ -1020,7 +1038,7 @@ fn start_metrics_flushers(
10201038#[ allow( clippy:: type_complexity) ]
10211039fn start_trace_agent (
10221040 config : & Arc < Config > ,
1023- resolved_api_key : String ,
1041+ api_key_factory : Arc < ApiKeyFactory > ,
10241042 tags_provider : & Arc < TagProvider > ,
10251043 invocation_processor : Arc < TokioMutex < InvocationProcessor > > ,
10261044 trace_aggregator : Arc < TokioMutex < trace_aggregator:: TraceAggregator > > ,
@@ -1035,7 +1053,7 @@ fn start_trace_agent(
10351053 // Stats
10361054 let stats_aggregator = Arc :: new ( TokioMutex :: new ( StatsAggregator :: default ( ) ) ) ;
10371055 let stats_flusher = Arc :: new ( stats_flusher:: ServerlessStatsFlusher :: new (
1038- resolved_api_key . clone ( ) ,
1056+ api_key_factory . clone ( ) ,
10391057 stats_aggregator. clone ( ) ,
10401058 Arc :: clone ( config) ,
10411059 ) ) ;
@@ -1059,13 +1077,13 @@ fn start_trace_agent(
10591077
10601078 let trace_processor = Arc :: new ( trace_processor:: ServerlessTraceProcessor {
10611079 obfuscation_config : Arc :: new ( obfuscation_config) ,
1062- resolved_api_key : resolved_api_key . clone ( ) ,
1080+ api_key_factory : api_key_factory . clone ( ) ,
10631081 } ) ;
10641082
10651083 // Proxy
10661084 let proxy_aggregator = Arc :: new ( TokioMutex :: new ( proxy_aggregator:: Aggregator :: default ( ) ) ) ;
10671085 let proxy_flusher = Arc :: new ( ProxyFlusher :: new (
1068- resolved_api_key ,
1086+ api_key_factory . clone ( ) ,
10691087 Arc :: clone ( & proxy_aggregator) ,
10701088 Arc :: clone ( tags_provider) ,
10711089 Arc :: clone ( config) ,
@@ -1080,6 +1098,7 @@ fn start_trace_agent(
10801098 proxy_aggregator,
10811099 invocation_processor,
10821100 Arc :: clone ( tags_provider) ,
1101+ api_key_factory,
10831102 ) ;
10841103 let trace_agent_channel = trace_agent. get_sender_copy ( ) ;
10851104 let shutdown_token = trace_agent. shutdown_token ( ) ;
@@ -1166,15 +1185,14 @@ fn start_otlp_agent(
11661185
11671186fn start_api_runtime_proxy (
11681187 config : & Arc < Config > ,
1169- aws_config : & AwsConfig ,
1188+ aws_config : Arc < AwsConfig > ,
11701189 invocation_processor : & Arc < TokioMutex < InvocationProcessor > > ,
11711190) -> Option < CancellationToken > {
1172- if !should_start_proxy ( config, aws_config) {
1191+ if !should_start_proxy ( config, Arc :: clone ( & aws_config) ) {
11731192 debug ! ( "Skipping API runtime proxy, no LWA proxy or datadog wrapper found" ) ;
11741193 return None ;
11751194 }
11761195
1177- let aws_config = aws_config. clone ( ) ;
11781196 let invocation_processor = invocation_processor. clone ( ) ;
11791197 interceptor:: start ( aws_config, invocation_processor) . ok ( )
11801198}
0 commit comments