Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 37 additions & 17 deletions bottlecap/src/bin/bottlecap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@ use bottlecap::{
listener::Listener as LifecycleListener,
},
logger,
logs::{
agent::LogsAgent,
flusher::{build_fqdn_logs, Flusher as LogsFlusher},
},
logs::{agent::LogsAgent, flusher::Flusher as LogsFlusher},
secrets::decrypt,
tags::{
lambda::{self, tags::EXTENSION_VERSION},
Expand Down Expand Up @@ -50,7 +47,9 @@ use decrypt::resolve_secrets;
use dogstatsd::{
aggregator::Aggregator as MetricsAggregator,
constants::CONTEXTS,
datadog::{MetricsIntakeUrlPrefix, Site as MetricsSite},
datadog::{
DdDdUrl, DdUrl, MetricsIntakeUrlPrefix, MetricsIntakeUrlPrefixOverride, Site as MetricsSite,
},
dogstatsd::{DogStatsD, DogStatsDConfig},
flusher::{Flusher as MetricsFlusher, FlusherConfig as MetricsFlusherConfig},
metric::{SortedTags, EMPTY_TAGS},
Expand Down Expand Up @@ -315,17 +314,8 @@ async fn extension_loop_active(
.expect("failed to create aggregator"),
));

let metrics_site = MetricsSite::new(config.site.clone()).expect("can't parse site");
let flusher_config = MetricsFlusherConfig {
api_key: resolved_api_key.clone(),
aggregator: Arc::clone(&metrics_aggr),
metrics_intake_url_prefix: MetricsIntakeUrlPrefix::new(Some(metrics_site), None)
.expect("can't parse metrics intake URL from site"),
https_proxy: config.https_proxy.clone(),
timeout: Duration::from_secs(config.flush_timeout),
};
let mut metrics_flusher = MetricsFlusher::new(flusher_config);

let mut metrics_flusher =
start_metrics_flusher(resolved_api_key.clone(), &metrics_aggr, config);
// Lifecycle Invocation Processor
let invocation_processor = Arc::new(TokioMutex::new(InvocationProcessor::new(
Arc::clone(&tags_provider),
Expand Down Expand Up @@ -713,7 +703,6 @@ fn start_logs_agent(
let logs_flusher = LogsFlusher::new(
resolved_api_key,
Arc::clone(&logs_agent.aggregator),
build_fqdn_logs(config.site.clone()),
config.clone(),
);
tokio::spawn(async move {
Expand All @@ -722,6 +711,37 @@ fn start_logs_agent(
(logs_agent_channel, logs_flusher)
}

fn start_metrics_flusher(
resolved_api_key: String,
metrics_aggr: &Arc<Mutex<MetricsAggregator>>,
config: &Arc<Config>,
) -> MetricsFlusher {
let metrics_intake_url = if !config.dd_url.is_empty() {
let dd_dd_url = DdDdUrl::new(config.dd_url.clone()).expect("can't parse DD_DD_URL");

let prefix_override = MetricsIntakeUrlPrefixOverride::maybe_new(None, Some(dd_dd_url));
MetricsIntakeUrlPrefix::new(None, prefix_override)
} else if !config.url.is_empty() {
let dd_url = DdUrl::new(config.url.clone()).expect("can't parse DD_URL");

let prefix_override = MetricsIntakeUrlPrefixOverride::maybe_new(Some(dd_url), None);
MetricsIntakeUrlPrefix::new(None, prefix_override)
} else {
// use site
let metrics_site = MetricsSite::new(config.site.clone()).expect("can't parse site");
MetricsIntakeUrlPrefix::new(Some(metrics_site), None)
};

let flusher_config = MetricsFlusherConfig {
api_key: resolved_api_key,
aggregator: Arc::clone(metrics_aggr),
metrics_intake_url_prefix: metrics_intake_url.expect("can't parse site or override"),
https_proxy: config.https_proxy.clone(),
timeout: Duration::from_secs(config.flush_timeout),
};
MetricsFlusher::new(flusher_config)
}

fn start_trace_agent(
config: &Arc<Config>,
resolved_api_key: String,
Expand Down
137 changes: 95 additions & 42 deletions bottlecap/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod processing_rule;
pub mod service_mapping;
pub mod trace_propagation_style;

use datadog_trace_utils::config_utils::{trace_intake_url, trace_intake_url_prefixed};
use std::collections::HashMap;
use std::path::Path;
use std::time::Instant;
Expand Down Expand Up @@ -38,12 +39,6 @@ pub struct FallbackConfig {
trace_otel_enabled: bool,
otlp_config_receiver_protocols_http_endpoint: Option<String>,
otlp_config_receiver_protocols_grpc_endpoint: Option<String>,
// intake urls
url: Option<String>,
dd_url: Option<String>,
logs_config_logs_dd_url: Option<String>,
// APM, as opposed to logs, does not use the `apm_config` prefix for env vars
apm_dd_url: Option<String>,
}

/// `FallbackYamlConfig` is a struct that represents fields in `datadog.yaml` not yet supported in the extension yet.
Expand All @@ -52,8 +47,6 @@ pub struct FallbackConfig {
#[serde(default)]
#[allow(clippy::module_name_repetitions)]
pub struct FallbackYamlConfig {
logs_config: Option<Value>,
apm_config: Option<Value>,
otlp_config: Option<Value>,
}
#[derive(Debug, PartialEq, Deserialize, Clone, Default)]
Expand Down Expand Up @@ -102,6 +95,7 @@ pub struct Config {
pub logs_config_processing_rules: Option<Vec<ProcessingRule>>,
pub logs_config_use_compression: bool,
pub logs_config_compression_level: i32,
pub logs_config_logs_dd_url: String,
pub serverless_flush_strategy: FlushStrategy,
pub enhanced_metrics: bool,
//flush timeout in seconds
Expand All @@ -119,6 +113,10 @@ pub struct Config {
pub trace_propagation_style_extract: Vec<TracePropagationStyle>,
pub trace_propagation_extract_first: bool,
pub trace_propagation_http_baggage_enabled: bool,
pub apm_config_apm_dd_url: String,
// Metrics overrides
pub dd_url: String,
pub url: String,
}

impl Default for Config {
Expand All @@ -141,6 +139,7 @@ impl Default for Config {
logs_config_processing_rules: None,
logs_config_use_compression: true,
logs_config_compression_level: 6,
logs_config_logs_dd_url: String::default(),
Comment thread
astuyve marked this conversation as resolved.
// Metrics
enhanced_metrics: true,
https_proxy: None,
Expand All @@ -156,6 +155,9 @@ impl Default for Config {
trace_propagation_style_extract: vec![],
trace_propagation_extract_first: false,
trace_propagation_http_baggage_enabled: false,
apm_config_apm_dd_url: String::default(),
dd_url: String::default(),
url: String::default(),
}
}
}
Expand Down Expand Up @@ -221,22 +223,6 @@ fn fallback(figment: &Figment, yaml_figment: &Figment, region: &str) -> Result<(
return Err(ConfigError::UnsupportedField("otel".to_string()));
}

// Intake URLs
if config.url.is_some()
|| config.dd_url.is_some()
|| config.logs_config_logs_dd_url.is_some()
|| config.apm_dd_url.is_some()
|| yaml_config
.logs_config
.is_some_and(|c| c.get("logs_dd_url").is_some())
|| yaml_config
.apm_config
.is_some_and(|c| c.get("apm_dd_url").is_some())
{
log_fallback_reason("intake_urls");
return Err(ConfigError::UnsupportedField("intake_urls".to_string()));
}

// Govcloud Regions
if region.starts_with("us-gov-") {
log_fallback_reason("gov_region");
Expand Down Expand Up @@ -314,10 +300,27 @@ pub fn get_config(config_directory: &Path, region: &str) -> Result<Config, Confi
.trace_propagation_style_extract
.clone_from(&config.trace_propagation_style);
}
if config.logs_config_logs_dd_url.is_empty() {
config.logs_config_logs_dd_url = build_fqdn_logs(config.site.clone());
Comment thread
astuyve marked this conversation as resolved.
}

if config.apm_config_apm_dd_url.is_empty() {
config.apm_config_apm_dd_url = trace_intake_url(config.site.clone().as_str());
} else {
config.apm_config_apm_dd_url =
trace_intake_url_prefixed(config.apm_config_apm_dd_url.as_str());
}

// Metrics are handled by dogstatsd in Main
Ok(config)
}

#[inline]
#[must_use]
fn build_fqdn_logs(site: String) -> String {
format!("https://http-intake.logs.{site}")
}

fn deserialize_string_or_int<'de, D>(deserializer: D) -> Result<Option<String>, D::Error>
where
D: Deserializer<'de>,
Expand Down Expand Up @@ -452,43 +455,90 @@ pub mod tests {
}

#[test]
fn test_fallback_on_intake_urls() {
fn test_default_logs_intake_url() {
figment::Jail::expect_with(|jail| {
jail.clear_env();
jail.set_env("DD_APM_DD_URL", "some_url");

let config =
get_config(Path::new(""), MOCK_REGION).expect_err("should reject unknown fields");
let config = get_config(Path::new(""), MOCK_REGION).expect("should parse config");
assert_eq!(
config,
ConfigError::UnsupportedField("intake_urls".to_string())
config.logs_config_logs_dd_url,
"https://http-intake.logs.datadoghq.com".to_string()
);
Ok(())
});
}

#[test]
fn test_fallback_on_intake_urls_yaml() {
fn test_support_pci_logs_intake_url() {
figment::Jail::expect_with(|jail| {
jail.clear_env();
jail.create_file(
"datadog.yaml",
r"
apm_config:
apm_dd_url: some_url
",
)?;
jail.set_env(
"DD_LOGS_CONFIG_LOGS_DD_URL",
"agent-http-intake-pci.logs.datadoghq.com:443",
);

let config =
get_config(Path::new(""), MOCK_REGION).expect_err("should reject unknown fields");
let config = get_config(Path::new(""), MOCK_REGION).expect("should parse config");
assert_eq!(
config,
ConfigError::UnsupportedField("intake_urls".to_string())
config.logs_config_logs_dd_url,
"agent-http-intake-pci.logs.datadoghq.com:443".to_string()
);
Ok(())
});
}

#[test]
fn test_support_pci_traces_intake_url() {
figment::Jail::expect_with(|jail| {
jail.clear_env();
jail.set_env(
"DD_APM_CONFIG_APM_DD_URL",
"https://trace-pci.agent.datadoghq.com",
);

let config = get_config(Path::new(""), MOCK_REGION).expect("should parse config");
assert_eq!(
config.apm_config_apm_dd_url,
"https://trace-pci.agent.datadoghq.com/api/v0.2/traces".to_string()
);
Ok(())
});
}

#[test]
fn test_support_dd_dd_url() {
figment::Jail::expect_with(|jail| {
jail.clear_env();
jail.set_env("DD_DD_URL", "custom_proxy:3128");

let config = get_config(Path::new(""), MOCK_REGION).expect("should parse config");
assert_eq!(config.dd_url, "custom_proxy:3128".to_string());
Ok(())
});
}

#[test]
fn test_support_dd_url() {
figment::Jail::expect_with(|jail| {
jail.clear_env();
jail.set_env("DD_URL", "custom_proxy:3128");

let config = get_config(Path::new(""), MOCK_REGION).expect("should parse config");
assert_eq!(config.url, "custom_proxy:3128".to_string());
Ok(())
});
}

#[test]
fn test_dd_dd_url_default() {
figment::Jail::expect_with(|jail| {
jail.clear_env();

let config = get_config(Path::new(""), MOCK_REGION).expect("should parse config");
assert_eq!(config.dd_url, String::new());
Ok(())
});
}

#[test]
fn test_allowed_but_disabled() {
figment::Jail::expect_with(|jail| {
Expand Down Expand Up @@ -572,6 +622,9 @@ pub mod tests {
TracePropagationStyle::Datadog,
TracePropagationStyle::TraceContext
],
logs_config_logs_dd_url: "https://http-intake.logs.datadoghq.com".to_string(),
apm_config_apm_dd_url: trace_intake_url("datadoghq.com").to_string(),
dd_url: String::new(), // We add the prefix in main.rs
..Config::default()
}
);
Expand Down
11 changes: 1 addition & 10 deletions bottlecap/src/logs/flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,16 @@ use zstd::stream::write::Encoder;

#[derive(Debug, Clone)]
pub struct Flusher {
fqdn_site: String,
client: reqwest::Client,
aggregator: Arc<Mutex<Aggregator>>,
config: Arc<config::Config>,
headers: HeaderMap,
}

#[inline]
#[must_use]
pub fn build_fqdn_logs(site: String) -> String {
format!("https://http-intake.logs.{site}")
}

impl Flusher {
pub fn new(
api_key: String,
aggregator: Arc<Mutex<Aggregator>>,
site: String,
config: Arc<config::Config>,
) -> Self {
let client = http_client::get_client(config.clone());
Expand All @@ -57,7 +49,6 @@ impl Flusher {
}

Flusher {
fqdn_site: site,
client,
aggregator,
config,
Expand Down Expand Up @@ -94,7 +85,7 @@ impl Flusher {
}

fn create_request(&self, data: Vec<u8>) -> reqwest::RequestBuilder {
let url = format!("{}/api/v2/logs", self.fqdn_site);
let url = format!("{}/api/v2/logs", self.config.logs_config_logs_dd_url);
let body = self.compress(data);
self.client
.post(&url)
Expand Down
1 change: 0 additions & 1 deletion bottlecap/src/traces/trace_flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ impl TraceFlusher for ServerlessTraceFlusher {
Ok(_) => debug!("Successfully flushed traces"),
Err(e) => {
error!("Error sending trace: {e:?}");
// TODO: Retries
}
}
}
Expand Down
Loading
Loading