diff --git a/Cargo.lock b/Cargo.lock index 8970e45..2023ad4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -487,6 +487,15 @@ dependencies = [ "zstd", ] +[[package]] +name = "datadog-metrics-collector" +version = "0.1.0" +dependencies = [ + "dogstatsd", + "libdd-common 3.0.1", + "tracing", +] + [[package]] name = "datadog-opentelemetry" version = "0.3.0" @@ -540,6 +549,7 @@ version = "0.1.0" dependencies = [ "datadog-fips", "datadog-logs-agent", + "datadog-metrics-collector", "datadog-trace-agent", "dogstatsd", "libdd-trace-utils 3.0.0", @@ -2196,7 +2206,7 @@ version = "0.13.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" dependencies = [ - "heck 0.5.0", + "heck 0.4.1", "itertools 0.14.0", "log", "multimap", @@ -2676,9 +2686,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.103.10" +version = "0.103.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df33b2b81ac578cabaf06b89b0631153a3f416b0a886e8a7a1707fb51abbd1ef" +checksum = "8279bb85272c9f10811ae6a6c547ff594d6a7f3c6c6b02ee9726d1d0dcfcdd06" dependencies = [ "aws-lc-rs", "ring", diff --git a/crates/datadog-metrics-collector/Cargo.toml b/crates/datadog-metrics-collector/Cargo.toml new file mode 100644 index 0000000..7e6d0b9 --- /dev/null +++ b/crates/datadog-metrics-collector/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "datadog-metrics-collector" +version = "0.1.0" +edition.workspace = true +license.workspace = true +description = "Collector to read, compute, and submit enhanced metrics in Serverless environments" + +[dependencies] +dogstatsd = { path = "../dogstatsd", default-features = true } +tracing = { version = "0.1", default-features = false } +libdd-common = { git = "https://github.com/DataDog/libdatadog", rev = "8c88979985154d6d97c0fc2ca9039682981eacad", default-features = false } diff --git a/crates/datadog-metrics-collector/src/instance.rs b/crates/datadog-metrics-collector/src/instance.rs new file mode 100644 index 0000000..18f3a48 --- /dev/null +++ b/crates/datadog-metrics-collector/src/instance.rs @@ -0,0 +1,100 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Instance identity metric collector for Azure Functions. +//! +//! Submits `azure.functions.enhanced.instance` with value 1.0 on each +//! collection tick, tagged with the instance identifier. + +use dogstatsd::aggregator::AggregatorHandle; +use dogstatsd::metric::{Metric, MetricValue, SortedTags}; +use std::env; +use tracing::{error, warn}; + +const INSTANCE_METRIC: &str = "azure.functions.enhanced.instance"; + +/// Resolves the instance ID from explicit values (used by tests). +fn resolve_instance_id_from( + website_instance_id: Option<&str>, + website_pod_name: Option<&str>, + container_name: Option<&str>, +) -> Option { + website_instance_id + .or(website_pod_name) + .or(container_name) + .map(String::from) +} + +/// Resolves the instance ID from environment variables. +/// +/// Checks in order: +/// 1. `WEBSITE_INSTANCE_ID` (Elastic Premium / Premium plans) +/// 2. `WEBSITE_POD_NAME` (Flex Consumption / Consumption plans) +/// 3. `CONTAINER_NAME` (Flex Consumption / Consumption plans) +fn resolve_instance_id() -> Option { + resolve_instance_id_from( + env::var("WEBSITE_INSTANCE_ID").ok().as_deref(), + env::var("WEBSITE_POD_NAME").ok().as_deref(), + env::var("CONTAINER_NAME").ok().as_deref(), + ) +} + +pub struct InstanceMetricsCollector { + aggregator: AggregatorHandle, + tags: Option, +} + +impl InstanceMetricsCollector { + /// Creates a new collector, returning `None` if no instance ID is found. + pub fn new(aggregator: AggregatorHandle, tags: Option) -> Option { + let instance_id = resolve_instance_id(); + let Some(instance_id) = instance_id else { + warn!("No instance ID found, instance metric will not be submitted"); + return None; + }; + + // Precompute tags: enhanced metrics tags + instance tag + let instance_tag = format!("instance:{}", instance_id); + let tags = match tags { + Some(mut existing) => { + if let Ok(id_tag) = SortedTags::parse(&instance_tag) { + existing.extend(&id_tag); + } + Some(existing) + } + None => SortedTags::parse(&instance_tag).ok(), + }; + + Some(Self { aggregator, tags }) + } + + pub fn collect_and_submit(&self) { + let metric = Metric::new( + INSTANCE_METRIC.into(), + MetricValue::gauge(1.0), + self.tags.clone(), + None, + ); + + if let Err(e) = self.aggregator.insert_batch(vec![metric]) { + error!("Failed to insert instance metric: {}", e); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_resolve_instance_id_falls_back_to_pod_name() { + let id = resolve_instance_id_from(None, Some("pod-xyz"), Some("container-123")); + assert_eq!(id, Some("pod-xyz".to_string())); + } + + #[test] + fn test_resolve_instance_id_falls_back_to_container_name() { + let id = resolve_instance_id_from(None, None, Some("container-123")); + assert_eq!(id, Some("container-123".to_string())); + } +} diff --git a/crates/datadog-metrics-collector/src/lib.rs b/crates/datadog-metrics-collector/src/lib.rs new file mode 100644 index 0000000..5f22d37 --- /dev/null +++ b/crates/datadog-metrics-collector/src/lib.rs @@ -0,0 +1,11 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +#![cfg_attr(not(test), deny(clippy::panic))] +#![cfg_attr(not(test), deny(clippy::unwrap_used))] +#![cfg_attr(not(test), deny(clippy::expect_used))] +#![cfg_attr(not(test), deny(clippy::todo))] +#![cfg_attr(not(test), deny(clippy::unimplemented))] + +pub mod instance; +pub mod tags; diff --git a/crates/datadog-metrics-collector/src/tags.rs b/crates/datadog-metrics-collector/src/tags.rs new file mode 100644 index 0000000..c6db691 --- /dev/null +++ b/crates/datadog-metrics-collector/src/tags.rs @@ -0,0 +1,55 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Shared tag builder for enhanced metrics. +//! +//! Tags are attached to all enhanced metrics submitted by the metrics collector. + +use dogstatsd::metric::SortedTags; +use libdd_common::azure_app_services; +use std::env; + +/// Builds the common tags for all enhanced metrics. +/// +/// Sources: +/// - Azure metadata (resource_group, subscription_id, name) from libdd_common +/// - Environment variables (region, plan_tier, service, env, version, serverless_compat_version) +/// +/// The DogStatsD origin tag (e.g. `origin:azurefunction`) is added by the metrics aggregator, +/// not here. +pub fn build_enhanced_metrics_tags() -> Option { + let mut tag_parts = Vec::new(); + + if let Some(aas_metadata) = &*azure_app_services::AAS_METADATA_FUNCTION { + let aas_tags = [ + ("resource_group", aas_metadata.get_resource_group()), + ("subscription_id", aas_metadata.get_subscription_id()), + ("name", aas_metadata.get_site_name()), + ]; + for (name, value) in aas_tags { + if value != "unknown" { + tag_parts.push(format!("{}:{}", name, value)); + } + } + } + + for (tag_name, env_var) in [ + ("region", "REGION_NAME"), + ("plan_tier", "WEBSITE_SKU"), + ("service", "DD_SERVICE"), + ("env", "DD_ENV"), + ("version", "DD_VERSION"), + ("serverless_compat_version", "DD_SERVERLESS_COMPAT_VERSION"), + ] { + if let Ok(val) = env::var(env_var) + && !val.is_empty() + { + tag_parts.push(format!("{}:{}", tag_name, val)); + } + } + + if tag_parts.is_empty() { + return None; + } + SortedTags::parse(&tag_parts.join(",")).ok() +} diff --git a/crates/datadog-serverless-compat/Cargo.toml b/crates/datadog-serverless-compat/Cargo.toml index b84bb15..215d401 100644 --- a/crates/datadog-serverless-compat/Cargo.toml +++ b/crates/datadog-serverless-compat/Cargo.toml @@ -11,6 +11,7 @@ windows-pipes = ["datadog-trace-agent/windows-pipes", "dogstatsd/windows-pipes"] [dependencies] datadog-logs-agent = { path = "../datadog-logs-agent" } +datadog-metrics-collector = { path = "../datadog-metrics-collector" } datadog-trace-agent = { path = "../datadog-trace-agent" } libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "8c88979985154d6d97c0fc2ca9039682981eacad" } datadog-fips = { path = "../datadog-fips", default-features = false } diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index 8c20c41..c3f00ab 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -40,10 +40,12 @@ use dogstatsd::{ util::parse_metric_namespace, }; +use datadog_metrics_collector::instance::InstanceMetricsCollector; use dogstatsd::metric::{EMPTY_TAGS, SortedTags}; use tokio_util::sync::CancellationToken; const DOGSTATSD_FLUSH_INTERVAL: u64 = 10; +const INSTANCE_METRICS_COLLECTION_INTERVAL_SECS: u64 = 3; const DOGSTATSD_TIMEOUT_DURATION: Duration = Duration::from_secs(5); const DEFAULT_DOGSTATSD_PORT: u16 = 8125; const DEFAULT_LOG_INTAKE_PORT: u16 = 10517; @@ -119,6 +121,12 @@ pub async fn main() { .ok() .and_then(|v| v.parse::().ok()) .unwrap_or(DEFAULT_LOG_INTAKE_PORT); + + let dd_enhanced_metrics = env_type == EnvironmentType::AzureFunction + && env::var("DD_ENHANCED_METRICS_ENABLED") + .map(|val| val.to_lowercase() != "false") + .unwrap_or(true); + debug!("Starting serverless trace mini agent"); let env_filter = format!("h2=off,hyper=off,rustls=off,{}", log_level); @@ -182,30 +190,60 @@ pub async fn main() { } }); - let (metrics_flusher, _aggregator_handle) = if dd_use_dogstatsd { - debug!("Starting dogstatsd"); - let (_, metrics_flusher, aggregator_handle) = start_dogstatsd( - dd_dogstatsd_port, + let needs_aggregator = dd_use_dogstatsd || dd_enhanced_metrics; + + // The aggregator is shared between dogstatsd and enhanced metrics. + // It is started independently so that either can be enabled without the other. + // Only dogstatsd needs the dogstatsd listener + let (metrics_flusher, aggregator_handle) = if needs_aggregator { + debug!("Creating metrics flusher and aggregator"); + + let (flusher, handle) = start_aggregator( dd_api_key.clone(), dd_site, https_proxy.clone(), dogstatsd_tags, - dd_statsd_metric_namespace, - #[cfg(all(windows, feature = "windows-pipes"))] - dd_dogstatsd_windows_pipe_name.clone(), ) .await; - if let Some(ref windows_pipe_name) = dd_dogstatsd_windows_pipe_name { - info!("dogstatsd-pipe: starting to listen on pipe {windows_pipe_name}"); + + if dd_use_dogstatsd { + debug!("Starting dogstatsd"); + let _ = start_dogstatsd_listener( + dd_dogstatsd_port, + handle.clone(), + dd_statsd_metric_namespace, + #[cfg(all(windows, feature = "windows-pipes"))] + dd_dogstatsd_windows_pipe_name.clone(), + ) + .await; + if let Some(ref windows_pipe_name) = dd_dogstatsd_windows_pipe_name { + info!("dogstatsd-pipe: starting to listen on pipe {windows_pipe_name}"); + } else { + info!("dogstatsd-udp: starting to listen on port {dd_dogstatsd_port}"); + } } else { - info!("dogstatsd-udp: starting to listen on port {dd_dogstatsd_port}"); + info!("dogstatsd disabled"); } - (metrics_flusher, Some(aggregator_handle)) + (flusher, Some(handle)) } else { - info!("dogstatsd disabled"); + info!("dogstatsd and enhanced metrics disabled"); (None, None) }; + let instance_collector = if dd_enhanced_metrics && metrics_flusher.is_some() { + aggregator_handle.as_ref().and_then(|handle| { + let tags = datadog_metrics_collector::tags::build_enhanced_metrics_tags(); + InstanceMetricsCollector::new(handle.clone(), tags) + }) + } else { + if !dd_enhanced_metrics { + info!("Enhanced metrics disabled"); + } else { + info!("Enhanced metrics enabled but metrics flusher not found"); + } + None + }; + let (log_flusher, _log_aggregator_handle): (Option, Option) = if dd_logs_enabled { debug!("Starting log agent"); @@ -225,48 +263,60 @@ pub async fn main() { }; let mut flush_interval = interval(Duration::from_secs(DOGSTATSD_FLUSH_INTERVAL)); + let mut instance_metrics_collection_interval = interval(Duration::from_secs( + INSTANCE_METRICS_COLLECTION_INTERVAL_SECS, + )); flush_interval.tick().await; // discard first tick, which is instantaneous + instance_metrics_collection_interval.tick().await; // Builders for log batches that failed transiently in the previous flush // cycle. They are redriven on the next cycle before new batches are sent. let mut pending_log_retries: Vec = Vec::new(); loop { - flush_interval.tick().await; - - if let Some(metrics_flusher) = metrics_flusher.as_ref() { - debug!("Flushing dogstatsd metrics"); - metrics_flusher.flush().await; - } + tokio::select! { + _ = flush_interval.tick() => { + if let Some(metrics_flusher) = metrics_flusher.clone() { + debug!("Flushing dogstatsd metrics"); + tokio::spawn(async move { + metrics_flusher.flush().await; + }); + } - if let Some(log_flusher) = log_flusher.as_ref() { - debug!("Flushing log agent"); - let retry_in = std::mem::take(&mut pending_log_retries); - let failed = log_flusher.flush(retry_in).await; - if !failed.is_empty() { - // TODO: surface flush failures into health/metrics telemetry so - // operators have a durable signal beyond log lines when logs are - // being dropped (e.g. increment a statsd counter or set a gauge). - warn!( - "log agent flush failed for {} batch(es); will retry next cycle", - failed.len() - ); - pending_log_retries = failed; + if let Some(log_flusher) = log_flusher.as_ref() { + debug!("Flushing log agent"); + let retry_in = std::mem::take(&mut pending_log_retries); + let failed = log_flusher.flush(retry_in).await; + if !failed.is_empty() { + // TODO: surface flush failures into health/metrics telemetry so + // operators have a durable signal beyond log lines when logs are + // being dropped (e.g. increment a statsd counter or set a gauge). + warn!( + "log agent flush failed for {} batch(es); will retry next cycle", + failed.len() + ); + pending_log_retries = failed; + } + } + } + _ = instance_metrics_collection_interval.tick(), if instance_collector.is_some() => { + if let Some(ref collector) = instance_collector { + collector.collect_and_submit(); + } } } } } -async fn start_dogstatsd( - port: u16, +/// Starts the metrics aggregator service and creates a flusher to send +/// aggregated metrics to the Datadog intake. +async fn start_aggregator( dd_api_key: Option, dd_site: String, https_proxy: Option, dogstatsd_tags: &str, - metric_namespace: Option, - #[cfg(all(windows, feature = "windows-pipes"))] windows_pipe_name: Option, -) -> (CancellationToken, Option, AggregatorHandle) { - // 1. Create the aggregator service +) -> (Option, AggregatorHandle) { + // Create the aggregator service #[allow(clippy::expect_used)] let (service, handle) = AggregatorService::new( SortedTags::parse(dogstatsd_tags).unwrap_or(EMPTY_TAGS), @@ -274,53 +324,18 @@ async fn start_dogstatsd( ) .expect("Failed to create aggregator service"); - // 2. Start the aggregator service in the background + // Start the aggregator service in the background tokio::spawn(service.run()); - #[cfg(all(windows, feature = "windows-pipes"))] - let dogstatsd_config = DogStatsDConfig { - host: AGENT_HOST.to_string(), - port, - metric_namespace, - windows_pipe_name, - so_rcvbuf: None, - buffer_size: None, - queue_size: None, - }; - - #[cfg(not(all(windows, feature = "windows-pipes")))] - let dogstatsd_config = DogStatsDConfig { - host: AGENT_HOST.to_string(), - port, - metric_namespace, - so_rcvbuf: None, - buffer_size: None, - queue_size: None, - }; - let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new(); - - // 3. Use handle in DogStatsD (cheap to clone) - let dogstatsd_client = DogStatsD::new( - &dogstatsd_config, - handle.clone(), - dogstatsd_cancel_token.clone(), - ) - .await; - - tokio::spawn(async move { - dogstatsd_client.spin().await; - }); - let metrics_flusher = match dd_api_key { Some(dd_api_key) => { let client = match build_metrics_client(https_proxy, DOGSTATSD_TIMEOUT_DURATION) { Ok(client) => client, Err(e) => { error!("Failed to build HTTP client: {e}, won't flush metrics"); - return (dogstatsd_cancel_token, None, handle); + return (None, handle); } }; - let metrics_intake_url_prefix = match Site::new(dd_site) .map_err(|e| e.to_string()) .and_then(|site| { @@ -329,7 +344,7 @@ async fn start_dogstatsd( Ok(prefix) => prefix, Err(e) => { error!("Failed to create metrics intake URL: {e}, won't flush metrics"); - return (dogstatsd_cancel_token, None, handle); + return (None, handle); } }; @@ -349,7 +364,50 @@ async fn start_dogstatsd( } }; - (dogstatsd_cancel_token, metrics_flusher, handle) + (metrics_flusher, handle) +} + +async fn start_dogstatsd_listener( + port: u16, + handle: AggregatorHandle, + metric_namespace: Option, + #[cfg(all(windows, feature = "windows-pipes"))] windows_pipe_name: Option, +) -> CancellationToken { + #[cfg(all(windows, feature = "windows-pipes"))] + let dogstatsd_config = DogStatsDConfig { + host: AGENT_HOST.to_string(), + port, + metric_namespace, + windows_pipe_name, + so_rcvbuf: None, + buffer_size: None, + queue_size: None, + }; + + #[cfg(not(all(windows, feature = "windows-pipes")))] + let dogstatsd_config = DogStatsDConfig { + host: AGENT_HOST.to_string(), + port, + metric_namespace, + so_rcvbuf: None, + buffer_size: None, + queue_size: None, + }; + let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new(); + + // Use handle in DogStatsD (cheap to clone) + let dogstatsd_client = DogStatsD::new( + &dogstatsd_config, + handle.clone(), + dogstatsd_cancel_token.clone(), + ) + .await; + + tokio::spawn(async move { + dogstatsd_client.spin().await; + }); + + dogstatsd_cancel_token } fn build_metrics_client( diff --git a/crates/dogstatsd/src/origin.rs b/crates/dogstatsd/src/origin.rs index fc025b9..61705fd 100644 --- a/crates/dogstatsd/src/origin.rs +++ b/crates/dogstatsd/src/origin.rs @@ -18,6 +18,7 @@ const AZURE_FUNCTIONS_TAG_VALUE: &str = "azurefunction"; const DATADOG_PREFIX: &str = "datadog."; const AWS_LAMBDA_PREFIX: &str = "aws.lambda"; const GOOGLE_CLOUD_RUN_PREFIX: &str = "gcp.run"; +const AZURE_FUNCTIONS_PREFIX: &str = "azure.functions"; const JVM_PREFIX: &str = "jvm."; const RUNTIME_PREFIX: &str = "runtime."; @@ -83,15 +84,17 @@ impl Metric { .join("."); // Determine the service based on metric prefix first - let service = if metric_name.starts_with(JVM_PREFIX) - || metric_name.starts_with(RUNTIME_PREFIX) - { - OriginService::ServerlessRuntime - } else if metric_prefix == AWS_LAMBDA_PREFIX || metric_prefix == GOOGLE_CLOUD_RUN_PREFIX { - OriginService::ServerlessEnhanced - } else { - OriginService::ServerlessCustom - }; + let service = + if metric_name.starts_with(JVM_PREFIX) || metric_name.starts_with(RUNTIME_PREFIX) { + OriginService::ServerlessRuntime + } else if metric_prefix == AWS_LAMBDA_PREFIX + || metric_prefix == GOOGLE_CLOUD_RUN_PREFIX + || metric_prefix == AZURE_FUNCTIONS_PREFIX + { + OriginService::ServerlessEnhanced + } else { + OriginService::ServerlessCustom + }; // Then determine the category based on tags let category = if has_tag_value(&tags, AWS_LAMBDA_TAG_KEY, "") { @@ -297,7 +300,32 @@ mod tests { } #[test] - fn test_find_metric_origin_azure_functions() { + fn test_find_metric_origin_azure_functions_enhanced() { + let tags = SortedTags::parse("origin:azurefunction").unwrap(); + let metric = Metric { + id: 0, + name: "azure.functions.enhanced.instance".into(), + value: MetricValue::Gauge(1.0), + tags: Some(tags.clone()), + timestamp: 0, + }; + let origin = metric.find_origin(tags).unwrap(); + assert_eq!( + origin.origin_product as u32, + OriginProduct::Serverless as u32 + ); + assert_eq!( + origin.origin_category as u32, + OriginCategory::AzureFunctionsMetrics as u32 + ); + assert_eq!( + origin.origin_service as u32, + OriginService::ServerlessEnhanced as u32 + ); + } + + #[test] + fn test_find_metric_origin_azure_functions_custom() { let tags = SortedTags::parse("origin:azurefunction").unwrap(); let metric = Metric { id: 0,