Skip to content

Commit a67cdf4

Browse files
authored
Aj/support pci and custom endpoints (#585)
* feat: logs_config_logs_dd_url * feat: apm pci endpoints * feat: metrics * feat: support metrics using dogstatsd methods * fix: use the right var * tests: use server url override * feat: refactor into flusher method * feat: clippy
1 parent 1dc502e commit a67cdf4

File tree

6 files changed

+137
-74
lines changed

6 files changed

+137
-74
lines changed

bottlecap/src/bin/bottlecap/main.rs

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,7 @@ use bottlecap::{
1919
listener::Listener as LifecycleListener,
2020
},
2121
logger,
22-
logs::{
23-
agent::LogsAgent,
24-
flusher::{build_fqdn_logs, Flusher as LogsFlusher},
25-
},
22+
logs::{agent::LogsAgent, flusher::Flusher as LogsFlusher},
2623
secrets::decrypt,
2724
tags::{
2825
lambda::{self, tags::EXTENSION_VERSION},
@@ -50,7 +47,9 @@ use decrypt::resolve_secrets;
5047
use dogstatsd::{
5148
aggregator::Aggregator as MetricsAggregator,
5249
constants::CONTEXTS,
53-
datadog::{MetricsIntakeUrlPrefix, Site as MetricsSite},
50+
datadog::{
51+
DdDdUrl, DdUrl, MetricsIntakeUrlPrefix, MetricsIntakeUrlPrefixOverride, Site as MetricsSite,
52+
},
5453
dogstatsd::{DogStatsD, DogStatsDConfig},
5554
flusher::{Flusher as MetricsFlusher, FlusherConfig as MetricsFlusherConfig},
5655
metric::{SortedTags, EMPTY_TAGS},
@@ -315,17 +314,8 @@ async fn extension_loop_active(
315314
.expect("failed to create aggregator"),
316315
));
317316

318-
let metrics_site = MetricsSite::new(config.site.clone()).expect("can't parse site");
319-
let flusher_config = MetricsFlusherConfig {
320-
api_key: resolved_api_key.clone(),
321-
aggregator: Arc::clone(&metrics_aggr),
322-
metrics_intake_url_prefix: MetricsIntakeUrlPrefix::new(Some(metrics_site), None)
323-
.expect("can't parse metrics intake URL from site"),
324-
https_proxy: config.https_proxy.clone(),
325-
timeout: Duration::from_secs(config.flush_timeout),
326-
};
327-
let mut metrics_flusher = MetricsFlusher::new(flusher_config);
328-
317+
let mut metrics_flusher =
318+
start_metrics_flusher(resolved_api_key.clone(), &metrics_aggr, config);
329319
// Lifecycle Invocation Processor
330320
let invocation_processor = Arc::new(TokioMutex::new(InvocationProcessor::new(
331321
Arc::clone(&tags_provider),
@@ -713,7 +703,6 @@ fn start_logs_agent(
713703
let logs_flusher = LogsFlusher::new(
714704
resolved_api_key,
715705
Arc::clone(&logs_agent.aggregator),
716-
build_fqdn_logs(config.site.clone()),
717706
config.clone(),
718707
);
719708
tokio::spawn(async move {
@@ -722,6 +711,37 @@ fn start_logs_agent(
722711
(logs_agent_channel, logs_flusher)
723712
}
724713

714+
fn start_metrics_flusher(
715+
resolved_api_key: String,
716+
metrics_aggr: &Arc<Mutex<MetricsAggregator>>,
717+
config: &Arc<Config>,
718+
) -> MetricsFlusher {
719+
let metrics_intake_url = if !config.dd_url.is_empty() {
720+
let dd_dd_url = DdDdUrl::new(config.dd_url.clone()).expect("can't parse DD_DD_URL");
721+
722+
let prefix_override = MetricsIntakeUrlPrefixOverride::maybe_new(None, Some(dd_dd_url));
723+
MetricsIntakeUrlPrefix::new(None, prefix_override)
724+
} else if !config.url.is_empty() {
725+
let dd_url = DdUrl::new(config.url.clone()).expect("can't parse DD_URL");
726+
727+
let prefix_override = MetricsIntakeUrlPrefixOverride::maybe_new(Some(dd_url), None);
728+
MetricsIntakeUrlPrefix::new(None, prefix_override)
729+
} else {
730+
// use site
731+
let metrics_site = MetricsSite::new(config.site.clone()).expect("can't parse site");
732+
MetricsIntakeUrlPrefix::new(Some(metrics_site), None)
733+
};
734+
735+
let flusher_config = MetricsFlusherConfig {
736+
api_key: resolved_api_key,
737+
aggregator: Arc::clone(metrics_aggr),
738+
metrics_intake_url_prefix: metrics_intake_url.expect("can't parse site or override"),
739+
https_proxy: config.https_proxy.clone(),
740+
timeout: Duration::from_secs(config.flush_timeout),
741+
};
742+
MetricsFlusher::new(flusher_config)
743+
}
744+
725745
fn start_trace_agent(
726746
config: &Arc<Config>,
727747
resolved_api_key: String,

bottlecap/src/config/mod.rs

Lines changed: 95 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ pub mod processing_rule;
44
pub mod service_mapping;
55
pub mod trace_propagation_style;
66

7+
use datadog_trace_utils::config_utils::{trace_intake_url, trace_intake_url_prefixed};
78
use std::collections::HashMap;
89
use std::path::Path;
910
use std::time::Instant;
@@ -38,12 +39,6 @@ pub struct FallbackConfig {
3839
trace_otel_enabled: bool,
3940
otlp_config_receiver_protocols_http_endpoint: Option<String>,
4041
otlp_config_receiver_protocols_grpc_endpoint: Option<String>,
41-
// intake urls
42-
url: Option<String>,
43-
dd_url: Option<String>,
44-
logs_config_logs_dd_url: Option<String>,
45-
// APM, as opposed to logs, does not use the `apm_config` prefix for env vars
46-
apm_dd_url: Option<String>,
4742
}
4843

4944
/// `FallbackYamlConfig` is a struct that represents fields in `datadog.yaml` not yet supported in the extension yet.
@@ -52,8 +47,6 @@ pub struct FallbackConfig {
5247
#[serde(default)]
5348
#[allow(clippy::module_name_repetitions)]
5449
pub struct FallbackYamlConfig {
55-
logs_config: Option<Value>,
56-
apm_config: Option<Value>,
5750
otlp_config: Option<Value>,
5851
}
5952
#[derive(Debug, PartialEq, Deserialize, Clone, Default)]
@@ -102,6 +95,7 @@ pub struct Config {
10295
pub logs_config_processing_rules: Option<Vec<ProcessingRule>>,
10396
pub logs_config_use_compression: bool,
10497
pub logs_config_compression_level: i32,
98+
pub logs_config_logs_dd_url: String,
10599
pub serverless_flush_strategy: FlushStrategy,
106100
pub enhanced_metrics: bool,
107101
//flush timeout in seconds
@@ -119,6 +113,10 @@ pub struct Config {
119113
pub trace_propagation_style_extract: Vec<TracePropagationStyle>,
120114
pub trace_propagation_extract_first: bool,
121115
pub trace_propagation_http_baggage_enabled: bool,
116+
pub apm_config_apm_dd_url: String,
117+
// Metrics overrides
118+
pub dd_url: String,
119+
pub url: String,
122120
}
123121

124122
impl Default for Config {
@@ -141,6 +139,7 @@ impl Default for Config {
141139
logs_config_processing_rules: None,
142140
logs_config_use_compression: true,
143141
logs_config_compression_level: 6,
142+
logs_config_logs_dd_url: String::default(),
144143
// Metrics
145144
enhanced_metrics: true,
146145
https_proxy: None,
@@ -156,6 +155,9 @@ impl Default for Config {
156155
trace_propagation_style_extract: vec![],
157156
trace_propagation_extract_first: false,
158157
trace_propagation_http_baggage_enabled: false,
158+
apm_config_apm_dd_url: String::default(),
159+
dd_url: String::default(),
160+
url: String::default(),
159161
}
160162
}
161163
}
@@ -221,22 +223,6 @@ fn fallback(figment: &Figment, yaml_figment: &Figment, region: &str) -> Result<(
221223
return Err(ConfigError::UnsupportedField("otel".to_string()));
222224
}
223225

224-
// Intake URLs
225-
if config.url.is_some()
226-
|| config.dd_url.is_some()
227-
|| config.logs_config_logs_dd_url.is_some()
228-
|| config.apm_dd_url.is_some()
229-
|| yaml_config
230-
.logs_config
231-
.is_some_and(|c| c.get("logs_dd_url").is_some())
232-
|| yaml_config
233-
.apm_config
234-
.is_some_and(|c| c.get("apm_dd_url").is_some())
235-
{
236-
log_fallback_reason("intake_urls");
237-
return Err(ConfigError::UnsupportedField("intake_urls".to_string()));
238-
}
239-
240226
// Govcloud Regions
241227
if region.starts_with("us-gov-") {
242228
log_fallback_reason("gov_region");
@@ -314,10 +300,27 @@ pub fn get_config(config_directory: &Path, region: &str) -> Result<Config, Confi
314300
.trace_propagation_style_extract
315301
.clone_from(&config.trace_propagation_style);
316302
}
303+
if config.logs_config_logs_dd_url.is_empty() {
304+
config.logs_config_logs_dd_url = build_fqdn_logs(config.site.clone());
305+
}
317306

307+
if config.apm_config_apm_dd_url.is_empty() {
308+
config.apm_config_apm_dd_url = trace_intake_url(config.site.clone().as_str());
309+
} else {
310+
config.apm_config_apm_dd_url =
311+
trace_intake_url_prefixed(config.apm_config_apm_dd_url.as_str());
312+
}
313+
314+
// Metrics are handled by dogstatsd in Main
318315
Ok(config)
319316
}
320317

318+
#[inline]
319+
#[must_use]
320+
fn build_fqdn_logs(site: String) -> String {
321+
format!("https://http-intake.logs.{site}")
322+
}
323+
321324
fn deserialize_string_or_int<'de, D>(deserializer: D) -> Result<Option<String>, D::Error>
322325
where
323326
D: Deserializer<'de>,
@@ -452,43 +455,90 @@ pub mod tests {
452455
}
453456

454457
#[test]
455-
fn test_fallback_on_intake_urls() {
458+
fn test_default_logs_intake_url() {
456459
figment::Jail::expect_with(|jail| {
457460
jail.clear_env();
458-
jail.set_env("DD_APM_DD_URL", "some_url");
459461

460-
let config =
461-
get_config(Path::new(""), MOCK_REGION).expect_err("should reject unknown fields");
462+
let config = get_config(Path::new(""), MOCK_REGION).expect("should parse config");
462463
assert_eq!(
463-
config,
464-
ConfigError::UnsupportedField("intake_urls".to_string())
464+
config.logs_config_logs_dd_url,
465+
"https://http-intake.logs.datadoghq.com".to_string()
465466
);
466467
Ok(())
467468
});
468469
}
469470

470471
#[test]
471-
fn test_fallback_on_intake_urls_yaml() {
472+
fn test_support_pci_logs_intake_url() {
472473
figment::Jail::expect_with(|jail| {
473474
jail.clear_env();
474-
jail.create_file(
475-
"datadog.yaml",
476-
r"
477-
apm_config:
478-
apm_dd_url: some_url
479-
",
480-
)?;
475+
jail.set_env(
476+
"DD_LOGS_CONFIG_LOGS_DD_URL",
477+
"agent-http-intake-pci.logs.datadoghq.com:443",
478+
);
481479

482-
let config =
483-
get_config(Path::new(""), MOCK_REGION).expect_err("should reject unknown fields");
480+
let config = get_config(Path::new(""), MOCK_REGION).expect("should parse config");
484481
assert_eq!(
485-
config,
486-
ConfigError::UnsupportedField("intake_urls".to_string())
482+
config.logs_config_logs_dd_url,
483+
"agent-http-intake-pci.logs.datadoghq.com:443".to_string()
484+
);
485+
Ok(())
486+
});
487+
}
488+
489+
#[test]
490+
fn test_support_pci_traces_intake_url() {
491+
figment::Jail::expect_with(|jail| {
492+
jail.clear_env();
493+
jail.set_env(
494+
"DD_APM_CONFIG_APM_DD_URL",
495+
"https://trace-pci.agent.datadoghq.com",
496+
);
497+
498+
let config = get_config(Path::new(""), MOCK_REGION).expect("should parse config");
499+
assert_eq!(
500+
config.apm_config_apm_dd_url,
501+
"https://trace-pci.agent.datadoghq.com/api/v0.2/traces".to_string()
487502
);
488503
Ok(())
489504
});
490505
}
491506

507+
#[test]
508+
fn test_support_dd_dd_url() {
509+
figment::Jail::expect_with(|jail| {
510+
jail.clear_env();
511+
jail.set_env("DD_DD_URL", "custom_proxy:3128");
512+
513+
let config = get_config(Path::new(""), MOCK_REGION).expect("should parse config");
514+
assert_eq!(config.dd_url, "custom_proxy:3128".to_string());
515+
Ok(())
516+
});
517+
}
518+
519+
#[test]
520+
fn test_support_dd_url() {
521+
figment::Jail::expect_with(|jail| {
522+
jail.clear_env();
523+
jail.set_env("DD_URL", "custom_proxy:3128");
524+
525+
let config = get_config(Path::new(""), MOCK_REGION).expect("should parse config");
526+
assert_eq!(config.url, "custom_proxy:3128".to_string());
527+
Ok(())
528+
});
529+
}
530+
531+
#[test]
532+
fn test_dd_dd_url_default() {
533+
figment::Jail::expect_with(|jail| {
534+
jail.clear_env();
535+
536+
let config = get_config(Path::new(""), MOCK_REGION).expect("should parse config");
537+
assert_eq!(config.dd_url, String::new());
538+
Ok(())
539+
});
540+
}
541+
492542
#[test]
493543
fn test_allowed_but_disabled() {
494544
figment::Jail::expect_with(|jail| {
@@ -572,6 +622,9 @@ pub mod tests {
572622
TracePropagationStyle::Datadog,
573623
TracePropagationStyle::TraceContext
574624
],
625+
logs_config_logs_dd_url: "https://http-intake.logs.datadoghq.com".to_string(),
626+
apm_config_apm_dd_url: trace_intake_url("datadoghq.com").to_string(),
627+
dd_url: String::new(), // We add the prefix in main.rs
575628
..Config::default()
576629
}
577630
);

bottlecap/src/logs/flusher.rs

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,24 +14,16 @@ use zstd::stream::write::Encoder;
1414

1515
#[derive(Debug, Clone)]
1616
pub struct Flusher {
17-
fqdn_site: String,
1817
client: reqwest::Client,
1918
aggregator: Arc<Mutex<Aggregator>>,
2019
config: Arc<config::Config>,
2120
headers: HeaderMap,
2221
}
2322

24-
#[inline]
25-
#[must_use]
26-
pub fn build_fqdn_logs(site: String) -> String {
27-
format!("https://http-intake.logs.{site}")
28-
}
29-
3023
impl Flusher {
3124
pub fn new(
3225
api_key: String,
3326
aggregator: Arc<Mutex<Aggregator>>,
34-
site: String,
3527
config: Arc<config::Config>,
3628
) -> Self {
3729
let client = http_client::get_client(config.clone());
@@ -57,7 +49,6 @@ impl Flusher {
5749
}
5850

5951
Flusher {
60-
fqdn_site: site,
6152
client,
6253
aggregator,
6354
config,
@@ -94,7 +85,7 @@ impl Flusher {
9485
}
9586

9687
fn create_request(&self, data: Vec<u8>) -> reqwest::RequestBuilder {
97-
let url = format!("{}/api/v2/logs", self.fqdn_site);
88+
let url = format!("{}/api/v2/logs", self.config.logs_config_logs_dd_url);
9889
let body = self.compress(data);
9990
self.client
10091
.post(&url)

bottlecap/src/traces/trace_flusher.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ impl TraceFlusher for ServerlessTraceFlusher {
6363
Ok(_) => debug!("Successfully flushed traces"),
6464
Err(e) => {
6565
error!("Error sending trace: {e:?}");
66-
// TODO: Retries
6766
}
6867
}
6968
}

0 commit comments

Comments
 (0)