Skip to content

Commit 663846f

Browse files
authored
Merge branch 'main' into zarir/OTLP-operation-name-change
2 parents 5b43698 + dfcb768 commit 663846f

46 files changed

Lines changed: 862 additions & 723 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

bottlecap/Cargo.lock

Lines changed: 73 additions & 71 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bottlecap/Cargo.toml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ reqwest = { version = "0.12.11", features = ["json", "http2"], default-features
2828
serde = { version = "1.0", default-features = false, features = ["derive"] }
2929
serde_json = { version = "1.0", default-features = false, features = ["alloc"] }
3030
thiserror = { version = "1.0", default-features = false }
31+
# Transitive dependency (pulled in via cookie). Pinned to >=0.3.47 so cargo audit / CI passes (RUSTSEC-2026-0009).
32+
time = { version = "0.3.47", default-features = false }
3133
tokio = { version = "1.47", default-features = false, features = ["macros", "rt-multi-thread", "time"] }
3234
tokio-util = { version = "0.7", default-features = false }
3335
tracing = { version = "0.1", default-features = false }
@@ -71,8 +73,8 @@ libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "158b
7173
libdd-trace-normalization = { git = "https://github.com/DataDog/libdatadog", rev = "158b59471f1132e3cb36023fa3c46ccb2dd0eda1" }
7274
libdd-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "158b59471f1132e3cb36023fa3c46ccb2dd0eda1" }
7375
libdd-trace-stats = { git = "https://github.com/DataDog/libdatadog", rev = "158b59471f1132e3cb36023fa3c46ccb2dd0eda1" }
74-
dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "18b49baba8bfef97060d7edd8b830584d0da3373", default-features = false }
75-
datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "18b49baba8bfef97060d7edd8b830584d0da3373", default-features = false }
76+
dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "2eb009a59ed07ffcaf174db1c31af413365e9bc6", default-features = false }
77+
datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "2eb009a59ed07ffcaf174db1c31af413365e9bc6", default-features = false }
7678
libddwaf = { version = "1.28.1", git = "https://github.com/DataDog/libddwaf-rust", rev = "d1534a158d976bd4f747bf9fcc58e0712d2d17fc", default-features = false, features = ["serde"] }
7779

7880
[dev-dependencies]

bottlecap/src/appsec/processor/context.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ impl Context {
202202
self.process_result(result);
203203
}
204204
Err(e) => log_waf_run_error(e),
205-
};
205+
}
206206
}
207207

208208
/// Obtain the information about the endpoint that was invoked, if available.

bottlecap/src/appsec/processor/mod.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ pub struct Processor {
3434
context_buffer: VecDeque<Context>,
3535
}
3636
impl Processor {
37-
const CONTEXT_BUFFER_DEFAULT_CAPACITY: NonZero<usize> = unsafe { NonZero::new_unchecked(5) };
37+
const CONTEXT_BUFFER_DEFAULT_CAPACITY: NonZero<usize> = NonZero::new(5).unwrap();
3838

3939
/// Creates a new [`Processor`] instance using the provided [`Config`].
4040
///
@@ -578,9 +578,9 @@ impl InvocationPayload for IdentifiedTrigger {
578578
.unwrap_or_default()
579579
.iter()
580580
.flat_map(parse_cookie)
581-
.chunk_by(|(k, _)| (*k).to_string())
581+
.chunk_by(|(k, _)| (*k).clone())
582582
.into_iter()
583-
.map(|(k, v)| (k, v.into_iter().map(|(_, v)| v.to_string()).collect()))
583+
.map(|(k, v)| (k, v.into_iter().map(|(_, v)| v.clone()).collect()))
584584
.collect(),
585585
Self::APIGatewayWebSocketEvent(t) => t
586586
.multi_value_headers
@@ -589,9 +589,9 @@ impl InvocationPayload for IdentifiedTrigger {
589589
.unwrap_or_default()
590590
.iter()
591591
.flat_map(parse_cookie)
592-
.chunk_by(|(k, _)| k.to_string())
592+
.chunk_by(|(k, _)| k.clone())
593593
.into_iter()
594-
.map(|(k, v)| (k, v.into_iter().map(|(_, v)| v.to_string()).collect()))
594+
.map(|(k, v)| (k, v.into_iter().map(|(_, v)| v.clone()).collect()))
595595
.collect(),
596596
Self::ALBEvent(t) => t
597597
.multi_value_headers
@@ -601,9 +601,9 @@ impl InvocationPayload for IdentifiedTrigger {
601601
.unwrap_or_default()
602602
.iter()
603603
.flat_map(parse_cookie)
604-
.chunk_by(|(k, _)| (*k).to_string())
604+
.chunk_by(|(k, _)| (*k).clone())
605605
.into_iter()
606-
.map(|(k, v)| (k, v.into_iter().map(|(_, v)| v.to_string()).collect()))
606+
.map(|(k, v)| (k, v.into_iter().map(|(_, v)| v.clone()).collect()))
607607
.collect(),
608608
Self::LambdaFunctionUrlEvent(t) => {
609609
t.cookies.as_ref().map(list_to_map).unwrap_or_default()

bottlecap/src/appsec/processor/response.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::appsec::processor::InvocationPayload;
77
use crate::lifecycle::invocation::triggers::body::Body;
88

99
/// The expected payload of a response. This is different from trigger to trigger.
10-
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
1111
pub enum ExpectedResponseFormat {
1212
/// API Gateway style integration responses (REST and HTTP)
1313
ApiGatewayResponse,
@@ -16,6 +16,7 @@ pub enum ExpectedResponseFormat {
1616
Raw,
1717

1818
/// Unknown or unsupported response format
19+
#[default]
1920
Unknown,
2021
}
2122
impl ExpectedResponseFormat {
@@ -39,11 +40,6 @@ impl ExpectedResponseFormat {
3940
}
4041
}
4142
}
42-
impl Default for ExpectedResponseFormat {
43-
fn default() -> Self {
44-
Self::Unknown
45-
}
46-
}
4743

4844
#[derive(Debug, Default, Deserialize)]
4945
#[serde(default)]

bottlecap/src/bin/bottlecap/main.rs

Lines changed: 25 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -113,13 +113,13 @@ async fn main() -> anyhow::Result<()> {
113113
debug!("Starting Datadog Extension v{version_without_next}");
114114

115115
// Debug: Wait for debugger to attach if DD_DEBUG_WAIT_FOR_ATTACH is set
116-
if let Ok(wait_secs) = env::var("DD_DEBUG_WAIT_FOR_ATTACH") {
117-
if let Ok(secs) = wait_secs.parse::<u64>() {
118-
debug!("DD_DEBUG_WAIT_FOR_ATTACH: Waiting {secs} seconds for debugger to attach...");
119-
debug!("Connect your debugger to port 2345 now!");
120-
tokio::time::sleep(tokio::time::Duration::from_secs(secs)).await;
121-
debug!("DD_DEBUG_WAIT_FOR_ATTACH: Continuing execution...");
122-
}
116+
if let Ok(wait_secs) = env::var("DD_DEBUG_WAIT_FOR_ATTACH")
117+
&& let Ok(secs) = wait_secs.parse::<u64>()
118+
{
119+
debug!("DD_DEBUG_WAIT_FOR_ATTACH: Waiting {secs} seconds for debugger to attach...");
120+
debug!("Connect your debugger to port 2345 now!");
121+
tokio::time::sleep(tokio::time::Duration::from_secs(secs)).await;
122+
debug!("DD_DEBUG_WAIT_FOR_ATTACH: Continuing execution...");
123123
}
124124

125125
prepare_client_provider()?;
@@ -273,7 +273,7 @@ async fn extension_loop_idle(
273273
error!("Error getting next event: {e:?}");
274274
return Err(e.into());
275275
}
276-
};
276+
}
277277
}
278278
}
279279

@@ -288,11 +288,7 @@ async fn extension_loop_active(
288288
) -> anyhow::Result<()> {
289289
let (mut event_bus, event_bus_tx) = EventBus::run();
290290

291-
let account_id = r
292-
.account_id
293-
.as_ref()
294-
.unwrap_or(&"none".to_string())
295-
.to_string();
291+
let account_id = r.account_id.as_ref().unwrap_or(&"none".to_string()).clone();
296292
let tags_provider = setup_tag_provider(&Arc::clone(&aws_config), config, &account_id);
297293

298294
let (logs_agent_channel, logs_flusher, logs_agent_cancel_token, logs_aggregator_handle) =
@@ -450,10 +446,8 @@ async fn extension_loop_active(
450446
// Wait for any pending flushes
451447
flushing_service.await_handles().await;
452448
// Final flush to capture any data that accumulated since the last
453-
// spawn_non_blocking(). We pass force_stats=true since this is our
454-
// last opportunity to send data before shutdown.
455-
let mut locked_metrics = flushing_service.metrics_flushers().lock().await;
456-
flushing_service.flush_blocking(true, &mut locked_metrics).await;
449+
// spawn_non_blocking(). This is our last opportunity to send data.
450+
flushing_service.flush_blocking_final().await;
457451
break;
458452
}
459453
}
@@ -519,7 +513,6 @@ async fn extension_loop_active(
519513
"Transient network error waiting for shutdown event: {}. Retrying...",
520514
e
521515
);
522-
continue;
523516
}
524517
Err(e) => {
525518
error!(
@@ -628,26 +621,19 @@ async fn extension_loop_active(
628621
tokio::select! {
629622
biased;
630623
Some(event) = event_bus.rx.recv() => {
631-
if let Some(telemetry_event) = handle_event_bus_event(event, invocation_processor_handle.clone(), appsec_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone(), stats_concentrator.clone()).await {
632-
if let TelemetryRecord::PlatformRuntimeDone{ .. } = telemetry_event.record {
624+
if let Some(telemetry_event) = handle_event_bus_event(event, invocation_processor_handle.clone(), appsec_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone(), stats_concentrator.clone()).await
625+
&& let TelemetryRecord::PlatformRuntimeDone{ .. } = telemetry_event.record {
633626
break 'flush_end;
634627
}
635-
}
636628
}
637629
_ = race_flush_interval.tick() => {
638-
let mut locked_metrics = metrics_flushers.lock().await;
639-
flushing_service
640-
.flush_blocking(false, &mut locked_metrics)
641-
.await;
630+
flushing_service.flush_blocking().await;
642631
race_flush_interval.reset();
643632
}
644633
}
645634
}
646635
// flush
647-
let mut locked_metrics = metrics_flushers.lock().await;
648-
flushing_service
649-
.flush_blocking(false, &mut locked_metrics)
650-
.await;
636+
flushing_service.flush_blocking().await;
651637
race_flush_interval.reset();
652638
let next_response =
653639
extension::next_event(client, &aws_config.runtime_api, &r.extension_id).await;
@@ -664,10 +650,7 @@ async fn extension_loop_active(
664650
}
665651
}
666652
FlushDecision::Periodic => {
667-
let mut locked_metrics = metrics_flushers.lock().await;
668-
flushing_service
669-
.flush_blocking(false, &mut locked_metrics)
670-
.await;
653+
flushing_service.flush_blocking().await;
671654
race_flush_interval.reset();
672655
}
673656
_ => {
@@ -695,10 +678,7 @@ async fn extension_loop_active(
695678
}
696679
_ = race_flush_interval.tick() => {
697680
if flush_control.flush_strategy == FlushStrategy::Default {
698-
let mut locked_metrics = metrics_flushers.lock().await;
699-
flushing_service
700-
.flush_blocking(false, &mut locked_metrics)
701-
.await;
681+
flushing_service.flush_blocking().await;
702682
race_flush_interval.reset();
703683
}
704684
}
@@ -744,11 +724,8 @@ async fn extension_loop_active(
744724
&lifecycle_listener_shutdown_token,
745725
);
746726

747-
// Final flush with force_stats=true since this is our last opportunity
748-
let mut locked_metrics = metrics_flushers.lock().await;
749-
flushing_service
750-
.flush_blocking(true, &mut locked_metrics)
751-
.await;
727+
// Final flush - this is our last opportunity to send data before shutdown
728+
flushing_service.flush_blocking_final().await;
752729

753730
// Even though we're shutting down, we need to reset the flush interval to prevent any future flushes
754731
race_flush_interval.reset();
@@ -1178,7 +1155,7 @@ async fn start_dogstatsd(
11781155
api_key_factory: Arc<ApiKeyFactory>,
11791156
config: &Arc<Config>,
11801157
) -> (
1181-
Arc<TokioMutex<Vec<MetricsFlusher>>>,
1158+
Arc<Vec<MetricsFlusher>>,
11821159
MetricsAggregatorHandle,
11831160
CancellationToken,
11841161
) {
@@ -1200,17 +1177,20 @@ async fn start_dogstatsd(
12001177
});
12011178

12021179
// Get flushers with aggregator handle
1203-
let flushers = Arc::new(TokioMutex::new(start_metrics_flushers(
1180+
let flushers = Arc::new(start_metrics_flushers(
12041181
Arc::clone(&api_key_factory),
12051182
&aggregator_handle,
12061183
config,
1207-
)));
1184+
));
12081185

12091186
// Create Dogstatsd server
12101187
let dogstatsd_config = DogStatsDConfig {
12111188
host: EXTENSION_HOST.to_string(),
12121189
port: DOGSTATSD_PORT,
12131190
metric_namespace: config.statsd_metric_namespace.clone(),
1191+
so_rcvbuf: config.dogstatsd_so_rcvbuf,
1192+
buffer_size: config.dogstatsd_buffer_size,
1193+
queue_size: config.dogstatsd_queue_size,
12141194
};
12151195
let cancel_token = tokio_util::sync::CancellationToken::new();
12161196
let dogstatsd_agent = DogStatsD::new(

bottlecap/src/config/env.rs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,24 @@ pub struct EnvConfig {
277277
#[serde(deserialize_with = "deserialize_optional_string")]
278278
pub statsd_metric_namespace: Option<String>,
279279

280+
/// @env `DD_DOGSTATSD_SO_RCVBUF`
281+
/// Size of the receive buffer for `DogStatsD` UDP packets, in bytes (`SO_RCVBUF`).
282+
/// Increase to reduce packet loss under high-throughput metric bursts.
283+
#[serde(deserialize_with = "deserialize_option_lossless")]
284+
pub dogstatsd_so_rcvbuf: Option<usize>,
285+
286+
/// @env `DD_DOGSTATSD_BUFFER_SIZE`
287+
/// Maximum size of a single read from any transport (UDP or named pipe), in bytes.
288+
/// Defaults to 8192.
289+
#[serde(deserialize_with = "deserialize_option_lossless")]
290+
pub dogstatsd_buffer_size: Option<usize>,
291+
292+
/// @env `DD_DOGSTATSD_QUEUE_SIZE`
293+
/// Internal queue capacity between the socket reader and metric processor.
294+
/// Defaults to 1024. Increase if the processor can't keep up with burst traffic.
295+
#[serde(deserialize_with = "deserialize_option_lossless")]
296+
pub dogstatsd_queue_size: Option<usize>,
297+
280298
// OTLP
281299
//
282300
// - APM / Traces
@@ -554,6 +572,11 @@ fn merge_config(config: &mut Config, env_config: &EnvConfig) {
554572
config.statsd_metric_namespace = parse_metric_namespace(namespace);
555573
}
556574

575+
// DogStatsD
576+
merge_option!(config, env_config, dogstatsd_so_rcvbuf);
577+
merge_option!(config, env_config, dogstatsd_buffer_size);
578+
merge_option!(config, env_config, dogstatsd_queue_size);
579+
557580
// OTLP
558581
merge_option_to_value!(config, env_config, otlp_config_traces_enabled);
559582
merge_option_to_value!(
@@ -830,6 +853,11 @@ mod tests {
830853
);
831854
jail.set_env("DD_OTLP_CONFIG_LOGS_ENABLED", "true");
832855

856+
// DogStatsD
857+
jail.set_env("DD_DOGSTATSD_SO_RCVBUF", "1048576");
858+
jail.set_env("DD_DOGSTATSD_BUFFER_SIZE", "65507");
859+
jail.set_env("DD_DOGSTATSD_QUEUE_SIZE", "2048");
860+
833861
// AWS Lambda
834862
jail.set_env(
835863
"DD_API_KEY_SECRET_ARN",
@@ -984,6 +1012,9 @@ mod tests {
9841012
otlp_config_traces_probabilistic_sampler_sampling_percentage: Some(50),
9851013
otlp_config_logs_enabled: true,
9861014
statsd_metric_namespace: None,
1015+
dogstatsd_so_rcvbuf: Some(1_048_576),
1016+
dogstatsd_buffer_size: Some(65507),
1017+
dogstatsd_queue_size: Some(2048),
9871018
api_key_secret_arn: "arn:aws:secretsmanager:region:account:secret:datadog-api-key"
9881019
.to_string(),
9891020
kms_api_key: "test-kms-key".to_string(),
@@ -1170,4 +1201,43 @@ mod tests {
11701201
Ok(())
11711202
});
11721203
}
1204+
1205+
#[test]
1206+
fn test_dogstatsd_config_from_env() {
1207+
figment::Jail::expect_with(|jail| {
1208+
jail.clear_env();
1209+
jail.set_env("DD_DOGSTATSD_SO_RCVBUF", "1048576");
1210+
jail.set_env("DD_DOGSTATSD_BUFFER_SIZE", "65507");
1211+
jail.set_env("DD_DOGSTATSD_QUEUE_SIZE", "2048");
1212+
1213+
let mut config = Config::default();
1214+
let env_config_source = EnvConfigSource;
1215+
env_config_source
1216+
.load(&mut config)
1217+
.expect("Failed to load config");
1218+
1219+
assert_eq!(config.dogstatsd_so_rcvbuf, Some(1_048_576));
1220+
assert_eq!(config.dogstatsd_buffer_size, Some(65507));
1221+
assert_eq!(config.dogstatsd_queue_size, Some(2048));
1222+
Ok(())
1223+
});
1224+
}
1225+
1226+
#[test]
1227+
fn test_dogstatsd_config_defaults_to_none() {
1228+
figment::Jail::expect_with(|jail| {
1229+
jail.clear_env();
1230+
1231+
let mut config = Config::default();
1232+
let env_config_source = EnvConfigSource;
1233+
env_config_source
1234+
.load(&mut config)
1235+
.expect("Failed to load config");
1236+
1237+
assert_eq!(config.dogstatsd_so_rcvbuf, None);
1238+
assert_eq!(config.dogstatsd_buffer_size, None);
1239+
assert_eq!(config.dogstatsd_queue_size, None);
1240+
Ok(())
1241+
});
1242+
}
11731243
}

0 commit comments

Comments
 (0)