Skip to content

Commit 4600dde

Browse files
feat(logs): adopt datadog-log-agent shared crate
Replace bespoke log aggregator, flusher, and intake types with the shared datadog-log-agent crate from serverless-components. Key changes: - Delete bottlecap/src/logs/{aggregator,aggregator_service,constants, flusher}.rs — superseded by the crate - LogEntry is now the flat datadog_log_agent::LogEntry; lambda context (arn, request_id) moves into attributes["lambda"] instead of the old nested IntakeLog.message.lambda struct - LogFlusher (with internal retry) replaces LogsFlusher; flush handle type simplifies from Vec<RequestBuilder> to bool - start_logs_agent made async to resolve the API key upfront and build LogFlusherConfig explicitly - Integration test updated to match the new flat JSON wire format Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent be113c9 commit 4600dde

14 files changed

Lines changed: 442 additions & 1033 deletions

File tree

bottlecap/Cargo.lock

Lines changed: 28 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bottlecap/src/bin/bottlecap/main.rs

Lines changed: 41 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,7 @@ use bottlecap::{
4545
listener::Listener as LifecycleListener,
4646
},
4747
logger,
48-
logs::{
49-
agent::LogsAgent,
50-
aggregator_service::{
51-
AggregatorHandle as LogsAggregatorHandle, AggregatorService as LogsAggregatorService,
52-
},
53-
flusher::LogsFlusher,
54-
},
48+
logs::agent::LogsAgent,
5549
otlp::{agent::Agent as OtlpAgent, should_enable_otlp_agent},
5650
proxy::{interceptor, should_start_proxy},
5751
secrets::decrypt,
@@ -79,6 +73,10 @@ use bottlecap::{
7973
},
8074
};
8175
use datadog_fips::reqwest_adapter::create_reqwest_client_builder;
76+
use datadog_log_agent::{
77+
AggregatorHandle as LogsAggregatorHandle, AggregatorService as LogsAggregatorService,
78+
FlusherMode, LogFlusher, LogFlusherConfig, LogsAdditionalEndpoint,
79+
};
8280
use decrypt::resolve_secrets;
8381
use dogstatsd::{
8482
aggregator::{
@@ -306,7 +304,8 @@ async fn extension_loop_active(
306304
event_bus_tx.clone(),
307305
aws_config.is_managed_instance_mode(),
308306
&shared_client,
309-
);
307+
)
308+
.await;
310309

311310
let (metrics_flushers, metrics_aggregator_handle, dogstatsd_cancel_token) = start_dogstatsd(
312311
tags_provider.clone(),
@@ -1027,7 +1026,7 @@ fn setup_tag_provider(
10271026
))
10281027
}
10291028

1030-
fn start_logs_agent(
1029+
async fn start_logs_agent(
10311030
config: &Arc<Config>,
10321031
api_key_factory: Arc<ApiKeyFactory>,
10331032
tags_provider: &Arc<TagProvider>,
@@ -1036,11 +1035,11 @@ fn start_logs_agent(
10361035
client: &Client,
10371036
) -> (
10381037
Sender<TelemetryEvent>,
1039-
LogsFlusher,
1038+
LogFlusher,
10401039
CancellationToken,
10411040
LogsAggregatorHandle,
10421041
) {
1043-
let (aggregator_service, aggregator_handle) = LogsAggregatorService::default();
1042+
let (aggregator_service, aggregator_handle) = LogsAggregatorService::new();
10441043
// Start service in background
10451044
tokio::spawn(async move {
10461045
aggregator_service.run().await;
@@ -1062,12 +1061,37 @@ fn start_logs_agent(
10621061
drop(agent);
10631062
});
10641063

1065-
let flusher = LogsFlusher::new(
1066-
api_key_factory,
1067-
aggregator_handle.clone(),
1068-
config.clone(),
1069-
client.clone(),
1070-
);
1064+
let api_key = api_key_factory.get_api_key().await.unwrap_or_default();
1065+
1066+
let mode = if config.observability_pipelines_worker_logs_enabled {
1067+
FlusherMode::ObservabilityPipelinesWorker {
1068+
url: config.observability_pipelines_worker_logs_url.clone(),
1069+
}
1070+
} else {
1071+
FlusherMode::Datadog
1072+
};
1073+
1074+
let additional_endpoints: Vec<LogsAdditionalEndpoint> = config
1075+
.logs_config_additional_endpoints
1076+
.iter()
1077+
.map(|ep| LogsAdditionalEndpoint {
1078+
api_key: ep.api_key.clone(),
1079+
url: format!("https://{}:{}/api/v2/logs", ep.host, ep.port),
1080+
is_reliable: ep.is_reliable,
1081+
})
1082+
.collect();
1083+
1084+
let flusher_config = LogFlusherConfig {
1085+
api_key,
1086+
site: config.site.clone(),
1087+
mode,
1088+
additional_endpoints,
1089+
use_compression: config.logs_config_use_compression,
1090+
compression_level: config.logs_config_compression_level,
1091+
flush_timeout: std::time::Duration::from_secs(config.flush_timeout),
1092+
};
1093+
1094+
let flusher = LogFlusher::new(flusher_config, client.clone(), aggregator_handle.clone());
10711095
(tx, flusher, cancel_token, aggregator_handle)
10721096
}
10731097

bottlecap/src/flushing/handles.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ pub struct MetricsRetryBatch {
2727
pub struct FlushHandles {
2828
/// Handles for trace flush operations. Returns failed traces for retry.
2929
pub trace_flush_handles: Vec<JoinHandle<Vec<SendData>>>,
30-
/// Handles for log flush operations. Returns failed request builders for retry.
31-
pub log_flush_handles: Vec<JoinHandle<Vec<reqwest::RequestBuilder>>>,
30+
/// Handles for log flush operations. Returns true on success, false on any failure.
31+
pub log_flush_handles: Vec<JoinHandle<bool>>,
3232
/// Handles for metrics flush operations. Returns batch info for retry.
3333
pub metric_flush_handles: Vec<JoinHandle<MetricsRetryBatch>>,
3434
/// Handles for proxy flush operations. Returns failed request builders for retry.

bottlecap/src/flushing/service.rs

Lines changed: 13 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,9 @@ use dogstatsd::{
88
aggregator::AggregatorHandle as MetricsAggregatorHandle, flusher::Flusher as MetricsFlusher,
99
};
1010

11+
use datadog_log_agent::LogFlusher;
12+
1113
use crate::flushing::handles::{FlushHandles, MetricsRetryBatch};
12-
use crate::logs::flusher::LogsFlusher;
1314
use crate::traces::{
1415
proxy_flusher::Flusher as ProxyFlusher, stats_flusher::StatsFlusher,
1516
trace_flusher::TraceFlusher,
@@ -23,7 +24,7 @@ use crate::traces::{
2324
/// - Performing blocking flushes (spawn + await)
2425
pub struct FlushingService {
2526
// Flushers
26-
logs_flusher: LogsFlusher,
27+
logs_flusher: LogFlusher,
2728
trace_flusher: Arc<TraceFlusher>,
2829
stats_flusher: Arc<StatsFlusher>,
2930
proxy_flusher: Arc<ProxyFlusher>,
@@ -40,7 +41,7 @@ impl FlushingService {
4041
/// Creates a new `FlushingService` with the given flushers.
4142
#[must_use]
4243
pub fn new(
43-
logs_flusher: LogsFlusher,
44+
logs_flusher: LogFlusher,
4445
trace_flusher: Arc<TraceFlusher>,
4546
stats_flusher: Arc<StatsFlusher>,
4647
proxy_flusher: Arc<ProxyFlusher>,
@@ -76,7 +77,7 @@ impl FlushingService {
7677
let lf = self.logs_flusher.clone();
7778
self.handles
7879
.log_flush_handles
79-
.push(tokio::spawn(async move { lf.flush(None).await }));
80+
.push(tokio::spawn(async move { lf.flush().await }));
8081

8182
// Spawn traces flush
8283
let tf = self.trace_flusher.clone();
@@ -191,32 +192,18 @@ impl FlushingService {
191192
}
192193
}
193194

194-
// Await log handles with retry
195+
// Await log handles — retries are handled internally by LogFlusher
195196
for handle in self.handles.log_flush_handles.drain(..) {
196197
match handle.await {
197-
Ok(retry) => {
198-
if !retry.is_empty() {
199-
debug!(
200-
"FLUSHING_SERVICE | redriving {:?} log payloads",
201-
retry.len()
202-
);
203-
}
204-
for item in retry {
205-
let lf = self.logs_flusher.clone();
206-
match item.try_clone() {
207-
Some(item_clone) => {
208-
joinset.spawn(async move {
209-
lf.flush(Some(item_clone)).await;
210-
});
211-
}
212-
None => {
213-
error!("FLUSHING_SERVICE | Can't clone redrive log payloads");
214-
}
215-
}
198+
Ok(success) => {
199+
if !success {
200+
debug!("FLUSHING_SERVICE | log flush reported a failure");
201+
flush_error = true;
216202
}
217203
}
218204
Err(e) => {
219-
error!("FLUSHING_SERVICE | redrive log error {e:?}");
205+
error!("FLUSHING_SERVICE | log flush task error {e:?}");
206+
flush_error = true;
220207
}
221208
}
222209
}
@@ -325,7 +312,7 @@ impl FlushingService {
325312
.collect();
326313

327314
tokio::join!(
328-
self.logs_flusher.flush(None),
315+
self.logs_flusher.flush(),
329316
futures::future::join_all(metrics_futures),
330317
self.trace_flusher.flush(None),
331318
self.stats_flusher.flush(force_stats, None),

bottlecap/src/logs/agent.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@ use tracing::debug;
66

77
use crate::event_bus::Event;
88
use crate::extension::telemetry::events::TelemetryEvent;
9-
use crate::logs::{aggregator_service::AggregatorHandle, processor::LogsProcessor};
9+
use crate::logs::processor::LogsProcessor;
1010
use crate::tags;
1111
use crate::{LAMBDA_RUNTIME_SLUG, config};
12+
use datadog_log_agent::AggregatorHandle;
1213

1314
const DRAIN_LOG_INTERVAL: Duration = Duration::from_millis(100);
1415

0 commit comments

Comments
 (0)