diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 5df29c4682..0285d09dad 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -2449,11 +2449,15 @@ dependencies = [ "numaflow-daemon", "numaflow-monitor", "numaflow-sideinput", + "opentelemetry", + "opentelemetry-otlp", + "opentelemetry_sdk", "rustls 0.23.31", "serving", "tokio", "tokio-util", "tracing", + "tracing-opentelemetry", "tracing-subscriber", ] @@ -2516,6 +2520,8 @@ dependencies = [ "numaflow-sqs", "numaflow-testing", "numaflow-throttling", + "opentelemetry", + "opentelemetry_sdk", "parking_lot", "pin-project", "prometheus-client", @@ -2538,6 +2544,7 @@ dependencies = [ "tonic", "tower", "tracing", + "tracing-opentelemetry", "tracing-subscriber", "trait-variant", "zstd", @@ -2850,6 +2857,82 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opentelemetry" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b84bcd6ae87133e903af7ef497404dda70c60d0ea14895fc8a5e6722754fc2a0" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "pin-project-lite", + "thiserror 2.0.17", + "tracing", +] + +[[package]] +name = "opentelemetry-http" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7a6d09a73194e6b66df7c8f1b680f156d916a1a942abf2de06823dd02b7855d" +dependencies = [ + "async-trait", + "bytes", + "http 1.3.1", + "opentelemetry", + "reqwest 0.12.22", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.31.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f69cd6acbb9af919df949cd1ec9e5e7fdc2ef15d234b6b795aaa525cc02f71f" +dependencies = [ + "http 1.3.1", + "opentelemetry", + "opentelemetry-http", + "opentelemetry-proto", + "opentelemetry_sdk", + "prost 0.14.1", + "reqwest 0.12.22", + "thiserror 2.0.17", + "tokio", + "tonic", + "tracing", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7175df06de5eaee9909d4805a3d07e28bb752c34cab57fa9cff549da596b30f" +dependencies = [ + "opentelemetry", + "opentelemetry_sdk", + "prost 0.14.1", + "tonic", + "tonic-prost", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e14ae4f5991976fd48df6d843de219ca6d31b01daaab2dad5af2badeded372bd" +dependencies = [ + "futures-channel", + "futures-executor", + "futures-util", + "opentelemetry", + "percent-encoding", + "rand 0.9.4", + "thiserror 2.0.17", + "tokio", + "tokio-stream", +] + [[package]] name = "ordered-float" version = "2.10.1" @@ -3575,6 +3658,7 @@ checksum = "cbc931937e6ca3a06e3b6c0aa7841849b160a90351d6ab467a8b9b9959767531" dependencies = [ "base64 0.22.1", "bytes", + "futures-channel", "futures-core", "futures-util", "h2 0.4.11", @@ -4760,9 +4844,9 @@ checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tracing" -version = "0.1.41" +version = "0.1.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" +checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" dependencies = [ "log", "pin-project-lite", @@ -4772,9 +4856,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.30" +version = "0.1.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81383ab64e72a7a8b8e13130c49e3dab29def6d0c7d76a03087b3cf71c5c6903" +checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" dependencies = [ "proc-macro2", "quote", @@ -4783,9 +4867,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.34" +version = "0.1.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9d12581f227e93f094d3af2ae690a574abb8a2b9b7a96e7cfe9647b2b617678" +checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a" dependencies = [ "once_cell", "valuable", @@ -4802,6 +4886,22 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.32.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ac28f2d093c6c477eaa76b23525478f38de514fa9aeb1285738d4b97a9552fc" +dependencies = [ + "js-sys", + "opentelemetry", + "smallvec", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", + "web-time", +] + [[package]] name = "tracing-serde" version = "0.2.0" @@ -4814,9 +4914,9 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.3.20" +version = "0.3.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2054a14f5307d601f88daf0553e1cbf472acc4f2c51afab632431cdcd72124d5" +checksum = "cb7f578e5945fb242538965c2d0b04418d38ec25c79d160cd279bf0731c8d319" dependencies = [ "matchers", "nu-ansi-term", diff --git a/rust/Cargo.toml b/rust/Cargo.toml index dd9048f708..ff88f973e8 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -83,6 +83,10 @@ tokio-stream = "0.1.17" tokio-util = { version = "0.7", features = ["rt"] } bytes = "1.11.1" tracing = "0.1.41" +tracing-opentelemetry = "0.32" +opentelemetry = "0.31" +opentelemetry_sdk = "0.31" +opentelemetry-otlp = { version = "0.31", features = ["grpc-tonic"] } async-nats = "0.44.2" thiserror = "2.0.12" axum = { version = "0.8.4", default-features = false, features = [ diff --git a/rust/numaflow-core/Cargo.toml b/rust/numaflow-core/Cargo.toml index c620804703..09da949bfc 100644 --- a/rust/numaflow-core/Cargo.toml +++ b/rust/numaflow-core/Cargo.toml @@ -19,6 +19,9 @@ tokio.workspace = true tokio-stream.workspace = true tokio-util.workspace = true tracing.workspace = true +tracing-opentelemetry.workspace = true +opentelemetry = { workspace = true } +opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] } numaflow-pulsar.workspace = true numaflow-monitor.workspace = true numaflow-nats.workspace = true diff --git a/rust/numaflow-core/src/mapper/map.rs b/rust/numaflow-core/src/mapper/map.rs index 3fb4c4af5c..8de79e07ea 100644 --- a/rust/numaflow-core/src/mapper/map.rs +++ b/rust/numaflow-core/src/mapper/map.rs @@ -1003,7 +1003,7 @@ mod tests { offset: i.to_string().into(), index: i, }, - ack_handle: Some(Arc::new(AckHandle::new(ack_tx))), + ack_handle: Some(Arc::new(AckHandle::new(ack_tx, None))), ..Default::default() }; input_tx.send(message).await.unwrap(); @@ -1099,7 +1099,7 @@ mod tests { offset: "0".to_string().into(), index: 0, }, - ack_handle: Some(Arc::new(AckHandle::new(ack_tx1))), + ack_handle: Some(Arc::new(AckHandle::new(ack_tx1, None))), ..Default::default() }, Message { @@ -1115,7 +1115,7 @@ mod tests { offset: "1".to_string().into(), index: 1, }, - ack_handle: Some(Arc::new(AckHandle::new(ack_tx2))), + ack_handle: Some(Arc::new(AckHandle::new(ack_tx2, None))), ..Default::default() }, ]; @@ -1224,7 +1224,7 @@ mod tests { offset: i.to_string().into(), index: i, }, - ack_handle: Some(Arc::new(AckHandle::new(ack_tx))), + ack_handle: Some(Arc::new(AckHandle::new(ack_tx, None))), ..Default::default() }; ack_rxs.push(ack_rx); @@ -1269,7 +1269,7 @@ mod tests { offset: i.to_string().into(), index: i, }, - ack_handle: Some(Arc::new(AckHandle::new(ack_tx))), + ack_handle: Some(Arc::new(AckHandle::new(ack_tx, None))), ..Default::default() } } @@ -1361,7 +1361,7 @@ mod tests { offset: "0".to_string().into(), index: 0, }, - ack_handle: Some(Arc::new(AckHandle::new(ack_tx1))), + ack_handle: Some(Arc::new(AckHandle::new(ack_tx1, None))), ..Default::default() }, Message { @@ -1377,7 +1377,7 @@ mod tests { offset: "1".to_string().into(), index: 1, }, - ack_handle: Some(Arc::new(AckHandle::new(ack_tx2))), + ack_handle: Some(Arc::new(AckHandle::new(ack_tx2, None))), ..Default::default() }, ]; diff --git a/rust/numaflow-core/src/mapper/map/batch.rs b/rust/numaflow-core/src/mapper/map/batch.rs index b4bd426469..7f13a73159 100644 --- a/rust/numaflow-core/src/mapper/map/batch.rs +++ b/rust/numaflow-core/src/mapper/map/batch.rs @@ -7,6 +7,7 @@ use crate::config::pipeline::VERTEX_TYPE_MAP_UDF; use crate::error::{Error, Result}; use crate::message::Message; use crate::monovertex::bypass_router::MvtxBypassRouter; +use crate::shared::otel; use crate::tracker::Tracker; use numaflow_pb::clients::map::{self, MapRequest, MapResponse, map_client::MapClient}; use std::collections::HashMap; @@ -53,7 +54,61 @@ pub(in crate::mapper) struct MapBatchTask { impl MapBatchTask { /// Executes the batch map operation. /// Returns an error if any message in the batch fails to be processed. - pub async fn execute(self) -> Result<()> { + pub async fn execute(mut self) -> Result<()> { + // Distributed tracing (MonoVertex only): create per-message `numaflow.monovertex.map` + // spans via the OTel SDK API (not tracing::Span because we need them to stay alive across + // the batch UDF `.await` without being tied to a thread-local enter/exit guard). Each + // span's parent is that message's own `platform.process` (extracted from + // sys_metadata["tracing"]). Each span's context is injected into sys_metadata["tracing_udf"] + // so the UDF creates `udf.map.process` as its child. Spans are closed via an RAII guard + // after the batch UDF returns so they are ended on every exit path. + // + // Invariant: tracing_udf is removed from result messages below; on error, input messages + // are dropped, so tracing_udf never propagates further. + let _span_guard = if is_mono_vertex() { + use opentelemetry::trace::{SpanKind, TraceContextExt, Tracer}; + let tracer = opentelemetry::global::tracer("numaflow-core"); + let mut contexts: Vec = Vec::with_capacity(self.batch.len()); + for message in self.batch.iter_mut() { + let parent_cx = message + .metadata + .as_deref() + .map(otel::extract_trace_context) + .unwrap_or_else(opentelemetry::Context::current); + let msg_id = message.offset.to_string(); + let map_span = tracer + .span_builder("numaflow.monovertex.map") + .with_kind(SpanKind::Internal) + .with_attributes(vec![ + opentelemetry::KeyValue::new(otel::ATTR_MESSAGING_SYSTEM, "numaflow"), + opentelemetry::KeyValue::new(otel::ATTR_MESSAGING_OPERATION_NAME, "map"), + opentelemetry::KeyValue::new(otel::ATTR_MESSAGING_MESSAGE_ID, msg_id), + opentelemetry::KeyValue::new(otel::ATTR_NUMAFLOW_TOPOLOGY, "monovertex"), + opentelemetry::KeyValue::new( + otel::ATTR_NUMAFLOW_PIPELINE_NAME, + crate::config::get_pipeline_name(), + ), + opentelemetry::KeyValue::new( + otel::ATTR_NUMAFLOW_VERTEX_NAME, + crate::config::get_vertex_name(), + ), + ]) + .start_with_context(&tracer, &parent_cx); + let map_cx = opentelemetry::Context::current().with_span(map_span); + if let Some(ref mut metadata) = message.metadata { + otel::inject_context_into_metadata( + Arc::make_mut(metadata), + otel::TRACING_UDF_METADATA_KEY, + &map_cx, + ); + } + contexts.push(map_cx); + } + Some(SpanCloser(contexts)) + } else { + None + }; + // Store parent message info for each message before sending to UDF let parent_infos: Vec = self.batch.iter().map(|m| m.into()).collect(); @@ -68,15 +123,29 @@ impl MapBatchTask { // Call the UDF and get results directly let results = self.mapper.batch(requests, self.cln_token).await; + // Drop span guard now so spans are closed before returning. They will also close on any + // early-return from the error branch below. + drop(_span_guard); + for (result, parent_info) in results.into_iter().zip(parent_infos) { match result { Ok(results) => { - // Convert raw results to Messages using parent info + // Convert raw results to Messages using parent info. + // Remove tracing_udf from each result (map stage is done). let mapped_messages: Vec = results .into_iter() .enumerate() .map(|(i, result)| { - UserDefinedMessage(result, &parent_info, i as i32).into() + let mut mapped_msg: Message = + UserDefinedMessage(result, &parent_info, i as i32).into(); + if is_mono_vertex() + && let Some(ref mut metadata) = mapped_msg.metadata + { + Arc::make_mut(metadata) + .sys_metadata + .remove(otel::TRACING_UDF_METADATA_KEY); + } + mapped_msg }) .collect(); @@ -123,6 +192,19 @@ impl MapBatchTask { } } +/// RAII guard that closes OTel spans on drop. Used by batch/stream mappers and sinker to +/// guarantee per-message spans are ended on every exit path (success, error, panic). +struct SpanCloser(Vec); + +impl Drop for SpanCloser { + fn drop(&mut self) { + use opentelemetry::trace::TraceContextExt as _; + for cx in self.0.drain(..) { + cx.span().end(); + } + } +} + /// UserDefinedBatchMap is a grpc client that sends batch requests to the map server /// and forwards the responses. #[derive(Clone)] diff --git a/rust/numaflow-core/src/mapper/map/stream.rs b/rust/numaflow-core/src/mapper/map/stream.rs index 170a5a2f6e..045982433b 100644 --- a/rust/numaflow-core/src/mapper/map/stream.rs +++ b/rust/numaflow-core/src/mapper/map/stream.rs @@ -6,6 +6,7 @@ use std::sync::atomic::Ordering; use crate::config::is_mono_vertex; use crate::error::{Error, Result}; use crate::message::Message; +use crate::shared::otel; use numaflow_pb::clients::map::{self, MapRequest, MapResponse, map_client::MapClient}; use tokio::sync::{OwnedSemaphorePermit, mpsc}; use tokio_stream::StreamExt; @@ -13,7 +14,8 @@ use tokio_util::sync::CancellationToken; use tokio_util::task::AbortOnDropHandle; use tonic::Streaming; use tonic::transport::Channel; -use tracing::{error, warn}; +use tracing::{Instrument, error, warn}; +use tracing_opentelemetry::OpenTelemetrySpanExt; use super::{ ParentMessageInfo, STREAMING_MAP_RESP_CHANNEL_SIZE, SharedMapTaskContext, UserDefinedMessage, @@ -58,10 +60,53 @@ impl MapStreamTask { } /// Executes the stream map operation. + /// + /// For MonoVertex with distributed tracing: wraps the receive loop with a + /// `numaflow.monovertex.map` span so its duration covers the full stream UDF interaction. async fn execute(self) { + if is_mono_vertex() { + let parent_cx = self + .message + .metadata + .as_deref() + .map(otel::extract_trace_context) + .unwrap_or_else(opentelemetry::Context::current); + let msg_id = self.message.offset.to_string(); + let map_span = tracing::info_span!( + "numaflow.monovertex.map", + otel.kind = "INTERNAL", + { otel::ATTR_MESSAGING_SYSTEM } = "numaflow", + { otel::ATTR_MESSAGING_OPERATION_NAME } = "map", + { otel::ATTR_MESSAGING_MESSAGE_ID } = %msg_id, + { otel::ATTR_NUMAFLOW_TOPOLOGY } = "monovertex", + { otel::ATTR_NUMAFLOW_PIPELINE_NAME } = crate::config::get_pipeline_name(), + { otel::ATTR_NUMAFLOW_VERTEX_NAME } = crate::config::get_vertex_name(), + ); + let _ = map_span.set_parent(parent_cx); + self.execute_inner().instrument(map_span).await; + } else { + self.execute_inner().await; + } + } + + async fn execute_inner(mut self) { // Hold the permit until the task completes let _permit = self.permit; + // Distributed tracing (MonoVertex): inject current `map` span context into + // sys_metadata["tracing_udf"] so the UDF creates `udf.map.process` as its child. + // sys_metadata["tracing"] remains unchanged (holds platform.process). + if is_mono_vertex() + && let Some(ref mut metadata) = self.message.metadata + { + let map_cx = tracing::Span::current().context(); + otel::inject_context_into_metadata( + Arc::make_mut(metadata), + otel::TRACING_UDF_METADATA_KEY, + &map_cx, + ); + } + // Store parent message info before sending to UDF // parent_info contains offset, so we don't need to clone it separately let mut parent_info: ParentMessageInfo = (&self.message).into(); @@ -88,12 +133,20 @@ impl MapStreamTask { let result = receiver.recv().await; match result { Some(Ok(results)) => { - // Convert raw results to Messages using parent info + // Convert raw results to Messages using parent info. + // Remove tracing_udf from each result (map stage is done). for result in results { - let mapped_message: Message = + let mut mapped_message: Message = UserDefinedMessage(result, &parent_info, parent_info.current_index) .into(); parent_info.current_index += 1; + if is_mono_vertex() + && let Some(ref mut metadata) = mapped_message.metadata + { + Arc::make_mut(metadata) + .sys_metadata + .remove(otel::TRACING_UDF_METADATA_KEY); + } update_udf_write_only_metric(self.shared_ctx.is_mono_vertex); diff --git a/rust/numaflow-core/src/mapper/map/unary.rs b/rust/numaflow-core/src/mapper/map/unary.rs index 66e0fdbc22..ae97776fa8 100644 --- a/rust/numaflow-core/src/mapper/map/unary.rs +++ b/rust/numaflow-core/src/mapper/map/unary.rs @@ -6,6 +6,7 @@ use std::sync::atomic::Ordering; use crate::config::is_mono_vertex; use crate::error::{Error, Result}; use crate::message::Message; +use crate::shared::otel; use numaflow_pb::clients::map::{self, MapRequest, MapResponse, map_client::MapClient}; use tokio::sync::{OwnedSemaphorePermit, mpsc, oneshot}; use tokio_stream::StreamExt; @@ -13,7 +14,8 @@ use tokio_util::sync::CancellationToken; use tokio_util::task::AbortOnDropHandle; use tonic::Streaming; use tonic::transport::Channel; -use tracing::{error, warn}; +use tracing::{Instrument, error, warn}; +use tracing_opentelemetry::OpenTelemetrySpanExt; use super::{ ParentMessageInfo, SharedMapTaskContext, UserDefinedMessage, create_response_stream, @@ -59,10 +61,65 @@ impl MapUnaryTask { } /// Executes the unary map operation. + /// + /// For MonoVertex with distributed tracing: extracts the `platform.process` context from + /// `sys_metadata["tracing"]`, creates a `numaflow.monovertex.map` span as its child, and + /// instruments the actual UDF call with that span so the span duration covers the map stage. async fn execute(self) { + // Only create spans when running in MonoVertex mode (Pipeline spans deferred to PR 3). + if is_mono_vertex() { + let parent_cx = self + .message + .metadata + .as_deref() + .map(otel::extract_trace_context) + .unwrap_or_else(opentelemetry::Context::current); + + let msg_id = self.message.offset.to_string(); + let map_span = tracing::info_span!( + "numaflow.monovertex.map", + otel.kind = "INTERNAL", + { otel::ATTR_MESSAGING_SYSTEM } = "numaflow", + { otel::ATTR_MESSAGING_OPERATION_NAME } = "map", + { otel::ATTR_MESSAGING_MESSAGE_ID } = %msg_id, + { otel::ATTR_NUMAFLOW_TOPOLOGY } = "monovertex", + { otel::ATTR_NUMAFLOW_PIPELINE_NAME } = crate::config::get_pipeline_name(), + { otel::ATTR_NUMAFLOW_VERTEX_NAME } = crate::config::get_vertex_name(), + ); + let _ = map_span.set_parent(parent_cx); + self.execute_inner().instrument(map_span).await; + } else { + self.execute_inner().await; + } + } + + /// The core unary map execution. When called under `.instrument(map_span)`, the current + /// tracing context is the map span — we inject it into `sys_metadata["tracing_udf"]` so the + /// UDF sees the map span as its parent. The key is removed from result messages before they + /// leave the mapper. + async fn execute_inner(mut self) { // Hold the permit until the task completes let _permit = self.permit; + // Distributed tracing (MonoVertex only): inject the current `map` span's context into + // sys_metadata["tracing_udf"] so the UDF creates `udf.map.process` as its child. + // Note: `sys_metadata["tracing"]` is NOT overwritten — it still holds `platform.process`. + // + // Invariant: tracing_udf is written to the input message here and removed from every + // result message below. On UDF error, we return early without producing result messages, + // and the input message is dropped — so tracing_udf never propagates downstream. Preserve + // this property in future refactors. + if is_mono_vertex() + && let Some(ref mut metadata) = self.message.metadata + { + let map_cx = tracing::Span::current().context(); + otel::inject_context_into_metadata( + Arc::make_mut(metadata), + otel::TRACING_UDF_METADATA_KEY, + &map_cx, + ); + } + // Store parent message info before sending to UDF // parent_info contains offset, so we don't need to clone it separately let parent_info: ParentMessageInfo = (&self.message).into(); @@ -90,12 +147,23 @@ impl MapUnaryTask { } }; - // Convert raw results to Messages using parent info - // Pre-allocate with exact capacity to avoid reallocations + // Convert raw results to Messages using parent info. + // The UserDefinedMessage -> Message conversion copies sys_metadata from parent (so + // platform.process context stays in "tracing"). We remove tracing_udf here because the + // map stage is done — downstream sink will inject its own tracing_udf. let results_len = results.len(); let mut mapped_messages: Vec = Vec::with_capacity(results_len); for (i, result) in results.into_iter().enumerate() { - mapped_messages.push(UserDefinedMessage(result, &parent_info, i as i32).into()); + let mut mapped_msg: Message = + UserDefinedMessage(result, &parent_info, i as i32).into(); + if is_mono_vertex() + && let Some(ref mut metadata) = mapped_msg.metadata + { + Arc::make_mut(metadata) + .sys_metadata + .remove(otel::TRACING_UDF_METADATA_KEY); + } + mapped_messages.push(mapped_msg); } update_udf_write_metric( diff --git a/rust/numaflow-core/src/message.rs b/rust/numaflow-core/src/message.rs index b42efb2d75..0806b338d7 100644 --- a/rust/numaflow-core/src/message.rs +++ b/rust/numaflow-core/src/message.rs @@ -60,18 +60,52 @@ pub(crate) struct Message { /// AckHandle is used to send the ack/nak to the source but it is reference counted and makes sure /// when it is dropped, we send the ack/nak to the source. -#[derive(Debug)] +/// +/// It also holds the per-message `platform.process` tracing span (when distributed tracing is +/// enabled). The span is set by the source after `AckHandle` is created (via +/// [`set_pipeline_span`]) so the span's lifetime is tied to the full message lifecycle: it is +/// dropped when the last `Arc` clone is released (after ack/nak fires), which ends +/// the span in the OTel backend with accurate duration. pub(crate) struct AckHandle { pub(crate) ack_handle: Option>, pub(crate) is_failed: AtomicBool, + /// Tracing span covering the full message lifecycle (created in source, dropped on ack). + /// Interior mutability is used because the span is set after construction. + pipeline_span: std::sync::Mutex>, +} + +impl fmt::Debug for AckHandle { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("AckHandle") + .field("ack_handle", &self.ack_handle) + .field("is_failed", &self.is_failed) + .finish() + } } impl AckHandle { /// create a new AckHandle for a message. - pub(crate) fn new(ack_handle: oneshot::Sender) -> Self { + /// + /// `pipeline_span`: optional `platform.process` span to end on ack. Pass `None` from + /// code paths that don't create the platform tracing root (e.g., tests, ISB reader/writer, + /// bypass router). The source sets this via [`set_pipeline_span`] after creation. + pub(crate) fn new( + ack_handle: oneshot::Sender, + pipeline_span: Option, + ) -> Self { Self { ack_handle: Some(ack_handle), is_failed: AtomicBool::new(false), + pipeline_span: std::sync::Mutex::new(pipeline_span), + } + } + + /// Store the `platform.process` tracing span so it is dropped when this AckHandle is dropped. + /// Called by the source after creating the span. + #[allow(dead_code)] // Used when distributed tracing is enabled (OTEL_EXPORTER_OTLP_ENDPOINT set). + pub(crate) fn set_pipeline_span(&self, span: tracing::Span) { + if let Ok(mut guard) = self.pipeline_span.lock() { + *guard = Some(span); } } } diff --git a/rust/numaflow-core/src/monovertex/bypass_router.rs b/rust/numaflow-core/src/monovertex/bypass_router.rs index 0cf569daee..b211587d05 100644 --- a/rust/numaflow-core/src/monovertex/bypass_router.rs +++ b/rust/numaflow-core/src/monovertex/bypass_router.rs @@ -381,7 +381,7 @@ mod tests { ) -> (Message, Option>) { let (ack_handle, ack_rx) = if with_ack_handle { let (tx, rx) = oneshot::channel(); - (Some(Arc::new(AckHandle::new(tx))), Some(rx)) + (Some(Arc::new(AckHandle::new(tx, None))), Some(rx)) } else { (None, None) }; @@ -528,7 +528,7 @@ mod tests { offset: format!("offset_{}", i).into(), index: i as i32, }, - ack_handle: Some(Arc::new(AckHandle::new(ack_tx))), + ack_handle: Some(Arc::new(AckHandle::new(ack_tx, None))), ..Default::default() } }) diff --git a/rust/numaflow-core/src/pipeline/isb/reader.rs b/rust/numaflow-core/src/pipeline/isb/reader.rs index b51b00dcce..f5b874c361 100644 --- a/rust/numaflow-core/src/pipeline/isb/reader.rs +++ b/rust/numaflow-core/src/pipeline/isb/reader.rs @@ -504,7 +504,7 @@ impl ISBReaderOrchestrator { Self::publish_read_metrics(&self.metric_labels, &message); let (ack_tx, ack_rx) = oneshot::channel(); - message.ack_handle = Some(Arc::new(AckHandle::new(ack_tx))); + message.ack_handle = Some(Arc::new(AckHandle::new(ack_tx, None))); // Start message tracking and WIP loop self.start_message_tracking( diff --git a/rust/numaflow-core/src/pipeline/isb/writer.rs b/rust/numaflow-core/src/pipeline/isb/writer.rs index 9bbf3345fa..87e44ab8f2 100644 --- a/rust/numaflow-core/src/pipeline/isb/writer.rs +++ b/rust/numaflow-core/src/pipeline/isb/writer.rs @@ -678,7 +678,7 @@ mod tests { offset: format!("offset_{}", i).into(), index: i as i32, }, - ack_handle: Some(Arc::new(AckHandle::new(ack_tx))), + ack_handle: Some(Arc::new(AckHandle::new(ack_tx, None))), ..Default::default() }; ack_rxs.push(ack_rx); @@ -789,7 +789,7 @@ mod tests { offset: format!("offset_{}", i).into(), index: i as i32, }, - ack_handle: Some(Arc::new(AckHandle::new(ack_tx))), + ack_handle: Some(Arc::new(AckHandle::new(ack_tx, None))), ..Default::default() }; ack_rxs.push(ack_rx); @@ -964,7 +964,7 @@ mod tests { offset: format!("offset_{}", i).into(), index: i as i32, }, - ack_handle: Some(Arc::new(AckHandle::new(ack_tx))), + ack_handle: Some(Arc::new(AckHandle::new(ack_tx, None))), ..Default::default() }; ack_rxs.push(ack_rx); @@ -1061,7 +1061,7 @@ mod simple_buffer_tests { headers: Arc::new(HashMap::new()), metadata: None, is_late: false, - ack_handle: Some(Arc::new(AckHandle::new(ack_tx))), + ack_handle: Some(Arc::new(AckHandle::new(ack_tx, None))), }; (message, ack_rx) } diff --git a/rust/numaflow-core/src/shared.rs b/rust/numaflow-core/src/shared.rs index 07e3e0bedc..568c73d39f 100644 --- a/rust/numaflow-core/src/shared.rs +++ b/rust/numaflow-core/src/shared.rs @@ -11,6 +11,11 @@ pub(crate) mod create_components; /// Shared methods for forwarding messages. pub(crate) mod forward; +/// OpenTelemetry propagation helpers for distributed tracing. +/// Functions in this module are consumed by span creation code in source, mapper, and sinker. +#[allow(dead_code)] +pub(crate) mod otel; + /// Test server framework: helpers for spinning up numaflow SDK gRPC servers in tests. #[cfg(test)] pub(crate) mod test_utils; diff --git a/rust/numaflow-core/src/shared/otel.rs b/rust/numaflow-core/src/shared/otel.rs new file mode 100644 index 0000000000..68fd5976ad --- /dev/null +++ b/rust/numaflow-core/src/shared/otel.rs @@ -0,0 +1,207 @@ +//! OpenTelemetry propagation helpers for Numaflow message metadata. +//! +//! Adapts `KeyValueGroup` (sys_metadata) as a carrier for W3C Trace Context. +//! The propagator uses these getter/setter implementations to read and write +//! `traceparent` and `tracestate` without knowing about our protobuf types. + +use std::collections::HashMap; +use std::sync::Arc; + +use opentelemetry::propagation::{Extractor, Injector}; + +use crate::metadata::KeyValueGroup; + +/// Key under which W3C trace context is stored in `sys_metadata`. +/// Always holds the `platform.process` span context — the shared parent +/// that makes source.read, map, and sink.write siblings. +pub const TRACING_METADATA_KEY: &str = "tracing"; + +/// Key under which the current stage's span context is stored for UDF consumption. +/// The UDF reads this key to see the platform stage (e.g., map) as its parent. +/// Written before calling the UDF, removed after the UDF returns. +pub const TRACING_UDF_METADATA_KEY: &str = "tracing_udf"; + +/// Wraps `KeyValueGroup` for **extraction**: the propagator calls `get` / `keys` +/// to read `traceparent` and `tracestate` from the carrier. +pub struct MetadataExtractor<'a>(pub &'a KeyValueGroup); + +impl Extractor for MetadataExtractor<'_> { + fn get(&self, key: &str) -> Option<&str> { + self.0 + .key_value + .get(key) + .and_then(|b| std::str::from_utf8(b.as_ref()).ok()) + } + + fn keys(&self) -> Vec<&str> { + self.0.key_value.keys().map(String::as_str).collect() + } +} + +/// Wraps `KeyValueGroup` for **injection**: the propagator calls `set` to write +/// `traceparent` and `tracestate` into the carrier. +pub struct MetadataInjector<'a>(pub &'a mut KeyValueGroup); + +impl Injector for MetadataInjector<'_> { + fn set(&mut self, key: &str, value: String) { + self.0 + .key_value + .insert(key.to_string(), bytes::Bytes::from(value.into_bytes())); + } +} + +/// Ensures the global text map propagator is set to W3C Trace Context. +pub(crate) fn ensure_propagator() { + use opentelemetry::global; + let propagator = opentelemetry_sdk::propagation::TraceContextPropagator::new(); + global::set_text_map_propagator(propagator); + tracing::info!("W3C TraceContext propagator installed"); +} + +/// Extracts an OpenTelemetry [`Context`] from a message's sys_metadata. +/// Returns `Context::current()` (root) if the tracing key is absent. +pub(crate) fn extract_trace_context( + metadata: &crate::metadata::Metadata, +) -> opentelemetry::Context { + use opentelemetry::global; + match metadata.sys_metadata.get(TRACING_METADATA_KEY) { + Some(kvg) => { + let extractor = MetadataExtractor(kvg); + global::get_text_map_propagator(|prop| prop.extract(&extractor)) + } + None => opentelemetry::Context::current(), + } +} + +/// Injects a specific OpenTelemetry [`Context`] into a named sys_metadata key. +/// +/// Used to inject `platform.process` context into `"tracing"` (preserving the root) +/// and stage-specific context into `"tracing_udf"` (for UDF parent). +pub(crate) fn inject_context_into_metadata( + metadata: &mut crate::metadata::Metadata, + key: &str, + cx: &opentelemetry::Context, +) { + use opentelemetry::global; + + let kvg = metadata + .sys_metadata + .entry(key.to_string()) + .or_insert_with(|| KeyValueGroup { + key_value: HashMap::new(), + }); + let mut injector = MetadataInjector(kvg); + global::get_text_map_propagator(|prop| { + prop.inject_context(cx, &mut injector); + }); +} + +/// Wraps `HashMap` (message headers) for **extraction**. +/// Used to extract W3C trace context from incoming message headers +/// (e.g., Kafka headers passed through by source connectors). +pub struct HeaderExtractor<'a>(pub &'a HashMap); + +impl Extractor for HeaderExtractor<'_> { + fn get(&self, key: &str) -> Option<&str> { + // Try exact match first, then case-insensitive (Kafka headers are case-sensitive + // but upstream producers may use varying cases for B3 headers) + self.0.get(key).map(String::as_str).or_else(|| { + let lower = key.to_lowercase(); + self.0 + .iter() + .find(|(k, _)| k.to_lowercase() == lower) + .map(|(_, v)| v.as_str()) + }) + } + + fn keys(&self) -> Vec<&str> { + self.0.keys().map(String::as_str).collect() + } +} + +/// Extracts an OpenTelemetry [`Context`] from incoming message headers. +/// +/// Checks for W3C `traceparent` first, then B3 multi-headers +/// (`X-B3-TraceId`, `X-B3-SpanId`, `X-B3-Sampled`). If B3 headers are found, +/// converts them to W3C `traceparent` format before extraction. +/// +/// Returns `Context::current()` (root) if no trace headers are present. +pub(crate) fn extract_trace_context_from_headers( + headers: &Arc>, +) -> opentelemetry::Context { + use opentelemetry::global; + + // Check for W3C traceparent header first + if get_header_case_insensitive(headers, "traceparent").is_some() { + let extractor = HeaderExtractor(headers.as_ref()); + return global::get_text_map_propagator(|prop| prop.extract(&extractor)); + } + + // Check for B3 multi-headers and convert to W3C traceparent + let trace_id = get_header_case_insensitive(headers, "X-B3-TraceId"); + let span_id = get_header_case_insensitive(headers, "X-B3-SpanId"); + + if let (Some(trace_id), Some(span_id)) = (trace_id, span_id) { + let sampled = get_header_case_insensitive(headers, "X-B3-Sampled"); + let traceparent = b3_to_traceparent(trace_id, span_id, sampled); + + let mut synthetic = HashMap::new(); + synthetic.insert("traceparent".to_string(), traceparent); + let extractor = HeaderExtractor(&synthetic); + return global::get_text_map_propagator(|prop| prop.extract(&extractor)); + } + + opentelemetry::Context::current() +} + +/// Case-insensitive header lookup. +fn get_header_case_insensitive<'a>( + headers: &'a HashMap, + key: &str, +) -> Option<&'a str> { + headers.get(key).map(String::as_str).or_else(|| { + let lower = key.to_lowercase(); + headers + .iter() + .find(|(k, _)| k.to_lowercase() == lower) + .map(|(_, v)| v.as_str()) + }) +} + +/// Converts B3 multi-header values to W3C traceparent format. +/// +/// Format: `{version}-{trace_id}-{span_id}-{trace_flags}` +/// - Pads 64-bit trace IDs to 128-bit (left-pads with zeros) +/// - Maps sampled: "1"/"true" → "01", "0"/"false" → "00", default → "01" +fn b3_to_traceparent(trace_id: &str, span_id: &str, sampled: Option<&str>) -> String { + let padded_trace_id = if trace_id.len() <= 16 { + format!("{:0>32}", trace_id) + } else { + trace_id.to_string() + }; + + let flags = match sampled { + Some("1" | "true") => "01", + Some("0" | "false") => "00", + Some("d") => "01", // debug = sampled + _ => "01", // default to sampled + }; + + format!("00-{}-{}-{}", padded_trace_id, span_id, flags) +} + +// --------------------------------------------------------------------------- +// Platform span attribute keys +// --------------------------------------------------------------------------- +// +// Attribute keys applied to every Numaflow platform span (OTel messaging semantic +// conventions + Numaflow-specific). Values are set at each call site because +// `tracing::info_span!` requires static attribute expressions and the OTel SDK API +// (for batch/sink per-message spans) uses `KeyValue::new` at the call site. + +pub const ATTR_MESSAGING_SYSTEM: &str = "messaging.system"; +pub const ATTR_MESSAGING_OPERATION_NAME: &str = "messaging.operation.name"; +pub const ATTR_MESSAGING_MESSAGE_ID: &str = "messaging.message.id"; +pub const ATTR_NUMAFLOW_TOPOLOGY: &str = "numaflow.topology"; +pub const ATTR_NUMAFLOW_PIPELINE_NAME: &str = "numaflow.pipeline.name"; +pub const ATTR_NUMAFLOW_VERTEX_NAME: &str = "numaflow.vertex.name"; diff --git a/rust/numaflow-core/src/sinker/sink.rs b/rust/numaflow-core/src/sinker/sink.rs index eb6c70b945..ddf542fea3 100644 --- a/rust/numaflow-core/src/sinker/sink.rs +++ b/rust/numaflow-core/src/sinker/sink.rs @@ -3,6 +3,8 @@ use crate::config::pipeline::VERTEX_TYPE_SINK; use crate::config::{get_vertex_name, is_mono_vertex}; use crate::error::Error; use crate::message::Message; +use crate::shared::otel; +use std::sync::Arc; use crate::metrics::{ PIPELINE_PARTITION_NAME_LABEL, monovertex_metrics, mvtx_forward_metric_labels, pipeline_drop_metric_labels, pipeline_metric_labels, pipeline_metrics, @@ -279,7 +281,7 @@ impl SinkWriter { /// Invokes the primary sink actor, handles fallback messages, serving messages, and errors. pub(crate) async fn write_to_sink( &mut self, - messages: Vec, + mut messages: Vec, cln_token: CancellationToken, ) -> Result<()> { if messages.is_empty() { @@ -290,11 +292,71 @@ impl SinkWriter { let messages_count = messages.len(); let messages_size: usize = messages.iter().map(|msg| msg.value.len()).sum(); + // Distributed tracing (MonoVertex only): per-message `numaflow.monovertex.sink.write` + // spans created via the OTel SDK API (not tracing::Span — we need spans to stay alive + // across the batch UDF `.await` without being tied to a thread-local enter/exit guard). + // Each span's parent is the message's own `platform.process`. Each span's context is + // injected into sys_metadata["tracing_udf"] so the sink UDF creates `udf.sink.process` + // as its child. Spans are closed via an RAII guard that fires on every exit path + // (Ok, Err via `?`, panic, cancellation). + let _span_guard = if is_mono_vertex() { + use opentelemetry::trace::{SpanKind, TraceContextExt, Tracer}; + let tracer = opentelemetry::global::tracer("numaflow-core"); + let mut contexts: Vec = Vec::with_capacity(messages.len()); + for message in messages.iter_mut() { + let parent_cx = message + .metadata + .as_deref() + .map(otel::extract_trace_context) + .unwrap_or_else(opentelemetry::Context::current); + let msg_id = message.offset.to_string(); + let sink_span = tracer + .span_builder("numaflow.monovertex.sink.write") + .with_kind(SpanKind::Client) + .with_attributes(vec![ + opentelemetry::KeyValue::new(otel::ATTR_MESSAGING_SYSTEM, "numaflow"), + opentelemetry::KeyValue::new( + otel::ATTR_MESSAGING_OPERATION_NAME, + "sink.write", + ), + opentelemetry::KeyValue::new(otel::ATTR_MESSAGING_MESSAGE_ID, msg_id), + opentelemetry::KeyValue::new(otel::ATTR_NUMAFLOW_TOPOLOGY, "monovertex"), + opentelemetry::KeyValue::new( + otel::ATTR_NUMAFLOW_PIPELINE_NAME, + crate::config::get_pipeline_name(), + ), + opentelemetry::KeyValue::new( + otel::ATTR_NUMAFLOW_VERTEX_NAME, + crate::config::get_vertex_name(), + ), + ]) + .start_with_context(&tracer, &parent_cx); + let sink_cx = opentelemetry::Context::current().with_span(sink_span); + if let Some(ref mut metadata) = message.metadata { + otel::inject_context_into_metadata( + Arc::make_mut(metadata), + otel::TRACING_UDF_METADATA_KEY, + &sink_cx, + ); + } + contexts.push(sink_cx); + } + Some(SinkSpanCloser(contexts)) + } else { + None + }; + // Invoke primary sink to write messages let response = self .write_to_primary_sink(messages, cln_token.clone()) .await?; + // Close per-message sink.write spans now so their duration covers only the primary + // sink UDF batch call — not the fallback/on_success/serving handling that follows. + // The guard still runs on early-returns above (e.g., write_to_primary_sink error via `?`), + // so spans are always closed on every path. + drop(_span_guard); + if !response.failed.is_empty() { error!( "Failed to write messages after retries: {:?}", @@ -652,6 +714,19 @@ impl SinkWriter { } } +/// RAII guard that closes per-message sink OTel spans on drop. Ensures spans are ended on +/// every exit path from `write_to_sink` (success, early error via `?`, panic, cancellation). +struct SinkSpanCloser(Vec); + +impl Drop for SinkSpanCloser { + fn drop(&mut self) { + use opentelemetry::trace::TraceContextExt as _; + for cx in self.0.drain(..) { + cx.span().end(); + } + } +} + /// Sends count of messages marked for explicit drop by the user /// Currently pub(crate) to allow usage by the bypass_router. pub(crate) fn send_drop_metrics(is_mono_vertex: bool, dropped_messages_count: usize) { @@ -824,7 +899,7 @@ mod tests { offset: format!("offset_{}", i).into(), index: i as i32, }, - ack_handle: Some(Arc::new(AckHandle::new(ack_tx))), + ack_handle: Some(Arc::new(AckHandle::new(ack_tx, None))), ..Default::default() } }) @@ -903,7 +978,7 @@ mod tests { offset: format!("offset_{}", i).into(), index: i as i32, }, - ack_handle: Some(Arc::new(AckHandle::new(ack_tx))), + ack_handle: Some(Arc::new(AckHandle::new(ack_tx, None))), ..Default::default() } }) @@ -992,7 +1067,7 @@ mod tests { offset: format!("offset_{}", i).into(), index: i as i32, }, - ack_handle: Some(Arc::new(AckHandle::new(ack_tx))), + ack_handle: Some(Arc::new(AckHandle::new(ack_tx, None))), ..Default::default() } }) @@ -1074,7 +1149,7 @@ mod tests { offset: format!("offset_{}", i).into(), index: i as i32, }, - ack_handle: Some(Arc::new(AckHandle::new(ack_tx))), + ack_handle: Some(Arc::new(AckHandle::new(ack_tx, None))), ..Default::default() } }) @@ -1188,7 +1263,7 @@ mod tests { index: i as i32, }, headers: Arc::new(headers), - ack_handle: Some(Arc::new(AckHandle::new(ack_tx))), + ack_handle: Some(Arc::new(AckHandle::new(ack_tx, None))), ..Default::default() } }) diff --git a/rust/numaflow-core/src/source.rs b/rust/numaflow-core/src/source.rs index bf7d90827d..31cfaed33a 100644 --- a/rust/numaflow-core/src/source.rs +++ b/rust/numaflow-core/src/source.rs @@ -6,6 +6,7 @@ use crate::config::pipeline::VERTEX_TYPE_SOURCE; use crate::config::{get_vertex_name, is_mono_vertex}; +use crate::shared::otel; use crate::error::{Error, Result}; use crate::message::{AckHandle, ReadAck}; use crate::metrics::{ @@ -28,6 +29,7 @@ use numaflow_pb::clients::source::source_client::SourceClient; use numaflow_pulsar::source::PulsarSource; use numaflow_sqs::source::SqsSource; use numaflow_throttling::RateLimiter; +use std::collections::HashMap; use std::sync::Arc; use std::sync::atomic::Ordering; use tokio::sync::OwnedSemaphorePermit; @@ -74,6 +76,61 @@ const ACK_RETRY_INTERVAL: u64 = 100; const ACK_RETRY_ATTEMPTS: usize = usize::MAX; const NACK_RETRY_ATTEMPTS: usize = 1200; // 100ms apart, total=2 minutes +/// Tracks per-message `numaflow.monovertex.source.dispatch` OTel spans keyed by message offset. +/// +/// The source creates a dispatch span per input message before tracker insert, transform, and +/// downstream send. On the success path, each span is `end()`ed either when the last downstream +/// message for that input offset is bypassed/sent or immediately after transform if that input +/// produced no outputs. Any spans that remain in the map at end-of-iteration (e.g., due to a +/// transformer error that breaks the outer loop before all messages are dispatched) are closed +/// by the RAII `Drop` impl — ensuring no span is leaked. +struct SourceDispatchSpans { + spans: HashMap, +} + +impl SourceDispatchSpans { + fn new() -> Self { + Self { + spans: HashMap::new(), + } + } + + fn insert(&mut self, offset: crate::message::Offset, cx: opentelemetry::Context) { + self.spans.insert(offset, cx); + } + + /// End dispatch spans for input offsets that produced no downstream messages. + fn end_without_outputs(&mut self, output_counts: &HashMap) { + let offsets_without_outputs: Vec<_> = self + .spans + .keys() + .filter(|offset| !output_counts.contains_key(*offset)) + .cloned() + .collect(); + + for offset in offsets_without_outputs { + self.end(&offset); + } + } + + /// End the dispatch span for a specific message (called as message is dispatched downstream). + fn end(&mut self, offset: &crate::message::Offset) { + use opentelemetry::trace::TraceContextExt as _; + if let Some(cx) = self.spans.remove(offset) { + cx.span().end(); + } + } +} + +impl Drop for SourceDispatchSpans { + fn drop(&mut self) { + use opentelemetry::trace::TraceContextExt as _; + for (_, cx) in self.spans.drain() { + cx.span().end(); + } + } +} + /// Represents the partition information returned by a source. /// Contains both the active partitions being processed and optionally the total number of partitions. #[derive(Debug, Clone, Default)] @@ -519,6 +576,25 @@ impl Source { let mut ack_handles = vec![]; let mut ack_batch = Vec::with_capacity(msgs_len); + + // Distributed tracing (MonoVertex only): hold per-message `source.dispatch` + // span contexts keyed by source offset. Spans are created before tracker insert + // and closed only after the last downstream message for that source offset is + // bypassed/sent. This keeps the span honest even when the transformer fans one + // input message out into multiple outputs. + // + // Any messages whose dispatch spans are still in the map at end-of-iteration + // (e.g., transformer error that breaks the outer loop) have their spans + // closed by the RAII guard when the map is dropped. + let mut dispatch_spans = SourceDispatchSpans::new(); + // Read-only parent contexts for `source.transform`; `dispatch_spans` remains the + // sole owner responsible for ending `source.dispatch`. + let mut dispatch_parent_contexts = if is_mono_vertex() && self.transformer.is_some() { + Some(HashMap::with_capacity(msgs_len)) + } else { + None + }; + for message in messages.iter_mut() { Self::record_partition_read_metrics( &pipeline_labels, @@ -528,7 +604,110 @@ impl Source { ); let (resp_ack_tx, resp_ack_rx) = oneshot::channel(); - message.ack_handle = Some(Arc::new(AckHandle::new(resp_ack_tx))); + + // Distributed tracing (MonoVertex only for now): + // - Create `numaflow.platform.process` root span for this message's full lifecycle. + // Parent: upstream trace context from message headers (W3C or B3), if present. + // The span is stored in AckHandle and dropped on ack, giving accurate duration. + // - Create `numaflow.monovertex.source.dispatch` child span covering per-message + // source-stage work (tracker insert + optional `source.transform` child span + + // watermark + downstream bypass/send). It is closed after the last message for + // this source offset leaves the source stage, or by the RAII guard on error. + // Note: this span measures the per-message source-stage dispatch work, NOT + // source read latency. Batch read duration is captured via metrics + // (record_batch_read_metrics above). + // - Inject `platform.process` context into sys_metadata["tracing"] so that map + // and sink become siblings of `source.dispatch` under `platform.process`. + let platform_span = if is_mono_vertex() { + use opentelemetry::trace::{SpanKind, TraceContextExt, Tracer}; + + let upstream_cx = + otel::extract_trace_context_from_headers(&message.headers); + let msg_id = message.offset.to_string(); + let platform_span = tracing::info_span!( + "numaflow.platform.process", + otel.kind = "INTERNAL", + { otel::ATTR_MESSAGING_SYSTEM } = "numaflow", + { otel::ATTR_MESSAGING_OPERATION_NAME } = "pipeline.process", + { otel::ATTR_MESSAGING_MESSAGE_ID } = %msg_id, + { otel::ATTR_NUMAFLOW_TOPOLOGY } = "monovertex", + { otel::ATTR_NUMAFLOW_PIPELINE_NAME } = + crate::config::get_pipeline_name(), + { otel::ATTR_NUMAFLOW_VERTEX_NAME } = + crate::config::get_vertex_name(), + ); + { + use tracing_opentelemetry::OpenTelemetrySpanExt; + let _ = platform_span.set_parent(upstream_cx); + } + + // Inject platform.process context into sys_metadata["tracing"] so + // downstream stages (map, sink) become siblings under this root. + let platform_cx = { + use tracing_opentelemetry::OpenTelemetrySpanExt; + platform_span.context() + }; + let metadata = message.metadata.get_or_insert_with(|| { + Arc::new(crate::metadata::Metadata::default()) + }); + otel::inject_context_into_metadata( + Arc::make_mut(metadata), + otel::TRACING_METADATA_KEY, + &platform_cx, + ); + + // Create source.dispatch as an OTel SDK span (child of platform.process). + // It stays alive until we dispatch this message downstream (or the RAII + // guard drops on error). Tracked by offset. + let tracer = opentelemetry::global::tracer("numaflow-core"); + let dispatch_span = tracer + .span_builder("numaflow.monovertex.source.dispatch") + .with_kind(SpanKind::Producer) + .with_attributes(vec![ + opentelemetry::KeyValue::new( + otel::ATTR_MESSAGING_SYSTEM, + "numaflow", + ), + opentelemetry::KeyValue::new( + otel::ATTR_MESSAGING_OPERATION_NAME, + "source.dispatch", + ), + opentelemetry::KeyValue::new( + otel::ATTR_MESSAGING_MESSAGE_ID, + msg_id, + ), + opentelemetry::KeyValue::new( + otel::ATTR_NUMAFLOW_TOPOLOGY, + "monovertex", + ), + opentelemetry::KeyValue::new( + otel::ATTR_NUMAFLOW_PIPELINE_NAME, + crate::config::get_pipeline_name(), + ), + opentelemetry::KeyValue::new( + otel::ATTR_NUMAFLOW_VERTEX_NAME, + crate::config::get_vertex_name(), + ), + ]) + .start_with_context(&tracer, &platform_cx); + let dispatch_cx = opentelemetry::Context::current().with_span(dispatch_span); + if let Some(ref mut parent_contexts) = dispatch_parent_contexts { + parent_contexts.insert(message.offset.clone(), dispatch_cx.clone()); + } + dispatch_spans.insert(message.offset.clone(), dispatch_cx); + + Some(platform_span) + } else { + None + }; + + let ack_handle = Arc::new(AckHandle::new(resp_ack_tx, None)); + if let Some(span) = platform_span { + // Store platform.process span in AckHandle so it ends when the message is + // fully processed (ack/nak fires on last Arc drop). + ack_handle.set_pipeline_span(span); + } + message.ack_handle = Some(ack_handle); // insert the offset and the ack one shot in the tracker. self.tracker.insert(message).await?; @@ -572,7 +751,11 @@ impl Source { let mut messages = match self.transformer.as_mut() { None => messages, Some(transformer) => match transformer - .transform_batch(messages, cln_token.clone()) + .transform_batch( + messages, + cln_token.clone(), + dispatch_parent_contexts.as_ref(), + ) .await { Ok(messages) => messages, @@ -594,6 +777,16 @@ impl Source { }, }; + let mut remaining_dispatches = HashMap::with_capacity(messages.len()); + for message in &messages { + *remaining_dispatches.entry(message.offset.clone()).or_insert(0usize) += 1; + } + + // If a source input produced no downstream messages (for example, the transformer + // filtered it out), close its dispatch span now so it does not stay open for the + // rest of the batch's watermark/send work. + dispatch_spans.end_without_outputs(&remaining_dispatches); + if let Some(watermark_handle) = self.watermark_handle.as_mut() { watermark_handle .generate_and_publish_source_watermark(&messages) @@ -608,6 +801,8 @@ impl Source { // write the messages to downstream. for message in messages { + let offset = message.offset.clone(); + let bypassed = if let Some(ref bypass_router) = bypass_router { bypass_router .try_bypass(message.clone()) @@ -623,7 +818,21 @@ impl Source { .await .expect("send should not fail"); } + + let should_end_dispatch = remaining_dispatches + .get_mut(&offset) + .map(|remaining| { + *remaining -= 1; + *remaining == 0 + }) + .unwrap_or(false); + if should_end_dispatch { + remaining_dispatches.remove(&offset); + dispatch_spans.end(&offset); + } } + // dispatch_spans drops here — any remaining (shouldn't happen on success path) + // get closed by the RAII Drop impl. } info!(status=?result, "Source stopped, waiting for inflight messages to be acked/nacked"); // wait for all the ack tasks to be completed before stopping the source, since we give diff --git a/rust/numaflow-core/src/transformer.rs b/rust/numaflow-core/src/transformer.rs index a3af1efb38..2cb1075847 100644 --- a/rust/numaflow-core/src/transformer.rs +++ b/rust/numaflow-core/src/transformer.rs @@ -2,6 +2,7 @@ use bytes::Bytes; use futures::stream::{self, StreamExt}; use numaflow_monitor::runtime; use numaflow_pb::clients::sourcetransformer::source_transform_client::SourceTransformClient; +use std::collections::HashMap; use std::time::Duration; use tokio::sync::{mpsc, oneshot}; use tokio_util::sync::CancellationToken; @@ -13,11 +14,12 @@ use crate::Result; use crate::config::pipeline::VERTEX_TYPE_SOURCE; use crate::config::{get_vertex_name, is_mono_vertex}; use crate::error::Error; -use crate::message::Message; +use crate::message::{Message, Offset}; use crate::metrics::{ PIPELINE_PARTITION_NAME_LABEL, monovertex_metrics, mvtx_forward_metric_labels, pipeline_metric_labels, pipeline_metrics, }; +use crate::shared::otel; use crate::tracker::Tracker; use crate::transformer::user_defined::UserDefinedTransformer; @@ -69,6 +71,68 @@ impl TransformerActor { } } +/// RAII guard for the per-input `source.transform` span. +/// +/// The span is a child of the input message's `source.dispatch` span and measures the transformer +/// UDF round-trip for that specific source message, not the whole batch. It closes on success, +/// error, or cancellation. +struct SourceTransformSpan(Option); + +impl SourceTransformSpan { + fn new(parent_cx: Option, msg_id: String) -> Self { + use opentelemetry::trace::{SpanKind, TraceContextExt as _, Tracer}; + + let Some(parent_cx) = parent_cx else { + return Self(None); + }; + + let tracer = opentelemetry::global::tracer("numaflow-core"); + let span = tracer + .span_builder("numaflow.monovertex.source.transform") + .with_kind(SpanKind::Internal) + .with_attributes(vec![ + opentelemetry::KeyValue::new(otel::ATTR_MESSAGING_SYSTEM, "numaflow"), + opentelemetry::KeyValue::new( + otel::ATTR_MESSAGING_OPERATION_NAME, + "source.transform", + ), + opentelemetry::KeyValue::new(otel::ATTR_MESSAGING_MESSAGE_ID, msg_id), + opentelemetry::KeyValue::new(otel::ATTR_NUMAFLOW_TOPOLOGY, "monovertex"), + opentelemetry::KeyValue::new( + otel::ATTR_NUMAFLOW_PIPELINE_NAME, + crate::config::get_pipeline_name(), + ), + opentelemetry::KeyValue::new( + otel::ATTR_NUMAFLOW_VERTEX_NAME, + crate::config::get_vertex_name(), + ), + ]) + .start_with_context(&tracer, &parent_cx); + Self(Some(opentelemetry::Context::current().with_span(span))) + } + + fn record_output_count(&self, output_count: usize) { + use opentelemetry::trace::TraceContextExt as _; + + if let Some(cx) = &self.0 { + cx.span().set_attribute(opentelemetry::KeyValue::new( + "numaflow.source.transform.output_count", + output_count as i64, + )); + } + } +} + +impl Drop for SourceTransformSpan { + fn drop(&mut self) { + use opentelemetry::trace::TraceContextExt as _; + + if let Some(cx) = self.0.take() { + cx.span().end(); + } + } +} + /// Transformer, transforms messages in a streaming fashion. #[derive(Clone)] pub(crate) struct Transformer { @@ -159,6 +223,7 @@ impl Transformer { &self, messages: Vec, cln_token: CancellationToken, + dispatch_parent_contexts: Option<&HashMap>, ) -> Result> { let batch_start_time = tokio::time::Instant::now(); let transform_handle = self.sender.clone(); @@ -204,11 +269,16 @@ impl Transformer { let transform_handle = transform_handle.clone(); let tracker = tracker.clone(); let hard_shutdown_token = hard_shutdown_token.clone(); + let source_transform_parent = dispatch_parent_contexts + .and_then(|parent_contexts| parent_contexts.get(&read_msg.offset).cloned()); async move { let offset = read_msg.offset.clone(); + let source_transform_span = + SourceTransformSpan::new(source_transform_parent, offset.to_string()); let transformed_messages = Transformer::transform(transform_handle, read_msg, hard_shutdown_token).await?; + source_transform_span.record_output_count(transformed_messages.len()); // update the tracker with the number of responses for each message tracker @@ -474,7 +544,7 @@ mod tests { } let transformed_messages = transformer - .transform_batch(messages, CancellationToken::new()) + .transform_batch(messages, CancellationToken::new(), None) .await?; for (i, transformed_message) in transformed_messages.iter().enumerate() { @@ -551,7 +621,7 @@ mod tests { }; let result = transformer - .transform_batch(vec![message], CancellationToken::new()) + .transform_batch(vec![message], CancellationToken::new(), None) .await; assert!(result.is_err(), "Expected an error due to panic"); assert!(result.unwrap_err().to_string().contains("panic")); diff --git a/rust/numaflow/Cargo.toml b/rust/numaflow/Cargo.toml index 8b7af39c18..360431fdd5 100644 --- a/rust/numaflow/Cargo.toml +++ b/rust/numaflow/Cargo.toml @@ -19,6 +19,10 @@ numaflow-daemon.workspace = true tokio.workspace = true tokio-util.workspace = true tracing.workspace = true +tracing-opentelemetry.workspace = true +opentelemetry = { workspace = true } +opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] } +opentelemetry-otlp = { workspace = true } rustls.workspace = true tracing-subscriber = { version = "0.3.20", features = ["env-filter", "fmt", "json"] } clap = "4.5.40" diff --git a/rust/numaflow/src/main.rs b/rust/numaflow/src/main.rs index e807c652cf..24e03a5e84 100644 --- a/rust/numaflow/src/main.rs +++ b/rust/numaflow/src/main.rs @@ -5,7 +5,7 @@ use std::error::Error; use tokio::task::JoinHandle; use tokio::{runtime, signal}; use tokio_util::sync::CancellationToken; -use tracing::{error, info, warn}; +use tracing::{error, info}; mod setup_tracing; @@ -16,7 +16,6 @@ const VERSION_INFO: &str = env!("NUMAFLOW_VERSION_INFO"); const ENV_MONO_VERTEX_NAME: &str = "NUMAFLOW_MONO_VERTEX_NAME"; fn main() { - setup_tracing::register(); // Setup the CryptoProvider (controls core cryptography used by rustls) for the process rustls::crypto::aws_lc_rs::default_provider() .install_default() @@ -36,15 +35,24 @@ fn main() { // section for the vertex. let cpu_core_count = env::var("NUMAFLOW_CPU_REQUEST").unwrap_or_else(|_| "1".into()); let worker_thread_count = cpu_core_count.parse::().inspect_err(|e| { - warn!(integer_conversion_error=?e, "The value of NUMAFLOW_CPU_REQUEST environment variable should be a valid unsigned integer. Worker thread count will be set to 1"); + // Use eprintln! because tracing subscriber is not yet initialized at this point + eprintln!("WARN: NUMAFLOW_CPU_REQUEST is not a valid unsigned integer ({e}). Worker thread count will be set to 1"); }).unwrap_or(1).max(1); + // Build the Tokio runtime before initializing tracing. The OTLP tonic/gRPC + // exporter needs a Tokio runtime for its background tasks. By entering the + // runtime context first, the tonic channel binds to this runtime. let rt = runtime::Builder::new_multi_thread() .enable_all() .worker_threads(worker_thread_count) .build() .unwrap(); + // Enter the runtime context so that tracing initialization (which may create + // a tonic gRPC channel for OTLP export) can use this runtime. + let _rt_guard = rt.enter(); + let tracer_provider = setup_tracing::register(); + info!( VERSION_INFO, tokio_worker_threads = worker_thread_count, @@ -53,12 +61,20 @@ fn main() { let cli = cmdline::root_cli(); - rt.block_on(async move { - if let Err(e) = run(cli).await { - error!("{e:?}"); - std::process::exit(1); - } - }); + let run_result = rt.block_on(async move { run(cli).await }); + + if let Err(ref e) = run_result { + error!("{e:?}"); + } + + // Flush buffered spans before process exit (this is done for both success and error paths). + if let Some(provider) = tracer_provider { + let _ = provider.shutdown(); + } + + if run_result.is_err() { + std::process::exit(1); + } info!("Exited."); } diff --git a/rust/numaflow/src/setup_tracing.rs b/rust/numaflow/src/setup_tracing.rs index afffb0c0b0..4c61b7bf26 100644 --- a/rust/numaflow/src/setup_tracing.rs +++ b/rust/numaflow/src/setup_tracing.rs @@ -53,10 +53,129 @@ fn report_panic(panic_info: &PanicHookInfo<'_>) { }; } -pub fn register() { - // Set up the tracing subscriber. RUST_LOG can be used to set the log level. - // The default log level is `info`. The `axum::rejection=trace` enables showing - // rejections from built-in extractors at `TRACE` level. +/// Build a sampler from standard OTel environment variables. +/// +/// - `OTEL_TRACES_SAMPLER`: sampler type (default: `parentbased_always_on`) +/// - `OTEL_TRACES_SAMPLER_ARG`: sampler argument (e.g., `0.1` for 10%) +/// +/// Supported samplers: `always_on`, `always_off`, `traceidratio`, +/// `parentbased_always_on`, `parentbased_always_off`, `parentbased_traceidratio`. +fn build_sampler() -> opentelemetry_sdk::trace::Sampler { + use opentelemetry_sdk::trace::Sampler; + + let sampler_name = + std::env::var("OTEL_TRACES_SAMPLER").unwrap_or_else(|_| "parentbased_always_on".into()); + let sampler_arg_raw = std::env::var("OTEL_TRACES_SAMPLER_ARG").ok(); + let sampler_arg = sampler_arg_raw + .as_deref() + .and_then(|v| v.parse::().ok()); + + let ratio_or_default = |sampler_kind: &str| { + sampler_arg.unwrap_or_else(|| { + if let Some(raw) = sampler_arg_raw.as_deref() { + eprintln!( + "[setup_tracing] Invalid OTEL_TRACES_SAMPLER_ARG='{raw}' for sampler '{sampler_kind}', defaulting ratio to 1.0" + ); + } + 1.0 + }) + }; + + let sampler = match sampler_name.as_str() { + "always_on" => Sampler::AlwaysOn, + "always_off" => Sampler::AlwaysOff, + "traceidratio" => Sampler::TraceIdRatioBased(ratio_or_default("traceidratio")), + "parentbased_always_on" => Sampler::ParentBased(Box::new(Sampler::AlwaysOn)), + "parentbased_always_off" => Sampler::ParentBased(Box::new(Sampler::AlwaysOff)), + "parentbased_traceidratio" => Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased( + ratio_or_default("parentbased_traceidratio"), + ))), + _ => { + eprintln!( + "[setup_tracing] Unknown sampler '{sampler_name}', defaulting to parentbased_always_on" + ); + Sampler::ParentBased(Box::new(Sampler::AlwaysOn)) + } + }; + + eprintln!( + "[setup_tracing] Sampler: {sampler_name}{}", + sampler_arg + .map(|r| format!(", ratio={r}")) + .unwrap_or_default() + ); + + sampler +} + +/// Initialize the OTLP tracing layer if `OTEL_EXPORTER_OTLP_ENDPOINT` is set. +/// Returns `None` if tracing is not configured (no env var), in which case +/// the subscriber runs with logging only. +fn init_otlp_layer( + service_name: String, +) -> Option<( + tracing_opentelemetry::OpenTelemetryLayer, + opentelemetry_sdk::trace::SdkTracerProvider, +)> +where + S: tracing::Subscriber + for<'span> tracing_subscriber::registry::LookupSpan<'span>, +{ + let otlp_endpoint = std::env::var("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT") + .or_else(|_| std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT")) + .ok()?; + + eprintln!( + "[setup_tracing] Configuring OTLP exporter: endpoint={otlp_endpoint}, service_name={service_name}" + ); + + use opentelemetry_otlp::WithExportConfig; + let exporter = match opentelemetry_otlp::SpanExporter::builder() + .with_tonic() + .with_endpoint(&otlp_endpoint) + .build() + { + Ok(e) => e, + Err(e) => { + eprintln!("[setup_tracing] Failed to create OTLP exporter: {e}"); + return None; + } + }; + + let sampler = build_sampler(); + let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder() + .with_batch_exporter(exporter) + .with_sampler(sampler) + .with_resource( + opentelemetry_sdk::Resource::builder() + .with_service_name(service_name) + .build(), + ) + .build(); + + use opentelemetry::trace::TracerProvider as _; + let tracer = tracer_provider.tracer("numaflow"); + + // Set the global tracer provider so OTel API users (e.g., per-message sink.write + // spans created via the OTel API directly) can access it. + // We clone here because we also return the provider for explicit shutdown. + opentelemetry::global::set_tracer_provider(tracer_provider.clone()); + + // Set W3C Trace Context propagator for context propagation via sys_metadata. + opentelemetry::global::set_text_map_propagator( + opentelemetry_sdk::propagation::TraceContextPropagator::new(), + ); + let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer); + + eprintln!("[setup_tracing] OTLP tracing ENABLED"); + + Some((otel_layer, tracer_provider)) +} + +/// Initialize the tracing subscriber with optional OTLP export. +/// Returns the `SdkTracerProvider` handle if OTLP is enabled — the caller +/// must keep it alive and call `.shutdown()` before process exit to flush +/// buffered spans. +pub fn register() -> Option { let debug_mode = std::env::var("NUMAFLOW_DEBUG").is_ok_and(|v| v.to_lowercase() == "true"); let default_log_level = if debug_mode { "debug,h2::codec=info" // "h2::codec" is too noisy @@ -64,15 +183,16 @@ pub fn register() { "info" }; + // Build filtering from default directives and allow `RUST_LOG` environment variable to override. let filter = EnvFilter::builder() .with_default_directive(default_log_level.parse().unwrap_or(Level::INFO.into())) - .from_env_lossy(); // Read RUST_LOG environment variable + .from_env_lossy(); - let layer = if debug_mode { - // Text format + let fmt_layer = if debug_mode { + // Log in a human-readable format for local debugging/development. fmt::layer().boxed() } else { - // JSON format, flattened + // Log in a JSON format with flattened event fields. fmt::layer() .with_ansi(false) .json() @@ -80,10 +200,24 @@ pub fn register() { .boxed() }; + let service_name = std::env::var("OTEL_SERVICE_NAME").unwrap_or_else(|_| "platform".into()); + let (otel_layer, tracer_provider) = match init_otlp_layer(service_name) { + Some((layer, provider)) => (Some(layer), Some(provider)), + None => (None, None), + }; + + // Only export spans (info_span!, tracing::Span) to the OTel layer, not log + // events (info!, error!, warn!). Without this filter, every log statement in the + // codebase becomes an OTel event, overwhelming the batch exporter at high throughput. + let otel_filter = tracing_subscriber::filter::filter_fn(|metadata| metadata.is_span()); + tracing_subscriber::registry() .with(filter) - .with(layer) + .with(fmt_layer) + .with(otel_layer.with_filter(otel_filter)) .init(); std::panic::set_hook(Box::new(report_panic)); + + tracer_provider }