diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index bacd782d8..79b5fe4b5 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -19,10 +19,7 @@ use bottlecap::{ listener::Listener as LifecycleListener, }, logger, - logs::{ - agent::LogsAgent, - flusher::{build_fqdn_logs, Flusher as LogsFlusher}, - }, + logs::{agent::LogsAgent, flusher::Flusher as LogsFlusher}, secrets::decrypt, tags::{ lambda::{self, tags::EXTENSION_VERSION}, @@ -50,7 +47,9 @@ use decrypt::resolve_secrets; use dogstatsd::{ aggregator::Aggregator as MetricsAggregator, constants::CONTEXTS, - datadog::{MetricsIntakeUrlPrefix, Site as MetricsSite}, + datadog::{ + DdDdUrl, DdUrl, MetricsIntakeUrlPrefix, MetricsIntakeUrlPrefixOverride, Site as MetricsSite, + }, dogstatsd::{DogStatsD, DogStatsDConfig}, flusher::{Flusher as MetricsFlusher, FlusherConfig as MetricsFlusherConfig}, metric::{SortedTags, EMPTY_TAGS}, @@ -315,17 +314,8 @@ async fn extension_loop_active( .expect("failed to create aggregator"), )); - let metrics_site = MetricsSite::new(config.site.clone()).expect("can't parse site"); - let flusher_config = MetricsFlusherConfig { - api_key: resolved_api_key.clone(), - aggregator: Arc::clone(&metrics_aggr), - metrics_intake_url_prefix: MetricsIntakeUrlPrefix::new(Some(metrics_site), None) - .expect("can't parse metrics intake URL from site"), - https_proxy: config.https_proxy.clone(), - timeout: Duration::from_secs(config.flush_timeout), - }; - let mut metrics_flusher = MetricsFlusher::new(flusher_config); - + let mut metrics_flusher = + start_metrics_flusher(resolved_api_key.clone(), &metrics_aggr, config); // Lifecycle Invocation Processor let invocation_processor = Arc::new(TokioMutex::new(InvocationProcessor::new( Arc::clone(&tags_provider), @@ -713,7 +703,6 @@ fn start_logs_agent( let logs_flusher = LogsFlusher::new( resolved_api_key, Arc::clone(&logs_agent.aggregator), - build_fqdn_logs(config.site.clone()), config.clone(), ); tokio::spawn(async move { @@ -722,6 +711,37 @@ fn start_logs_agent( (logs_agent_channel, logs_flusher) } +fn start_metrics_flusher( + resolved_api_key: String, + metrics_aggr: &Arc>, + config: &Arc, +) -> MetricsFlusher { + let metrics_intake_url = if !config.dd_url.is_empty() { + let dd_dd_url = DdDdUrl::new(config.dd_url.clone()).expect("can't parse DD_DD_URL"); + + let prefix_override = MetricsIntakeUrlPrefixOverride::maybe_new(None, Some(dd_dd_url)); + MetricsIntakeUrlPrefix::new(None, prefix_override) + } else if !config.url.is_empty() { + let dd_url = DdUrl::new(config.url.clone()).expect("can't parse DD_URL"); + + let prefix_override = MetricsIntakeUrlPrefixOverride::maybe_new(Some(dd_url), None); + MetricsIntakeUrlPrefix::new(None, prefix_override) + } else { + // use site + let metrics_site = MetricsSite::new(config.site.clone()).expect("can't parse site"); + MetricsIntakeUrlPrefix::new(Some(metrics_site), None) + }; + + let flusher_config = MetricsFlusherConfig { + api_key: resolved_api_key, + aggregator: Arc::clone(metrics_aggr), + metrics_intake_url_prefix: metrics_intake_url.expect("can't parse site or override"), + https_proxy: config.https_proxy.clone(), + timeout: Duration::from_secs(config.flush_timeout), + }; + MetricsFlusher::new(flusher_config) +} + fn start_trace_agent( config: &Arc, resolved_api_key: String, diff --git a/bottlecap/src/config/mod.rs b/bottlecap/src/config/mod.rs index 76ce1da98..4cd0ebe70 100644 --- a/bottlecap/src/config/mod.rs +++ b/bottlecap/src/config/mod.rs @@ -4,6 +4,7 @@ pub mod processing_rule; pub mod service_mapping; pub mod trace_propagation_style; +use datadog_trace_utils::config_utils::{trace_intake_url, trace_intake_url_prefixed}; use std::collections::HashMap; use std::path::Path; use std::time::Instant; @@ -38,12 +39,6 @@ pub struct FallbackConfig { trace_otel_enabled: bool, otlp_config_receiver_protocols_http_endpoint: Option, otlp_config_receiver_protocols_grpc_endpoint: Option, - // intake urls - url: Option, - dd_url: Option, - logs_config_logs_dd_url: Option, - // APM, as opposed to logs, does not use the `apm_config` prefix for env vars - apm_dd_url: Option, } /// `FallbackYamlConfig` is a struct that represents fields in `datadog.yaml` not yet supported in the extension yet. @@ -52,8 +47,6 @@ pub struct FallbackConfig { #[serde(default)] #[allow(clippy::module_name_repetitions)] pub struct FallbackYamlConfig { - logs_config: Option, - apm_config: Option, otlp_config: Option, } #[derive(Debug, PartialEq, Deserialize, Clone, Default)] @@ -102,6 +95,7 @@ pub struct Config { pub logs_config_processing_rules: Option>, pub logs_config_use_compression: bool, pub logs_config_compression_level: i32, + pub logs_config_logs_dd_url: String, pub serverless_flush_strategy: FlushStrategy, pub enhanced_metrics: bool, //flush timeout in seconds @@ -119,6 +113,10 @@ pub struct Config { pub trace_propagation_style_extract: Vec, pub trace_propagation_extract_first: bool, pub trace_propagation_http_baggage_enabled: bool, + pub apm_config_apm_dd_url: String, + // Metrics overrides + pub dd_url: String, + pub url: String, } impl Default for Config { @@ -141,6 +139,7 @@ impl Default for Config { logs_config_processing_rules: None, logs_config_use_compression: true, logs_config_compression_level: 6, + logs_config_logs_dd_url: String::default(), // Metrics enhanced_metrics: true, https_proxy: None, @@ -156,6 +155,9 @@ impl Default for Config { trace_propagation_style_extract: vec![], trace_propagation_extract_first: false, trace_propagation_http_baggage_enabled: false, + apm_config_apm_dd_url: String::default(), + dd_url: String::default(), + url: String::default(), } } } @@ -221,22 +223,6 @@ fn fallback(figment: &Figment, yaml_figment: &Figment, region: &str) -> Result<( return Err(ConfigError::UnsupportedField("otel".to_string())); } - // Intake URLs - if config.url.is_some() - || config.dd_url.is_some() - || config.logs_config_logs_dd_url.is_some() - || config.apm_dd_url.is_some() - || yaml_config - .logs_config - .is_some_and(|c| c.get("logs_dd_url").is_some()) - || yaml_config - .apm_config - .is_some_and(|c| c.get("apm_dd_url").is_some()) - { - log_fallback_reason("intake_urls"); - return Err(ConfigError::UnsupportedField("intake_urls".to_string())); - } - // Govcloud Regions if region.starts_with("us-gov-") { log_fallback_reason("gov_region"); @@ -314,10 +300,27 @@ pub fn get_config(config_directory: &Path, region: &str) -> Result String { + format!("https://http-intake.logs.{site}") +} + fn deserialize_string_or_int<'de, D>(deserializer: D) -> Result, D::Error> where D: Deserializer<'de>, @@ -452,43 +455,90 @@ pub mod tests { } #[test] - fn test_fallback_on_intake_urls() { + fn test_default_logs_intake_url() { figment::Jail::expect_with(|jail| { jail.clear_env(); - jail.set_env("DD_APM_DD_URL", "some_url"); - let config = - get_config(Path::new(""), MOCK_REGION).expect_err("should reject unknown fields"); + let config = get_config(Path::new(""), MOCK_REGION).expect("should parse config"); assert_eq!( - config, - ConfigError::UnsupportedField("intake_urls".to_string()) + config.logs_config_logs_dd_url, + "https://http-intake.logs.datadoghq.com".to_string() ); Ok(()) }); } #[test] - fn test_fallback_on_intake_urls_yaml() { + fn test_support_pci_logs_intake_url() { figment::Jail::expect_with(|jail| { jail.clear_env(); - jail.create_file( - "datadog.yaml", - r" - apm_config: - apm_dd_url: some_url - ", - )?; + jail.set_env( + "DD_LOGS_CONFIG_LOGS_DD_URL", + "agent-http-intake-pci.logs.datadoghq.com:443", + ); - let config = - get_config(Path::new(""), MOCK_REGION).expect_err("should reject unknown fields"); + let config = get_config(Path::new(""), MOCK_REGION).expect("should parse config"); assert_eq!( - config, - ConfigError::UnsupportedField("intake_urls".to_string()) + config.logs_config_logs_dd_url, + "agent-http-intake-pci.logs.datadoghq.com:443".to_string() + ); + Ok(()) + }); + } + + #[test] + fn test_support_pci_traces_intake_url() { + figment::Jail::expect_with(|jail| { + jail.clear_env(); + jail.set_env( + "DD_APM_CONFIG_APM_DD_URL", + "https://trace-pci.agent.datadoghq.com", + ); + + let config = get_config(Path::new(""), MOCK_REGION).expect("should parse config"); + assert_eq!( + config.apm_config_apm_dd_url, + "https://trace-pci.agent.datadoghq.com/api/v0.2/traces".to_string() ); Ok(()) }); } + #[test] + fn test_support_dd_dd_url() { + figment::Jail::expect_with(|jail| { + jail.clear_env(); + jail.set_env("DD_DD_URL", "custom_proxy:3128"); + + let config = get_config(Path::new(""), MOCK_REGION).expect("should parse config"); + assert_eq!(config.dd_url, "custom_proxy:3128".to_string()); + Ok(()) + }); + } + + #[test] + fn test_support_dd_url() { + figment::Jail::expect_with(|jail| { + jail.clear_env(); + jail.set_env("DD_URL", "custom_proxy:3128"); + + let config = get_config(Path::new(""), MOCK_REGION).expect("should parse config"); + assert_eq!(config.url, "custom_proxy:3128".to_string()); + Ok(()) + }); + } + + #[test] + fn test_dd_dd_url_default() { + figment::Jail::expect_with(|jail| { + jail.clear_env(); + + let config = get_config(Path::new(""), MOCK_REGION).expect("should parse config"); + assert_eq!(config.dd_url, String::new()); + Ok(()) + }); + } + #[test] fn test_allowed_but_disabled() { figment::Jail::expect_with(|jail| { @@ -572,6 +622,9 @@ pub mod tests { TracePropagationStyle::Datadog, TracePropagationStyle::TraceContext ], + logs_config_logs_dd_url: "https://http-intake.logs.datadoghq.com".to_string(), + apm_config_apm_dd_url: trace_intake_url("datadoghq.com").to_string(), + dd_url: String::new(), // We add the prefix in main.rs ..Config::default() } ); diff --git a/bottlecap/src/logs/flusher.rs b/bottlecap/src/logs/flusher.rs index 5dc0952f7..ffa49010c 100644 --- a/bottlecap/src/logs/flusher.rs +++ b/bottlecap/src/logs/flusher.rs @@ -14,24 +14,16 @@ use zstd::stream::write::Encoder; #[derive(Debug, Clone)] pub struct Flusher { - fqdn_site: String, client: reqwest::Client, aggregator: Arc>, config: Arc, headers: HeaderMap, } -#[inline] -#[must_use] -pub fn build_fqdn_logs(site: String) -> String { - format!("https://http-intake.logs.{site}") -} - impl Flusher { pub fn new( api_key: String, aggregator: Arc>, - site: String, config: Arc, ) -> Self { let client = http_client::get_client(config.clone()); @@ -57,7 +49,6 @@ impl Flusher { } Flusher { - fqdn_site: site, client, aggregator, config, @@ -94,7 +85,7 @@ impl Flusher { } fn create_request(&self, data: Vec) -> reqwest::RequestBuilder { - let url = format!("{}/api/v2/logs", self.fqdn_site); + let url = format!("{}/api/v2/logs", self.config.logs_config_logs_dd_url); let body = self.compress(data); self.client .post(&url) diff --git a/bottlecap/src/traces/trace_flusher.rs b/bottlecap/src/traces/trace_flusher.rs index c89834b09..d3d3f862a 100644 --- a/bottlecap/src/traces/trace_flusher.rs +++ b/bottlecap/src/traces/trace_flusher.rs @@ -63,7 +63,6 @@ impl TraceFlusher for ServerlessTraceFlusher { Ok(_) => debug!("Successfully flushed traces"), Err(e) => { error!("Error sending trace: {e:?}"); - // TODO: Retries } } } diff --git a/bottlecap/src/traces/trace_processor.rs b/bottlecap/src/traces/trace_processor.rs index f526f1a77..bd5dcb95c 100644 --- a/bottlecap/src/traces/trace_processor.rs +++ b/bottlecap/src/traces/trace_processor.rs @@ -13,7 +13,6 @@ use datadog_trace_obfuscation::obfuscate::obfuscate_span; use datadog_trace_obfuscation::obfuscation_config; use datadog_trace_protobuf::pb; use datadog_trace_protobuf::pb::Span; -use datadog_trace_utils::config_utils::trace_intake_url; use datadog_trace_utils::send_data::{RetryBackoffType, RetryStrategy}; use datadog_trace_utils::trace_utils::SendData; use datadog_trace_utils::trace_utils::{self}; @@ -158,9 +157,9 @@ impl TraceProcessor for ServerlessTraceProcessor { } } } - let intake_url = trace_intake_url(&config.site); let endpoint = Endpoint { - url: hyper::Uri::from_str(&intake_url).expect("can't parse trace intake URL, exiting"), + url: hyper::Uri::from_str(&config.apm_config_apm_dd_url) + .expect("can't parse trace intake URL, exiting"), api_key: Some(self.resolved_api_key.clone().into()), timeout_ms: config.flush_timeout * 1_000, test_token: None, @@ -205,6 +204,7 @@ mod tests { fn create_test_config() -> Arc { Arc::new(Config { + apm_config_apm_dd_url: "https://trace.agent.datadoghq.com".to_string(), service: Some("test-service".to_string()), tags: Some("test:tag,env:test-env".to_string()), ..Config::default() diff --git a/bottlecap/tests/logs_integration_test.rs b/bottlecap/tests/logs_integration_test.rs index 7396d635e..114d57361 100644 --- a/bottlecap/tests/logs_integration_test.rs +++ b/bottlecap/tests/logs_integration_test.rs @@ -39,6 +39,7 @@ async fn test_logs() { let arc_conf = Arc::new(Config { logs_config_use_compression: false, + logs_config_logs_dd_url: server.url(""), ..Config::default() }); let tags_provider = Arc::new(Provider::new( @@ -53,7 +54,6 @@ async fn test_logs() { let logs_flusher = LogsFlusher::new( dd_api_key.to_string(), Arc::clone(&logs_agent.aggregator), - server.base_url(), arc_conf.clone(), );