Skip to content

Commit 72de1a9

Browse files
Merge branch 'main' into tianning.li/SVLS-8755-arn-secret-key
2 parents 806b961 + ce3c8e3 commit 72de1a9

File tree

28 files changed

+1498
-78
lines changed

28 files changed

+1498
-78
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,4 @@ integration-tests/cdk.context.json
3030

3131
.gitlab/pipeline*
3232
/CLAUDE.md
33+
/AGENTS.md

.gitlab/datasources/test-suites.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@ test_suites:
33
- name: otlp
44
- name: snapstart
55
- name: lmi
6+
- name: auth

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ For `v67` through `v87`, you can opt out of the next-generation Lambda Extension
2121

2222
Today, all workloads using Logs and Metrics are supported.
2323

24-
APM Tracing is supported for Python, NodeJS, Go, Java, and .NET runtimes.
24+
APM Tracing is supported for Python, NodeJS, Go, Java, .NET, and Ruby runtimes.
2525

2626
### Feedback
2727

bottlecap/src/bin/bottlecap/main.rs

Lines changed: 40 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ use bottlecap::{
5151
AggregatorHandle as LogsAggregatorHandle, AggregatorService as LogsAggregatorService,
5252
},
5353
flusher::LogsFlusher,
54+
lambda::DurableContextUpdate,
5455
},
5556
otlp::{agent::Agent as OtlpAgent, should_enable_otlp_agent},
5657
proxy::{interceptor, should_start_proxy},
@@ -149,7 +150,11 @@ async fn main() -> anyhow::Result<()> {
149150
let config = Arc::new(config::get_config(Path::new(&lambda_directory)));
150151

151152
let aws_config = Arc::new(aws_config);
152-
let api_key_factory = create_api_key_factory(&config, &aws_config);
153+
// Build one shared reqwest::Client for metrics, logs, trace proxy flushing, and calls to
154+
// Datadog APIs (e.g. delegated auth). reqwest::Client is Arc-based internally, so cloning
155+
// just increments a refcount and shares the connection pool.
156+
let shared_client = bottlecap::http::get_client(&config);
157+
let api_key_factory = create_api_key_factory(&config, &aws_config, &shared_client);
153158

154159
let r = response
155160
.await
@@ -160,6 +165,7 @@ async fn main() -> anyhow::Result<()> {
160165
Arc::clone(&aws_config),
161166
&config,
162167
&client,
168+
shared_client,
163169
&r,
164170
Arc::clone(&api_key_factory),
165171
start_time,
@@ -245,17 +251,23 @@ fn get_flush_strategy_for_mode(
245251
}
246252
}
247253

248-
fn create_api_key_factory(config: &Arc<Config>, aws_config: &Arc<AwsConfig>) -> Arc<ApiKeyFactory> {
254+
fn create_api_key_factory(
255+
config: &Arc<Config>,
256+
aws_config: &Arc<AwsConfig>,
257+
client: &reqwest::Client,
258+
) -> Arc<ApiKeyFactory> {
249259
let config = Arc::clone(config);
250260
let aws_config = Arc::clone(aws_config);
261+
let client = client.clone();
251262
let api_key_secret_reload_interval = config.api_key_secret_reload_interval;
252263

253264
Arc::new(ApiKeyFactory::new_from_resolver(
254265
Arc::new(move || {
255266
let config = Arc::clone(&config);
256267
let aws_config = Arc::clone(&aws_config);
268+
let client = client.clone();
257269

258-
Box::pin(async move { resolve_secrets(config, aws_config).await })
270+
Box::pin(async move { resolve_secrets(config, aws_config, client).await })
259271
}),
260272
api_key_secret_reload_interval,
261273
))
@@ -284,6 +296,7 @@ async fn extension_loop_active(
284296
aws_config: Arc<AwsConfig>,
285297
config: &Arc<Config>,
286298
client: &Client,
299+
shared_client: reqwest::Client,
287300
r: &RegisterResponse,
288301
api_key_factory: Arc<ApiKeyFactory>,
289302
start_time: Instant,
@@ -293,20 +306,20 @@ async fn extension_loop_active(
293306
let account_id = r.account_id.as_ref().unwrap_or(&"none".to_string()).clone();
294307
let tags_provider = setup_tag_provider(&Arc::clone(&aws_config), config, &account_id);
295308

296-
// Build one shared reqwest::Client for metrics, logs, and trace proxy flushing.
297-
// reqwest::Client is Arc-based internally, so cloning just increments a refcount
298-
// and shares the connection pool.
299-
let shared_client = bottlecap::http::get_client(config);
300-
301-
let (logs_agent_channel, logs_flusher, logs_agent_cancel_token, logs_aggregator_handle) =
302-
start_logs_agent(
303-
config,
304-
Arc::clone(&api_key_factory),
305-
&tags_provider,
306-
event_bus_tx.clone(),
307-
aws_config.is_managed_instance_mode(),
308-
&shared_client,
309-
);
309+
let (
310+
logs_agent_channel,
311+
logs_flusher,
312+
logs_agent_cancel_token,
313+
logs_aggregator_handle,
314+
durable_context_tx,
315+
) = start_logs_agent(
316+
config,
317+
Arc::clone(&api_key_factory),
318+
&tags_provider,
319+
event_bus_tx.clone(),
320+
aws_config.is_managed_instance_mode(),
321+
&shared_client,
322+
);
310323

311324
let (metrics_flushers, metrics_aggregator_handle, dogstatsd_cancel_token) = start_dogstatsd(
312325
tags_provider.clone(),
@@ -325,6 +338,7 @@ async fn extension_loop_active(
325338
Arc::clone(&aws_config),
326339
metrics_aggregator_handle.clone(),
327340
Arc::clone(&propagator),
341+
durable_context_tx,
328342
);
329343
tokio::spawn(async move {
330344
invocation_processor_service.run().await;
@@ -1039,14 +1053,15 @@ fn start_logs_agent(
10391053
LogsFlusher,
10401054
CancellationToken,
10411055
LogsAggregatorHandle,
1056+
Sender<DurableContextUpdate>,
10421057
) {
10431058
let (aggregator_service, aggregator_handle) = LogsAggregatorService::default();
10441059
// Start service in background
10451060
tokio::spawn(async move {
10461061
aggregator_service.run().await;
10471062
});
10481063

1049-
let (mut agent, tx) = LogsAgent::new(
1064+
let (mut agent, tx, durable_context_tx) = LogsAgent::new(
10501065
Arc::clone(tags_provider),
10511066
Arc::clone(config),
10521067
event_bus,
@@ -1068,7 +1083,13 @@ fn start_logs_agent(
10681083
config.clone(),
10691084
client.clone(),
10701085
);
1071-
(tx, flusher, cancel_token, aggregator_handle)
1086+
(
1087+
tx,
1088+
flusher,
1089+
cancel_token,
1090+
aggregator_handle,
1091+
durable_context_tx,
1092+
)
10721093
}
10731094

10741095
#[allow(clippy::type_complexity)]

bottlecap/src/config/env.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -482,6 +482,12 @@ pub struct EnvConfig {
482482
/// The delay between two samples of the API Security schema collection, in seconds.
483483
#[serde(deserialize_with = "deserialize_optional_duration_from_seconds")]
484484
pub api_security_sample_delay: Option<Duration>,
485+
486+
/// @env `DD_ORG_UUID`
487+
///
488+
/// The Datadog organization UUID. When set, delegated auth is auto-enabled.
489+
#[serde(deserialize_with = "deserialize_string_or_int")]
490+
pub org_uuid: Option<String>,
485491
}
486492

487493
#[allow(clippy::too_many_lines)]
@@ -684,6 +690,8 @@ fn merge_config(config: &mut Config, env_config: &EnvConfig) {
684690
merge_option_to_value!(config, env_config, appsec_waf_timeout);
685691
merge_option_to_value!(config, env_config, api_security_enabled);
686692
merge_option_to_value!(config, env_config, api_security_sample_delay);
693+
694+
merge_string!(config, dd_org_uuid, env_config, org_uuid);
687695
}
688696

689697
#[derive(Debug, PartialEq, Clone, Copy)]
@@ -1044,6 +1052,8 @@ mod tests {
10441052
appsec_waf_timeout: Duration::from_secs(1),
10451053
api_security_enabled: false,
10461054
api_security_sample_delay: Duration::from_secs(60),
1055+
1056+
dd_org_uuid: String::default(),
10471057
};
10481058

10491059
assert_eq!(config, expected_config);

bottlecap/src/config/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,8 @@ pub struct Config {
364364
pub span_dedup_timeout: Option<Duration>,
365365
pub api_key_secret_reload_interval: Option<Duration>,
366366

367+
pub dd_org_uuid: String,
368+
367369
pub serverless_appsec_enabled: bool,
368370
pub appsec_rules: Option<String>,
369371
pub appsec_waf_timeout: Duration,
@@ -479,6 +481,8 @@ impl Default for Config {
479481
span_dedup_timeout: None,
480482
api_key_secret_reload_interval: None,
481483

484+
dd_org_uuid: String::default(),
485+
482486
serverless_appsec_enabled: false,
483487
appsec_rules: None,
484488
appsec_waf_timeout: Duration::from_millis(5),

bottlecap/src/config/yaml.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1036,6 +1036,8 @@ api_security_sample_delay: 60 # Seconds
10361036
dogstatsd_so_rcvbuf: Some(1_048_576),
10371037
dogstatsd_buffer_size: Some(65507),
10381038
dogstatsd_queue_size: Some(2048),
1039+
1040+
dd_org_uuid: String::default(),
10391041
};
10401042

10411043
// Assert that

bottlecap/src/lifecycle/invocation/processor.rs

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ use libdd_trace_protobuf::pb::Span;
1616
use libdd_trace_utils::tracer_header_tags;
1717
use serde_json::Value;
1818
use tokio::time::Instant;
19-
use tracing::{debug, trace, warn};
19+
use tracing::{debug, error, trace, warn};
20+
21+
use tokio::sync::mpsc;
2022

2123
use crate::{
2224
config::{self, aws::AwsConfig},
@@ -31,6 +33,7 @@ use crate::{
3133
span_inferrer::{self, SpanInferrer},
3234
triggers::get_default_service_name,
3335
},
36+
logs::lambda::DurableContextUpdate,
3437
metrics::enhanced::lambda::{EnhancedMetricData, Lambda as EnhancedMetrics},
3538
proc::{
3639
self, CPUData, NetworkData,
@@ -89,6 +92,10 @@ pub struct Processor {
8992
/// Tracks whether if first invocation after init has been received in Managed Instance mode.
9093
/// Used to determine if we should search for the empty context on an invocation.
9194
awaiting_first_invocation: bool,
95+
/// Sender used to forward durable execution context extracted from `aws.lambda` spans to the
96+
/// logs agent. Decouples the trace agent from the logs agent: the trace agent sends spans
97+
/// to the lifecycle processor, which extracts durable context and relays it here.
98+
durable_context_tx: mpsc::Sender<DurableContextUpdate>,
9299
}
93100

94101
impl Processor {
@@ -99,6 +106,7 @@ impl Processor {
99106
aws_config: Arc<AwsConfig>,
100107
metrics_aggregator: dogstatsd::aggregator::AggregatorHandle,
101108
propagator: Arc<DatadogCompositePropagator>,
109+
durable_context_tx: mpsc::Sender<DurableContextUpdate>,
102110
) -> Self {
103111
let resource = tags_provider
104112
.get_canonical_resource_name()
@@ -128,6 +136,7 @@ impl Processor {
128136
dynamic_tags: HashMap::new(),
129137
active_invocations: 0,
130138
awaiting_first_invocation: false,
139+
durable_context_tx,
131140
}
132141
}
133142

@@ -1358,6 +1367,29 @@ impl Processor {
13581367
.add_tracer_span(request_id, span, client_computed_stats);
13591368
}
13601369
}
1370+
1371+
/// Forwards durable execution context extracted from an `aws.lambda` span to the logs
1372+
/// pipeline so it can release held logs and tag them with durable execution metadata.
1373+
pub async fn forward_durable_context(
1374+
&mut self,
1375+
request_id: &str,
1376+
execution_id: &str,
1377+
execution_name: &str,
1378+
first_invocation: Option<bool>,
1379+
) {
1380+
if let Err(e) = self
1381+
.durable_context_tx
1382+
.send(DurableContextUpdate {
1383+
request_id: request_id.to_owned(),
1384+
execution_id: execution_id.to_owned(),
1385+
execution_name: execution_name.to_owned(),
1386+
first_invocation,
1387+
})
1388+
.await
1389+
{
1390+
error!("Invocation Processor | Failed to forward durable context to logs agent: {e}");
1391+
}
1392+
}
13611393
}
13621394

13631395
#[cfg(test)]
@@ -1403,7 +1435,15 @@ mod tests {
14031435
tokio::spawn(service.run());
14041436

14051437
let propagator = Arc::new(DatadogCompositePropagator::new(Arc::clone(&config)));
1406-
Processor::new(tags_provider, config, aws_config, handle, propagator)
1438+
let (durable_context_tx, _) = tokio::sync::mpsc::channel(1);
1439+
Processor::new(
1440+
tags_provider,
1441+
config,
1442+
aws_config,
1443+
handle,
1444+
propagator,
1445+
durable_context_tx,
1446+
)
14071447
}
14081448

14091449
#[test]
@@ -1940,7 +1980,15 @@ mod tests {
19401980

19411981
let propagator = Arc::new(DatadogCompositePropagator::new(Arc::clone(&config)));
19421982

1943-
let processor = Processor::new(tags_provider, config, aws_config, handle, propagator);
1983+
let (durable_context_tx, _) = tokio::sync::mpsc::channel(1);
1984+
let processor = Processor::new(
1985+
tags_provider,
1986+
config,
1987+
aws_config,
1988+
handle,
1989+
propagator,
1990+
durable_context_tx,
1991+
);
19441992

19451993
assert!(
19461994
processor.is_managed_instance_mode(),
@@ -2160,12 +2208,14 @@ mod tests {
21602208
AggregatorService::new(EMPTY_TAGS, 1024).expect("failed to create aggregator service");
21612209
tokio::spawn(aggregator_service.run());
21622210
let propagator = Arc::new(DatadogCompositePropagator::new(Arc::clone(&config)));
2211+
let (durable_context_tx, _) = tokio::sync::mpsc::channel(1);
21632212
let mut p = Processor::new(
21642213
Arc::clone(&tags_provider),
21652214
Arc::clone(&config),
21662215
aws_config,
21672216
aggregator_handle,
21682217
propagator,
2218+
durable_context_tx,
21692219
);
21702220

21712221
let (trace_tx, mut trace_rx) = mpsc::channel(10);

0 commit comments

Comments
 (0)