diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 5df29c4682..0285d09dad 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -2449,11 +2449,15 @@ dependencies = [ "numaflow-daemon", "numaflow-monitor", "numaflow-sideinput", + "opentelemetry", + "opentelemetry-otlp", + "opentelemetry_sdk", "rustls 0.23.31", "serving", "tokio", "tokio-util", "tracing", + "tracing-opentelemetry", "tracing-subscriber", ] @@ -2516,6 +2520,8 @@ dependencies = [ "numaflow-sqs", "numaflow-testing", "numaflow-throttling", + "opentelemetry", + "opentelemetry_sdk", "parking_lot", "pin-project", "prometheus-client", @@ -2538,6 +2544,7 @@ dependencies = [ "tonic", "tower", "tracing", + "tracing-opentelemetry", "tracing-subscriber", "trait-variant", "zstd", @@ -2850,6 +2857,82 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opentelemetry" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b84bcd6ae87133e903af7ef497404dda70c60d0ea14895fc8a5e6722754fc2a0" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "pin-project-lite", + "thiserror 2.0.17", + "tracing", +] + +[[package]] +name = "opentelemetry-http" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7a6d09a73194e6b66df7c8f1b680f156d916a1a942abf2de06823dd02b7855d" +dependencies = [ + "async-trait", + "bytes", + "http 1.3.1", + "opentelemetry", + "reqwest 0.12.22", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.31.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f69cd6acbb9af919df949cd1ec9e5e7fdc2ef15d234b6b795aaa525cc02f71f" +dependencies = [ + "http 1.3.1", + "opentelemetry", + "opentelemetry-http", + "opentelemetry-proto", + "opentelemetry_sdk", + "prost 0.14.1", + "reqwest 0.12.22", + "thiserror 2.0.17", + "tokio", + "tonic", + "tracing", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7175df06de5eaee9909d4805a3d07e28bb752c34cab57fa9cff549da596b30f" +dependencies = [ + "opentelemetry", + "opentelemetry_sdk", + "prost 0.14.1", + "tonic", + "tonic-prost", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e14ae4f5991976fd48df6d843de219ca6d31b01daaab2dad5af2badeded372bd" +dependencies = [ + "futures-channel", + "futures-executor", + "futures-util", + "opentelemetry", + "percent-encoding", + "rand 0.9.4", + "thiserror 2.0.17", + "tokio", + "tokio-stream", +] + [[package]] name = "ordered-float" version = "2.10.1" @@ -3575,6 +3658,7 @@ checksum = "cbc931937e6ca3a06e3b6c0aa7841849b160a90351d6ab467a8b9b9959767531" dependencies = [ "base64 0.22.1", "bytes", + "futures-channel", "futures-core", "futures-util", "h2 0.4.11", @@ -4760,9 +4844,9 @@ checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tracing" -version = "0.1.41" +version = "0.1.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" +checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" dependencies = [ "log", "pin-project-lite", @@ -4772,9 +4856,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.30" +version = "0.1.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81383ab64e72a7a8b8e13130c49e3dab29def6d0c7d76a03087b3cf71c5c6903" +checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" dependencies = [ "proc-macro2", "quote", @@ -4783,9 +4867,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.34" +version = "0.1.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9d12581f227e93f094d3af2ae690a574abb8a2b9b7a96e7cfe9647b2b617678" +checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a" dependencies = [ "once_cell", "valuable", @@ -4802,6 +4886,22 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.32.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ac28f2d093c6c477eaa76b23525478f38de514fa9aeb1285738d4b97a9552fc" +dependencies = [ + "js-sys", + "opentelemetry", + "smallvec", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", + "web-time", +] + [[package]] name = "tracing-serde" version = "0.2.0" @@ -4814,9 +4914,9 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.3.20" +version = "0.3.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2054a14f5307d601f88daf0553e1cbf472acc4f2c51afab632431cdcd72124d5" +checksum = "cb7f578e5945fb242538965c2d0b04418d38ec25c79d160cd279bf0731c8d319" dependencies = [ "matchers", "nu-ansi-term", diff --git a/rust/Cargo.toml b/rust/Cargo.toml index dd9048f708..ff88f973e8 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -83,6 +83,10 @@ tokio-stream = "0.1.17" tokio-util = { version = "0.7", features = ["rt"] } bytes = "1.11.1" tracing = "0.1.41" +tracing-opentelemetry = "0.32" +opentelemetry = "0.31" +opentelemetry_sdk = "0.31" +opentelemetry-otlp = { version = "0.31", features = ["grpc-tonic"] } async-nats = "0.44.2" thiserror = "2.0.12" axum = { version = "0.8.4", default-features = false, features = [ diff --git a/rust/numaflow-core/Cargo.toml b/rust/numaflow-core/Cargo.toml index c620804703..09da949bfc 100644 --- a/rust/numaflow-core/Cargo.toml +++ b/rust/numaflow-core/Cargo.toml @@ -19,6 +19,9 @@ tokio.workspace = true tokio-stream.workspace = true tokio-util.workspace = true tracing.workspace = true +tracing-opentelemetry.workspace = true +opentelemetry = { workspace = true } +opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] } numaflow-pulsar.workspace = true numaflow-monitor.workspace = true numaflow-nats.workspace = true diff --git a/rust/numaflow-core/src/shared.rs b/rust/numaflow-core/src/shared.rs index 07e3e0bedc..568c73d39f 100644 --- a/rust/numaflow-core/src/shared.rs +++ b/rust/numaflow-core/src/shared.rs @@ -11,6 +11,11 @@ pub(crate) mod create_components; /// Shared methods for forwarding messages. pub(crate) mod forward; +/// OpenTelemetry propagation helpers for distributed tracing. +/// Functions in this module are consumed by span creation code in source, mapper, and sinker. +#[allow(dead_code)] +pub(crate) mod otel; + /// Test server framework: helpers for spinning up numaflow SDK gRPC servers in tests. #[cfg(test)] pub(crate) mod test_utils; diff --git a/rust/numaflow-core/src/shared/otel.rs b/rust/numaflow-core/src/shared/otel.rs new file mode 100644 index 0000000000..d08915cdff --- /dev/null +++ b/rust/numaflow-core/src/shared/otel.rs @@ -0,0 +1,478 @@ +//! OpenTelemetry propagation helpers for Numaflow message metadata. +//! +//! Adapts `KeyValueGroup` (sys_metadata) as a carrier for W3C Trace Context. +//! The propagator uses these getter/setter implementations to read and write +//! `traceparent` and `tracestate` without knowing about our protobuf types. + +use std::collections::HashMap; +use std::sync::Arc; + +use opentelemetry::global; +use opentelemetry::propagation::{Extractor, Injector}; + +use crate::metadata::KeyValueGroup; + +/// Key under which W3C trace context is stored in `sys_metadata`. +/// Always holds the shared `platform.process` parent context for downstream +/// platform spans (for example source, map, and sink spans). +pub const TRACING_METADATA_KEY: &str = "tracing"; + +/// Key under which the current stage's span context is stored for UDF consumption. +/// The UDF reads this key to see the platform stage (e.g., map) as its parent. +/// Written before calling the UDF, removed after the UDF returns. +pub const TRACING_UDF_METADATA_KEY: &str = "tracing_udf"; + +/// Wraps `KeyValueGroup` for **extraction**: the propagator calls `get` / `keys` +/// to read `traceparent` and `tracestate` from the carrier. +pub struct MetadataExtractor<'a>(pub &'a KeyValueGroup); + +impl Extractor for MetadataExtractor<'_> { + fn get(&self, key: &str) -> Option<&str> { + self.0 + .key_value + .get(key) + .and_then(|b| std::str::from_utf8(b.as_ref()).ok()) + } + + fn keys(&self) -> Vec<&str> { + self.0.key_value.keys().map(String::as_str).collect() + } +} + +/// Wraps `KeyValueGroup` for **injection**: the propagator calls `set` to write +/// `traceparent` and `tracestate` into the carrier. +pub struct MetadataInjector<'a>(pub &'a mut KeyValueGroup); + +impl Injector for MetadataInjector<'_> { + fn set(&mut self, key: &str, value: String) { + self.0 + .key_value + .insert(key.to_string(), bytes::Bytes::from(value.into_bytes())); + } +} + +/// Extracts an OpenTelemetry [`Context`] from a message's sys_metadata. +/// Returns a fresh root context if the tracing key is absent. +pub(crate) fn extract_trace_context( + metadata: &crate::metadata::Metadata, +) -> opentelemetry::Context { + match metadata.sys_metadata.get(TRACING_METADATA_KEY) { + Some(kvg) => { + let extractor = MetadataExtractor(kvg); + global::get_text_map_propagator(|prop| prop.extract(&extractor)) + } + // No propagated trace: return a fresh root context rather than + // `Context::current()`, which would inherit the ambient context of the + // current task (e.g. a surrounding span) and graft this message onto + // an unrelated parent. + None => opentelemetry::Context::new(), + } +} + +/// Injects a specific OpenTelemetry [`Context`] into a named sys_metadata key. +/// +/// Used to inject `platform.process` context into `"tracing"` (preserving the root) +/// and stage-specific context into `"tracing_udf"` (for UDF parent). +pub(crate) fn inject_context_into_metadata( + metadata: &mut crate::metadata::Metadata, + key: &str, + cx: &opentelemetry::Context, +) { + let kvg = metadata + .sys_metadata + .entry(key.to_string()) + .or_insert_with(|| KeyValueGroup { + key_value: HashMap::new(), + }); + let mut injector = MetadataInjector(kvg); + global::get_text_map_propagator(|prop| { + prop.inject_context(cx, &mut injector); + }); +} + +/// Wraps `HashMap` (message headers) for **extraction**. +/// Used to extract W3C trace context from incoming message headers +/// (e.g., Kafka headers passed through by source connectors). +pub struct HeaderExtractor<'a>(pub &'a HashMap); + +impl Extractor for HeaderExtractor<'_> { + fn get(&self, key: &str) -> Option<&str> { + // Try exact match first, then case-insensitive (Kafka headers are case-sensitive + // but upstream producers may use varying cases for B3 headers) + self.0.get(key).map(String::as_str).or_else(|| { + let lower = key.to_lowercase(); + self.0 + .iter() + .find(|(k, _)| k.to_lowercase() == lower) + .map(|(_, v)| v.as_str()) + }) + } + + fn keys(&self) -> Vec<&str> { + self.0.keys().map(String::as_str).collect() + } +} + +/// Extracts an OpenTelemetry [`Context`] from incoming message headers. +/// +/// Checks for W3C `traceparent` first, then B3 multi-headers +/// (`X-B3-TraceId`, `X-B3-SpanId`, `X-B3-Sampled`, `X-B3-Flags`). If B3 headers are found, +/// converts them to W3C `traceparent` format before extraction. +/// +/// Returns a fresh root context if no trace headers are present. +pub(crate) fn extract_trace_context_from_headers( + headers: &Arc>, +) -> opentelemetry::Context { + // Check for W3C traceparent header first + if get_header_case_insensitive(headers, "traceparent").is_some() { + let extractor = HeaderExtractor(headers.as_ref()); + return global::get_text_map_propagator(|prop| prop.extract(&extractor)); + } + + // Check for B3 multi-headers and convert to W3C traceparent + let trace_id = get_header_case_insensitive(headers, "X-B3-TraceId"); + let span_id = get_header_case_insensitive(headers, "X-B3-SpanId"); + + if let (Some(trace_id), Some(span_id)) = (trace_id, span_id) { + let sampled = get_header_case_insensitive(headers, "X-B3-Sampled"); + // B3 debug is carried in X-B3-Flags: 1 (not in X-B3-Sampled). + // If present, it forces recording regardless of the sampled header. + let debug = get_header_case_insensitive(headers, "X-B3-Flags") == Some("1"); + let traceparent = b3_to_traceparent(trace_id, span_id, sampled, debug); + + let mut synthetic = HashMap::new(); + synthetic.insert("traceparent".to_string(), traceparent); + let extractor = HeaderExtractor(&synthetic); + return global::get_text_map_propagator(|prop| prop.extract(&extractor)); + } + + // No upstream trace headers: fresh root context, not the ambient one. + opentelemetry::Context::new() +} + +/// Case-insensitive header lookup. +fn get_header_case_insensitive<'a>( + headers: &'a HashMap, + key: &str, +) -> Option<&'a str> { + headers.get(key).map(String::as_str).or_else(|| { + let lower = key.to_lowercase(); + headers + .iter() + .find(|(k, _)| k.to_lowercase() == lower) + .map(|(_, v)| v.as_str()) + }) +} + +/// Converts B3 multi-header values to W3C traceparent format. +/// +/// Format: `{version}-{trace_id}-{span_id}-{trace_flags}` +/// - Pads 64-bit trace IDs to 128-bit (left-pads with zeros) +/// - Maps sampled: "1"/"true" -> "01", "0"/"false"/absent -> "00" +/// - Maps debug (`X-B3-Flags: 1`) -> "01" +/// +/// B3 treats an absent `X-B3-Sampled` header as "deferred" (downstream decides), +/// which has no equivalent in W3C's two-state trace-flags. We collapse absent to +/// `00` (not sampled) rather than `01` to avoid unexpectedly force-sampling every +/// B3-without-sampled trace and amplifying trace volume past the configured +/// sampler ratio. +fn b3_to_traceparent(trace_id: &str, span_id: &str, sampled: Option<&str>, debug: bool) -> String { + let padded_trace_id = if trace_id.len() <= 16 { + format!("{:0>32}", trace_id) + } else { + trace_id.to_string() + }; + + let flags = if debug { + "01" // B3 debug forces recording + } else { + match sampled { + Some("1" | "true") => "01", + Some("0" | "false") => "00", + _ => "00", // absent: B3 defers; don't force-sample + } + }; + + format!("00-{}-{}-{}", padded_trace_id, span_id, flags) +} + +#[cfg(test)] +mod tests { + use super::*; + use bytes::Bytes; + use opentelemetry::propagation::{Extractor, Injector}; + use opentelemetry::trace::TraceContextExt; + use std::sync::Once; + + /// Install the W3C propagator once per test process. Tests that rely on + /// `extract`/`inject` going through the real propagator must call this. + fn init_propagator() { + static INIT: Once = Once::new(); + INIT.call_once(|| { + opentelemetry::global::set_text_map_propagator( + opentelemetry_sdk::propagation::TraceContextPropagator::new(), + ); + }); + } + + fn kvg_with(pairs: &[(&str, &str)]) -> KeyValueGroup { + let mut kv = HashMap::new(); + for (k, v) in pairs { + kv.insert((*k).to_string(), Bytes::from(v.as_bytes().to_vec())); + } + KeyValueGroup { key_value: kv } + } + + fn context_from_traceparent(traceparent: &str) -> opentelemetry::Context { + let carrier = kvg_with(&[("traceparent", traceparent)]); + global::get_text_map_propagator(|prop| prop.extract(&MetadataExtractor(&carrier))) + } + + #[test] + fn metadata_extractor_reads_utf8_values() { + let kvg = kvg_with(&[("traceparent", "abc"), ("tracestate", "k=v")]); + let ex = MetadataExtractor(&kvg); + assert_eq!(ex.get("traceparent"), Some("abc")); + assert_eq!(ex.get("tracestate"), Some("k=v")); + assert_eq!(ex.get("missing"), None); + + let mut keys = ex.keys(); + keys.sort(); + assert_eq!(keys, vec!["traceparent", "tracestate"]); + } + + #[test] + fn metadata_extractor_returns_none_for_non_utf8() { + let mut kv = HashMap::new(); + kv.insert("traceparent".to_string(), Bytes::from(vec![0xff, 0xfe])); + let kvg = KeyValueGroup { key_value: kv }; + assert_eq!(MetadataExtractor(&kvg).get("traceparent"), None); + } + + #[test] + fn metadata_injector_sets_value() { + let mut kvg = KeyValueGroup { + key_value: HashMap::new(), + }; + MetadataInjector(&mut kvg).set("traceparent", "tp-value".to_string()); + assert_eq!( + kvg.key_value.get("traceparent").map(|b| b.as_ref()), + Some(b"tp-value".as_slice()) + ); + } + + #[test] + fn extract_trace_context_absent_key_returns_fresh_root() { + init_propagator(); + let ambient_cx = + context_from_traceparent("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01"); + let ambient_sc = ambient_cx.span().span_context().clone(); + let _ambient_guard = ambient_cx.attach(); + + let meta = crate::metadata::Metadata::default(); + let cx = extract_trace_context(&meta); + let span = cx.span(); + let sc = span.span_context(); + + // Fresh root: no valid remote span context and no inheritance from the + // ambient test context. + assert!(!sc.is_valid()); + assert_ne!(sc.trace_id(), ambient_sc.trace_id()); + assert_ne!(sc.span_id(), ambient_sc.span_id()); + } + + #[test] + fn extract_trace_context_roundtrips_with_inject() { + init_propagator(); + + // Build a parent context from a known traceparent so we can assert on specific IDs. + let parent_tp = "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01"; + let parent_kvg = kvg_with(&[("traceparent", parent_tp)]); + let parent_cx = + global::get_text_map_propagator(|prop| prop.extract(&MetadataExtractor(&parent_kvg))); + let parent_sc = { + let parent_span = parent_cx.span(); + parent_span.span_context().clone() + }; + assert!(parent_sc.is_valid(), "parent must parse to valid context"); + + // Inject into a fresh Metadata under the "tracing" key, then extract back. + let mut meta = crate::metadata::Metadata::default(); + inject_context_into_metadata(&mut meta, TRACING_METADATA_KEY, &parent_cx); + assert!(meta.sys_metadata.contains_key(TRACING_METADATA_KEY)); + + let extracted_cx = extract_trace_context(&meta); + let extracted_span = extracted_cx.span(); + let extracted_sc = extracted_span.span_context(); + assert!(extracted_sc.is_valid()); + assert_eq!(extracted_sc.trace_id(), parent_sc.trace_id()); + assert_eq!(extracted_sc.span_id(), parent_sc.span_id()); + } + + #[test] + fn header_extractor_case_insensitive_fallback() { + let mut h = HashMap::new(); + h.insert("X-B3-TraceId".to_string(), "abc".to_string()); + + let ex = HeaderExtractor(&h); + assert_eq!(ex.get("X-B3-TraceId"), Some("abc")); // exact match + assert_eq!(ex.get("x-b3-traceid"), Some("abc")); // case-insensitive + assert_eq!(ex.get("missing"), None); + } + + #[test] + fn extract_from_headers_no_trace_returns_fresh_root() { + init_propagator(); + let ambient_cx = + context_from_traceparent("00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"); + let ambient_sc = ambient_cx.span().span_context().clone(); + let _ambient_guard = ambient_cx.attach(); + + let headers = Arc::new(HashMap::new()); + let cx = extract_trace_context_from_headers(&headers); + let span = cx.span(); + let sc = span.span_context(); + + assert!(!sc.is_valid()); + assert_ne!(sc.trace_id(), ambient_sc.trace_id()); + assert_ne!(sc.span_id(), ambient_sc.span_id()); + } + + #[test] + fn extract_from_headers_w3c_traceparent() { + init_propagator(); + let mut h = HashMap::new(); + h.insert( + "traceparent".to_string(), + "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01".to_string(), + ); + let headers = Arc::new(h); + + let cx = extract_trace_context_from_headers(&headers); + let span = cx.span(); + let sc = span.span_context(); + assert!(sc.is_valid()); + assert_eq!( + sc.trace_id().to_string(), + "0af7651916cd43dd8448eb211c80319c" + ); + assert_eq!(sc.span_id().to_string(), "b7ad6b7169203331"); + assert!(sc.is_sampled()); + } + + #[test] + fn extract_from_headers_b3_multi_header_sampled() { + init_propagator(); + let mut h = HashMap::new(); + h.insert( + "X-B3-TraceId".to_string(), + "0af7651916cd43dd8448eb211c80319c".to_string(), + ); + h.insert("X-B3-SpanId".to_string(), "b7ad6b7169203331".to_string()); + h.insert("X-B3-Sampled".to_string(), "1".to_string()); + let headers = Arc::new(h); + + let cx = extract_trace_context_from_headers(&headers); + let span = cx.span(); + let sc = span.span_context(); + assert!(sc.is_valid()); + assert_eq!( + sc.trace_id().to_string(), + "0af7651916cd43dd8448eb211c80319c" + ); + assert_eq!(sc.span_id().to_string(), "b7ad6b7169203331"); + assert!(sc.is_sampled()); + } + + #[test] + fn extract_from_headers_b3_debug_flag_forces_sampled() { + init_propagator(); + let mut h = HashMap::new(); + h.insert( + "X-B3-TraceId".to_string(), + "0af7651916cd43dd8448eb211c80319c".to_string(), + ); + h.insert("X-B3-SpanId".to_string(), "b7ad6b7169203331".to_string()); + h.insert("X-B3-Sampled".to_string(), "0".to_string()); + h.insert("X-B3-Flags".to_string(), "1".to_string()); + let headers = Arc::new(h); + + let cx = extract_trace_context_from_headers(&headers); + let span = cx.span(); + let sc = span.span_context(); + assert!(sc.is_valid()); + assert_eq!( + sc.trace_id().to_string(), + "0af7651916cd43dd8448eb211c80319c" + ); + assert_eq!(sc.span_id().to_string(), "b7ad6b7169203331"); + assert!(sc.is_sampled()); + } + + #[test] + fn extract_from_headers_b3_missing_span_id_falls_through() { + init_propagator(); + let mut h = HashMap::new(); + h.insert("X-B3-TraceId".to_string(), "abc".to_string()); + // No X-B3-SpanId. + let headers = Arc::new(h); + + let cx = extract_trace_context_from_headers(&headers); + assert!(!cx.span().span_context().is_valid()); + } + + #[test] + fn b3_pads_64bit_trace_id_to_128bit() { + let tp = b3_to_traceparent("b7ad6b7169203331", "b7ad6b7169203331", Some("1"), false); + assert_eq!( + tp, + "00-0000000000000000b7ad6b7169203331-b7ad6b7169203331-01" + ); + } + + #[test] + fn b3_preserves_128bit_trace_id() { + let tp = b3_to_traceparent( + "0af7651916cd43dd8448eb211c80319c", + "b7ad6b7169203331", + Some("1"), + false, + ); + assert_eq!( + tp, + "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01" + ); + } + + #[test] + fn b3_sampled_flag_mapping() { + let tid = "0af7651916cd43dd8448eb211c80319c"; + let sid = "b7ad6b7169203331"; + + // Sampled=1/true -> 01. + assert!(b3_to_traceparent(tid, sid, Some("1"), false).ends_with("-01")); + assert!(b3_to_traceparent(tid, sid, Some("true"), false).ends_with("-01")); + + // Sampled=0/false -> 00. + assert!(b3_to_traceparent(tid, sid, Some("0"), false).ends_with("-00")); + assert!(b3_to_traceparent(tid, sid, Some("false"), false).ends_with("-00")); + + // Absent -> 00 (B3 deferral collapsed to not-sampled to avoid volume blowup). + assert!(b3_to_traceparent(tid, sid, None, false).ends_with("-00")); + + // Unknown value -> 00 (same default as absent). + assert!(b3_to_traceparent(tid, sid, Some("weird"), false).ends_with("-00")); + } + + #[test] + fn b3_debug_flag_forces_sampled_regardless_of_sampled_header() { + let tid = "0af7651916cd43dd8448eb211c80319c"; + let sid = "b7ad6b7169203331"; + + // debug=true beats sampled=0/absent. + assert!(b3_to_traceparent(tid, sid, Some("0"), true).ends_with("-01")); + assert!(b3_to_traceparent(tid, sid, None, true).ends_with("-01")); + // debug=false + sampled=1 still samples. + assert!(b3_to_traceparent(tid, sid, Some("1"), false).ends_with("-01")); + } +} diff --git a/rust/numaflow/Cargo.toml b/rust/numaflow/Cargo.toml index 8b7af39c18..360431fdd5 100644 --- a/rust/numaflow/Cargo.toml +++ b/rust/numaflow/Cargo.toml @@ -19,6 +19,10 @@ numaflow-daemon.workspace = true tokio.workspace = true tokio-util.workspace = true tracing.workspace = true +tracing-opentelemetry.workspace = true +opentelemetry = { workspace = true } +opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] } +opentelemetry-otlp = { workspace = true } rustls.workspace = true tracing-subscriber = { version = "0.3.20", features = ["env-filter", "fmt", "json"] } clap = "4.5.40" diff --git a/rust/numaflow/src/main.rs b/rust/numaflow/src/main.rs index e807c652cf..c0b6ef5fd0 100644 --- a/rust/numaflow/src/main.rs +++ b/rust/numaflow/src/main.rs @@ -15,8 +15,8 @@ mod cmdline; const VERSION_INFO: &str = env!("NUMAFLOW_VERSION_INFO"); const ENV_MONO_VERTEX_NAME: &str = "NUMAFLOW_MONO_VERTEX_NAME"; -fn main() { - setup_tracing::register(); +/// Returns `std::process::ExitCode` so destructors run (e.g., the tracer guard flushes buffered spans). +fn main() -> std::process::ExitCode { // Setup the CryptoProvider (controls core cryptography used by rustls) for the process rustls::crypto::aws_lc_rs::default_provider() .install_default() @@ -35,16 +35,31 @@ fn main() { // User may specify a higher value by setting NUMAFLOW_CPU_REQUEST in `containerTemplate.env` // section for the vertex. let cpu_core_count = env::var("NUMAFLOW_CPU_REQUEST").unwrap_or_else(|_| "1".into()); - let worker_thread_count = cpu_core_count.parse::().inspect_err(|e| { - warn!(integer_conversion_error=?e, "The value of NUMAFLOW_CPU_REQUEST environment variable should be a valid unsigned integer. Worker thread count will be set to 1"); - }).unwrap_or(1).max(1); - + // Parse the CPU request; defer logging any error until after the tracing + // subscriber is installed below so it flows through the normal log pipeline. + let cpu_parse_result = cpu_core_count.parse::(); + let worker_thread_count = cpu_parse_result.as_ref().copied().unwrap_or(1).max(1); + + // Build the Tokio runtime before initializing tracing. The OTLP tonic/gRPC + // exporter needs a Tokio runtime for its background tasks. By entering the + // runtime context first, the tonic channel binds to this runtime. let rt = runtime::Builder::new_multi_thread() .enable_all() .worker_threads(worker_thread_count) .build() .unwrap(); + // Enter the runtime context so that tracing initialization (which may create + // a tonic gRPC channel for OTLP export) can use this runtime. + // Drop order at end of main is reverse of declaration: _tracer_guard drops first + // (flushing spans via the still-live runtime), then _rt_guard, then rt. + let _rt_guard = rt.enter(); + let _tracer_guard = setup_tracing::register(); + + if let Err(ref e) = cpu_parse_result { + warn!(integer_conversion_error=?e, "NUMAFLOW_CPU_REQUEST is not a valid unsigned integer. Worker thread count will be set to 1"); + } + info!( VERSION_INFO, tokio_worker_threads = worker_thread_count, @@ -53,14 +68,18 @@ fn main() { let cli = cmdline::root_cli(); - rt.block_on(async move { - if let Err(e) = run(cli).await { + let run_result = rt.block_on(async move { run(cli).await }); + + match run_result { + Err(e) => { error!("{e:?}"); - std::process::exit(1); + std::process::ExitCode::FAILURE } - }); - - info!("Exited."); + Ok(()) => { + info!("Exited."); + std::process::ExitCode::SUCCESS + } + } } async fn run(cli: clap::Command) -> Result<(), Box> { diff --git a/rust/numaflow/src/setup_tracing.rs b/rust/numaflow/src/setup_tracing.rs index afffb0c0b0..7a19d8ad1f 100644 --- a/rust/numaflow/src/setup_tracing.rs +++ b/rust/numaflow/src/setup_tracing.rs @@ -1,3 +1,4 @@ +use opentelemetry_sdk::trace::Sampler; use tracing::Level; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; @@ -53,10 +54,153 @@ fn report_panic(panic_info: &PanicHookInfo<'_>) { }; } -pub fn register() { - // Set up the tracing subscriber. RUST_LOG can be used to set the log level. - // The default log level is `info`. The `axum::rejection=trace` enables showing - // rejections from built-in extractors at `TRACE` level. +/// Build a sampler from standard OTel environment variables. +/// +/// - `OTEL_TRACES_SAMPLER`: sampler type (default: `parentbased_always_on`) +/// - `OTEL_TRACES_SAMPLER_ARG`: sampler argument (e.g., `0.1` for 10%) +/// +/// Supported samplers: `always_on`, `always_off`, `traceidratio`, +/// `parentbased_always_on`, `parentbased_always_off`, `parentbased_traceidratio`. +fn build_sampler() -> opentelemetry_sdk::trace::Sampler { + let sampler_name = + std::env::var("OTEL_TRACES_SAMPLER").unwrap_or_else(|_| "parentbased_always_on".into()); + let sampler_arg_raw = std::env::var("OTEL_TRACES_SAMPLER_ARG").ok(); + build_sampler_from(&sampler_name, sampler_arg_raw.as_deref()) +} + +/// Pure helper that builds a [`Sampler`] from already-resolved inputs. +/// Extracted from `build_sampler` so it can be unit-tested without mutating +/// the process env. +fn build_sampler_from( + sampler_name: &str, + sampler_arg_raw: Option<&str>, +) -> opentelemetry_sdk::trace::Sampler { + let sampler_arg = sampler_arg_raw.and_then(|v| v.parse::().ok()); + + let ratio_or_default = |sampler_kind: &str| { + sampler_arg.unwrap_or_else(|| { + if let Some(raw) = sampler_arg_raw { + eprintln!( + "[setup_tracing] Invalid OTEL_TRACES_SAMPLER_ARG='{raw}' for sampler '{sampler_kind}', defaulting ratio to 1.0" + ); + } + 1.0 + }) + }; + + let sampler = match sampler_name { + "always_on" => Sampler::AlwaysOn, + "always_off" => Sampler::AlwaysOff, + "traceidratio" => Sampler::TraceIdRatioBased(ratio_or_default("traceidratio")), + "parentbased_always_on" => Sampler::ParentBased(Box::new(Sampler::AlwaysOn)), + "parentbased_always_off" => Sampler::ParentBased(Box::new(Sampler::AlwaysOff)), + "parentbased_traceidratio" => Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased( + ratio_or_default("parentbased_traceidratio"), + ))), + _ => { + eprintln!( + "[setup_tracing] Unknown sampler '{sampler_name}', defaulting to parentbased_always_on" + ); + Sampler::ParentBased(Box::new(Sampler::AlwaysOn)) + } + }; + + eprintln!( + "[setup_tracing] Sampler: {sampler_name}{}", + sampler_arg + .map(|r| format!(", ratio={r}")) + .unwrap_or_default() + ); + + sampler +} + +/// Initialize the OTLP tracing layer if `OTEL_EXPORTER_OTLP_ENDPOINT` is set. +/// Returns `None` if tracing is not configured (no env var), in which case +/// the subscriber runs with logging only. +fn init_otlp_layer( + service_name: String, +) -> Option<( + tracing_opentelemetry::OpenTelemetryLayer, + opentelemetry_sdk::trace::SdkTracerProvider, +)> +where + S: tracing::Subscriber + for<'span> tracing_subscriber::registry::LookupSpan<'span>, +{ + use opentelemetry_otlp::WithExportConfig; + + let otlp_endpoint = std::env::var("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT") + .or_else(|_| std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT")) + .ok()?; + + eprintln!( + "[setup_tracing] Configuring OTLP exporter: endpoint={otlp_endpoint}, service_name={service_name}" + ); + + let exporter = match opentelemetry_otlp::SpanExporter::builder() + .with_tonic() + .with_endpoint(&otlp_endpoint) + .build() + { + Ok(e) => e, + Err(e) => { + eprintln!("[setup_tracing] Failed to create OTLP exporter: {e}"); + return None; + } + }; + + let sampler = build_sampler(); + let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder() + .with_batch_exporter(exporter) + .with_sampler(sampler) + .with_resource( + opentelemetry_sdk::Resource::builder() + .with_service_name(service_name) + .build(), + ) + .build(); + + use opentelemetry::trace::TracerProvider as _; + let tracer = tracer_provider.tracer("numaflow"); + + // Set the global tracer provider so OTel API users (e.g., per-message sink.write + // spans created via the OTel API directly) can access it. + // `.clone()` is an Arc refcount bump: the global handle and the returned handle + // share the same underlying span queue and exporter, so `shutdown()` on either + // (done via `TracerProviderGuard`'s drop) flushes and tears down both. + opentelemetry::global::set_tracer_provider(tracer_provider.clone()); + + // Set W3C Trace Context propagator for context propagation via sys_metadata. + opentelemetry::global::set_text_map_propagator( + opentelemetry_sdk::propagation::TraceContextPropagator::new(), + ); + let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer); + + eprintln!("[setup_tracing] OTLP tracing ENABLED"); + + Some((otel_layer, tracer_provider)) +} + +/// RAII guard that flushes buffered spans by shutting down the provider +/// on drop. Must be dropped while the Tokio runtime is still alive. +#[must_use = "binding drops the guard immediately and flushes; hold it with `let _guard = ...` for the duration of the program"] +pub struct TracerProviderGuard(Option); + +impl Drop for TracerProviderGuard { + fn drop(&mut self) { + if let Some(provider) = self.0.take() + && let Err(e) = provider.shutdown() + { + eprintln!("[setup_tracing] Failed to shut down tracer provider: {e}"); + } + } +} + +/// Initialize the tracing subscriber with optional OTLP export. +/// Returns a `TracerProviderGuard` that will flush buffered spans on drop. +/// Callers must bind it (e.g., `let _guard = register();`) rather than +/// discard it, or the provider will shut down immediately. +pub fn register() -> TracerProviderGuard { let debug_mode = std::env::var("NUMAFLOW_DEBUG").is_ok_and(|v| v.to_lowercase() == "true"); let default_log_level = if debug_mode { "debug,h2::codec=info" // "h2::codec" is too noisy @@ -64,15 +208,16 @@ pub fn register() { "info" }; + // Build filtering from default directives and allow `RUST_LOG` environment variable to override. let filter = EnvFilter::builder() .with_default_directive(default_log_level.parse().unwrap_or(Level::INFO.into())) - .from_env_lossy(); // Read RUST_LOG environment variable + .from_env_lossy(); - let layer = if debug_mode { - // Text format + let fmt_layer = if debug_mode { + // Log in a human-readable format for local debugging/development. fmt::layer().boxed() } else { - // JSON format, flattened + // Log in a JSON format with flattened event fields. fmt::layer() .with_ansi(false) .json() @@ -80,10 +225,241 @@ pub fn register() { .boxed() }; + let service_name = std::env::var("OTEL_SERVICE_NAME").unwrap_or_else(|_| "platform".into()); + let (otel_layer, tracer_provider) = match init_otlp_layer(service_name) { + Some((layer, provider)) => (Some(layer), Some(provider)), + None => (None, None), + }; + + // Only export spans (info_span!, tracing::Span) to the OTel layer, not log + // events (info!, error!, warn!). Without this filter, every log statement in the + // codebase becomes an OTel event, overwhelming the batch exporter at high throughput. + let otel_filter = tracing_subscriber::filter::filter_fn(|metadata| metadata.is_span()); + tracing_subscriber::registry() .with(filter) - .with(layer) + .with(fmt_layer) + .with(otel_layer.with_filter(otel_filter)) .init(); std::panic::set_hook(Box::new(report_panic)); + + TracerProviderGuard(tracer_provider) +} + +#[cfg(test)] +mod tests { + use super::*; + use opentelemetry::{ + Context, + trace::{ + SamplingDecision, SpanContext, SpanId, SpanKind, TraceContextExt, TraceFlags, TraceId, + TraceState, + }, + }; + use opentelemetry_sdk::trace::{Sampler, SdkTracerProvider, ShouldSample}; + + fn sampling_decision( + sampler: &Sampler, + parent_context: Option<&Context>, + trace_id: TraceId, + ) -> SamplingDecision { + sampler + .should_sample( + parent_context, + trace_id, + "test-span", + &SpanKind::Internal, + &[], + &[], + ) + .decision + } + + fn trace_id_at_probability_boundary(prob: f64, just_below_boundary: bool) -> TraceId { + let upper_bound = (prob.max(0.0) * (1u64 << 63) as f64) as u64; + let rnd = if just_below_boundary { + upper_bound.saturating_sub(1) + } else { + upper_bound + }; + + TraceId::from((rnd as u128) << 1) + } + + fn parent_context(sampled: bool) -> Context { + let trace_flags = if sampled { + TraceFlags::SAMPLED + } else { + TraceFlags::default() + }; + + Context::new().with_remote_span_context(SpanContext::new( + TraceId::from(1u128), + SpanId::from(1u64), + trace_flags, + true, + TraceState::default(), + )) + } + + #[test] + fn sampler_always_on() { + assert!(matches!( + build_sampler_from("always_on", None), + Sampler::AlwaysOn + )); + } + + #[test] + fn sampler_always_off() { + assert!(matches!( + build_sampler_from("always_off", None), + Sampler::AlwaysOff + )); + } + + #[test] + fn sampler_traceidratio_parses_arg() { + let sampler = build_sampler_from("traceidratio", Some("0.25")); + assert!(matches!(&sampler, Sampler::TraceIdRatioBased(_))); + assert_eq!( + sampling_decision(&sampler, None, trace_id_at_probability_boundary(0.25, true)), + SamplingDecision::RecordAndSample + ); + assert_eq!( + sampling_decision( + &sampler, + None, + trace_id_at_probability_boundary(0.25, false) + ), + SamplingDecision::Drop + ); + } + + #[test] + fn sampler_traceidratio_defaults_to_1_when_arg_missing() { + let sampler = build_sampler_from("traceidratio", None); + assert!(matches!(&sampler, Sampler::TraceIdRatioBased(_))); + assert_eq!( + sampling_decision(&sampler, None, TraceId::from(u128::MAX)), + SamplingDecision::RecordAndSample + ); + } + + #[test] + fn sampler_traceidratio_defaults_to_1_when_arg_unparsable() { + let sampler = build_sampler_from("traceidratio", Some("not-a-number")); + assert!(matches!(&sampler, Sampler::TraceIdRatioBased(_))); + assert_eq!( + sampling_decision(&sampler, None, TraceId::from(u128::MAX)), + SamplingDecision::RecordAndSample + ); + } + + #[test] + fn sampler_parentbased_always_on() { + let sampler = build_sampler_from("parentbased_always_on", None); + let sampled_parent = parent_context(true); + let unsampled_parent = parent_context(false); + + assert!(matches!(&sampler, Sampler::ParentBased(_))); + assert_eq!( + sampling_decision(&sampler, None, TraceId::from(u128::MAX)), + SamplingDecision::RecordAndSample + ); + assert_eq!( + sampling_decision(&sampler, Some(&sampled_parent), TraceId::from(u128::MAX)), + SamplingDecision::RecordAndSample + ); + assert_eq!( + sampling_decision(&sampler, Some(&unsampled_parent), TraceId::from(u128::MAX)), + SamplingDecision::Drop + ); + } + + #[test] + fn sampler_parentbased_always_off() { + let sampler = build_sampler_from("parentbased_always_off", None); + let sampled_parent = parent_context(true); + + assert!(matches!(&sampler, Sampler::ParentBased(_))); + assert_eq!( + sampling_decision(&sampler, None, TraceId::from(1u128)), + SamplingDecision::Drop + ); + assert_eq!( + sampling_decision(&sampler, Some(&sampled_parent), TraceId::from(1u128)), + SamplingDecision::RecordAndSample + ); + } + + #[test] + fn sampler_parentbased_traceidratio_with_arg() { + let sampler = build_sampler_from("parentbased_traceidratio", Some("0.1")); + let sampled_parent = parent_context(true); + let unsampled_parent = parent_context(false); + + assert!(matches!(&sampler, Sampler::ParentBased(_))); + assert_eq!( + sampling_decision(&sampler, None, trace_id_at_probability_boundary(0.1, true)), + SamplingDecision::RecordAndSample + ); + assert_eq!( + sampling_decision(&sampler, None, trace_id_at_probability_boundary(0.1, false)), + SamplingDecision::Drop + ); + assert_eq!( + sampling_decision( + &sampler, + Some(&sampled_parent), + trace_id_at_probability_boundary(0.1, false), + ), + SamplingDecision::RecordAndSample + ); + assert_eq!( + sampling_decision( + &sampler, + Some(&unsampled_parent), + trace_id_at_probability_boundary(0.1, true), + ), + SamplingDecision::Drop + ); + } + + #[test] + fn sampler_unknown_name_falls_back_to_parentbased_always_on() { + let sampler = build_sampler_from("nonsense", None); + let unsampled_parent = parent_context(false); + + assert!(matches!(&sampler, Sampler::ParentBased(_))); + assert_eq!( + sampling_decision(&sampler, None, TraceId::from(u128::MAX)), + SamplingDecision::RecordAndSample + ); + assert_eq!( + sampling_decision(&sampler, Some(&unsampled_parent), TraceId::from(u128::MAX)), + SamplingDecision::Drop + ); + } + + #[test] + fn tracer_provider_guard_drop_without_provider_is_noop() { + // Guard holding None must not panic on drop (the path taken when + // OTLP is not configured). + let guard = TracerProviderGuard(None); + drop(guard); + } + + #[test] + fn tracer_provider_guard_drop_with_provider_shuts_down_cleanly() { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("tokio runtime should build"); + let _runtime_guard = runtime.enter(); + + let guard = TracerProviderGuard(Some(SdkTracerProvider::builder().build())); + drop(guard); + } }