diff --git a/Cargo.lock b/Cargo.lock index fa5647e590..513b2d80c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1353,8 +1353,10 @@ checksum = "d7a1e2f27636f116493b8b860f5546edb47c8d8f8ea73e1d2a20be88e28d1fea" name = "datadog-ffe" version = "1.0.0" dependencies = [ + "anyhow", "chrono", "criterion", + "datadog-remote-config", "derive_more", "env_logger", "faststr", @@ -1432,6 +1434,7 @@ dependencies = [ "anyhow", "bytes", "constcat", + "datadog-remote-config", "http", "http-body-util", "libdd-common", @@ -1479,8 +1482,6 @@ version = "0.0.1" dependencies = [ "anyhow", "base64 0.22.1", - "datadog-ffe", - "datadog-live-debugger", "datadog-remote-config", "futures", "futures-util", diff --git a/datadog-ffe/Cargo.toml b/datadog-ffe/Cargo.toml index 30dcad9231..dc0aa2751e 100644 --- a/datadog-ffe/Cargo.toml +++ b/datadog-ffe/Cargo.toml @@ -11,6 +11,8 @@ publish = false bench = false [dependencies] +anyhow = { version = "1.0" } +datadog-remote-config = { path = "../datadog-remote-config", default-features = false } faststr = { version = "0.2.23", default-features = false, features = ["serde"] } serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde_json = { version = "1.0", default-features = false, features = ["std", "raw_value"] } diff --git a/datadog-ffe/src/lib.rs b/datadog-ffe/src/lib.rs index a32b8b757c..3182c5a329 100644 --- a/datadog-ffe/src/lib.rs +++ b/datadog-ffe/src/lib.rs @@ -3,6 +3,7 @@ mod flag_type; +pub mod remote_config; pub mod rules_based; pub use flag_type::{ExpectedFlagType, FlagType}; diff --git a/datadog-ffe/src/remote_config.rs b/datadog-ffe/src/remote_config.rs new file mode 100644 index 0000000000..4ed636a2cd --- /dev/null +++ b/datadog-ffe/src/remote_config.rs @@ -0,0 +1,13 @@ +// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use crate::rules_based::UniversalFlagConfig; +use datadog_remote_config::{RemoteConfigContent, RemoteConfigProduct}; + +impl RemoteConfigContent for UniversalFlagConfig { + const PRODUCT: RemoteConfigProduct = RemoteConfigProduct::FfeFlags; + + fn parse(data: &[u8]) -> anyhow::Result { + Ok(UniversalFlagConfig::from_json(data.to_vec())?) + } +} diff --git a/datadog-live-debugger/Cargo.toml b/datadog-live-debugger/Cargo.toml index c305dc179d..ec5ec690ff 100644 --- a/datadog-live-debugger/Cargo.toml +++ b/datadog-live-debugger/Cargo.toml @@ -7,6 +7,7 @@ publish = false [dependencies] anyhow = "1.0" +datadog-remote-config = { path = "../datadog-remote-config", default-features = false } libdd-common = { path = "../libdd-common" } libdd-data-pipeline = { path = "../libdd-data-pipeline" } http-body-util = "0.1" diff --git a/datadog-live-debugger/src/lib.rs b/datadog-live-debugger/src/lib.rs index fca585e432..69f09d0ee3 100644 --- a/datadog-live-debugger/src/lib.rs +++ b/datadog-live-debugger/src/lib.rs @@ -13,6 +13,7 @@ mod probe_defs; pub mod debugger_defs; mod redacted_names; +pub mod remote_config; pub mod sender; pub use expr_eval::*; diff --git a/datadog-live-debugger/src/remote_config.rs b/datadog-live-debugger/src/remote_config.rs new file mode 100644 index 0000000000..a5e2f6e953 --- /dev/null +++ b/datadog-live-debugger/src/remote_config.rs @@ -0,0 +1,13 @@ +// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use crate::probe_defs::LiveDebuggingData; +use datadog_remote_config::{RemoteConfigContent, RemoteConfigProduct}; + +impl RemoteConfigContent for LiveDebuggingData { + const PRODUCT: RemoteConfigProduct = RemoteConfigProduct::LiveDebugger; + + fn parse(data: &[u8]) -> anyhow::Result { + crate::parse_json::parse(&String::from_utf8_lossy(data)) + } +} diff --git a/datadog-remote-config/Cargo.toml b/datadog-remote-config/Cargo.toml index e4102603cb..b5d5f9d093 100644 --- a/datadog-remote-config/Cargo.toml +++ b/datadog-remote-config/Cargo.toml @@ -21,17 +21,14 @@ client = [ "time", "tracing" ] -live-debugger = ["datadog-live-debugger"] + regex-lite = ["libdd-common/regex-lite"] -ffe = ["datadog-ffe"] test = ["hyper/server", "hyper-util"] [dependencies] anyhow = { version = "1.0" } libdd-common = { path = "../libdd-common"} libdd-trace-protobuf = { path = "../libdd-trace-protobuf", optional = true } -datadog-live-debugger = { path = "../datadog-live-debugger", optional = true } -datadog-ffe = { path = "../datadog-ffe", optional = true } hyper = { workspace = true, optional = true, default-features = false } http-body-util = {version = "0.1", optional = true } http = { version = "1.1", optional = true } diff --git a/datadog-remote-config/examples/remote_config_fetch.rs b/datadog-remote-config/examples/remote_config_fetch.rs index d53146a3e6..a6e849516f 100644 --- a/datadog-remote-config/examples/remote_config_fetch.rs +++ b/datadog-remote-config/examples/remote_config_fetch.rs @@ -5,7 +5,7 @@ use datadog_remote_config::fetch::{ConfigInvariants, ConfigOptions, SingleChange use datadog_remote_config::file_change_tracker::{Change, FilePath}; use datadog_remote_config::file_storage::ParsedFileStorage; use datadog_remote_config::RemoteConfigProduct::ApmTracing; -use datadog_remote_config::{RemoteConfigData, Target}; +use datadog_remote_config::{RemoteConfigParsedData, Target}; use libdd_common::tag::Tag; use libdd_common::Endpoint; use std::time::Duration; @@ -86,12 +86,15 @@ async fn main() { } } -fn print_file_contents(contents: &anyhow::Result) { +fn print_file_contents(contents: &anyhow::Result>>) { // Note: these contents may be large. Do not actually print it fully in a non-dev env. match contents { - Ok(data) => { + Ok(Some(data)) => { println!("File contents: {data:?}"); } + Ok(None) => { + println!("Unregistered product, no parsed data"); + } Err(e) => { println!("Failed parsing file: {e:?}"); } diff --git a/datadog-remote-config/src/config/dynamic.rs b/datadog-remote-config/src/config/dynamic.rs index 96e32371d6..c0fa77940c 100644 --- a/datadog-remote-config/src/config/dynamic.rs +++ b/datadog-remote-config/src/config/dynamic.rs @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize}; use std::collections::HashMap; -#[derive(Debug, Deserialize)] +#[derive(Debug, Clone, Deserialize)] #[cfg_attr(feature = "test", derive(Default, Serialize))] pub struct DynamicConfigTarget { #[serde(default)] @@ -13,7 +13,7 @@ pub struct DynamicConfigTarget { pub env: Option, } -#[derive(Debug, Deserialize)] +#[derive(Debug, Clone, Deserialize)] #[cfg_attr(feature = "test", derive(Serialize))] pub struct DynamicConfigFile { pub action: String, @@ -76,7 +76,7 @@ pub struct TracingSamplingRule { pub sample_rate: f64, } -#[derive(Debug, Deserialize)] +#[derive(Debug, Clone, Deserialize)] #[cfg_attr(feature = "test", derive(Default, Serialize))] pub struct DynamicConfig { pub(crate) tracing_header_tags: Option>, diff --git a/datadog-remote-config/src/fetch/fetcher.rs b/datadog-remote-config/src/fetch/fetcher.rs index 9378c8e5a6..4f985ff58b 100644 --- a/datadog-remote-config/src/fetch/fetcher.rs +++ b/datadog-remote-config/src/fetch/fetcher.rs @@ -220,6 +220,9 @@ pub struct ConfigClientState { targets_version: u64, root_version: u64, last_error: Option, + /// Services discovered at runtime. Sent to the agent on each poll so it can route configs + /// targeting those services to this client. Updated out-of-band by the consumer + extra_services: Vec, } impl Default for ConfigClientState { @@ -231,10 +234,17 @@ impl Default for ConfigClientState { targets_version: 0, root_version: 1, last_error: None, + extra_services: vec![], } } } +impl ConfigClientState { + pub fn set_extra_services(&mut self, services: Vec) { + self.extra_services = services; + } +} + impl ConfigFetcher { pub fn new(file_storage: S, state: Arc>) -> Self { ConfigFetcher { @@ -299,6 +309,7 @@ impl ConfigFetcher { } } } + let extra_services = opaque_state.extra_services.clone(); let config_req = ClientGetConfigsRequest { client: Some(libdd_trace_protobuf::remoteconfig::Client { @@ -322,7 +333,7 @@ impl ConfigFetcher { language: self.state.invariants.language.to_string(), tracer_version: self.state.invariants.tracer_version.clone(), service, - extra_services: vec![], + extra_services, env, app_version, tags: tags.iter().map(|t| t.to_string()).collect(), @@ -914,6 +925,100 @@ pub mod tests { } } + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn test_extra_services_forwarded_in_client_tracer() { + let server: Arc = RemoteConfigServer::spawn(); + server.files.lock().unwrap().insert( + PATH_FIRST.clone(), + (vec![DUMMY_TARGET.clone()], 1, "v1".to_string()), + ); + + let storage = Arc::new(Storage::default()); + let mut fetcher = ConfigFetcher::new( + storage, + Arc::new(ConfigFetcherState::new(server.dummy_options().invariants)), + ); + let mut opaque_state = ConfigClientState::default(); + + // Default: nothing set, agent receives an empty list. + fetcher + .fetch_once( + DUMMY_RUNTIME_ID, + DUMMY_TARGET.clone(), + &server.dummy_product_capabilities(), + "foo", + &mut opaque_state, + ) + .await + .unwrap(); + { + let req = server.last_request.lock().unwrap(); + let tracer = req + .as_ref() + .unwrap() + .client + .as_ref() + .unwrap() + .client_tracer + .as_ref() + .unwrap(); + assert!(tracer.extra_services.is_empty()); + } + + // After set_extra_services, the next poll forwards them to the agent. + opaque_state.set_extra_services(vec!["svc-a".to_string(), "svc-b".to_string()]); + fetcher + .fetch_once( + DUMMY_RUNTIME_ID, + DUMMY_TARGET.clone(), + &server.dummy_product_capabilities(), + "foo", + &mut opaque_state, + ) + .await + .unwrap(); + { + let req = server.last_request.lock().unwrap(); + let tracer = req + .as_ref() + .unwrap() + .client + .as_ref() + .unwrap() + .client_tracer + .as_ref() + .unwrap(); + assert_eq!(tracer.extra_services, &["svc-a", "svc-b"]); + } + + // Replace-semantics: a subsequent set fully overrides the previous list. + opaque_state.set_extra_services(vec!["svc-c".to_string()]); + fetcher + .fetch_once( + DUMMY_RUNTIME_ID, + DUMMY_TARGET.clone(), + &server.dummy_product_capabilities(), + "foo", + &mut opaque_state, + ) + .await + .unwrap(); + { + let req = server.last_request.lock().unwrap(); + let tracer = req + .as_ref() + .unwrap() + .client + .as_ref() + .unwrap() + .client_tracer + .as_ref() + .unwrap(); + assert_eq!(tracer.extra_services, &["svc-c"]); + } + } + #[tokio::test] #[cfg_attr(miri, ignore)] async fn test_process_tags_forwarded_in_client_tracer() { diff --git a/datadog-remote-config/src/fetch/single.rs b/datadog-remote-config/src/fetch/single.rs index 66d8120638..9a0d87fd86 100644 --- a/datadog-remote-config/src/fetch/single.rs +++ b/datadog-remote-config/src/fetch/single.rs @@ -70,6 +70,13 @@ impl SingleFetcher { pub fn set_config_state(&self, file: &RemoteConfigPath, state: ConfigApplyState) { self.fetcher.set_config_state(file, state) } + + /// Update the set of services discovered at runtime + /// Sent to the agent on each subsequent poll so it can route configs targeting those + /// services to this client. Replace-semantics: the new vec fully overrides the previous one. + pub fn set_extra_services(&mut self, services: Vec) { + self.opaque_state.set_extra_services(services); + } } pub struct SingleChangesFetcher @@ -117,4 +124,9 @@ where pub fn set_config_state(&self, file: &S::StoredFile, state: ConfigApplyState) { self.fetcher.set_config_state(file.path(), state) } + + /// See [`SingleFetcher::set_extra_services`]. + pub fn set_extra_services(&mut self, services: Vec) { + self.fetcher.set_extra_services(services); + } } diff --git a/datadog-remote-config/src/fetch/test_server.rs b/datadog-remote-config/src/fetch/test_server.rs index 1a02add2d7..1ed16ad0f3 100644 --- a/datadog-remote-config/src/fetch/test_server.rs +++ b/datadog-remote-config/src/fetch/test_server.rs @@ -216,9 +216,6 @@ impl RemoteConfigServer { tracer_version: "1.2.3".to_string(), endpoint: self.endpoint.clone(), }, - #[cfg(not(feature = "live-debugger"))] - products: vec![RemoteConfigProduct::ApmTracing], - #[cfg(feature = "live-debugger")] products: vec![ RemoteConfigProduct::ApmTracing, RemoteConfigProduct::LiveDebugger, diff --git a/datadog-remote-config/src/file_storage.rs b/datadog-remote-config/src/file_storage.rs index 6c9428d7ac..bdb8221f80 100644 --- a/datadog-remote-config/src/file_storage.rs +++ b/datadog-remote-config/src/file_storage.rs @@ -3,62 +3,73 @@ use crate::fetch::FileStorage; use crate::file_change_tracker::{FilePath, UpdatedFiles}; -use crate::{RemoteConfigData, RemoteConfigPath}; +use crate::parse::{default_registry, ParserRegistry, RemoteConfigParsedData}; +use crate::RemoteConfigPath; use libdd_common::MutexExt; use std::ops::Deref; use std::sync::{Arc, Mutex, MutexGuard}; /// A trivial local storage for remote config files. pub struct RawFileStorage { - updated: Mutex>, P)>>, + parser: P, + #[allow(clippy::type_complexity)] + updated: Mutex>, P::Parsed)>>, } -impl Default for RawFileStorage

{ +impl Default for RawFileStorage

{ fn default() -> Self { + Self::new(P::default()) + } +} + +impl RawFileStorage

{ + pub fn new(parser: P) -> Self { RawFileStorage { + parser, updated: Mutex::default(), } } } -pub trait ParseFile -where - Self: Sized, -{ - fn parse(path: &RemoteConfigPath, contents: Vec) -> Self; +/// Instance-based file parser. Implementations may carry state (e.g. an [`Arc`]). +pub trait ParseFile: Clone + Send + Sync { + /// The type of the parsed/stored content. + type Parsed: Send; + + fn parse(&self, path: &RemoteConfigPath, contents: Vec) -> Self::Parsed; } -impl UpdatedFiles, P> for RawFileStorage

{ - fn updated(&self) -> Vec<(Arc>, P)> { +impl UpdatedFiles, P::Parsed> for RawFileStorage

{ + fn updated(&self) -> Vec<(Arc>, P::Parsed)> { std::mem::take(&mut *self.updated.lock_or_panic()) } } /// Mutable data: version and contents. -struct RawFileData

{ +struct RawFileData { version: u64, - contents: P, + contents: T, } /// File contents and file metadata -pub struct RawFile

{ +pub struct RawFile { path: Arc, - data: Mutex>, + data: Mutex>, } -pub struct RawFileContentsGuard<'a, P>(MutexGuard<'a, RawFileData

>); +pub struct RawFileContentsGuard<'a, T>(MutexGuard<'a, RawFileData>); -impl

Deref for RawFileContentsGuard<'_, P> { - type Target = P; +impl Deref for RawFileContentsGuard<'_, T> { + type Target = T; fn deref(&self) -> &Self::Target { &self.0.contents } } -impl

RawFile

{ +impl RawFile { /// Gets the contents behind a Deref impl (guarding a Mutex). - pub fn contents(&self) -> RawFileContentsGuard<'_, P> { + pub fn contents(&self) -> RawFileContentsGuard<'_, T> { RawFileContentsGuard(self.data.lock_or_panic()) } @@ -67,14 +78,14 @@ impl

RawFile

{ } } -impl

FilePath for RawFile

{ +impl FilePath for RawFile { fn path(&self) -> &RemoteConfigPath { &self.path } } impl FileStorage for RawFileStorage

{ - type StoredFile = RawFile

; + type StoredFile = RawFile; fn store( &self, @@ -85,7 +96,7 @@ impl FileStorage for RawFileStorage

{ Ok(Arc::new(RawFile { data: Mutex::new(RawFileData { version, - contents: P::parse(&path, contents), + contents: self.parser.parse(&path, contents), }), path, })) @@ -97,7 +108,7 @@ impl FileStorage for RawFileStorage

{ version: u64, contents: Vec, ) -> anyhow::Result<()> { - let mut contents = P::parse(&file.path, contents); + let mut contents = self.parser.parse(&file.path, contents); let mut data = file.data.lock_or_panic(); std::mem::swap(&mut data.contents, &mut contents); self.updated.lock_or_panic().push((file.clone(), contents)); @@ -106,20 +117,49 @@ impl FileStorage for RawFileStorage

{ } } -/// It simply stores the raw remote config file contents. -pub type SimpleFileStorage = RawFileStorage>; +// ── RawBytesParser ──────────────────────────────────────────────────────────── + +/// Stores raw remote config file bytes without parsing. +#[derive(Clone, Default)] +pub struct RawBytesParser; -impl ParseFile for Vec { - fn parse(_path: &RemoteConfigPath, contents: Vec) -> Self { +impl ParseFile for RawBytesParser { + type Parsed = Vec; + + fn parse(&self, _path: &RemoteConfigPath, contents: Vec) -> Vec { contents } } -/// Storing the remote config file contents in parsed form -pub type ParsedFileStorage = RawFileStorage>; +/// Stores the remote config file contents in raw (unparsed) form. +pub type SimpleFileStorage = RawFileStorage; + +// ── RegistryParser ──────────────────────────────────────────────────────────── + +/// A [`ParseFile`] implementation that dispatches to a [`ParserRegistry`]. +#[derive(Clone)] +pub struct RegistryParser(pub Arc); + +impl ParseFile for RegistryParser { + type Parsed = anyhow::Result>>; -impl ParseFile for anyhow::Result { - fn parse(path: &RemoteConfigPath, contents: Vec) -> Self { - RemoteConfigData::try_parse(path.product, contents.as_slice()) + fn parse(&self, path: &RemoteConfigPath, contents: Vec) -> Self::Parsed { + self.0.parse(path.product, &contents) + } +} + +/// Stores the remote config file contents in parsed form using a [`ParserRegistry`]. +pub type ParsedFileStorage = RawFileStorage; + +impl ParsedFileStorage { + /// Create a storage backed by the given registry. + pub fn with_registry(registry: Arc) -> Self { + RawFileStorage::new(RegistryParser(registry)) + } +} + +impl Default for ParsedFileStorage { + fn default() -> Self { + Self::with_registry(Arc::new(default_registry())) } } diff --git a/datadog-remote-config/src/lib.rs b/datadog-remote-config/src/lib.rs index c92fd75710..462362d160 100644 --- a/datadog-remote-config/src/lib.rs +++ b/datadog-remote-config/src/lib.rs @@ -21,6 +21,7 @@ mod targets; pub use parse::*; pub use path::*; + use { libdd_common::tag::Tag, serde::{Deserialize, Serialize}, diff --git a/datadog-remote-config/src/parse.rs b/datadog-remote-config/src/parse.rs index 48ebf55ce9..f0b9b35a48 100644 --- a/datadog-remote-config/src/parse.rs +++ b/datadog-remote-config/src/parse.rs @@ -3,89 +3,217 @@ use crate::{ config::{ - self, agent_config::AgentConfigFile, agent_task::AgentTaskFile, dynamic::DynamicConfigFile, + agent_config::{self, AgentConfigFile}, + agent_task::{self, AgentTaskFile}, + dynamic::{self, DynamicConfigFile}, }, RemoteConfigPath, RemoteConfigProduct, RemoteConfigSource, }; -#[cfg(feature = "ffe")] -use datadog_ffe::rules_based::UniversalFlagConfig; -#[cfg(feature = "live-debugger")] -use datadog_live_debugger::LiveDebuggingData; +use std::any::Any; +use std::collections::HashMap; +use std::fmt::{Debug, Display, Formatter, Result}; +/// Opaque parsed payload of a remote config product. Implemented by every type that impls +/// [`RemoteConfigContent`]; product crates do not implement this trait directly. +/// +/// Use `parsed.as_any().downcast_ref::()` to recover the concrete product type. +pub trait RemoteConfigParsedData: Any + Debug + Send + Sync + 'static { + fn as_any(&self) -> &dyn Any; +} +impl RemoteConfigParsedData for T { + fn as_any(&self) -> &dyn Any { + self + } +} + +/// Typed contract a product crate provides so the registry can build a parser for [`Self`]. +pub trait RemoteConfigContent: Any + Debug + Send + Sync + 'static { + const PRODUCT: RemoteConfigProduct; + fn parse(data: &[u8]) -> anyhow::Result + where + Self: Sized; +} + +/// A product-specific parser: converts raw bytes into a parsed payload. +pub type ProductParser = + Box anyhow::Result> + Send + Sync>; + +/// Returned by [`ParserRegistry::register`] when a parser is already registered for a product. #[derive(Debug)] -#[allow(clippy::large_enum_variant)] -pub enum RemoteConfigData { - DynamicConfig(DynamicConfigFile), - #[cfg(feature = "live-debugger")] - LiveDebugger(LiveDebuggingData), - TracerFlareConfig(AgentConfigFile), - TracerFlareTask(AgentTaskFile), - #[cfg(feature = "ffe")] - FfeFlags(UniversalFlagConfig), - Ignored(RemoteConfigProduct), +pub struct AlreadyRegistered(pub RemoteConfigProduct); + +impl Display for AlreadyRegistered { + fn fmt(&self, f: &mut Formatter<'_>) -> Result { + write!(f, "parser already registered for product {}", self.0) + } } -impl RemoteConfigData { - pub fn try_parse( +impl std::error::Error for AlreadyRegistered {} + +/// Maps [`RemoteConfigProduct`] variants to their parser functions. +/// +/// Build a registry (optionally starting from [`default_registry`]) and inject it into the file +/// storage or fetcher. Products with no registered parser yield `Ok(None)` so callers can still +/// track the config without processing its contents. +pub struct ParserRegistry { + parsers: HashMap, +} + +impl ParserRegistry { + pub fn new() -> Self { + ParserRegistry { + parsers: HashMap::new(), + } + } + + /// Register `parser` for `product`. Errors if a parser is already registered for `product` — + /// silent overwrites would mask configuration mistakes. + pub fn register( + &mut self, + product: RemoteConfigProduct, + parser: ProductParser, + ) -> std::result::Result<(), AlreadyRegistered> { + if self.parsers.contains_key(&product) { + return Err(AlreadyRegistered(product)); + } + self.parsers.insert(product, parser); + Ok(()) + } + + /// Builder-style registration of a typed [`RemoteConfigContent`] implementor. + /// Panics if `T::PRODUCT` is already registered — intended for one-shot setup at the start of + /// a process where a collision is a programmer error worth surfacing immediately. + #[must_use] + pub fn with(mut self) -> Self { + let parser: ProductParser = Box::new(|data: &[u8]| { + let parsed = T::parse(data)?; + Ok(Box::new(parsed) as Box) + }); + #[allow(clippy::expect_used)] + self.register(T::PRODUCT, parser) + .expect("ParserRegistry::with: parser already registered"); + self + } + + /// Parse `data` for `product`. Returns `Ok(None)` (not an error) when no parser is + /// registered, so callers can still track the config in their bookkeeping structures. + pub fn parse( + &self, product: RemoteConfigProduct, data: &[u8], - ) -> anyhow::Result { - Ok(match product { - RemoteConfigProduct::AgentConfig => { - RemoteConfigData::TracerFlareConfig(config::agent_config::parse_json(data)?) - } - RemoteConfigProduct::AgentTask => { - RemoteConfigData::TracerFlareTask(config::agent_task::parse_json(data)?) - } - RemoteConfigProduct::ApmTracing => { - RemoteConfigData::DynamicConfig(config::dynamic::parse_json(data)?) - } - #[cfg(feature = "live-debugger")] - RemoteConfigProduct::LiveDebugger => { - let parsed = datadog_live_debugger::parse_json(&String::from_utf8_lossy(data))?; - RemoteConfigData::LiveDebugger(parsed) - } - #[cfg(feature = "ffe")] - RemoteConfigProduct::FfeFlags => { - RemoteConfigData::FfeFlags(UniversalFlagConfig::from_json(data.to_vec())?) - } - _ => RemoteConfigData::Ignored(product), - }) + ) -> anyhow::Result>> { + match self.parsers.get(&product) { + Some(parser) => parser(data).map(Some), + None => Ok(None), + } } } -impl From<&RemoteConfigData> for RemoteConfigProduct { - fn from(value: &RemoteConfigData) -> Self { - match value { - RemoteConfigData::DynamicConfig(_) => RemoteConfigProduct::ApmTracing, - #[cfg(feature = "live-debugger")] - RemoteConfigData::LiveDebugger(_) => RemoteConfigProduct::LiveDebugger, - RemoteConfigData::TracerFlareConfig(_) => RemoteConfigProduct::AgentConfig, - RemoteConfigData::TracerFlareTask(_) => RemoteConfigProduct::AgentTask, - #[cfg(feature = "ffe")] - RemoteConfigData::FfeFlags(_) => RemoteConfigProduct::FfeFlags, - RemoteConfigData::Ignored(product) => *product, - } +impl Default for ParserRegistry { + fn default() -> Self { + Self::new() } } -#[derive(Debug)] +// ── RemoteConfigContent impls for RC-internal product types ─────────────────── + +impl RemoteConfigContent for AgentConfigFile { + const PRODUCT: RemoteConfigProduct = RemoteConfigProduct::AgentConfig; + + fn parse(data: &[u8]) -> anyhow::Result { + Ok(agent_config::parse_json(data)?) + } +} + +impl RemoteConfigContent for AgentTaskFile { + const PRODUCT: RemoteConfigProduct = RemoteConfigProduct::AgentTask; + + fn parse(data: &[u8]) -> anyhow::Result { + Ok(agent_task::parse_json(data)?) + } +} + +impl RemoteConfigContent for DynamicConfigFile { + const PRODUCT: RemoteConfigProduct = RemoteConfigProduct::ApmTracing; + + fn parse(data: &[u8]) -> anyhow::Result { + Ok(dynamic::parse_json(data)?) + } +} + +/// Returns a registry pre-loaded with parsers for the RC-internal products. +/// +/// Consumers that need additional product parsers (live-debugger, FFE, …) should chain +/// [`ParserRegistry::with`] on the returned registry. +pub fn default_registry() -> ParserRegistry { + ParserRegistry::new() + .with::() + .with::() + .with::() +} + +// ── RemoteConfigValue ───────────────────────────────────────────────────────── + pub struct RemoteConfigValue { pub source: RemoteConfigSource, - pub data: RemoteConfigData, + pub product: RemoteConfigProduct, + pub data: Option>, pub config_id: String, pub name: String, } +impl Debug for RemoteConfigValue { + fn fmt(&self, f: &mut Formatter<'_>) -> Result { + f.debug_struct("RemoteConfigValue") + .field("source", &self.source) + .field("product", &self.product) + .field("config_id", &self.config_id) + .field("name", &self.name) + .finish() + } +} + impl RemoteConfigValue { - pub fn try_parse(path: &str, data: &[u8]) -> anyhow::Result { + pub fn try_parse(path: &str, data: &[u8], registry: &ParserRegistry) -> anyhow::Result { let path = RemoteConfigPath::try_parse(path)?; - let data = RemoteConfigData::try_parse(path.product, data)?; + let data = registry.parse(path.product, data)?; Ok(RemoteConfigValue { source: path.source, + product: path.product, data, config_id: path.config_id.to_string(), name: path.name.to_string(), }) } } + +#[cfg(test)] +mod tests { + use super::*; + + fn noop_parser() -> ProductParser { + Box::new(|_data: &[u8]| anyhow::bail!("not invoked in this test")) + } + + #[test] + fn parse_returns_none_for_unregistered_product() { + let registry = ParserRegistry::new(); + let parsed = registry + .parse(RemoteConfigProduct::AsmFeatures, b"{}") + .expect("parse must not error for unregistered products"); + assert!(parsed.is_none()); + } + + #[test] + fn register_rejects_duplicate_product() { + let mut registry = ParserRegistry::new(); + registry + .register(RemoteConfigProduct::AgentTask, noop_parser()) + .expect("first registration succeeds"); + + let err = registry + .register(RemoteConfigProduct::AgentTask, noop_parser()) + .expect_err("second registration for the same product must fail"); + assert_eq!(err.0, RemoteConfigProduct::AgentTask); + } +} diff --git a/datadog-remote-config/src/path.rs b/datadog-remote-config/src/path.rs index 35bcae8f29..68af47ac08 100644 --- a/datadog-remote-config/src/path.rs +++ b/datadog-remote-config/src/path.rs @@ -24,7 +24,6 @@ pub enum RemoteConfigProduct { AsmDD, AsmFeatures, FfeFlags, - #[cfg(feature = "live-debugger")] LiveDebugger, } @@ -39,7 +38,6 @@ impl Display for RemoteConfigProduct { RemoteConfigProduct::AsmDD => "ASM_DD", RemoteConfigProduct::AsmFeatures => "ASM_FEATURES", RemoteConfigProduct::FfeFlags => "FFE_FLAGS", - #[cfg(feature = "live-debugger")] RemoteConfigProduct::LiveDebugger => "LIVE_DEBUGGING", }; write!(f, "{str}") @@ -90,7 +88,6 @@ impl RemoteConfigPath { "ASM_DD" => RemoteConfigProduct::AsmDD, "ASM_FEATURES" => RemoteConfigProduct::AsmFeatures, "FFE_FLAGS" => RemoteConfigProduct::FfeFlags, - #[cfg(feature = "live-debugger")] "LIVE_DEBUGGING" => RemoteConfigProduct::LiveDebugger, product => anyhow::bail!("Unknown product {}", product), }, diff --git a/datadog-sidecar/Cargo.toml b/datadog-sidecar/Cargo.toml index 5c4ffffbf0..80f2662a7d 100644 --- a/datadog-sidecar/Cargo.toml +++ b/datadog-sidecar/Cargo.toml @@ -25,7 +25,7 @@ libdd-telemetry = { path = "../libdd-telemetry", features = ["tracing"] } libdd-data-pipeline = { path = "../libdd-data-pipeline" } libdd-trace-utils = { path = "../libdd-trace-utils" } libdd-trace-stats = { path = "../libdd-trace-stats" } -datadog-remote-config = { path = "../datadog-remote-config" , features = ["live-debugger"]} +datadog-remote-config = { path = "../datadog-remote-config" } datadog-live-debugger = { path = "../datadog-live-debugger" } libdd-crashtracker = { path = "../libdd-crashtracker" } libdd-dogstatsd-client = { path = "../libdd-dogstatsd-client" } diff --git a/datadog-sidecar/src/shm_remote_config.rs b/datadog-sidecar/src/shm_remote_config.rs index 98114fe1e2..63610cb563 100644 --- a/datadog-sidecar/src/shm_remote_config.rs +++ b/datadog-sidecar/src/shm_remote_config.rs @@ -11,12 +11,16 @@ use base64::prelude::BASE64_URL_SAFE_NO_PAD; use base64::Engine; use datadog_ipc::platform::{FileBackedHandle, MappedMem, NamedShmHandle}; use datadog_ipc::rate_limiter::ShmLimiter; +use datadog_live_debugger::LiveDebuggingData; use datadog_remote_config::config::dynamic::{parse_json, Configs}; use datadog_remote_config::fetch::{ ConfigInvariants, FileRefcountData, FileStorage, MultiTargetFetcher, MultiTargetHandlers, MultiTargetStats, NotifyTarget, ProductCapabilities, RefcountedFile, }; -use datadog_remote_config::{RemoteConfigPath, RemoteConfigProduct, RemoteConfigValue, Target}; +use datadog_remote_config::{ + default_registry, ParserRegistry, RemoteConfigPath, RemoteConfigProduct, RemoteConfigValue, + Target, +}; use libdd_common::{tag::Tag, MutexExt}; use priority_queue::PriorityQueue; use sha2::{Digest, Sha224}; @@ -526,7 +530,7 @@ impl ShmRemoteConfigs { } } -fn read_config(path: &str) -> anyhow::Result<(RemoteConfigValue, u32)> { +fn read_config(path: &str, registry: &ParserRegistry) -> anyhow::Result<(RemoteConfigValue, u32)> { if let [shm_path, limiter, rc_path] = &path.split(':').collect::>()[..] { let mapped = NamedShmHandle::open(&CString::new(*shm_path)?)?.map()?; let rc_path = String::from_utf8(BASE64_URL_SAFE_NO_PAD.decode(rc_path)?)?; @@ -534,7 +538,7 @@ fn read_config(path: &str) -> anyhow::Result<(RemoteConfigValue, u32)> { #[cfg(windows)] let data = &data[4..(4 + u32::from_ne_bytes((&data[0..4]).try_into()?) as usize)]; Ok(( - RemoteConfigValue::try_parse(&rc_path, data)?, + RemoteConfigValue::try_parse(&rc_path, data, registry)?, u32::from_str(limiter)?, )) } else { @@ -553,6 +557,7 @@ fn read_config(path: &str) -> anyhow::Result<(RemoteConfigValue, u32)> { /// once. They will always be Remove()d first, then Add()ed again upon update. pub struct RemoteConfigManager { invariants: ConfigInvariants, + registry: Arc, active_target: Option>, pub active_reader: Option, encountered_targets: HashMap, (RemoteConfigReader, Vec)>, @@ -576,8 +581,17 @@ pub enum RemoteConfigUpdate { impl RemoteConfigManager { pub fn new(invariants: ConfigInvariants) -> RemoteConfigManager { + let registry = default_registry().with::(); + Self::new_with_registry(invariants, Arc::new(registry)) + } + + pub fn new_with_registry( + invariants: ConfigInvariants, + registry: Arc, + ) -> RemoteConfigManager { RemoteConfigManager { invariants, + registry, active_target: None, active_reader: None, encountered_targets: Default::default(), @@ -662,12 +676,12 @@ impl RemoteConfigManager { while let Some(config) = self.last_read_configs.pop() { if let Entry::Vacant(entry) = self.active_configs.entry(config) { - match read_config(entry.key()) { + match read_config(entry.key(), &self.registry) { Ok((parsed, limiter_index)) => { trace!("Adding remote config file {}: {:?}", entry.key(), parsed); entry.insert(RemoteConfigPath { source: parsed.source, - product: (&parsed.data).into(), + product: parsed.product, config_id: parsed.config_id.clone(), name: parsed.name.clone(), }); @@ -756,9 +770,11 @@ impl RemoteConfigManager { #[cfg(test)] mod tests { use super::*; - use datadog_remote_config::config::dynamic::{tests::dummy_dynamic_config, Configs}; + use datadog_remote_config::config::dynamic::{ + tests::dummy_dynamic_config, Configs, DynamicConfigFile, + }; use datadog_remote_config::fetch::test_server::RemoteConfigServer; - use datadog_remote_config::{RemoteConfigData, RemoteConfigProduct, RemoteConfigSource}; + use datadog_remote_config::{RemoteConfigProduct, RemoteConfigSource}; use manual_future::ManualFuture; use std::sync::LazyLock; @@ -776,6 +792,13 @@ mod tests { name: "config".to_string(), }); + static PATH_LIVE_DEBUGGER: LazyLock = LazyLock::new(|| RemoteConfigPath { + source: RemoteConfigSource::Employee, + product: RemoteConfigProduct::LiveDebugger, + config_id: "ld-1".to_string(), + name: "config".to_string(), + }); + static DUMMY_TARGET: LazyLock> = LazyLock::new(|| { Arc::new(Target { service: "service".to_string(), @@ -868,9 +891,10 @@ mod tests { assert_eq!(value.config_id, PATH_FIRST.config_id); assert_eq!(value.source, PATH_FIRST.source); assert_eq!(value.name, PATH_FIRST.name); - if let RemoteConfigData::DynamicConfig(data) = value.data { + let parsed = value.data.as_deref().expect("dynamic config must parse"); + if let Some(cfg) = parsed.as_any().downcast_ref::() { assert!(matches!( - >::from(data.lib_config)[0], + >::from(cfg.lib_config.clone())[0], Configs::TracingEnabled(true) )); } else { @@ -999,4 +1023,73 @@ mod tests { assert!(matches!(manager.fetch_update(), RemoteConfigUpdate::None)); } + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn test_live_debugger_config_parsed() { + use datadog_live_debugger::LiveDebuggingData; + + let server = RemoteConfigServer::spawn(); + + // The callback is mandatory but irrelevant here — we don't synchronize on teardown. + let shm = ShmRemoteConfigs::new( + server.dummy_options().invariants, + Box::new(|| {}), + Duration::from_millis(10), + ); + + let mut manager = RemoteConfigManager::new(server.dummy_options().invariants); + + // Minimal valid `ServiceConfiguration` payload — `LiveDebuggingData::ServiceConfiguration` + // only requires `id` and `type` to parse. + let live_debugger_json = r#"{"id":"ld-1","type":"SERVICE_CONFIGURATION"}"#; + server.files.lock().unwrap().insert( + PATH_LIVE_DEBUGGER.clone(), + ( + vec![DUMMY_TARGET.clone()], + 1, + live_debugger_json.to_string(), + ), + ); + + manager.track_target(&DUMMY_TARGET); + + let (sender, mut receiver) = tokio::sync::mpsc::channel(1); + let _shm_guard = shm.add_runtime( + "3b43524b-a70c-45dc-921d-34504e50c5eb".to_string(), + NotifyDummy(Arc::new(sender)), + DUMMY_TARGET.env.to_string(), + DUMMY_TARGET.service.to_string(), + DUMMY_TARGET.app_version.to_string(), + DUMMY_TARGET.tags.clone(), + ProductCapabilities { + products: server.dummy_options().products, + capabilities: server.dummy_options().capabilities, + }, + DynamicInstrumentationConfigState::Enabled, + DUMMY_TARGET.process_tags.clone(), + ); + + receiver.recv().await; + + if let RemoteConfigUpdate::Add { value, .. } = manager.fetch_update() { + assert_eq!(value.config_id, PATH_LIVE_DEBUGGER.config_id); + assert_eq!( + value.product, + RemoteConfigProduct::LiveDebugger, + "must be parsed as LiveDebugger, not skipped" + ); + let data = value.data.as_deref().expect("LiveDebugger must parse"); + let parsed = data + .as_any() + .downcast_ref::() + .expect("downcast to LiveDebuggingData should succeed"); + match parsed { + LiveDebuggingData::ServiceConfiguration(sc) => assert_eq!(sc.id, "ld-1"), + LiveDebuggingData::Probe(_) => unreachable!("expected ServiceConfiguration"), + } + } else { + unreachable!("expected RemoteConfigUpdate::Add for the LiveDebugger config"); + } + } } diff --git a/libdd-tracer-flare/src/lib.rs b/libdd-tracer-flare/src/lib.rs index 3867dc3525..bb23dccb20 100644 --- a/libdd-tracer-flare/src/lib.rs +++ b/libdd-tracer-flare/src/lib.rs @@ -17,7 +17,10 @@ use std::{ }, }; -use datadog_remote_config::{config::agent_task::AgentTaskFile, RemoteConfigData}; +use datadog_remote_config::{ + config::{agent_config::AgentConfigFile, agent_task::AgentTaskFile}, + RemoteConfigParsedData, +}; use crate::error::FlareError; #[cfg(feature = "listener")] @@ -25,7 +28,7 @@ use { datadog_remote_config::{ fetch::{ConfigInvariants, ConfigOptions, SingleChangesFetcher}, file_change_tracker::Change, - file_storage::{ParsedFileStorage, RawFile, RawFileStorage}, + file_storage::{ParsedFileStorage, RawFile}, RemoteConfigProduct, Target, }, libdd_common::Endpoint, @@ -216,7 +219,7 @@ impl TracerFlareManager { /// * `FlareError(msg)` - If something fails. pub fn handle_remote_config_data( &self, - data: &RemoteConfigData, + data: &dyn RemoteConfigParsedData, ) -> Result { let action = data.try_into(); if let Ok(FlareAction::Set(_)) = action { @@ -250,7 +253,8 @@ impl TracerFlareManager { file: RemoteConfigFile, ) -> Result { match file.contents().as_ref() { - Ok(data) => self.handle_remote_config_data(data), + Ok(Some(data)) => self.handle_remote_config_data(data.as_ref()), + Ok(None) => Ok(FlareAction::None), Err(e) => { // If encounter an error we need to stop collecting self.collecting.store(false, Ordering::Relaxed); @@ -360,9 +364,10 @@ impl TryFrom<&str> for LogLevel { } #[cfg(feature = "listener")] -pub type RemoteConfigFile = std::sync::Arc>>; +pub type RemoteConfigFile = + std::sync::Arc>>>>; #[cfg(feature = "listener")] -pub type Listener = SingleChangesFetcher>>; +pub type Listener = SingleChangesFetcher; #[cfg(feature = "listener")] impl TryFrom for FlareAction { @@ -381,44 +386,29 @@ impl TryFrom for FlareAction { /// * `FlareError(msg)` - If something fail. fn try_from(file: RemoteConfigFile) -> Result { match file.contents().as_ref() { - Ok(data) => data.try_into(), + Ok(Some(data)) => data.as_ref().try_into(), + Ok(None) => Ok(FlareAction::None), Err(e) => Err(FlareError::ParsingError(e.to_string())), } } } -impl TryFrom<&RemoteConfigData> for FlareAction { +impl TryFrom<&dyn RemoteConfigParsedData> for FlareAction { type Error = FlareError; - /// Check the `&RemoteConfigData` and return the action the tracer flare - /// needs to perform. - /// - /// # Arguments - /// - /// * `data` - &RemoteConfigData - /// - /// # Returns - /// - /// * `Ok(FlareAction)` - If successful - /// * `FlareError(msg)` - If something fails - fn try_from(data: &RemoteConfigData) -> Result { - match data { - RemoteConfigData::TracerFlareConfig(agent_config) => { - if agent_config.name.starts_with("flare-log-level.") { - if let Some(log_level) = &agent_config.config.log_level { - let log_level = log_level.as_str().try_into()?; - return Ok(FlareAction::Set(log_level)); - } + fn try_from(data: &dyn RemoteConfigParsedData) -> Result { + if let Some(agent_config) = data.as_any().downcast_ref::() { + if agent_config.name.starts_with("flare-log-level.") { + if let Some(log_level) = &agent_config.config.log_level { + let log_level = log_level.as_str().try_into()?; + return Ok(FlareAction::Set(log_level)); } } - RemoteConfigData::TracerFlareTask(agent_task) => { - if agent_task.task_type.eq("tracer_flare") { - return Ok(FlareAction::Send(agent_task.to_owned())); - } + } else if let Some(agent_task) = data.as_any().downcast_ref::() { + if agent_task.task_type.eq("tracer_flare") { + return Ok(FlareAction::Send(agent_task.to_owned())); } - _ => return Ok(FlareAction::None), } - Ok(FlareAction::None) } } @@ -495,14 +485,14 @@ pub async fn run_remote_config_listener( } } else if let Change::Remove(file) = change { match file.contents().as_ref() { - Ok(data) => match data { - RemoteConfigData::TracerFlareConfig(_) => { - if state == FlareAction::None { - state = FlareAction::Unset; - } + Ok(Some(data)) => { + if data.as_any().downcast_ref::().is_some() + && state == FlareAction::None + { + state = FlareAction::Unset; } - _ => continue, - }, + } + Ok(None) => {} Err(e) => { return Err(FlareError::ParsingError(e.to_string())); } @@ -528,7 +518,7 @@ pub async fn run_remote_config_listener( mod tests { #[cfg(feature = "listener")] use crate::FlareAction; - use crate::{FlareError, LogLevel, RemoteConfigData, TracerFlareManager}; + use crate::{FlareError, LogLevel, TracerFlareManager}; #[cfg(feature = "listener")] use datadog_remote_config::{ config::{ @@ -722,7 +712,8 @@ mod tests { #[test] fn test_remote_config_task_with_wrong_type_returns_none() { - let data = RemoteConfigData::TracerFlareTask(AgentTaskFile { + use datadog_remote_config::config::agent_task::{AgentTask, AgentTaskFile}; + let data = AgentTaskFile { args: AgentTask { case_id: "123".to_string(), hostname: "test-host".to_string(), @@ -730,19 +721,21 @@ mod tests { }, task_type: "not_tracer_flare".to_string(), uuid: "test-uuid".to_string(), - }); - - let result = FlareAction::try_from(&data); + }; + let tracer_flare = TracerFlareManager::new("http://localhost:8126", "rust"); + let result = tracer_flare.handle_remote_config_data(&data); assert!(result.is_ok()); assert_eq!(result.unwrap(), FlareAction::None); } #[test] fn test_handle_remote_config_data_send_stops_collecting() { + use datadog_remote_config::config::agent_task::{AgentTask, AgentTaskFile}; + use std::sync::atomic::Ordering; let tracer_flare = TracerFlareManager::new("http://localhost:8126", "rust"); tracer_flare.collecting.store(true, Ordering::Relaxed); - let data = RemoteConfigData::TracerFlareTask(AgentTaskFile { + let data = AgentTaskFile { args: AgentTask { case_id: "123".to_string(), hostname: "test-host".to_string(), @@ -750,7 +743,7 @@ mod tests { }, task_type: "tracer_flare".to_string(), uuid: "test-uuid".to_string(), - }); + }; let result = tracer_flare.handle_remote_config_data(&data).unwrap(); assert!(matches!(result, FlareAction::Send(_)));