Skip to content

Commit 30d0c2e

Browse files
Dual Shipping Logs Support (#718)
Adds support for dual shipping metrics to endpoints configured using the `logs_config` YAML or `DD_LOGS_CONFIG_ADDITIONAL_ENDPOINTS` env var config. Implemented a `LogsFlusher` as a wrapper to all the `Flusher` instances to manages flushing to all configured endpoints. Moved retry logic to `LogsFlusher`, as the retry request contains the endpoint details and does not have to be tied to a particular flusher. --------- Co-authored-by: jordan gonzález <30836115+duncanista@users.noreply.github.com>
1 parent dbcef1d commit 30d0c2e

7 files changed

Lines changed: 212 additions & 30 deletions

File tree

bottlecap/src/bin/bottlecap/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use bottlecap::{
2525
listener::Listener as LifecycleListener,
2626
},
2727
logger,
28-
logs::{agent::LogsAgent, flusher::Flusher as LogsFlusher},
28+
logs::{agent::LogsAgent, flusher::LogsFlusher},
2929
otlp::{agent::Agent as OtlpAgent, should_enable_otlp_agent},
3030
proxy::{interceptor, should_start_proxy},
3131
secrets::decrypt,

bottlecap/src/config/env.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ use crate::{
1212
deserialize_optional_bool_from_anything, deserialize_string_or_int,
1313
flush_strategy::FlushStrategy,
1414
log_level::LogLevel,
15+
logs_additional_endpoints::{
16+
deserialize_logs_additional_endpoints, LogsAdditionalEndpoint,
17+
},
1518
processing_rule::{deserialize_processing_rules, ProcessingRule},
1619
service_mapping::deserialize_service_mapping,
1720
trace_propagation_style::{deserialize_trace_propagation_style, TracePropagationStyle},
@@ -125,6 +128,12 @@ pub struct EnvConfig {
125128
/// to 9 (maximum compression but higher resource usage). Only takes effect if
126129
/// `use_compression` is set to `true`.
127130
pub logs_config_compression_level: Option<i32>,
131+
/// @env `DD_LOGS_CONFIG_ADDITIONAL_ENDPOINTS`
132+
///
133+
/// Additional endpoints to send logs to.
134+
/// <https://docs.datadoghq.com/agent/configuration/dual-shipping/?tab=helm#environment-variable-configuration-6>
135+
#[serde(deserialize_with = "deserialize_logs_additional_endpoints")]
136+
pub logs_config_additional_endpoints: Vec<LogsAdditionalEndpoint>,
128137

129138
// APM
130139
//
@@ -322,6 +331,7 @@ fn merge_config(config: &mut Config, env_config: &EnvConfig) {
322331
merge_option!(config, env_config, logs_config_processing_rules);
323332
merge_option_to_value!(config, env_config, logs_config_use_compression);
324333
merge_option_to_value!(config, env_config, logs_config_compression_level);
334+
merge_vec!(config, env_config, logs_config_additional_endpoints);
325335

326336
// APM
327337
merge_hashmap!(config, env_config, service_mapping);
@@ -505,6 +515,10 @@ mod tests {
505515
);
506516
jail.set_env("DD_LOGS_CONFIG_USE_COMPRESSION", "false");
507517
jail.set_env("DD_LOGS_CONFIG_COMPRESSION_LEVEL", "3");
518+
jail.set_env(
519+
"DD_LOGS_CONFIG_ADDITIONAL_ENDPOINTS",
520+
"[{\"api_key\": \"apikey2\", \"Host\": \"agent-http-intake.logs.datadoghq.com\", \"Port\": 443, \"is_reliable\": true}]",
521+
);
508522

509523
// APM
510524
jail.set_env("DD_SERVICE_MAPPING", "old-service:new-service");
@@ -640,6 +654,12 @@ mod tests {
640654
}]),
641655
logs_config_use_compression: false,
642656
logs_config_compression_level: 3,
657+
logs_config_additional_endpoints: vec![LogsAdditionalEndpoint {
658+
api_key: "apikey2".to_string(),
659+
host: "agent-http-intake.logs.datadoghq.com".to_string(),
660+
port: 443,
661+
is_reliable: true,
662+
}],
643663
service_mapping: HashMap::from([(
644664
"old-service".to_string(),
645665
"new-service".to_string(),
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
use serde::{Deserialize, Deserializer};
2+
use serde_json::Value;
3+
use tracing::error;
4+
5+
#[derive(Debug, PartialEq, Clone, Deserialize)]
6+
pub struct LogsAdditionalEndpoint {
7+
pub api_key: String,
8+
#[serde(rename = "Host")]
9+
pub host: String,
10+
#[serde(rename = "Port")]
11+
pub port: u32,
12+
pub is_reliable: bool,
13+
}
14+
15+
#[allow(clippy::module_name_repetitions)]
16+
pub fn deserialize_logs_additional_endpoints<'de, D>(
17+
deserializer: D,
18+
) -> Result<Vec<LogsAdditionalEndpoint>, D::Error>
19+
where
20+
D: Deserializer<'de>,
21+
{
22+
let value = Value::deserialize(deserializer)?;
23+
24+
match value {
25+
Value::String(s) if !s.is_empty() => {
26+
// For JSON format (string) in DD_ADDITIONAL_ENDPOINTS
27+
Ok(serde_json::from_str(&s).unwrap_or_else(|err| {
28+
error!("Failed to deserialize DD_LOGS_CONFIG_ADDITIONAL_ENDPOINTS: {err}");
29+
vec![]
30+
}))
31+
}
32+
_ => Ok(Vec::new()),
33+
}
34+
}
35+
36+
#[cfg(test)]
37+
mod tests {
38+
use super::*;
39+
use serde_json::json;
40+
41+
#[test]
42+
fn test_deserialize_logs_additional_endpoints_valid() {
43+
let input = json!("[{\"api_key\": \"apiKey2\", \"Host\": \"agent-http-intake.logs.datadoghq.com\", \"Port\": 443, \"is_reliable\": true}]");
44+
45+
let result = deserialize_logs_additional_endpoints(input).unwrap();
46+
let mut expected = Vec::new();
47+
expected.push(LogsAdditionalEndpoint {
48+
api_key: "apiKey2".to_string(),
49+
host: "agent-http-intake.logs.datadoghq.com".to_string(),
50+
port: 443,
51+
is_reliable: true,
52+
});
53+
54+
assert_eq!(result, expected);
55+
}
56+
57+
#[test]
58+
fn test_deserialize_logs_additional_endpoints_invalid() {
59+
// input missing "Port" field
60+
let input = json!("[{\"api_key\": \"apiKey2\", \"Host\": \"agent-http-intake.logs.datadoghq.com\", \"is_reliable\": true}]");
61+
62+
let result = deserialize_logs_additional_endpoints(input).unwrap();
63+
let expected = Vec::new(); // expect empty list due to invalid input
64+
65+
assert_eq!(result, expected);
66+
}
67+
}

bottlecap/src/config/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@ pub mod aws;
44
pub mod env;
55
pub mod flush_strategy;
66
pub mod log_level;
7+
pub mod logs_additional_endpoints;
78
pub mod processing_rule;
89
pub mod service_mapping;
910
pub mod trace_propagation_style;
1011
pub mod yaml;
1112

1213
use datadog_trace_obfuscation::replacer::ReplaceRule;
1314
use datadog_trace_utils::config_utils::{trace_intake_url, trace_intake_url_prefixed};
15+
1416
use serde::{Deserialize, Deserializer};
1517
use serde_aux::prelude::deserialize_bool_from_anything;
1618
use serde_json::Value;
@@ -24,6 +26,7 @@ use crate::config::{
2426
env::EnvConfigSource,
2527
flush_strategy::FlushStrategy,
2628
log_level::LogLevel,
29+
logs_additional_endpoints::LogsAdditionalEndpoint,
2730
processing_rule::{deserialize_processing_rules, ProcessingRule},
2831
trace_propagation_style::TracePropagationStyle,
2932
yaml::YamlConfigSource,
@@ -260,6 +263,7 @@ pub struct Config {
260263
pub logs_config_processing_rules: Option<Vec<ProcessingRule>>,
261264
pub logs_config_use_compression: bool,
262265
pub logs_config_compression_level: i32,
266+
pub logs_config_additional_endpoints: Vec<LogsAdditionalEndpoint>,
263267

264268
// APM
265269
//
@@ -353,6 +357,7 @@ impl Default for Config {
353357
logs_config_processing_rules: None,
354358
logs_config_use_compression: true,
355359
logs_config_compression_level: 6,
360+
logs_config_additional_endpoints: Vec::new(),
356361

357362
// APM
358363
service_mapping: HashMap::new(),

bottlecap/src/config/yaml.rs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use crate::{
88
deserialize_string_or_int,
99
flush_strategy::FlushStrategy,
1010
log_level::LogLevel,
11+
logs_additional_endpoints::LogsAdditionalEndpoint,
1112
service_mapping::deserialize_service_mapping,
1213
trace_propagation_style::{deserialize_trace_propagation_style, TracePropagationStyle},
1314
Config, ConfigError, ConfigSource, ProcessingRule,
@@ -117,6 +118,7 @@ pub struct LogsConfig {
117118
#[serde(deserialize_with = "deserialize_optional_bool_from_anything")]
118119
pub use_compression: Option<bool>,
119120
pub compression_level: Option<i32>,
121+
pub additional_endpoints: Vec<LogsAdditionalEndpoint>,
120122
}
121123

122124
/// APM Config
@@ -392,6 +394,12 @@ fn merge_config(config: &mut Config, yaml_config: &YamlConfig) {
392394
yaml_config.logs_config,
393395
compression_level
394396
);
397+
merge_vec!(
398+
config,
399+
logs_config_additional_endpoints,
400+
yaml_config.logs_config,
401+
additional_endpoints
402+
);
395403

396404
// APM
397405
merge_hashmap!(config, yaml_config, service_mapping);
@@ -633,8 +641,8 @@ http_protocol: "http1"
633641
# Endpoints
634642
additional_endpoints:
635643
"https://app.datadoghq.com":
636-
- apikey2
637-
- apikey3
644+
- "apikey2"
645+
- "apikey3"
638646
"https://app.datadoghq.eu":
639647
- apikey4
640648
@@ -655,6 +663,11 @@ logs_config:
655663
pattern: "test-pattern"
656664
use_compression: false
657665
compression_level: 3
666+
additional_endpoints:
667+
- api_key: "apikey2"
668+
Host: "agent-http-intake.logs.datadoghq.com"
669+
Port: 443
670+
is_reliable: true
658671
659672
# APM
660673
apm_config:
@@ -770,6 +783,12 @@ extension_version: "compatibility"
770783
}]),
771784
logs_config_use_compression: false,
772785
logs_config_compression_level: 3,
786+
logs_config_additional_endpoints: vec![LogsAdditionalEndpoint {
787+
api_key: "apikey2".to_string(),
788+
host: "agent-http-intake.logs.datadoghq.com".to_string(),
789+
port: 443,
790+
is_reliable: true,
791+
}],
773792
service_mapping: HashMap::from([(
774793
"old-service".to_string(),
775794
"new-service".to_string(),

0 commit comments

Comments
 (0)