Skip to content

Commit 0608fd8

Browse files
chore(logs): use IntakeEntry and Destination from datadog-log-agent
Rename LogEntry → IntakeEntry and FlusherMode → Destination to match the canonical names exported by the datadog-log-agent crate.
1 parent 78eeb08 commit 0608fd8

5 files changed

Lines changed: 26 additions & 21 deletions

File tree

bottlecap/Cargo.lock

Lines changed: 6 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bottlecap/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ libdd-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev =
7878
libdd-trace-stats = { git = "https://github.com/DataDog/libdatadog", rev = "c8121f422d2c8d219f8d421ff3cdb1fcbe9e8b09" }
7979
dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "28f796bf767fff56caf08153ade5cd80c8e8f705", default-features = false }
8080
datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "28f796bf767fff56caf08153ade5cd80c8e8f705", default-features = false }
81-
datadog-log-agent = { path = "../../serverless-components/crates/datadog-log-agent" }
81+
datadog-log-agent = { git = "https://github.com/DataDog/serverless-components", rev = "51700e5a2eaf9dd73aff5b83b3139124847d3037", default-features = false }
8282
libddwaf = { version = "1.28.1", git = "https://github.com/DataDog/libddwaf-rust", rev = "d1534a158d976bd4f747bf9fcc58e0712d2d17fc", default-features = false, features = ["serde"] }
8383

8484
[dev-dependencies]

bottlecap/src/bin/bottlecap/main.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ use bottlecap::{
7575
use datadog_fips::reqwest_adapter::create_reqwest_client_builder;
7676
use datadog_log_agent::{
7777
AggregatorHandle as LogsAggregatorHandle, AggregatorService as LogsAggregatorService,
78-
FlusherMode, LogFlusher, LogFlusherConfig, LogsAdditionalEndpoint,
78+
Destination, LogFlusher, LogFlusherConfig, LogsAdditionalEndpoint,
7979
};
8080
use decrypt::resolve_secrets;
8181
use dogstatsd::{
@@ -1064,11 +1064,11 @@ async fn start_logs_agent(
10641064
let api_key = api_key_factory.get_api_key().await.unwrap_or_default();
10651065

10661066
let mode = if config.observability_pipelines_worker_logs_enabled {
1067-
FlusherMode::ObservabilityPipelinesWorker {
1067+
Destination::ObservabilityPipelinesWorker {
10681068
url: config.observability_pipelines_worker_logs_url.clone(),
10691069
}
10701070
} else {
1071-
FlusherMode::Datadog
1071+
Destination::Datadog
10721072
};
10731073

10741074
let additional_endpoints: Vec<LogsAdditionalEndpoint> = config

bottlecap/src/logs/lambda/processor.rs

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use crate::logs::processor::{Processor, Rule};
1515
use crate::tags::provider;
1616

1717
use crate::logs::lambda::Message;
18-
use datadog_log_agent::{AggregatorHandle, LogEntry};
18+
use datadog_log_agent::{AggregatorHandle, IntakeEntry};
1919

2020
const OOM_ERRORS: [&str; 7] = [
2121
"fatal error: runtime: out of memory", // Go
@@ -44,9 +44,9 @@ pub struct LambdaProcessor {
4444
// Current Invocation Context
4545
invocation_context: InvocationContext,
4646
// Logs which don't have a `request_id`
47-
orphan_logs: Vec<LogEntry>,
47+
orphan_logs: Vec<IntakeEntry>,
4848
// Logs which are ready to be aggregated
49-
ready_logs: Vec<LogEntry>,
49+
ready_logs: Vec<IntakeEntry>,
5050
// Main event bus
5151
event_bus: Sender<Event>,
5252
// Logs enabled
@@ -71,7 +71,7 @@ fn map_log_level_to_status(level: &str) -> Option<&'static str> {
7171
}
7272
}
7373

74-
impl Processor<LogEntry> for LambdaProcessor {}
74+
impl Processor<IntakeEntry> for LambdaProcessor {}
7575

7676
impl LambdaProcessor {
7777
#[must_use]
@@ -354,7 +354,10 @@ impl LambdaProcessor {
354354
}
355355
}
356356

357-
fn get_log_entry(&mut self, mut lambda_message: Message) -> Result<LogEntry, Box<dyn Error>> {
357+
fn get_log_entry(
358+
&mut self,
359+
mut lambda_message: Message,
360+
) -> Result<IntakeEntry, Box<dyn Error>> {
358361
// Assign request_id from message or context if available
359362
lambda_message.lambda.request_id = match lambda_message.lambda.request_id {
360363
Some(request_id) => Some(request_id),
@@ -413,7 +416,7 @@ impl LambdaProcessor {
413416
}),
414417
);
415418

416-
let entry = LogEntry {
419+
let entry = IntakeEntry {
417420
message: final_message,
418421
timestamp: lambda_message.timestamp,
419422
hostname: Some(self.function_arn.clone()),
@@ -472,7 +475,7 @@ impl LambdaProcessor {
472475
original_message
473476
}
474477

475-
async fn make_log(&mut self, event: TelemetryEvent) -> Result<LogEntry, Box<dyn Error>> {
478+
async fn make_log(&mut self, event: TelemetryEvent) -> Result<IntakeEntry, Box<dyn Error>> {
476479
match self.get_message(event).await {
477480
Ok(lambda_message) => self.get_log_entry(lambda_message),
478481
// TODO: Check what to do when we can't process the event
@@ -481,7 +484,7 @@ impl LambdaProcessor {
481484
}
482485

483486
/// Processes a log, applies filtering rules, and queues it for aggregation
484-
fn process_and_queue_log(&mut self, mut log: LogEntry) {
487+
fn process_and_queue_log(&mut self, mut log: IntakeEntry) {
485488
let should_send_log =
486489
self.logs_enabled && LambdaProcessor::apply_rules(&self.rules, &mut log.message);
487490
if should_send_log {
@@ -531,7 +534,7 @@ mod tests {
531534
RuntimeDoneMetrics, Status,
532535
};
533536
use crate::logs::lambda::Lambda;
534-
use datadog_log_agent::{AggregatorService, LogEntry};
537+
use datadog_log_agent::{AggregatorService, IntakeEntry};
535538

536539
macro_rules! get_message_tests {
537540
($($name:ident: $value:expr,)*) => {
@@ -1096,7 +1099,7 @@ mod tests {
10961099
let batches = aggregator_handle.get_batches().await.unwrap();
10971100
assert_eq!(batches.len(), 1);
10981101

1099-
let entries: Vec<LogEntry> = serde_json::from_slice(&batches[0]).unwrap();
1102+
let entries: Vec<IntakeEntry> = serde_json::from_slice(&batches[0]).unwrap();
11001103
assert_eq!(entries.len(), 1);
11011104
let entry = &entries[0];
11021105
assert_eq!(
@@ -1283,7 +1286,7 @@ mod tests {
12831286
let batches = aggregator_handle.get_batches().await.unwrap();
12841287
assert_eq!(batches.len(), 1);
12851288

1286-
let entries: Vec<LogEntry> = serde_json::from_slice(&batches[0]).unwrap();
1289+
let entries: Vec<IntakeEntry> = serde_json::from_slice(&batches[0]).unwrap();
12871290
assert_eq!(entries.len(), 2);
12881291
let start_entry = &entries[0];
12891292
assert_eq!(

bottlecap/tests/logs_integration_test.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use bottlecap::event_bus::EventBus;
44
use bottlecap::extension::telemetry::events::TelemetryEvent;
55
use bottlecap::logs::agent::LogsAgent;
66
use bottlecap::tags::provider::Provider;
7-
use datadog_log_agent::{AggregatorService, FlusherMode, LogFlusher, LogFlusherConfig};
7+
use datadog_log_agent::{AggregatorService, Destination, LogFlusher, LogFlusherConfig};
88
use httpmock::prelude::*;
99
use std::collections::HashMap;
1010
use std::sync::Arc;
@@ -78,7 +78,7 @@ async fn test_logs() {
7878
let flusher_config = LogFlusherConfig {
7979
api_key: dd_api_key.to_string(),
8080
site: "datadoghq.com".to_string(),
81-
mode: FlusherMode::ObservabilityPipelinesWorker {
81+
mode: Destination::ObservabilityPipelinesWorker {
8282
url: format!("{}/api/v2/logs", server.url("")),
8383
},
8484
additional_endpoints: Vec::new(),

0 commit comments

Comments
 (0)