From 87864c0502f38479a33683f62ac7aa9f46a220bb Mon Sep 17 00:00:00 2001 From: Brian Marks Date: Fri, 26 Jun 2026 13:40:57 -0400 Subject: [PATCH 01/17] build(data-pipeline): add tonic 0.14 for gRPC OTLP transport Adds tonic dependency with transport and codegen features required for implementing gRPC-based OTLP trace export. Also adds h2 and tokio-stream as dev dependencies for gRPC test infrastructure. Co-Authored-By: Claude Sonnet 4.6 --- Cargo.lock | 3 +++ libdd-data-pipeline/Cargo.toml | 6 ++++++ 2 files changed, 9 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 4763752c88..eb00734a20 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2990,6 +2990,7 @@ dependencies = [ "duplicate 2.0.1", "either", "getrandom 0.2.15", + "h2", "http", "http-body-util", "httpmock", @@ -3016,7 +3017,9 @@ dependencies = [ "sha2", "tempfile", "tokio", + "tokio-stream", "tokio-util", + "tonic", "tracing", "uuid", ] diff --git a/libdd-data-pipeline/Cargo.toml b/libdd-data-pipeline/Cargo.toml index ca4c4075e0..d62e209e23 100644 --- a/libdd-data-pipeline/Cargo.toml +++ b/libdd-data-pipeline/Cargo.toml @@ -46,6 +46,10 @@ libdd-tinybytes = { version = "1.1.1", path = "../libdd-tinybytes", features = [ "bytes_string", "serialization", ] } +tonic = { version = "0.14", default-features = false, features = [ + "transport", # Channel, Endpoint, Server + "codegen", # Grpc client, tonic::client::Grpc +] } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] tokio = { version = "1.23", features = ["time", "test-util"], default-features = false } @@ -83,6 +87,8 @@ tokio = { version = "1.23", features = [ "test-util", ], default-features = false } duplicate = "2.0.1" +h2 = "0.4" +tokio-stream = { version = "0.1", features = ["net"] } [features] default = ["https", "telemetry"] From de5097a12e1fc5c9dc72647f13961398ad79b1b2 Mon Sep 17 00:00:00 2001 From: Brian Marks Date: Fri, 26 Jun 2026 13:45:43 -0400 Subject: [PATCH 02/17] feat(data-pipeline): add OtlpProtocol::Grpc variant and OtlpGrpcTraceConfig Re-add OtlpProtocol::Grpc (previously removed) with support for gRPC as a valid export protocol alongside HTTP/JSON and HTTP/Protobuf. Add OtlpGrpcTraceConfig struct to hold gRPC-specific configuration (endpoint, headers, timeout, semantics flag). The Grpc variant now parses successfully from "grpc" string and is available for use by the gRPC exporter implementation (Task 3+). Co-Authored-By: Claude Sonnet 4.6 --- libdd-data-pipeline/src/otlp/config.rs | 67 ++++++++++++++++++++------ 1 file changed, 53 insertions(+), 14 deletions(-) diff --git a/libdd-data-pipeline/src/otlp/config.rs b/libdd-data-pipeline/src/otlp/config.rs index 76b98493be..79e3e3bc52 100644 --- a/libdd-data-pipeline/src/otlp/config.rs +++ b/libdd-data-pipeline/src/otlp/config.rs @@ -6,12 +6,16 @@ use http::HeaderMap; use std::time::Duration; -/// OTLP trace export protocol — selects the HTTP body encoding and `Content-Type`. +/// OTLP trace export protocol — selects the wire transport and body encoding. /// -/// Only the HTTP encodings libdatadog actually supports are representable. A `grpc` value (e.g. -/// resolved from the OTel-default `OTEL_EXPORTER_OTLP_PROTOCOL`) is rejected by -/// [`FromStr`](std::str::FromStr) rather than represented here, so an unsupported protocol can -/// never be constructed and silently mishandled downstream. +/// All three OTel-standard protocol strings parse successfully; the selection +/// controls which send path the exporter uses: +/// - `http/json` and `http/protobuf` → OTLP over HTTP/1.1 via +/// [`HttpClientCapability`](libdd_capabilities::HttpClientCapability). +/// - `grpc` → OTLP over HTTP/2 via a tonic [`Channel`](tonic::transport::Channel). +/// +/// Plaintext gRPC (`http://` scheme, port 4317) is supported. TLS gRPC +/// (`https://` scheme) is not yet implemented — use a TLS-terminating sidecar. #[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] pub enum OtlpProtocol { /// HTTP with a JSON body (`Content-Type: application/json`). The default. @@ -19,6 +23,9 @@ pub enum OtlpProtocol { HttpJson, /// HTTP with a protobuf body (`Content-Type: application/x-protobuf`). HttpProtobuf, + /// gRPC over HTTP/2. Protobuf-encoded body with 5-byte gRPC framing. + /// Default port is 4317. Only plaintext (`http://`) is supported. + Grpc, } impl std::str::FromStr for OtlpProtocol { @@ -27,11 +34,7 @@ impl std::str::FromStr for OtlpProtocol { match s { "http/json" => Ok(OtlpProtocol::HttpJson), "http/protobuf" => Ok(OtlpProtocol::HttpProtobuf), - // gRPC is a valid OTLP protocol in the OTel spec but is not implemented in - // libdatadog. Reject it explicitly so callers get a clean error at the parse - // boundary, rather than constructing an unsupported value that has to be guarded - // against everywhere downstream. - "grpc" => Err("OTLP gRPC export is not supported".to_string()), + "grpc" => Ok(OtlpProtocol::Grpc), other => Err(format!("unknown OTLP protocol: {other}")), } } @@ -40,24 +43,30 @@ impl std::str::FromStr for OtlpProtocol { impl OtlpProtocol { /// The HTTP `Content-Type` for this protocol's body encoding. Crate-internal: the public type /// is only constructed/selected by callers; encoding is the exporter's job. + /// Only called on the HTTP path; the gRPC path uses tonic's ProstCodec. pub(crate) fn content_type(&self) -> http::HeaderValue { + #[allow(clippy::unreachable)] match self { OtlpProtocol::HttpJson => libdd_common::header::APPLICATION_JSON, OtlpProtocol::HttpProtobuf => libdd_common::header::APPLICATION_PROTOBUF, + OtlpProtocol::Grpc => unreachable!("gRPC path does not call content_type()"), } } /// Encode the prost OTLP request to this protocol's wire format. Crate-internal so the /// third-party `serde_json::Error` does not leak into the public API. + /// Only called on the HTTP path; the gRPC path uses tonic's ProstCodec. pub(crate) fn encode( &self, req: &libdd_trace_utils::otlp_encoder::ProtoExportTraceServiceRequest, ) -> Result, serde_json::Error> { + #[allow(clippy::unreachable)] match self { OtlpProtocol::HttpJson => libdd_trace_utils::otlp_encoder::encode_otlp_json(req), OtlpProtocol::HttpProtobuf => { Ok(libdd_trace_utils::otlp_encoder::encode_otlp_protobuf(req)) } + OtlpProtocol::Grpc => unreachable!("gRPC path does not call encode()"), } } } @@ -99,10 +108,20 @@ mod tests { } #[test] - fn grpc_is_rejected_at_parse() { - // gRPC is unsupported, so it must not parse into a protocol: an unsupported value can - // never be constructed. - assert!(OtlpProtocol::from_str("grpc").is_err()); + fn grpc_parses_successfully() { + // gRPC is now a supported protocol — it must parse without error. + assert_eq!(OtlpProtocol::from_str("grpc").unwrap(), OtlpProtocol::Grpc); + } + + #[test] + fn grpc_parses_from_str() { + assert_eq!(OtlpProtocol::from_str("grpc").unwrap(), OtlpProtocol::Grpc); + } + + #[test] + fn grpc_config_is_send_sync() { + fn assert_send_sync() {} + assert_send_sync::(); } #[test] @@ -118,6 +137,26 @@ mod tests { } } +/// Parsed OTLP gRPC trace exporter configuration. +/// +/// The gRPC endpoint URL contains only scheme + host + port (e.g. +/// `http://localhost:4317`). The service path +/// `/opentelemetry.proto.collector.trace.v1.TraceService/Export` is +/// appended by the exporter. +#[derive(Clone, Debug)] +#[allow(dead_code)] +pub struct OtlpGrpcTraceConfig { + /// Full gRPC base URL, e.g. `http://localhost:4317`. + /// Must use `http://` scheme; `https://` (TLS) is not yet supported. + pub endpoint_url: String, + /// Custom key-value pairs forwarded as gRPC request metadata. + pub headers: Vec<(String, String)>, + /// Per-request timeout (applied via [`tokio::time::timeout`]). + pub timeout: Duration, + /// When `true`, omit DD-specific per-span attributes from the payload. + pub otel_trace_semantics_enabled: bool, +} + /// Parsed OTLP trace-metrics exporter configuration. #[derive(Clone, Debug)] pub struct OtlpMetricsConfig { From 646e19bc38b383c6027709f7c6316a937dc5e42c Mon Sep 17 00:00:00 2001 From: Brian Marks Date: Fri, 26 Jun 2026 13:56:26 -0400 Subject: [PATCH 03/17] feat(data-pipeline): implement OtlpGrpcTransport and send_otlp_traces_grpc Add grpc_exporter.rs with OtlpGrpcTransport, build_grpc_channel, and send_otlp_traces_grpc. Implements a minimal ProstCodecImpl since tonic 0.14 removed ProstCodec from its public API. All items are pub(crate); the dispatch wiring comes in the next task. Co-Authored-By: Claude Sonnet 4.6 --- libdd-data-pipeline/Cargo.toml | 2 + libdd-data-pipeline/src/otlp/grpc_exporter.rs | 291 ++++++++++++++++++ libdd-data-pipeline/src/otlp/mod.rs | 1 + 3 files changed, 294 insertions(+) create mode 100644 libdd-data-pipeline/src/otlp/grpc_exporter.rs diff --git a/libdd-data-pipeline/Cargo.toml b/libdd-data-pipeline/Cargo.toml index d62e209e23..90f62f8dd1 100644 --- a/libdd-data-pipeline/Cargo.toml +++ b/libdd-data-pipeline/Cargo.toml @@ -46,6 +46,7 @@ libdd-tinybytes = { version = "1.1.1", path = "../libdd-tinybytes", features = [ "bytes_string", "serialization", ] } +prost = "0.14.1" tonic = { version = "0.14", default-features = false, features = [ "transport", # Channel, Endpoint, Server "codegen", # Grpc client, tonic::client::Grpc @@ -85,6 +86,7 @@ tokio = { version = "1.23", features = [ "rt", "time", "test-util", + "macros", ], default-features = false } duplicate = "2.0.1" h2 = "0.4" diff --git a/libdd-data-pipeline/src/otlp/grpc_exporter.rs b/libdd-data-pipeline/src/otlp/grpc_exporter.rs new file mode 100644 index 0000000000..1afbc273dc --- /dev/null +++ b/libdd-data-pipeline/src/otlp/grpc_exporter.rs @@ -0,0 +1,291 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! OTLP gRPC trace exporter. +//! +//! Sends an [`ExportTraceServiceRequest`] over a tonic gRPC channel using +//! plaintext HTTP/2 (`http://` scheme). TLS (`https://`) is not yet supported. +//! +//! # gRPC framing +//! The inner [`ProstCodecImpl`] + tonic's [`Grpc`](tonic::client::Grpc) handle +//! the 5-byte frame prefix, protobuf encoding, and gRPC trailer parsing +//! automatically, using [`prost`] for message encoding/decoding. + +// These items are pub(crate) but not yet consumed by the trace exporter dispatch +// path — wired up in the next task. +#![allow(dead_code)] + +use super::config::OtlpGrpcTraceConfig; +use crate::trace_exporter::error::{BuilderErrorKind, RequestError, TraceExporterError}; +use libdd_trace_protobuf::opentelemetry::proto::collector::trace::v1::{ + ExportTraceServiceRequest, ExportTraceServiceResponse, +}; +use prost::Message as ProstMessage; +use std::marker::PhantomData; +use std::time::Duration; +use tonic::{ + client::Grpc, + codec::{Codec, DecodeBuf, Decoder, EncodeBuf, Encoder}, + metadata::{AsciiMetadataKey, AsciiMetadataValue}, + transport::{Channel, Endpoint}, + Code, Request, Status, +}; +use tracing::warn; + +/// gRPC path for the OTLP trace export RPC. +const GRPC_EXPORT_PATH: &str = "/opentelemetry.proto.collector.trace.v1.TraceService/Export"; + +// --------------------------------------------------------------------------- +// Prost codec implementation — tonic 0.14 removed ProstCodec from its public +// API; we provide a minimal replacement that satisfies tonic::codec::Codec. +// --------------------------------------------------------------------------- + +/// A [`tonic::codec::Codec`] that encodes and decodes prost messages. +#[derive(Clone, Default)] +pub(crate) struct ProstCodecImpl { + _phantom: PhantomData<(Enc, Dec)>, +} + +/// A prost message encoder that implements [`tonic::codec::Encoder`]. +pub(crate) struct ProstEncoder { + _phantom: PhantomData, +} + +impl Default for ProstEncoder { + fn default() -> Self { + Self { + _phantom: PhantomData, + } + } +} + +impl Encoder for ProstEncoder { + type Item = T; + type Error = Status; + + fn encode(&mut self, item: Self::Item, dst: &mut EncodeBuf<'_>) -> Result<(), Self::Error> { + item.encode(dst) + .map_err(|e| Status::internal(format!("Failed to encode protobuf message: {e}"))) + } +} + +/// A prost message decoder that implements [`tonic::codec::Decoder`]. +pub(crate) struct ProstDecoder { + _phantom: PhantomData, +} + +impl Default for ProstDecoder { + fn default() -> Self { + Self { + _phantom: PhantomData, + } + } +} + +impl Decoder for ProstDecoder { + type Item = T; + type Error = Status; + + fn decode(&mut self, src: &mut DecodeBuf<'_>) -> Result, Self::Error> { + use bytes::Buf as _; + let mut buf = bytes::BytesMut::with_capacity(src.remaining()); + buf.extend_from_slice(src.chunk()); + src.advance(src.remaining()); + match T::decode(buf) { + Ok(msg) => Ok(Some(msg)), + Err(e) => Err(Status::internal(format!( + "Failed to decode protobuf message: {e}" + ))), + } + } +} + +impl Codec for ProstCodecImpl +where + Enc: ProstMessage + Default + Send + 'static, + Dec: ProstMessage + Default + Send + 'static, +{ + type Encode = Enc; + type Decode = Dec; + type Encoder = ProstEncoder; + type Decoder = ProstDecoder; + + fn encoder(&mut self) -> Self::Encoder { + ProstEncoder::default() + } + + fn decoder(&mut self) -> Self::Decoder { + ProstDecoder::default() + } +} + +// --------------------------------------------------------------------------- +// OtlpGrpcTransport +// --------------------------------------------------------------------------- + +/// A connected gRPC transport for OTLP trace export. +/// +/// Holds the per-export config and a lazily-connected tonic [`Channel`]. +/// Clone is cheap — `Channel` is internally reference-counted. +#[derive(Clone, Debug)] +pub(crate) struct OtlpGrpcTransport { + pub(crate) config: OtlpGrpcTraceConfig, + /// Lazily-connected HTTP/2 channel. tonic establishes the TCP connection + /// on the first RPC call and maintains a connection pool afterwards. + pub(crate) channel: Channel, +} + +/// Build a lazy tonic gRPC channel for `endpoint_url`. +/// +/// The channel does **not** connect eagerly — TCP setup happens on the first +/// RPC call. `timeout` is stored on the channel and applied per-request. +/// +/// Only `http://` scheme endpoints are accepted; `https://` is not yet +/// supported (use a TLS-terminating sidecar for encrypted connections). +pub(crate) fn build_grpc_channel( + endpoint_url: &str, + timeout: Duration, +) -> Result { + if endpoint_url.starts_with("https://") { + return Err(TraceExporterError::Builder( + BuilderErrorKind::InvalidConfiguration( + "gRPC TLS (https://) is not yet supported; use http:// and a \ + TLS-terminating sidecar if encryption is required" + .to_string(), + ), + )); + } + let channel = Endpoint::from_shared(endpoint_url.to_owned()) + .map_err(|e| TraceExporterError::Builder(BuilderErrorKind::InvalidUri(e.to_string())))? + .timeout(timeout) + .connect_lazy(); // Non-async; connects on first RPC call. + Ok(channel) +} + +/// Send an OTLP trace export request over gRPC. +/// +/// Uses the `transport.channel` tonic channel with [`ProstCodecImpl`] for +/// encoding/decoding. Custom metadata headers and the test session token +/// are attached to the request metadata. +/// +/// # Errors +/// +/// Returns [`TraceExporterError::Io`] on timeout or connection failure, +/// [`TraceExporterError::Request`] on non-OK gRPC status codes. +pub(crate) async fn send_otlp_traces_grpc( + transport: &OtlpGrpcTransport, + test_token: Option<&str>, + request: ExportTraceServiceRequest, +) -> Result<(), TraceExporterError> { + let mut client = Grpc::new(transport.channel.clone()); + + let mut req = Request::new(request); + attach_metadata(&mut req, &transport.config.headers, test_token); + + let path = http::uri::PathAndQuery::from_static(GRPC_EXPORT_PATH); + let codec = ProstCodecImpl::::default(); + + tokio::time::timeout(transport.config.timeout, client.unary(req, path, codec)) + .await + .map_err(|_| TraceExporterError::Io(std::io::Error::from(std::io::ErrorKind::TimedOut)))? + .map(|_response| ()) + .map_err(grpc_status_to_error) +} + +/// Attach `headers` and the optional test-session token to gRPC request metadata. +fn attach_metadata( + req: &mut Request, + headers: &[(String, String)], + test_token: Option<&str>, +) { + for (k, v) in headers { + match ( + k.parse::(), + v.parse::(), + ) { + (Ok(key), Ok(val)) => { + req.metadata_mut().insert(key, val); + } + _ => warn!("Skipping invalid gRPC metadata header: {k:?}={v:?}"), + } + } + if let Some(token) = test_token { + match token.parse::() { + Ok(val) => { + req.metadata_mut().insert( + AsciiMetadataKey::from_static("x-datadog-test-session-token"), + val, + ); + } + Err(_) => warn!("Skipping invalid test-session token: {token:?}"), + } + } +} + +fn grpc_status_to_error(status: Status) -> TraceExporterError { + match status.code() { + Code::Ok => { + // Ok status should never reach the error path — tonic's `unary` + // returns Ok(response) on Code::Ok, so map_err is not called. + TraceExporterError::Io(std::io::Error::other( + "gRPC Ok status reached error handler (unexpected)", + )) + } + Code::Unavailable | Code::DeadlineExceeded => TraceExporterError::Io(std::io::Error::new( + std::io::ErrorKind::ConnectionRefused, + status.message(), + )), + _ => TraceExporterError::Request(RequestError::new( + http::StatusCode::INTERNAL_SERVER_ERROR, + status.message(), + )), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + + #[test] + fn build_grpc_channel_rejects_https() { + let err = build_grpc_channel("https://localhost:4317", Duration::from_secs(10)); + assert!( + err.is_err(), + "https:// should be rejected until TLS is implemented" + ); + let msg = err.unwrap_err().to_string(); + assert!(msg.contains("TLS") || msg.contains("https"), "got: {msg}"); + } + + /// `connect_lazy()` internally registers with the Tokio reactor — wrap in a runtime. + #[tokio::test] + async fn build_grpc_channel_accepts_http() { + // connect_lazy() doesn't dial — this should always succeed. + let result = build_grpc_channel("http://localhost:4317", Duration::from_secs(10)); + assert!(result.is_ok(), "http:// must produce a channel: {result:?}"); + } + + #[test] + fn build_grpc_channel_rejects_malformed_url() { + let err = build_grpc_channel("not a url", Duration::from_secs(10)); + assert!(err.is_err()); + } + + /// `connect_lazy()` requires a Tokio runtime — wrap in `#[tokio::test]`. + #[tokio::test] + async fn grpc_transport_is_clone() { + let channel = build_grpc_channel("http://localhost:4317", Duration::from_secs(5)) + .expect("http channel must build"); + let transport = OtlpGrpcTransport { + config: OtlpGrpcTraceConfig { + endpoint_url: "http://localhost:4317".to_string(), + headers: vec![], + timeout: Duration::from_secs(5), + otel_trace_semantics_enabled: false, + }, + channel, + }; + let _clone = transport.clone(); + } +} diff --git a/libdd-data-pipeline/src/otlp/mod.rs b/libdd-data-pipeline/src/otlp/mod.rs index 0bda6b1b7e..4f9b1576d0 100644 --- a/libdd-data-pipeline/src/otlp/mod.rs +++ b/libdd-data-pipeline/src/otlp/mod.rs @@ -29,6 +29,7 @@ pub mod config; pub mod exporter; +pub(crate) mod grpc_exporter; pub mod metrics; pub use config::{OtlpMetricsConfig, OtlpProtocol, OtlpTraceConfig}; From 27e2e5eaab54a19439393c29f52dde7b2d0c086c Mon Sep 17 00:00:00 2001 From: Brian Marks Date: Fri, 26 Jun 2026 13:59:24 -0400 Subject: [PATCH 04/17] fix(data-pipeline): re-export gRPC transport types from otlp module Downstream tasks depend on OtlpGrpcTraceConfig, build_grpc_channel, send_otlp_traces_grpc, and OtlpGrpcTransport. Add public and crate-private re-exports to libdd-data-pipeline's otlp module. Co-Authored-By: Claude Sonnet 4.6 --- libdd-data-pipeline/src/otlp/mod.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/libdd-data-pipeline/src/otlp/mod.rs b/libdd-data-pipeline/src/otlp/mod.rs index 4f9b1576d0..b0d8f68245 100644 --- a/libdd-data-pipeline/src/otlp/mod.rs +++ b/libdd-data-pipeline/src/otlp/mod.rs @@ -32,7 +32,11 @@ pub mod exporter; pub(crate) mod grpc_exporter; pub mod metrics; -pub use config::{OtlpMetricsConfig, OtlpProtocol, OtlpTraceConfig}; +#[allow(unused_imports)] +pub use config::{OtlpGrpcTraceConfig, OtlpMetricsConfig, OtlpProtocol, OtlpTraceConfig}; pub use exporter::send_otlp_traces_http; pub use libdd_trace_utils::otlp_encoder::{map_traces_to_otlp, OtlpResourceInfo}; pub use metrics::OtlpStatsExporter; + +#[allow(unused_imports)] +pub(crate) use grpc_exporter::{build_grpc_channel, send_otlp_traces_grpc, OtlpGrpcTransport}; From 9de5c10c8c19c97bde531d2a3ef117aa6b7d0936 Mon Sep 17 00:00:00 2001 From: Brian Marks Date: Fri, 26 Jun 2026 14:05:12 -0400 Subject: [PATCH 05/17] feat(data-pipeline): add OtlpExportMode enum; dispatch to gRPC send path Introduces `OtlpExportMode` to unify HTTP and gRPC OTLP transport selection. Renames `TraceExporter.otlp_config` to `otlp: Option`, updates the dispatch in `send_trace_chunks_inner` to match on the enum variant, and adds `send_otlp_grpc_inner` that calls `send_otlp_traces_grpc` with the pre-built `OtlpGrpcTransport`. Builder updated to wrap the HTTP config in `OtlpExportMode::Http`; gRPC wiring follows in Task 5. Co-Authored-By: Claude Sonnet 4.6 --- .../src/trace_exporter/builder.rs | 22 +++-- libdd-data-pipeline/src/trace_exporter/mod.rs | 95 +++++++++++++++++-- 2 files changed, 99 insertions(+), 18 deletions(-) diff --git a/libdd-data-pipeline/src/trace_exporter/builder.rs b/libdd-data-pipeline/src/trace_exporter/builder.rs index b0286df2ea..61f84315c9 100644 --- a/libdd-data-pipeline/src/trace_exporter/builder.rs +++ b/libdd-data-pipeline/src/trace_exporter/builder.rs @@ -13,9 +13,9 @@ use crate::trace_exporter::TelemetryConfig; #[cfg(not(target_arch = "wasm32"))] use crate::trace_exporter::TraceExporterWorkers; use crate::trace_exporter::{ - add_path, StatsComputationStatus, TelemetryInstrumentationSessions, TraceExporter, - TraceExporterError, TraceExporterInputFormat, TraceExporterOutputFormat, TraceSerializer, - TracerMetadata, INFO_ENDPOINT, + add_path, OtlpExportMode, StatsComputationStatus, TelemetryInstrumentationSessions, + TraceExporter, TraceExporterError, TraceExporterInputFormat, TraceExporterOutputFormat, + TraceSerializer, TracerMetadata, INFO_ENDPOINT, }; use arc_swap::ArcSwap; use libdd_capabilities::{HttpClientCapability, MaybeSend, SleepCapability}; @@ -577,12 +577,14 @@ impl TraceExporterBuilder { // `self.otlp_protocol` is always an HTTP encoding here: gRPC is rejected at the parse // boundary (`OtlpProtocol::from_str`) and so can never be constructed. - let otlp_config = self.otlp_endpoint.map(|url| OtlpTraceConfig { - endpoint_url: url, - headers: build_otlp_header_map(self.otlp_headers), - timeout: otlp_timeout, - protocol: self.otlp_protocol, - otel_trace_semantics_enabled: self.otel_trace_semantics_enabled, + let otlp = self.otlp_endpoint.map(|url| { + OtlpExportMode::Http(OtlpTraceConfig { + endpoint_url: url, + headers: build_otlp_header_map(self.otlp_headers), + timeout: otlp_timeout, + protocol: self.otlp_protocol, + otel_trace_semantics_enabled: self.otel_trace_semantics_enabled, + }) }); let otlp_metrics_config = self.otlp_metrics_endpoint.map(|url| OtlpMetricsConfig { @@ -705,7 +707,7 @@ impl TraceExporterBuilder { agent_payload_response_version: self .agent_rates_payload_version_enabled .then(AgentResponsePayloadVersion::new), - otlp_config, + otlp, trace_filterer: ArcSwap::from_pointee(TraceFilterer::with_empty_conf()), otlp_stats_enabled, }) diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index a3a19aed78..8a412095ae 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -16,7 +16,10 @@ use self::metrics::MetricsEmitter; use self::stats::StatsComputationStatus; use self::trace_serializer::TraceSerializer; use crate::agent_info::ResponseObserver; -use crate::otlp::{map_traces_to_otlp, send_otlp_traces_http, OtlpResourceInfo, OtlpTraceConfig}; +use crate::otlp::{ + map_traces_to_otlp, send_otlp_traces_grpc, send_otlp_traces_http, OtlpGrpcTransport, + OtlpResourceInfo, OtlpTraceConfig, +}; #[cfg(feature = "telemetry")] use crate::telemetry::{SendPayloadTelemetry, TelemetryClient}; use crate::trace_exporter::agent_response::{ @@ -58,6 +61,20 @@ use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; use tracing::{debug, error, warn}; +/// Selects the OTLP export transport for a [] instance. +/// +/// Only one variant is active per instance; mutual exclusivity is enforced at +/// build time. +// The Grpc variant is constructed in the builder (Task 5); allow dead_code until then. +#[allow(dead_code)] +#[derive(Debug)] +pub(crate) enum OtlpExportMode { + /// OTLP over HTTP/1.1 (JSON or protobuf body). + Http(OtlpTraceConfig), + /// OTLP over HTTP/2 via gRPC. + Grpc(OtlpGrpcTransport), +} + const INFO_ENDPOINT: &str = "/info"; const V04_TRACES_ENDPOINT: &str = "/v0.4/traces"; const V05_TRACES_ENDPOINT: &str = "/v0.5/traces"; @@ -219,8 +236,8 @@ pub struct TraceExporter< #[cfg(not(target_arch = "wasm32"))] workers: TraceExporterWorkers, agent_payload_response_version: Option, - /// When set, traces are exported via OTLP HTTP/JSON instead of the Datadog agent. - otlp_config: Option, + /// When set, traces are exported via OTLP (HTTP or gRPC) instead of the Datadog agent. + otlp: Option, trace_filterer: ArcSwap, /// When true, span stats are computed and exported as OTLP metrics. The concentrator is /// started at build time, so agent-driven stats (de)activation in `check_agent_info` is @@ -620,6 +637,58 @@ impl( + &self, + traces: Vec>>, + transport: &OtlpGrpcTransport, + ) -> Result { + let resource_info = { + let mut r = OtlpResourceInfo::default(); + r.service = self.metadata.service.clone(); + r.env = self.metadata.env.clone(); + r.app_version = self.metadata.app_version.clone(); + r.language = self.metadata.language.clone(); + r.tracer_version = self.metadata.tracer_version.clone(); + r.runtime_id = self.metadata.runtime_id.clone(); + r.client_computed_stats = self.otlp_stats_enabled; + r + }; + + // map_traces_to_otlp returns the prost ExportTraceServiceRequest directly — + // the same type expected by send_otlp_traces_grpc; no conversion needed. + let proto_request = map_traces_to_otlp( + traces, + &resource_info, + transport.config.otel_trace_semantics_enabled, + ); + + // Optionally inject the client-computed-stats metadata header. + let effective_transport; + let transport_to_use = if self.otlp_stats_enabled { + effective_transport = { + let mut c = transport.clone(); + c.config.headers.push(( + "datadog-client-computed-stats".to_string(), + "yes".to_string(), + )); + c + }; + &effective_transport + } else { + transport + }; + + send_otlp_traces_grpc( + transport_to_use, + self.endpoint.test_token.as_deref(), + proto_request, + ) + .await?; + + Ok(AgentResponse::Unchanged) + } + /// Send traces payload to agent with retry and telemetry reporting async fn send_traces_with_telemetry( &self, @@ -682,12 +751,22 @@ impl { + libdd_trace_utils::span::trace_utils::drop_chunks(&mut traces); + if traces.is_empty() { + return Ok(AgentResponse::Unchanged); + } + return self.send_otlp_traces_inner(traces, config).await; + } + Some(OtlpExportMode::Grpc(transport)) => { + libdd_trace_utils::span::trace_utils::drop_chunks(&mut traces); + if traces.is_empty() { + return Ok(AgentResponse::Unchanged); + } + return self.send_otlp_grpc_inner(traces, transport).await; } - return self.send_otlp_traces_inner(traces, config).await; + None => {} } // Snapshot the effective format once so the serializer and the URL agree even if From 986f8429d1066aa7d09da1d607ae1bb3f8dff2c2 Mon Sep 17 00:00:00 2001 From: Brian Marks Date: Fri, 26 Jun 2026 14:11:53 -0400 Subject: [PATCH 06/17] feat(data-pipeline): build OtlpGrpcTransport when Grpc protocol + endpoint configured Update TraceExporterBuilder to dispatch to the correct OtlpExportMode at build time based on otlp_protocol. When OtlpProtocol::Grpc is set and an endpoint URL is provided, call build_grpc_channel and wrap the result in OtlpExportMode::Grpc(OtlpGrpcTransport); otherwise fall back to OtlpExportMode::Http. When no otlp_endpoint is set, otlp is None and the exporter uses the Datadog agent path. Also update the set_otlp_protocol doc comment to reflect that grpc is now a supported protocol string. Three new builder tests cover: grpc+endpoint succeeds, grpc without endpoint falls back to agent, and https:// with grpc is rejected at build. Co-Authored-By: Claude Sonnet 4.6 --- .../src/trace_exporter/builder.rs | 84 ++++++++++++++++--- 1 file changed, 71 insertions(+), 13 deletions(-) diff --git a/libdd-data-pipeline/src/trace_exporter/builder.rs b/libdd-data-pipeline/src/trace_exporter/builder.rs index 61f84315c9..cc82925b67 100644 --- a/libdd-data-pipeline/src/trace_exporter/builder.rs +++ b/libdd-data-pipeline/src/trace_exporter/builder.rs @@ -3,7 +3,10 @@ use crate::agent_info::AgentInfoFetcher; use crate::otlp::config::{OtlpProtocol, DEFAULT_OTLP_TIMEOUT}; -use crate::otlp::{OtlpMetricsConfig, OtlpResourceInfo, OtlpTraceConfig}; +use crate::otlp::{ + build_grpc_channel, OtlpGrpcTraceConfig, OtlpGrpcTransport, OtlpMetricsConfig, + OtlpResourceInfo, OtlpTraceConfig, +}; #[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))] use crate::telemetry::TelemetryClientBuilder; use crate::trace_exporter::agent_response::AgentResponsePayloadVersion; @@ -374,10 +377,10 @@ impl TraceExporterBuilder { self } - /// Selects the OTLP export protocol: [`OtlpProtocol::HttpJson`] (default) or - /// [`OtlpProtocol::HttpProtobuf`]. The host language resolves this from - /// `OTEL_EXPORTER_OTLP_TRACES_PROTOCOL` / `OTEL_EXPORTER_OTLP_PROTOCOL`; a `grpc` value is - /// unsupported and is rejected when parsed into [`OtlpProtocol`], so it never reaches here. + /// Selects the OTLP export protocol. The host language resolves this from + /// `OTEL_EXPORTER_OTLP_TRACES_PROTOCOL` / `OTEL_EXPORTER_OTLP_PROTOCOL`. All three OTel + /// protocol strings (`http/json`, `http/protobuf`, `grpc`) are valid. Plaintext gRPC only; + /// `https://` endpoints are rejected at [`build`](Self::build) time. pub fn set_otlp_protocol(&mut self, protocol: OtlpProtocol) -> &mut Self { self.otlp_protocol = protocol; self @@ -575,17 +578,28 @@ impl TraceExporterBuilder { .map(Duration::from_millis) .unwrap_or(DEFAULT_OTLP_TIMEOUT); - // `self.otlp_protocol` is always an HTTP encoding here: gRPC is rejected at the parse - // boundary (`OtlpProtocol::from_str`) and so can never be constructed. - let otlp = self.otlp_endpoint.map(|url| { - OtlpExportMode::Http(OtlpTraceConfig { - endpoint_url: url, - headers: build_otlp_header_map(self.otlp_headers), + let otlp = match self.otlp_endpoint { + Some(ref url) if self.otlp_protocol == OtlpProtocol::Grpc => { + let channel = build_grpc_channel(url, otlp_timeout)?; + Some(OtlpExportMode::Grpc(OtlpGrpcTransport { + config: OtlpGrpcTraceConfig { + endpoint_url: url.clone(), + headers: self.otlp_headers.clone(), + timeout: otlp_timeout, + otel_trace_semantics_enabled: self.otel_trace_semantics_enabled, + }, + channel, + })) + } + Some(ref url) => Some(OtlpExportMode::Http(OtlpTraceConfig { + endpoint_url: url.clone(), + headers: build_otlp_header_map(self.otlp_headers.clone()), timeout: otlp_timeout, protocol: self.otlp_protocol, otel_trace_semantics_enabled: self.otel_trace_semantics_enabled, - }) - }); + })), + None => None, + }; let otlp_metrics_config = self.otlp_metrics_endpoint.map(|url| OtlpMetricsConfig { endpoint_url: url, @@ -887,4 +901,48 @@ mod tests { "http://127.0.0.1:8126/v0.4/traces" ); } + + #[cfg_attr(miri, ignore)] + #[test] + fn build_with_grpc_protocol_and_endpoint_succeeds() { + let mut builder = TraceExporterBuilder::default(); + builder + .set_otlp_endpoint("http://localhost:4317") + .set_otlp_protocol(crate::otlp::OtlpProtocol::Grpc); + // connect_lazy() doesn't dial, so this must build successfully even without a server. + let result = builder.build::(); + assert!( + result.is_ok(), + "gRPC protocol + endpoint should build successfully: {result:?}" + ); + } + + #[cfg_attr(miri, ignore)] + #[test] + fn build_with_grpc_protocol_no_endpoint_uses_agent_path() { + // A configured gRPC protocol without an OTLP endpoint is inert — the exporter + // falls back to the normal Datadog agent path. + let mut builder = TraceExporterBuilder::default(); + builder + .set_url("http://localhost:8126") + .set_otlp_protocol(crate::otlp::OtlpProtocol::Grpc); + let result = builder.build::(); + assert!( + result.is_ok(), + "gRPC protocol without endpoint must use agent path, not fail: {result:?}" + ); + } + + #[test] + fn build_with_grpc_https_endpoint_rejected() { + let mut builder = TraceExporterBuilder::default(); + builder + .set_otlp_endpoint("https://collector.example.com:4317") + .set_otlp_protocol(crate::otlp::OtlpProtocol::Grpc); + let result = builder.build::(); + assert!( + result.is_err(), + "https:// gRPC should be rejected until TLS is implemented" + ); + } } From 335a0238b97396bfa37256683633c2520bfc5c55 Mon Sep 17 00:00:00 2001 From: Brian Marks Date: Fri, 26 Jun 2026 14:15:14 -0400 Subject: [PATCH 07/17] feat(data-pipeline-ffi): accept 'grpc' as valid OTLP protocol in FFI Update doc comment, inline comment, and tests for ddog_trace_exporter_config_set_otlp_protocol to reflect that "grpc" is now a supported value. Replaces set_otlp_protocol_rejects_grpc_and_unknown with set_otlp_protocol_accepts_all_three_protocols and set_otlp_protocol_rejects_unknown; updates config_otlp_protocol_test to assert "grpc" -> success with OtlpProtocol::Grpc stored. Co-Authored-By: Claude Sonnet 4.6 --- libdd-data-pipeline-ffi/src/trace_exporter.rs | 65 +++++++++++++------ 1 file changed, 46 insertions(+), 19 deletions(-) diff --git a/libdd-data-pipeline-ffi/src/trace_exporter.rs b/libdd-data-pipeline-ffi/src/trace_exporter.rs index 8cb1c5eb37..36776fda70 100644 --- a/libdd-data-pipeline-ffi/src/trace_exporter.rs +++ b/libdd-data-pipeline-ffi/src/trace_exporter.rs @@ -503,15 +503,17 @@ pub unsafe extern "C" fn ddog_trace_exporter_config_set_otlp_endpoint( ) } -/// Sets the OTLP export protocol. Accepts the OTel-standard values `http/json` (default) or -/// `http/protobuf`; `grpc` is rejected as not yet supported. The host language resolves the value -/// (e.g. from `OTEL_EXPORTER_OTLP_TRACES_PROTOCOL`). +/// Sets the OTLP export protocol. Accepts all three OTel-standard values: +/// - `http/json` (default) — OTLP over HTTP with JSON body +/// - `http/protobuf` — OTLP over HTTP with protobuf body +/// - `grpc` — OTLP over HTTP/2 (plaintext only; `https://` is not yet supported) /// -/// Has no effect unless an OTLP endpoint is also configured via -/// `ddog_trace_exporter_config_set_otlp_endpoint`; without one, traces are sent to the -/// Datadog agent and this protocol selection is ignored. +/// The host language resolves the value (e.g. from `OTEL_EXPORTER_OTLP_TRACES_PROTOCOL`). /// -/// Returns `None` on success, `ErrorCode::InvalidArgument` for a null config or an unaccepted +/// Has no effect unless an OTLP endpoint is configured via +/// `ddog_trace_exporter_config_set_otlp_endpoint`. +/// +/// Returns `None` on success, `ErrorCode::InvalidArgument` for a null config or an unrecognized /// value, and `ErrorCode::InvalidInput` for a non-UTF-8 string. #[no_mangle] pub unsafe extern "C" fn ddog_trace_exporter_config_set_otlp_protocol( @@ -524,9 +526,9 @@ pub unsafe extern "C" fn ddog_trace_exporter_config_set_otlp_protocol( Ok(s) => s, Err(e) => return Some(e), }; - // `FromStr` is the single source of truth for string -> OtlpProtocol. It accepts only - // the supported HTTP encodings (`http/json`, `http/protobuf`); `grpc` and any unknown - // value are rejected with an error, so an unsupported protocol can never be stored. + // `FromStr` is the single source of truth for string -> OtlpProtocol. It accepts all + // three OTel-standard encodings (`http/json`, `http/protobuf`, `grpc`); any unknown + // value is rejected with an error, so an unsupported protocol can never be stored. match value.parse::() { Ok(p) => { handle.otlp_protocol = Some(p); @@ -1362,14 +1364,17 @@ mod tests { Some(OtlpProtocol::HttpProtobuf) ); - // "grpc" → InvalidArgument + // "grpc" → success (gRPC is now supported) let mut config = Some(TraceExporterConfig::default()); let error = ddog_trace_exporter_config_set_otlp_protocol( config.as_mut(), CharSlice::from("grpc"), ); - assert_eq!(error.as_ref().unwrap().code, ErrorCode::InvalidArgument); - ddog_trace_exporter_error_free(error); + assert_eq!(error, None); + assert_eq!( + config.as_ref().unwrap().otlp_protocol, + Some(OtlpProtocol::Grpc) + ); // Garbage value → InvalidArgument let mut config = Some(TraceExporterConfig::default()); @@ -1407,17 +1412,39 @@ mod tests { } #[test] - fn set_otlp_protocol_rejects_grpc_and_unknown() { - let mut cfg = TraceExporterConfig::default(); - for bad in ["grpc", "nonsense"] { + fn set_otlp_protocol_accepts_all_three_protocols() { + use libdd_data_pipeline::OtlpProtocol; + for (input, expected) in [ + ("http/json", OtlpProtocol::HttpJson), + ("http/protobuf", OtlpProtocol::HttpProtobuf), + ("grpc", OtlpProtocol::Grpc), + ] { + let mut cfg = TraceExporterConfig::default(); let err = unsafe { - ddog_trace_exporter_config_set_otlp_protocol(Some(&mut cfg), CharSlice::from(bad)) + ddog_trace_exporter_config_set_otlp_protocol(Some(&mut cfg), CharSlice::from(input)) }; - assert!(err.is_some(), "expected error for {bad}"); - assert_eq!(cfg.otlp_protocol, None, "{bad} must not be stored"); + assert!(err.is_none(), "expected success for {input}: {err:?}"); + assert_eq!( + cfg.otlp_protocol, + Some(expected), + "wrong protocol for {input}" + ); } } + #[test] + fn set_otlp_protocol_rejects_unknown() { + let mut cfg = TraceExporterConfig::default(); + let err = unsafe { + ddog_trace_exporter_config_set_otlp_protocol( + Some(&mut cfg), + CharSlice::from("nonsense"), + ) + }; + assert!(err.is_some(), "expected error for unknown protocol"); + assert_eq!(cfg.otlp_protocol, None, "must not be stored on error"); + } + #[cfg(all(feature = "catch_panic", panic = "unwind"))] #[test] fn catch_panic_test() { From 6a9b1d9dbf807ccdff877cf0821a23a567dcdf13 Mon Sep 17 00:00:00 2001 From: Brian Marks Date: Fri, 26 Jun 2026 14:23:01 -0400 Subject: [PATCH 08/17] fix(data-pipeline): call Grpc::ready() before unary() to satisfy tower Buffer poll contract Tower's Buffer service requires poll_ready/poll_reserve to be called before call/send_item. tonic 0.14's Grpc::unary does not call poll_ready internally, so callers must invoke client.ready().await first. Without this, the exporter panicked with "send_item called without first calling poll_reserve" on the first gRPC send. Also removes the now-stale #![allow(dead_code)] annotation since the gRPC transport is fully wired up. Co-Authored-By: Claude Sonnet 4.6 --- libdd-data-pipeline/src/otlp/grpc_exporter.rs | 26 ++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/libdd-data-pipeline/src/otlp/grpc_exporter.rs b/libdd-data-pipeline/src/otlp/grpc_exporter.rs index 1afbc273dc..0bbac8b543 100644 --- a/libdd-data-pipeline/src/otlp/grpc_exporter.rs +++ b/libdd-data-pipeline/src/otlp/grpc_exporter.rs @@ -11,10 +11,6 @@ //! the 5-byte frame prefix, protobuf encoding, and gRPC trailer parsing //! automatically, using [`prost`] for message encoding/decoding. -// These items are pub(crate) but not yet consumed by the trace exporter dispatch -// path — wired up in the next task. -#![allow(dead_code)] - use super::config::OtlpGrpcTraceConfig; use crate::trace_exporter::error::{BuilderErrorKind, RequestError, TraceExporterError}; use libdd_trace_protobuf::opentelemetry::proto::collector::trace::v1::{ @@ -185,11 +181,23 @@ pub(crate) async fn send_otlp_traces_grpc( let path = http::uri::PathAndQuery::from_static(GRPC_EXPORT_PATH); let codec = ProstCodecImpl::::default(); - tokio::time::timeout(transport.config.timeout, client.unary(req, path, codec)) - .await - .map_err(|_| TraceExporterError::Io(std::io::Error::from(std::io::ErrorKind::TimedOut)))? - .map(|_response| ()) - .map_err(grpc_status_to_error) + // Tower's `Buffer` service (used inside tonic's `Channel`) requires + // `poll_ready` to be called and return `Ready` before `call`. tonic's + // `Grpc::ready()` drives that poll loop for us. + tokio::time::timeout(transport.config.timeout, async { + client.ready().await.map_err(|e| { + TraceExporterError::Io(std::io::Error::other(format!( + "gRPC channel not ready: {e}" + ))) + })?; + client + .unary(req, path, codec) + .await + .map(|_response| ()) + .map_err(grpc_status_to_error) + }) + .await + .map_err(|_| TraceExporterError::Io(std::io::Error::from(std::io::ErrorKind::TimedOut)))? } /// Attach `headers` and the optional test-session token to gRPC request metadata. From daf3bf121614dd918aa1cad6d717b0e63841adeb Mon Sep 17 00:00:00 2001 From: Brian Marks Date: Fri, 26 Jun 2026 14:23:14 -0400 Subject: [PATCH 09/17] test(data-pipeline): end-to-end gRPC OTLP trace export test with in-process h2 server Adds two tests in test_trace_exporter_otlp_grpc.rs: - grpc_export_sends_decodable_request: spins up an in-process h2 server, configures a TraceExporter with OtlpProtocol::Grpc, sends a msgpack-encoded trace via spawn_blocking, and asserts the server received a decodable ExportTraceServiceRequest with the expected service.name attribute. - grpc_protocol_string_parses: verifies OtlpProtocol::from_str("grpc") returns OtlpProtocol::Grpc, covering the FromStr path without FFI ceremony. Co-Authored-By: Claude Sonnet 4.6 --- .../tests/test_trace_exporter_otlp_grpc.rs | 175 ++++++++++++++++++ 1 file changed, 175 insertions(+) create mode 100644 libdd-data-pipeline/tests/test_trace_exporter_otlp_grpc.rs diff --git a/libdd-data-pipeline/tests/test_trace_exporter_otlp_grpc.rs b/libdd-data-pipeline/tests/test_trace_exporter_otlp_grpc.rs new file mode 100644 index 0000000000..5456d6ac60 --- /dev/null +++ b/libdd-data-pipeline/tests/test_trace_exporter_otlp_grpc.rs @@ -0,0 +1,175 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! End-to-end test for OTLP gRPC trace export. +//! +//! Starts an in-process HTTP/2 gRPC server using the `h2` crate, configures +//! a [`TraceExporter`] with `OtlpProtocol::Grpc`, sends a trace, and asserts +//! that the server received a valid [`ExportTraceServiceRequest`] containing +//! the expected span. + +#[cfg(test)] +mod grpc_export_tests { + use bytes::Bytes; + use h2::server; + use libdd_capabilities_impl::NativeCapabilities; + use libdd_data_pipeline::{trace_exporter::TraceExporterBuilder, OtlpProtocol}; + use libdd_trace_protobuf::opentelemetry::proto::{ + collector::trace::v1::{ExportTraceServiceRequest, ExportTraceServiceResponse}, + common::v1::any_value::Value, + }; + use libdd_trace_utils::test_utils::create_test_json_span; + use prost::Message; + use serde_json::json; + use tokio::net::TcpListener; + use tokio::sync::oneshot; + use tokio::task; + + /// Starts a minimal HTTP/2 gRPC server that accepts exactly one Export call. + /// + /// Returns `(port, rx)` where `rx` receives the decoded + /// `ExportTraceServiceRequest` after the server handles the first request. + async fn start_grpc_test_server() -> (u16, oneshot::Receiver) { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let port = listener.local_addr().unwrap().port(); + let (tx, rx) = oneshot::channel::(); + + tokio::spawn(async move { + let (socket, _) = listener.accept().await.unwrap(); + let mut h2_conn = server::handshake(socket).await.unwrap(); + + if let Some(result) = h2_conn.accept().await { + let (request, mut respond) = result.unwrap(); + + // Collect the full request body. The chunk length must be captured + // before `extend_from_slice` consumes it to satisfy the borrow checker. + let mut body = request.into_body(); + let mut frame_data: Vec = Vec::new(); + while let Some(chunk) = body.data().await { + let chunk = chunk.unwrap(); + let len = chunk.len(); + frame_data.extend_from_slice(&chunk); + body.flow_control().release_capacity(len).ok(); + } + + // Decode the gRPC frame: skip the 5-byte prefix (1 compression flag + 4 length). + let decoded = if frame_data.len() > 5 { + ExportTraceServiceRequest::decode(&frame_data[5..]).ok() + } else { + None + }; + if let Some(req) = decoded { + let _ = tx.send(req); + } + + // Send gRPC success response: 200 headers + protobuf body + grpc-status trailer. + let response_proto = ExportTraceServiceResponse::default(); + let proto_bytes = response_proto.encode_to_vec(); + let mut frame = Vec::with_capacity(5 + proto_bytes.len()); + frame.push(0u8); // no compression + frame.extend_from_slice(&(proto_bytes.len() as u32).to_be_bytes()); + frame.extend_from_slice(&proto_bytes); + + let response = http::Response::builder() + .status(200) + .header("content-type", "application/grpc") + .body(()) + .unwrap(); + let mut send_stream = respond.send_response(response, false).unwrap(); + send_stream.send_data(Bytes::from(frame), false).unwrap(); + + // gRPC-status trailer (trailing HEADERS frame with END_STREAM). + let mut trailers = http::HeaderMap::new(); + trailers.insert("grpc-status", "0".parse().unwrap()); + send_stream.send_trailers(trailers).unwrap(); + } + + // Drive the connection to completion so the client receives all frames + // before the TCP socket closes. + while h2_conn.accept().await.is_some() {} + }); + + (port, rx) + } + + #[cfg_attr(miri, ignore)] + #[tokio::test] + async fn grpc_export_sends_decodable_request() { + let (port, rx) = start_grpc_test_server().await; + + let endpoint = format!("http://127.0.0.1:{port}"); + + // TraceExporter::send internally drives a tokio runtime; use spawn_blocking + // so it does not block the test's async runtime. + let task_result = task::spawn_blocking(move || { + let mut builder = TraceExporterBuilder::default(); + builder + .set_otlp_endpoint(&endpoint) + .set_otlp_protocol(OtlpProtocol::Grpc) + .set_language("test-lang") + .set_tracer_version("1.0") + .set_env("grpc-test-env") + .set_service("grpc-test-svc"); + let exporter = builder.build::().expect("build"); + + let mut span = create_test_json_span(1234, 12342, 12341, 1, false); + span["service"] = json!("grpc-test-svc"); + span["name"] = json!("grpc_span"); + let data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); + exporter.send(data.as_ref()).expect("send ok"); + }) + .await; + + assert!( + task_result.is_ok(), + "exporter task panicked: {task_result:?}" + ); + + // Wait for the server to receive and decode the request (5 second timeout). + let received = tokio::time::timeout(std::time::Duration::from_secs(5), rx) + .await + .expect("server did not receive request within 5s") + .expect("server channel closed without sending"); + + // Validate the decoded request contains the expected span. + assert!( + !received.resource_spans.is_empty(), + "expected at least one ResourceSpans" + ); + let service_name = received + .resource_spans + .first() + .and_then(|rs| rs.resource.as_ref()) + .and_then(|r| { + r.attributes.iter().find_map(|kv| { + if kv.key == "service.name" { + kv.value + .as_ref() + .and_then(|v| v.value.as_ref()) + .and_then(|v| match v { + Value::StringValue(s) => Some(s.as_str()), + _ => None, + }) + } else { + None + } + }) + }); + assert_eq!( + service_name, + Some("grpc-test-svc"), + "service.name attribute not found or wrong value" + ); + } + + /// Verify the protocol string "grpc" is accepted by `OtlpProtocol`'s `FromStr` impl. + #[test] + fn grpc_protocol_string_parses() { + use std::str::FromStr; + let protocol = OtlpProtocol::from_str("grpc"); + assert!( + matches!(protocol, Ok(OtlpProtocol::Grpc)), + "expected OtlpProtocol::Grpc, got {protocol:?}" + ); + } +} From 99b62905e75a34305a3cc21c282f9bbb073766d8 Mon Sep 17 00:00:00 2001 From: Brian Marks Date: Fri, 26 Jun 2026 15:05:14 -0400 Subject: [PATCH 10/17] chore(data-pipeline): remove stale allow attrs and duplicate parse tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix doc placeholder in OtlpExportMode: [] → [TraceExporter] - Remove duplicate grpc parse tests (keep grpc_parses_successfully) - Remove #[allow(dead_code)] from OtlpExportMode (now wired up) - Remove #[allow(unused_imports)] from otlp/mod.rs re-exports - Mark OtlpGrpcTraceConfig.endpoint_url with #[allow(dead_code)] (unused by gRPC path) Co-Authored-By: Claude Sonnet 4.6 --- libdd-data-pipeline/src/otlp/config.rs | 7 +------ libdd-data-pipeline/src/otlp/mod.rs | 2 -- libdd-data-pipeline/src/trace_exporter/mod.rs | 4 +--- .../tests/test_trace_exporter_otlp_grpc.rs | 11 ----------- 4 files changed, 2 insertions(+), 22 deletions(-) diff --git a/libdd-data-pipeline/src/otlp/config.rs b/libdd-data-pipeline/src/otlp/config.rs index 79e3e3bc52..c2bf04eabc 100644 --- a/libdd-data-pipeline/src/otlp/config.rs +++ b/libdd-data-pipeline/src/otlp/config.rs @@ -113,11 +113,6 @@ mod tests { assert_eq!(OtlpProtocol::from_str("grpc").unwrap(), OtlpProtocol::Grpc); } - #[test] - fn grpc_parses_from_str() { - assert_eq!(OtlpProtocol::from_str("grpc").unwrap(), OtlpProtocol::Grpc); - } - #[test] fn grpc_config_is_send_sync() { fn assert_send_sync() {} @@ -144,10 +139,10 @@ mod tests { /// `/opentelemetry.proto.collector.trace.v1.TraceService/Export` is /// appended by the exporter. #[derive(Clone, Debug)] -#[allow(dead_code)] pub struct OtlpGrpcTraceConfig { /// Full gRPC base URL, e.g. `http://localhost:4317`. /// Must use `http://` scheme; `https://` (TLS) is not yet supported. + #[allow(dead_code)] pub endpoint_url: String, /// Custom key-value pairs forwarded as gRPC request metadata. pub headers: Vec<(String, String)>, diff --git a/libdd-data-pipeline/src/otlp/mod.rs b/libdd-data-pipeline/src/otlp/mod.rs index b0d8f68245..347d92c6ba 100644 --- a/libdd-data-pipeline/src/otlp/mod.rs +++ b/libdd-data-pipeline/src/otlp/mod.rs @@ -32,11 +32,9 @@ pub mod exporter; pub(crate) mod grpc_exporter; pub mod metrics; -#[allow(unused_imports)] pub use config::{OtlpGrpcTraceConfig, OtlpMetricsConfig, OtlpProtocol, OtlpTraceConfig}; pub use exporter::send_otlp_traces_http; pub use libdd_trace_utils::otlp_encoder::{map_traces_to_otlp, OtlpResourceInfo}; pub use metrics::OtlpStatsExporter; -#[allow(unused_imports)] pub(crate) use grpc_exporter::{build_grpc_channel, send_otlp_traces_grpc, OtlpGrpcTransport}; diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index 8a412095ae..d497499991 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -61,12 +61,10 @@ use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; use tracing::{debug, error, warn}; -/// Selects the OTLP export transport for a [] instance. +/// Selects the OTLP export transport for a [`TraceExporter`] instance. /// /// Only one variant is active per instance; mutual exclusivity is enforced at /// build time. -// The Grpc variant is constructed in the builder (Task 5); allow dead_code until then. -#[allow(dead_code)] #[derive(Debug)] pub(crate) enum OtlpExportMode { /// OTLP over HTTP/1.1 (JSON or protobuf body). diff --git a/libdd-data-pipeline/tests/test_trace_exporter_otlp_grpc.rs b/libdd-data-pipeline/tests/test_trace_exporter_otlp_grpc.rs index 5456d6ac60..359f7a71f3 100644 --- a/libdd-data-pipeline/tests/test_trace_exporter_otlp_grpc.rs +++ b/libdd-data-pipeline/tests/test_trace_exporter_otlp_grpc.rs @@ -161,15 +161,4 @@ mod grpc_export_tests { "service.name attribute not found or wrong value" ); } - - /// Verify the protocol string "grpc" is accepted by `OtlpProtocol`'s `FromStr` impl. - #[test] - fn grpc_protocol_string_parses() { - use std::str::FromStr; - let protocol = OtlpProtocol::from_str("grpc"); - assert!( - matches!(protocol, Ok(OtlpProtocol::Grpc)), - "expected OtlpProtocol::Grpc, got {protocol:?}" - ); - } } From ca48ccecec242f88d04c4a943d2fc3b8068c8ddf Mon Sep 17 00:00:00 2001 From: Brian Marks Date: Fri, 26 Jun 2026 16:19:54 -0400 Subject: [PATCH 11/17] fix(data-pipeline): gate gRPC OTLP transport off wasm32; stabilize miri/coverage - Move tonic to a cfg(not(target_arch = "wasm32")) dependency and gate the grpc_exporter module, OtlpExportMode::Grpc, the builder construction arm, and send_otlp_grpc_inner behind the same cfg. tonic's transport (hyper/tokio/socket2) does not build for wasm32, which broke `cargo check -p libdd-data-pipeline --target wasm32-unknown-unknown --no-default-features`. - Add #[cfg_attr(miri, ignore)] to build_with_grpc_https_endpoint_rejected, matching its sibling builder tests: build() starts the shared runtime whose worker hits a syscall Miri cannot execute. - Raise the e2e gRPC test's per-request timeout (set_connection_timeout) so the heavily instrumented coverage build does not time out the in-process round trip. Co-Authored-By: Claude Sonnet 4.6 (1M context) --- libdd-data-pipeline/Cargo.toml | 10 ++++++---- libdd-data-pipeline/src/otlp/mod.rs | 3 +++ libdd-data-pipeline/src/trace_exporter/builder.rs | 11 +++++++---- libdd-data-pipeline/src/trace_exporter/mod.rs | 13 ++++++++----- .../tests/test_trace_exporter_otlp_grpc.rs | 3 +++ 5 files changed, 27 insertions(+), 13 deletions(-) diff --git a/libdd-data-pipeline/Cargo.toml b/libdd-data-pipeline/Cargo.toml index 90f62f8dd1..d826e4b4e4 100644 --- a/libdd-data-pipeline/Cargo.toml +++ b/libdd-data-pipeline/Cargo.toml @@ -47,14 +47,16 @@ libdd-tinybytes = { version = "1.1.1", path = "../libdd-tinybytes", features = [ "serialization", ] } prost = "0.14.1" -tonic = { version = "0.14", default-features = false, features = [ - "transport", # Channel, Endpoint, Server - "codegen", # Grpc client, tonic::client::Grpc -] } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] tokio = { version = "1.23", features = ["time", "test-util"], default-features = false } libdd-capabilities-impl = { version = "2.0.0", path = "../libdd-capabilities-impl", default-features = false } +# tonic's transport (hyper/tokio/socket2) does not build for wasm32, so the gRPC +# OTLP transport is gated off wasm targets along with the code that uses it. +tonic = { version = "0.14", default-features = false, features = [ + "transport", # Channel, Endpoint, Server + "codegen", # Grpc client, tonic::client::Grpc +] } [target.'cfg(target_arch = "wasm32")'.dependencies] getrandom = { version = "0.2", features = ["js"] } diff --git a/libdd-data-pipeline/src/otlp/mod.rs b/libdd-data-pipeline/src/otlp/mod.rs index 347d92c6ba..bf428a8c09 100644 --- a/libdd-data-pipeline/src/otlp/mod.rs +++ b/libdd-data-pipeline/src/otlp/mod.rs @@ -29,6 +29,8 @@ pub mod config; pub mod exporter; +// gRPC OTLP export depends on tonic, which does not build for wasm32. +#[cfg(not(target_arch = "wasm32"))] pub(crate) mod grpc_exporter; pub mod metrics; @@ -37,4 +39,5 @@ pub use exporter::send_otlp_traces_http; pub use libdd_trace_utils::otlp_encoder::{map_traces_to_otlp, OtlpResourceInfo}; pub use metrics::OtlpStatsExporter; +#[cfg(not(target_arch = "wasm32"))] pub(crate) use grpc_exporter::{build_grpc_channel, send_otlp_traces_grpc, OtlpGrpcTransport}; diff --git a/libdd-data-pipeline/src/trace_exporter/builder.rs b/libdd-data-pipeline/src/trace_exporter/builder.rs index cc82925b67..a53b18c5e2 100644 --- a/libdd-data-pipeline/src/trace_exporter/builder.rs +++ b/libdd-data-pipeline/src/trace_exporter/builder.rs @@ -3,10 +3,10 @@ use crate::agent_info::AgentInfoFetcher; use crate::otlp::config::{OtlpProtocol, DEFAULT_OTLP_TIMEOUT}; -use crate::otlp::{ - build_grpc_channel, OtlpGrpcTraceConfig, OtlpGrpcTransport, OtlpMetricsConfig, - OtlpResourceInfo, OtlpTraceConfig, -}; +use crate::otlp::{OtlpMetricsConfig, OtlpResourceInfo, OtlpTraceConfig}; +// gRPC OTLP export depends on tonic, which does not build for wasm32. +#[cfg(not(target_arch = "wasm32"))] +use crate::otlp::{build_grpc_channel, OtlpGrpcTraceConfig, OtlpGrpcTransport}; #[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))] use crate::telemetry::TelemetryClientBuilder; use crate::trace_exporter::agent_response::AgentResponsePayloadVersion; @@ -579,6 +579,7 @@ impl TraceExporterBuilder { .unwrap_or(DEFAULT_OTLP_TIMEOUT); let otlp = match self.otlp_endpoint { + #[cfg(not(target_arch = "wasm32"))] Some(ref url) if self.otlp_protocol == OtlpProtocol::Grpc => { let channel = build_grpc_channel(url, otlp_timeout)?; Some(OtlpExportMode::Grpc(OtlpGrpcTransport { @@ -933,6 +934,8 @@ mod tests { ); } + // build() spins up the shared runtime, whose worker hits a syscall Miri can't execute. + #[cfg_attr(miri, ignore)] #[test] fn build_with_grpc_https_endpoint_rejected() { let mut builder = TraceExporterBuilder::default(); diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index d497499991..62d0922663 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -16,10 +16,10 @@ use self::metrics::MetricsEmitter; use self::stats::StatsComputationStatus; use self::trace_serializer::TraceSerializer; use crate::agent_info::ResponseObserver; -use crate::otlp::{ - map_traces_to_otlp, send_otlp_traces_grpc, send_otlp_traces_http, OtlpGrpcTransport, - OtlpResourceInfo, OtlpTraceConfig, -}; +use crate::otlp::{map_traces_to_otlp, send_otlp_traces_http, OtlpResourceInfo, OtlpTraceConfig}; +// gRPC OTLP export depends on tonic, which does not build for wasm32. +#[cfg(not(target_arch = "wasm32"))] +use crate::otlp::{send_otlp_traces_grpc, OtlpGrpcTransport}; #[cfg(feature = "telemetry")] use crate::telemetry::{SendPayloadTelemetry, TelemetryClient}; use crate::trace_exporter::agent_response::{ @@ -69,7 +69,8 @@ use tracing::{debug, error, warn}; pub(crate) enum OtlpExportMode { /// OTLP over HTTP/1.1 (JSON or protobuf body). Http(OtlpTraceConfig), - /// OTLP over HTTP/2 via gRPC. + /// OTLP over HTTP/2 via gRPC. Unavailable on wasm32 (tonic does not build there). + #[cfg(not(target_arch = "wasm32"))] Grpc(OtlpGrpcTransport), } @@ -636,6 +637,7 @@ impl( &self, traces: Vec>>, @@ -757,6 +759,7 @@ impl { libdd_trace_utils::span::trace_utils::drop_chunks(&mut traces); if traces.is_empty() { diff --git a/libdd-data-pipeline/tests/test_trace_exporter_otlp_grpc.rs b/libdd-data-pipeline/tests/test_trace_exporter_otlp_grpc.rs index 359f7a71f3..8147982cd3 100644 --- a/libdd-data-pipeline/tests/test_trace_exporter_otlp_grpc.rs +++ b/libdd-data-pipeline/tests/test_trace_exporter_otlp_grpc.rs @@ -106,6 +106,9 @@ mod grpc_export_tests { builder .set_otlp_endpoint(&endpoint) .set_otlp_protocol(OtlpProtocol::Grpc) + // Generous per-request timeout so the heavily instrumented coverage + // build does not time out the in-process gRPC round trip. + .set_connection_timeout(Some(60_000)) .set_language("test-lang") .set_tracer_version("1.0") .set_env("grpc-test-env") From 08cd15aa54f5d39ebbef3c1c28d7ea1be841c754 Mon Sep 17 00:00:00 2001 From: Brian Marks Date: Fri, 26 Jun 2026 16:38:54 -0400 Subject: [PATCH 12/17] perf(data-pipeline): avoid per-export gRPC transport clone; add tests + benchmark - Attach the `datadog-client-computed-stats` marker as request metadata inside send_otlp_traces_grpc (new client_computed_stats arg) instead of cloning the whole OtlpGrpcTransport (and its header Vec) on every export. - Add unit tests for grpc_status_to_error (transient -> Io, application -> Request) and attach_metadata (valid header + token, invalid header skipped, stats marker). - Add a criterion benchmark (benches/otlp_grpc_export.rs) for the gRPC wire-frame encoding and end-to-end native-spans -> framed-wire preparation across trace sizes. Co-Authored-By: Claude Sonnet 4.6 (1M context) --- libdd-data-pipeline/Cargo.toml | 5 + .../benches/otlp_grpc_export.rs | 115 ++++++++++++++++++ libdd-data-pipeline/src/otlp/grpc_exporter.rs | 91 +++++++++++++- libdd-data-pipeline/src/trace_exporter/mod.rs | 22 +--- 4 files changed, 212 insertions(+), 21 deletions(-) create mode 100644 libdd-data-pipeline/benches/otlp_grpc_export.rs diff --git a/libdd-data-pipeline/Cargo.toml b/libdd-data-pipeline/Cargo.toml index d826e4b4e4..ff42441419 100644 --- a/libdd-data-pipeline/Cargo.toml +++ b/libdd-data-pipeline/Cargo.toml @@ -70,6 +70,11 @@ name = "trace_buffer" harness = false path = "benches/trace_buffer.rs" +[[bench]] +name = "otlp_grpc_export" +harness = false +path = "benches/otlp_grpc_export.rs" + [dev-dependencies] libdd-capabilities-impl = { version = "2.0.0", path = "../libdd-capabilities-impl" } libdd-log = { path = "../libdd-log" } diff --git a/libdd-data-pipeline/benches/otlp_grpc_export.rs b/libdd-data-pipeline/benches/otlp_grpc_export.rs new file mode 100644 index 0000000000..33e6d4c3cf --- /dev/null +++ b/libdd-data-pipeline/benches/otlp_grpc_export.rs @@ -0,0 +1,115 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Benchmarks for the OTLP gRPC export hot path. +//! +//! The gRPC exporter turns native trace chunks into the length-prefixed gRPC +//! wire frame that tonic's codec puts on the socket once per export. The prost +//! protobuf encoding is shared with the HTTP/protobuf path (already covered by +//! `libdd-trace-utils/benches/otlp_encoding.rs`); these benches measure the +//! gRPC-specific framing on top of it, plus the full native-spans -> wire-frame +//! preparation across trace sizes so the per-span cost is visible. + +use criterion::{black_box, criterion_group, criterion_main, BatchSize, Criterion}; +use libdd_trace_utils::msgpack_decoder; +use libdd_trace_utils::otlp_encoder::{map_traces_to_otlp, OtlpResourceInfo}; +use prost::Message; +use serde_json::{json, Value}; + +/// A realistic OTLP-bound span: a handful of string `meta` tags and a couple of +/// numeric `metrics`, so the per-span attribute work (the dominant cost) is +/// exercised. Mirrors the fixture in `libdd-trace-utils/benches/otlp_encoding.rs`. +fn generate_spans(num_spans: usize, trace_id: u64) -> Vec { + let root_span_id = 100_000_000_000 + (trace_id % 1_000_000); + (0..num_spans) + .map(|i| { + let span_id = root_span_id + i as u64; + let is_root = i == 0; + let parent_id = if is_root { 0 } else { root_span_id }; + let mut meta = json!({ + "http.method": "GET", + "http.url": "https://example.com/api/v1/users/12345", + "http.status_code": "200", + "env": "production", + "version": "1.2.3", + "component": "net/http", + }); + if is_root { + meta["_dd.p.tid"] = json!("5b8efff798038103"); + } + json!({ + "service": "bench-service", + "name": "http.request", + "resource": "GET /api/v1/users", + "trace_id": trace_id, + "span_id": span_id, + "parent_id": parent_id, + "start": 1_544_712_660_000_000_000_i64 + i as i64, + "duration": 1_000_000, + "error": 0, + "meta": meta, + "metrics": { "_sampling_priority_v1": 1, "_dd.top_level": 1 }, + "type": "web", + }) + }) + .collect() +} + +fn resource_info() -> OtlpResourceInfo { + // `OtlpResourceInfo` is `#[non_exhaustive]`, so build via Default + field assignment. + let mut info = OtlpResourceInfo::default(); + info.service = "bench-service".to_string(); + info.env = "production".to_string(); + info.app_version = "1.2.3".to_string(); + info.language = "rust".to_string(); + info.tracer_version = "9.9.9".to_string(); + info.runtime_id = "11111111-2222-3333-4444-555555555555".to_string(); + info +} + +/// Frame protobuf bytes exactly as gRPC does: a 1-byte compression flag and a +/// 4-byte big-endian length prefix, then the message body. +fn grpc_frame(body: &[u8]) -> Vec { + let mut framed = Vec::with_capacity(5 + body.len()); + framed.push(0u8); // compression flag: 0 = uncompressed + framed.extend_from_slice(&(body.len() as u32).to_be_bytes()); + framed.extend_from_slice(body); + framed +} + +pub fn grpc_export_benches(c: &mut Criterion) { + let info = resource_info(); + + for &num_spans in &[1usize, 1000usize] { + let id = format!("1x{num_spans}"); + let bytes = + rmp_serde::to_vec(&vec![generate_spans(num_spans, 100_000_000_000)]).expect("fixture"); + let (spans, _) = + msgpack_decoder::v04::from_slice(bytes.as_slice()).expect("decode fixture"); + let req = map_traces_to_otlp(spans.clone(), &info, false); + + // Encode-only: prost OTLP IR -> gRPC wire frame (what the codec emits per export). + c.bench_function(&format!("grpc/encode_framed/{id}"), |b| { + b.iter(|| { + let body = black_box(&req).encode_to_vec(); + black_box(grpc_frame(&body)) + }) + }); + + // End-to-end: native spans -> mapped OTLP IR -> gRPC wire frame. + c.bench_function(&format!("grpc/e2e_framed/{id}"), |b| { + b.iter_batched( + || spans.clone(), + |s| { + let req = map_traces_to_otlp(s, &info, false); + let body = req.encode_to_vec(); + black_box(grpc_frame(&body)) + }, + BatchSize::SmallInput, + ) + }); + } +} + +criterion_group!(benches, grpc_export_benches); +criterion_main!(benches); diff --git a/libdd-data-pipeline/src/otlp/grpc_exporter.rs b/libdd-data-pipeline/src/otlp/grpc_exporter.rs index 0bbac8b543..eb5e1d1a11 100644 --- a/libdd-data-pipeline/src/otlp/grpc_exporter.rs +++ b/libdd-data-pipeline/src/otlp/grpc_exporter.rs @@ -31,6 +31,10 @@ use tracing::warn; /// gRPC path for the OTLP trace export RPC. const GRPC_EXPORT_PATH: &str = "/opentelemetry.proto.collector.trace.v1.TraceService/Export"; +/// Metadata header signalling that the client already computed APM stats, so the +/// agent does not recompute them. Attached per-request when stats are enabled. +const CLIENT_COMPUTED_STATS_HEADER: &str = "datadog-client-computed-stats"; + // --------------------------------------------------------------------------- // Prost codec implementation — tonic 0.14 removed ProstCodec from its public // API; we provide a minimal replacement that satisfies tonic::codec::Codec. @@ -161,8 +165,9 @@ pub(crate) fn build_grpc_channel( /// Send an OTLP trace export request over gRPC. /// /// Uses the `transport.channel` tonic channel with [`ProstCodecImpl`] for -/// encoding/decoding. Custom metadata headers and the test session token -/// are attached to the request metadata. +/// encoding/decoding. Custom metadata headers, the test session token, and +/// (when `client_computed_stats` is set) the client-computed-stats header are +/// attached to the request metadata. /// /// # Errors /// @@ -171,12 +176,18 @@ pub(crate) fn build_grpc_channel( pub(crate) async fn send_otlp_traces_grpc( transport: &OtlpGrpcTransport, test_token: Option<&str>, + client_computed_stats: bool, request: ExportTraceServiceRequest, ) -> Result<(), TraceExporterError> { let mut client = Grpc::new(transport.channel.clone()); let mut req = Request::new(request); - attach_metadata(&mut req, &transport.config.headers, test_token); + attach_metadata( + &mut req, + &transport.config.headers, + test_token, + client_computed_stats, + ); let path = http::uri::PathAndQuery::from_static(GRPC_EXPORT_PATH); let codec = ProstCodecImpl::::default(); @@ -200,11 +211,16 @@ pub(crate) async fn send_otlp_traces_grpc( .map_err(|_| TraceExporterError::Io(std::io::Error::from(std::io::ErrorKind::TimedOut)))? } -/// Attach `headers` and the optional test-session token to gRPC request metadata. +/// Attach `headers`, the optional test-session token, and the optional +/// client-computed-stats marker to gRPC request metadata. +/// +/// Invalid custom headers are skipped with a warning rather than failing the +/// export — a single malformed user-supplied header should not drop the batch. fn attach_metadata( req: &mut Request, headers: &[(String, String)], test_token: Option<&str>, + client_computed_stats: bool, ) { for (k, v) in headers { match ( @@ -228,6 +244,12 @@ fn attach_metadata( Err(_) => warn!("Skipping invalid test-session token: {token:?}"), } } + if client_computed_stats { + req.metadata_mut().insert( + AsciiMetadataKey::from_static(CLIENT_COMPUTED_STATS_HEADER), + AsciiMetadataValue::from_static("yes"), + ); + } } fn grpc_status_to_error(status: Status) -> TraceExporterError { @@ -280,6 +302,67 @@ mod tests { assert!(err.is_err()); } + #[test] + fn grpc_status_unavailable_and_deadline_map_to_io() { + for status in [ + Status::unavailable("backend down"), + Status::deadline_exceeded("too slow"), + ] { + assert!( + matches!(grpc_status_to_error(status), TraceExporterError::Io(_)), + "transient transport status should map to Io" + ); + } + } + + #[test] + fn grpc_status_application_errors_map_to_request() { + for status in [ + Status::internal("boom"), + Status::invalid_argument("bad"), + Status::resource_exhausted("quota"), + ] { + assert!( + matches!(grpc_status_to_error(status), TraceExporterError::Request(_)), + "non-transient status should map to Request" + ); + } + } + + #[test] + fn attach_metadata_inserts_valid_header_and_token() { + let mut req = Request::new(ExportTraceServiceRequest::default()); + let headers = vec![("x-custom-key".to_string(), "val".to_string())]; + attach_metadata(&mut req, &headers, Some("tok123"), false); + let md = req.metadata(); + assert_eq!(md.get("x-custom-key").unwrap(), "val"); + assert_eq!(md.get("x-datadog-test-session-token").unwrap(), "tok123"); + assert!(md.get(CLIENT_COMPUTED_STATS_HEADER).is_none()); + } + + #[test] + fn attach_metadata_skips_invalid_header_keeps_valid() { + let mut req = Request::new(ExportTraceServiceRequest::default()); + // A space in the key is not a valid HTTP/2 header name, so it is skipped. + let headers = vec![ + ("bad key".to_string(), "v".to_string()), + ("good-key".to_string(), "ok".to_string()), + ]; + attach_metadata(&mut req, &headers, None, false); + assert_eq!(req.metadata().get("good-key").unwrap(), "ok"); + assert_eq!(req.metadata().len(), 1, "only the valid header is attached"); + } + + #[test] + fn attach_metadata_adds_client_computed_stats_when_enabled() { + let mut req = Request::new(ExportTraceServiceRequest::default()); + attach_metadata(&mut req, &[], None, true); + assert_eq!( + req.metadata().get(CLIENT_COMPUTED_STATS_HEADER).unwrap(), + "yes" + ); + } + /// `connect_lazy()` requires a Tokio runtime — wrap in `#[tokio::test]`. #[tokio::test] async fn grpc_transport_is_clone() { diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index 62d0922663..92a5bdca0a 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -663,25 +663,13 @@ impl Date: Fri, 26 Jun 2026 17:24:20 -0400 Subject: [PATCH 13/17] fix(data-pipeline): reject gRPC on wasm32; fix deadline status kind; review cleanups MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses pre-push review findings across the OTLP gRPC change: - builder: on wasm32, reject `OtlpProtocol::Grpc` with a clear error instead of falling through to the HTTP arm — which stored `OtlpProtocol::Grpc` in `OtlpTraceConfig` and panicked at `encode()`'s `unreachable!()` on first send (a panic across the FFI boundary). The unreachable arms are now truly unreachable on every target. - grpc_status_to_error: map `DeadlineExceeded` to `TimedOut` (was `ConnectionRefused`); the unit test now asserts the exact error kind. - ProstDecoder: drain the buffer via `copy_to_bytes`, robust if tonic's `DecodeBuf` ever becomes non-contiguous (`chunk()` could truncate). - extract `build_otlp_resource_info`, shared by the HTTP and gRPC send paths; hoist the OTLP drop_chunks/empty-check guard above the dispatch match. - drop the unused `OtlpGrpcTraceConfig::endpoint_url` field; gate `prost` off wasm32 since only the (wasm-gated) gRPC exporter uses it. Co-Authored-By: Claude Sonnet 4.6 (1M context) --- libdd-data-pipeline/Cargo.toml | 6 +-- libdd-data-pipeline/src/otlp/config.rs | 11 ++-- libdd-data-pipeline/src/otlp/grpc_exporter.rs | 39 +++++++++----- .../src/trace_exporter/builder.rs | 12 ++++- libdd-data-pipeline/src/trace_exporter/mod.rs | 51 ++++++++----------- 5 files changed, 64 insertions(+), 55 deletions(-) diff --git a/libdd-data-pipeline/Cargo.toml b/libdd-data-pipeline/Cargo.toml index ff42441419..43d85a99b6 100644 --- a/libdd-data-pipeline/Cargo.toml +++ b/libdd-data-pipeline/Cargo.toml @@ -46,17 +46,17 @@ libdd-tinybytes = { version = "1.1.1", path = "../libdd-tinybytes", features = [ "bytes_string", "serialization", ] } -prost = "0.14.1" - [target.'cfg(not(target_arch = "wasm32"))'.dependencies] tokio = { version = "1.23", features = ["time", "test-util"], default-features = false } libdd-capabilities-impl = { version = "2.0.0", path = "../libdd-capabilities-impl", default-features = false } # tonic's transport (hyper/tokio/socket2) does not build for wasm32, so the gRPC -# OTLP transport is gated off wasm targets along with the code that uses it. +# OTLP transport — and prost, which only the gRPC exporter uses — are gated off +# wasm targets along with the code that uses them. tonic = { version = "0.14", default-features = false, features = [ "transport", # Channel, Endpoint, Server "codegen", # Grpc client, tonic::client::Grpc ] } +prost = "0.14.1" [target.'cfg(target_arch = "wasm32")'.dependencies] getrandom = { version = "0.2", features = ["js"] } diff --git a/libdd-data-pipeline/src/otlp/config.rs b/libdd-data-pipeline/src/otlp/config.rs index c2bf04eabc..53e70df5b1 100644 --- a/libdd-data-pipeline/src/otlp/config.rs +++ b/libdd-data-pipeline/src/otlp/config.rs @@ -134,16 +134,11 @@ mod tests { /// Parsed OTLP gRPC trace exporter configuration. /// -/// The gRPC endpoint URL contains only scheme + host + port (e.g. -/// `http://localhost:4317`). The service path -/// `/opentelemetry.proto.collector.trace.v1.TraceService/Export` is -/// appended by the exporter. +/// The endpoint URL is consumed at build time to construct the tonic +/// [`Channel`](tonic::transport::Channel); only the per-request settings below +/// are retained here. #[derive(Clone, Debug)] pub struct OtlpGrpcTraceConfig { - /// Full gRPC base URL, e.g. `http://localhost:4317`. - /// Must use `http://` scheme; `https://` (TLS) is not yet supported. - #[allow(dead_code)] - pub endpoint_url: String, /// Custom key-value pairs forwarded as gRPC request metadata. pub headers: Vec<(String, String)>, /// Per-request timeout (applied via [`tokio::time::timeout`]). diff --git a/libdd-data-pipeline/src/otlp/grpc_exporter.rs b/libdd-data-pipeline/src/otlp/grpc_exporter.rs index eb5e1d1a11..1d4dbc975a 100644 --- a/libdd-data-pipeline/src/otlp/grpc_exporter.rs +++ b/libdd-data-pipeline/src/otlp/grpc_exporter.rs @@ -88,9 +88,10 @@ impl Decoder for ProstDecoder { fn decode(&mut self, src: &mut DecodeBuf<'_>) -> Result, Self::Error> { use bytes::Buf as _; - let mut buf = bytes::BytesMut::with_capacity(src.remaining()); - buf.extend_from_slice(src.chunk()); - src.advance(src.remaining()); + // `copy_to_bytes` drains the whole buffer correctly even if `DecodeBuf`'s + // backing store is ever non-contiguous (`chunk()` only returns the first + // contiguous segment, so the old chunk()+advance() form could truncate). + let buf = src.copy_to_bytes(src.remaining()); match T::decode(buf) { Ok(msg) => Ok(Some(msg)), Err(e) => Err(Status::internal(format!( @@ -261,10 +262,16 @@ fn grpc_status_to_error(status: Status) -> TraceExporterError { "gRPC Ok status reached error handler (unexpected)", )) } - Code::Unavailable | Code::DeadlineExceeded => TraceExporterError::Io(std::io::Error::new( + // Server temporarily unreachable / overloaded. + Code::Unavailable => TraceExporterError::Io(std::io::Error::new( std::io::ErrorKind::ConnectionRefused, status.message(), )), + // Server-side deadline fired — a timeout, not a refused connection. + Code::DeadlineExceeded => TraceExporterError::Io(std::io::Error::new( + std::io::ErrorKind::TimedOut, + status.message(), + )), _ => TraceExporterError::Request(RequestError::new( http::StatusCode::INTERNAL_SERVER_ERROR, status.message(), @@ -303,15 +310,22 @@ mod tests { } #[test] - fn grpc_status_unavailable_and_deadline_map_to_io() { - for status in [ - Status::unavailable("backend down"), - Status::deadline_exceeded("too slow"), + fn grpc_status_transient_map_to_io_with_correct_kind() { + // Unavailable → ConnectionRefused; DeadlineExceeded → TimedOut (not refused). + for (status, want) in [ + ( + Status::unavailable("backend down"), + std::io::ErrorKind::ConnectionRefused, + ), + ( + Status::deadline_exceeded("too slow"), + std::io::ErrorKind::TimedOut, + ), ] { - assert!( - matches!(grpc_status_to_error(status), TraceExporterError::Io(_)), - "transient transport status should map to Io" - ); + match grpc_status_to_error(status) { + TraceExporterError::Io(e) => assert_eq!(e.kind(), want), + other => panic!("expected Io, got {other:?}"), + } } } @@ -370,7 +384,6 @@ mod tests { .expect("http channel must build"); let transport = OtlpGrpcTransport { config: OtlpGrpcTraceConfig { - endpoint_url: "http://localhost:4317".to_string(), headers: vec![], timeout: Duration::from_secs(5), otel_trace_semantics_enabled: false, diff --git a/libdd-data-pipeline/src/trace_exporter/builder.rs b/libdd-data-pipeline/src/trace_exporter/builder.rs index a53b18c5e2..86a391b526 100644 --- a/libdd-data-pipeline/src/trace_exporter/builder.rs +++ b/libdd-data-pipeline/src/trace_exporter/builder.rs @@ -584,7 +584,6 @@ impl TraceExporterBuilder { let channel = build_grpc_channel(url, otlp_timeout)?; Some(OtlpExportMode::Grpc(OtlpGrpcTransport { config: OtlpGrpcTraceConfig { - endpoint_url: url.clone(), headers: self.otlp_headers.clone(), timeout: otlp_timeout, otel_trace_semantics_enabled: self.otel_trace_semantics_enabled, @@ -592,6 +591,17 @@ impl TraceExporterBuilder { channel, })) } + // gRPC export is unavailable on wasm32 (tonic does not build there). Reject it + // explicitly instead of falling through to the HTTP arm, which would store + // `OtlpProtocol::Grpc` in `OtlpTraceConfig` and panic at `encode()` on send. + #[cfg(target_arch = "wasm32")] + Some(_) if self.otlp_protocol == OtlpProtocol::Grpc => { + return Err(TraceExporterError::Builder( + BuilderErrorKind::InvalidConfiguration( + "OTLP gRPC export is not supported on wasm32 targets".to_string(), + ), + )); + } Some(ref url) => Some(OtlpExportMode::Http(OtlpTraceConfig { endpoint_url: url.clone(), headers: build_otlp_header_map(self.otlp_headers.clone()), diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index 92a5bdca0a..eb7a8ee656 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -583,23 +583,26 @@ impl OtlpResourceInfo { + let mut r = OtlpResourceInfo::default(); + r.service = self.metadata.service.clone(); + r.env = self.metadata.env.clone(); + r.app_version = self.metadata.app_version.clone(); + r.language = self.metadata.language.clone(); + r.tracer_version = self.metadata.tracer_version.clone(); + r.runtime_id = self.metadata.runtime_id.clone(); + r.client_computed_stats = self.otlp_stats_enabled; + r + } + /// Sends trace chunks via OTLP HTTP (JSON or protobuf) when OTLP config is enabled. async fn send_otlp_traces_inner( &self, traces: Vec>>, config: &OtlpTraceConfig, ) -> Result { - let resource_info = { - let mut r = OtlpResourceInfo::default(); - r.service = self.metadata.service.clone(); - r.env = self.metadata.env.clone(); - r.app_version = self.metadata.app_version.clone(); - r.language = self.metadata.language.clone(); - r.tracer_version = self.metadata.tracer_version.clone(); - r.runtime_id = self.metadata.runtime_id.clone(); - r.client_computed_stats = self.otlp_stats_enabled; - r - }; + let resource_info = self.build_otlp_resource_info(); // Single prost OTLP IR; the configured protocol encodes the same request to its wire // format (JSON or protobuf). OTel-semantics gating (omit DD-specific attrs) happens in // the mapper. @@ -643,17 +646,7 @@ impl>>, transport: &OtlpGrpcTransport, ) -> Result { - let resource_info = { - let mut r = OtlpResourceInfo::default(); - r.service = self.metadata.service.clone(); - r.env = self.metadata.env.clone(); - r.app_version = self.metadata.app_version.clone(); - r.language = self.metadata.language.clone(); - r.tracer_version = self.metadata.tracer_version.clone(); - r.runtime_id = self.metadata.runtime_id.clone(); - r.client_computed_stats = self.otlp_stats_enabled; - r - }; + let resource_info = self.build_otlp_resource_info(); // map_traces_to_otlp returns the prost ExportTraceServiceRequest directly — // the same type expected by send_otlp_traces_grpc; no conversion needed. @@ -739,20 +732,18 @@ impl { - libdd_trace_utils::span::trace_utils::drop_chunks(&mut traces); - if traces.is_empty() { - return Ok(AgentResponse::Unchanged); - } return self.send_otlp_traces_inner(traces, config).await; } #[cfg(not(target_arch = "wasm32"))] Some(OtlpExportMode::Grpc(transport)) => { - libdd_trace_utils::span::trace_utils::drop_chunks(&mut traces); - if traces.is_empty() { - return Ok(AgentResponse::Unchanged); - } return self.send_otlp_grpc_inner(traces, transport).await; } None => {} From 51af4116c03702558c25c9b5656363247665f65d Mon Sep 17 00:00:00 2001 From: Brian Marks Date: Sat, 27 Jun 2026 10:21:33 -0400 Subject: [PATCH 14/17] test(data-pipeline): make OTLP gRPC e2e test robust under CI contention The `grpc_export_sends_decodable_request` test timed out (Io::TimedOut at the full per-request timeout) on the alpine/arm release-build jobs in GitLab CI, while passing locally and on centos-amd. Root cause: the `#[tokio::test]` current-thread runtime hosted both the in-process h2 server task and the `spawn_blocking` exporter coordination, so under parallel CI load the server task could be starved and the client's send would hang to the timeout. Run the h2 server on its own OS thread + current-thread runtime, and drive the exporter's own runtime via `send` directly on the test thread (no ambient async runtime, so no `spawn_blocking`). The two sides now have independent threads and runtimes and cannot starve each other. Verified 15/15 runs locally. Co-Authored-By: Claude Sonnet 4.6 (1M context) --- .../tests/test_trace_exporter_otlp_grpc.rs | 209 +++++++++--------- 1 file changed, 107 insertions(+), 102 deletions(-) diff --git a/libdd-data-pipeline/tests/test_trace_exporter_otlp_grpc.rs b/libdd-data-pipeline/tests/test_trace_exporter_otlp_grpc.rs index 8147982cd3..b104a0d82b 100644 --- a/libdd-data-pipeline/tests/test_trace_exporter_otlp_grpc.rs +++ b/libdd-data-pipeline/tests/test_trace_exporter_otlp_grpc.rs @@ -21,118 +21,123 @@ mod grpc_export_tests { use libdd_trace_utils::test_utils::create_test_json_span; use prost::Message; use serde_json::json; + use std::sync::mpsc; + use std::time::Duration; use tokio::net::TcpListener; - use tokio::sync::oneshot; - use tokio::task; - - /// Starts a minimal HTTP/2 gRPC server that accepts exactly one Export call. - /// - /// Returns `(port, rx)` where `rx` receives the decoded - /// `ExportTraceServiceRequest` after the server handles the first request. - async fn start_grpc_test_server() -> (u16, oneshot::Receiver) { - let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); - let port = listener.local_addr().unwrap().port(); - let (tx, rx) = oneshot::channel::(); - - tokio::spawn(async move { - let (socket, _) = listener.accept().await.unwrap(); - let mut h2_conn = server::handshake(socket).await.unwrap(); - - if let Some(result) = h2_conn.accept().await { - let (request, mut respond) = result.unwrap(); - - // Collect the full request body. The chunk length must be captured - // before `extend_from_slice` consumes it to satisfy the borrow checker. - let mut body = request.into_body(); - let mut frame_data: Vec = Vec::new(); - while let Some(chunk) = body.data().await { - let chunk = chunk.unwrap(); - let len = chunk.len(); - frame_data.extend_from_slice(&chunk); - body.flow_control().release_capacity(len).ok(); - } - - // Decode the gRPC frame: skip the 5-byte prefix (1 compression flag + 4 length). - let decoded = if frame_data.len() > 5 { - ExportTraceServiceRequest::decode(&frame_data[5..]).ok() - } else { - None - }; - if let Some(req) = decoded { - let _ = tx.send(req); - } - - // Send gRPC success response: 200 headers + protobuf body + grpc-status trailer. - let response_proto = ExportTraceServiceResponse::default(); - let proto_bytes = response_proto.encode_to_vec(); - let mut frame = Vec::with_capacity(5 + proto_bytes.len()); - frame.push(0u8); // no compression - frame.extend_from_slice(&(proto_bytes.len() as u32).to_be_bytes()); - frame.extend_from_slice(&proto_bytes); - - let response = http::Response::builder() - .status(200) - .header("content-type", "application/grpc") - .body(()) - .unwrap(); - let mut send_stream = respond.send_response(response, false).unwrap(); - send_stream.send_data(Bytes::from(frame), false).unwrap(); - - // gRPC-status trailer (trailing HEADERS frame with END_STREAM). - let mut trailers = http::HeaderMap::new(); - trailers.insert("grpc-status", "0".parse().unwrap()); - send_stream.send_trailers(trailers).unwrap(); + + /// Runs the in-process h2 server loop: accept one connection, decode the first + /// Export request, forward it on `req_tx`, and return a gRPC success response. + async fn run_grpc_test_server( + listener: TcpListener, + req_tx: mpsc::Sender, + ) { + let (socket, _) = listener.accept().await.unwrap(); + let mut h2_conn = server::handshake(socket).await.unwrap(); + + if let Some(result) = h2_conn.accept().await { + let (request, mut respond) = result.unwrap(); + + // Collect the full request body. The chunk length must be captured + // before `extend_from_slice` consumes it to satisfy the borrow checker. + let mut body = request.into_body(); + let mut frame_data: Vec = Vec::new(); + while let Some(chunk) = body.data().await { + let chunk = chunk.unwrap(); + let len = chunk.len(); + frame_data.extend_from_slice(&chunk); + body.flow_control().release_capacity(len).ok(); } - // Drive the connection to completion so the client receives all frames - // before the TCP socket closes. - while h2_conn.accept().await.is_some() {} - }); + // Decode the gRPC frame: skip the 5-byte prefix (1 compression flag + 4 length). + let decoded = if frame_data.len() > 5 { + ExportTraceServiceRequest::decode(&frame_data[5..]).ok() + } else { + None + }; + if let Some(req) = decoded { + let _ = req_tx.send(req); + } - (port, rx) + // Send gRPC success response: 200 headers + protobuf body + grpc-status trailer. + let response_proto = ExportTraceServiceResponse::default(); + let proto_bytes = response_proto.encode_to_vec(); + let mut frame = Vec::with_capacity(5 + proto_bytes.len()); + frame.push(0u8); // no compression + frame.extend_from_slice(&(proto_bytes.len() as u32).to_be_bytes()); + frame.extend_from_slice(&proto_bytes); + + let response = http::Response::builder() + .status(200) + .header("content-type", "application/grpc") + .body(()) + .unwrap(); + let mut send_stream = respond.send_response(response, false).unwrap(); + send_stream.send_data(Bytes::from(frame), false).unwrap(); + + // gRPC-status trailer (trailing HEADERS frame with END_STREAM). + let mut trailers = http::HeaderMap::new(); + trailers.insert("grpc-status", "0".parse().unwrap()); + send_stream.send_trailers(trailers).unwrap(); + } + + // Drive the connection to completion so the client receives all frames + // before the TCP socket closes. + while h2_conn.accept().await.is_some() {} } #[cfg_attr(miri, ignore)] - #[tokio::test] - async fn grpc_export_sends_decodable_request() { - let (port, rx) = start_grpc_test_server().await; + #[test] + fn grpc_export_sends_decodable_request() { + // The h2 server runs on its own OS thread + runtime, and the exporter + // drives its own runtime via `send` on this thread. Keeping them on + // separate threads (rather than one shared test runtime) means neither + // can starve the other under parallel CI load — the previous + // `#[tokio::test]` current-thread setup could deadlock the server task. + let (port_tx, port_rx) = mpsc::channel::(); + let (req_tx, req_rx) = mpsc::channel::(); + + let server = std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(async move { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + port_tx.send(listener.local_addr().unwrap().port()).unwrap(); + run_grpc_test_server(listener, req_tx).await; + }); + }); + let port = port_rx + .recv_timeout(Duration::from_secs(10)) + .expect("server did not bind within 10s"); let endpoint = format!("http://127.0.0.1:{port}"); - // TraceExporter::send internally drives a tokio runtime; use spawn_blocking - // so it does not block the test's async runtime. - let task_result = task::spawn_blocking(move || { - let mut builder = TraceExporterBuilder::default(); - builder - .set_otlp_endpoint(&endpoint) - .set_otlp_protocol(OtlpProtocol::Grpc) - // Generous per-request timeout so the heavily instrumented coverage - // build does not time out the in-process gRPC round trip. - .set_connection_timeout(Some(60_000)) - .set_language("test-lang") - .set_tracer_version("1.0") - .set_env("grpc-test-env") - .set_service("grpc-test-svc"); - let exporter = builder.build::().expect("build"); - - let mut span = create_test_json_span(1234, 12342, 12341, 1, false); - span["service"] = json!("grpc-test-svc"); - span["name"] = json!("grpc_span"); - let data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - exporter.send(data.as_ref()).expect("send ok"); - }) - .await; - - assert!( - task_result.is_ok(), - "exporter task panicked: {task_result:?}" - ); - - // Wait for the server to receive and decode the request (5 second timeout). - let received = tokio::time::timeout(std::time::Duration::from_secs(5), rx) - .await - .expect("server did not receive request within 5s") - .expect("server channel closed without sending"); + let mut builder = TraceExporterBuilder::default(); + builder + .set_otlp_endpoint(&endpoint) + .set_otlp_protocol(OtlpProtocol::Grpc) + .set_connection_timeout(Some(30_000)) + .set_language("test-lang") + .set_tracer_version("1.0") + .set_env("grpc-test-env") + .set_service("grpc-test-svc"); + let exporter = builder.build::().expect("build"); + + let mut span = create_test_json_span(1234, 12342, 12341, 1, false); + span["service"] = json!("grpc-test-svc"); + span["name"] = json!("grpc_span"); + let data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); + // No ambient async runtime on this thread, so the exporter's internal + // `block_on` runs directly without `spawn_blocking`. + exporter.send(data.as_ref()).expect("send ok"); + + // The server decodes and forwards the request before responding, so it is + // already available once `send` returns. + let received = req_rx + .recv_timeout(Duration::from_secs(10)) + .expect("server did not receive a decodable request within 10s"); + server.join().ok(); // Validate the decoded request contains the expected span. assert!( From 83b28d2b9cc8ad0aa46229caee93155cea619828 Mon Sep 17 00:00:00 2001 From: Brian Marks Date: Sat, 27 Jun 2026 15:58:17 -0400 Subject: [PATCH 15/17] test(data-pipeline): bound the OTLP gRPC e2e server so it cannot hang CI MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous rewrite blocked the test on `server.join()`, whose `while h2_conn.accept()` loop only ends when the pooled client connection closes — which the exporter never does — so the alpine/centos release-build jobs hung to GitLab's 2h `job_execution_timeout`. It passed locally only because process exit killed the detached runtime before the join mattered. Detach the server thread and time-bound every server-side await: a 60s overall ceiling (so a never-connecting client can't wedge the thread) and a 10s response-flush drive. The test no longer joins the server — verification comes from `send` returning Ok (client received grpc-status:0) plus the decoded request on the channel. 20/20 local runs, ~100ms each. Co-Authored-By: Claude Sonnet 4.6 (1M context) --- .../tests/test_trace_exporter_otlp_grpc.rs | 28 +++++++++++++------ 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/libdd-data-pipeline/tests/test_trace_exporter_otlp_grpc.rs b/libdd-data-pipeline/tests/test_trace_exporter_otlp_grpc.rs index b104a0d82b..b50ff2b90c 100644 --- a/libdd-data-pipeline/tests/test_trace_exporter_otlp_grpc.rs +++ b/libdd-data-pipeline/tests/test_trace_exporter_otlp_grpc.rs @@ -80,9 +80,14 @@ mod grpc_export_tests { send_stream.send_trailers(trailers).unwrap(); } - // Drive the connection to completion so the client receives all frames - // before the TCP socket closes. - while h2_conn.accept().await.is_some() {} + // Briefly drive the connection so the client receives the response frames + // (including the grpc-status trailer) before the socket closes. Bounded so + // the server task can never loop forever waiting on a pooled client + // connection that never closes. + let _ = tokio::time::timeout(Duration::from_secs(10), async { + while h2_conn.accept().await.is_some() {} + }) + .await; } #[cfg_attr(miri, ignore)] @@ -96,15 +101,23 @@ mod grpc_export_tests { let (port_tx, port_rx) = mpsc::channel::(); let (req_tx, req_rx) = mpsc::channel::(); - let server = std::thread::spawn(move || { + // The server thread is detached and time-bounded end to end (60s ceiling): + // if the client never connects, the bound fires and the thread exits rather + // than lingering or blocking. The test never joins it — verification comes + // from `send` returning Ok (the client received grpc-status:0) plus the + // decoded request delivered on `req_rx`. + std::thread::spawn(move || { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap(); rt.block_on(async move { - let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); - port_tx.send(listener.local_addr().unwrap().port()).unwrap(); - run_grpc_test_server(listener, req_tx).await; + let _ = tokio::time::timeout(Duration::from_secs(60), async move { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + port_tx.send(listener.local_addr().unwrap().port()).unwrap(); + run_grpc_test_server(listener, req_tx).await; + }) + .await; }); }); @@ -137,7 +150,6 @@ mod grpc_export_tests { let received = req_rx .recv_timeout(Duration::from_secs(10)) .expect("server did not receive a decodable request within 10s"); - server.join().ok(); // Validate the decoded request contains the expected span. assert!( From 69aab629868950d927b639df662ab9fcb796aafa Mon Sep 17 00:00:00 2001 From: Brian Marks Date: Sat, 27 Jun 2026 16:09:40 -0400 Subject: [PATCH 16/17] test(data-pipeline): ignore the timing-fragile OTLP gRPC e2e test in CI The in-process exporter<->h2-server round trip is too timing-sensitive for heavily contended CI runners: the client `send` has hit its request timeout on macos-15 (GitHub) and the alpine/arm release-build matrix (GitLab) across multiple runs, despite passing locally (20/20) and on less-loaded runners, and despite the live backend verification in PR #2171 succeeding. Mark it #[ignore] so it no longer runs in the default CI suites (it can't flake the PR or burn release-build runner time), while keeping it runnable on demand via `--run-ignored all`. The gRPC export path stays covered by this crate's unit tests (ProstCodec, grpc_status_to_error, attach_metadata, build_grpc_channel, builder dispatch, protocol parse) and by the live backend verification. Co-Authored-By: Claude Sonnet 4.6 (1M context) --- .../tests/test_trace_exporter_otlp_grpc.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/libdd-data-pipeline/tests/test_trace_exporter_otlp_grpc.rs b/libdd-data-pipeline/tests/test_trace_exporter_otlp_grpc.rs index b50ff2b90c..cefd985a19 100644 --- a/libdd-data-pipeline/tests/test_trace_exporter_otlp_grpc.rs +++ b/libdd-data-pipeline/tests/test_trace_exporter_otlp_grpc.rs @@ -90,7 +90,15 @@ mod grpc_export_tests { .await; } - #[cfg_attr(miri, ignore)] + // Ignored in normal CI: the in-process exporter<->h2-server round trip is + // timing-fragile on heavily contended runners — the client `send` has hit its + // request timeout on macos-15 (GitHub) and the alpine/arm release-build matrix + // (GitLab) even though it passes locally and on less-loaded runners. The gRPC + // path is covered without it by this crate's unit tests (codec, status mapping, + // metadata, channel build, builder dispatch) and by live backend verification + // (DataDog/libdatadog#2171). Run it explicitly with: + // cargo nextest run -p libdd-data-pipeline --run-ignored all grpc_export + #[ignore = "timing-fragile in-process round trip on contended CI; run with --run-ignored all"] #[test] fn grpc_export_sends_decodable_request() { // The h2 server runs on its own OS thread + runtime, and the exporter From 9dcddce2719ccd3c5118635825beaa633aa247f1 Mon Sep 17 00:00:00 2001 From: Brian Marks Date: Sat, 27 Jun 2026 16:52:45 -0400 Subject: [PATCH 17/17] fix(data-pipeline): drive the h2 connection in the OTLP gRPC e2e test; un-ignore MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Root cause of the CI flakiness (found, not masked): the in-process h2 test server accepted the stream once, then read the request body inline without continuing to poll the `server::Connection`. Polling the connection is what reads incoming frames off the socket and routes DATA to an open stream's body. When the client's request DATA frame was already buffered during `accept()` (the common coalesced case) the read returned immediately and the test passed; when the DATA frame landed *after* `accept()` returned (more likely under load, as HEADERS and DATA split across socket reads), nothing ever polled the connection to read it, so `body.data()` hung until the client's request timeout fired — surfacing as `Io(Kind(TimedOut))` / "Timeout expired" on the contended macos-15 and alpine/arm runners. Confirmed by instrumentation (server reached "stream accepted" then stalled 30s with no body) and by it being independent of the exporter's worker-thread count (1 vs 4 both failed), ruling out CPU starvation / a product bug. Fix: keep the `accept()` loop running (which drives the connection and flushes responses) and handle each Export stream in a spawned task. Reproduced the failure under 6x CPU oversubscription (24/160 failures) and verified the fix (160/160, then 120/120, all green) with the default 1-worker runtime. Reverts the earlier #[ignore] from 69aab6298 — the test is correct and robust now, so it runs in CI again. Co-Authored-By: Claude Sonnet 4.6 (1M context) --- .../tests/test_trace_exporter_otlp_grpc.rs | 122 +++++++++--------- 1 file changed, 61 insertions(+), 61 deletions(-) diff --git a/libdd-data-pipeline/tests/test_trace_exporter_otlp_grpc.rs b/libdd-data-pipeline/tests/test_trace_exporter_otlp_grpc.rs index cefd985a19..c960595a12 100644 --- a/libdd-data-pipeline/tests/test_trace_exporter_otlp_grpc.rs +++ b/libdd-data-pipeline/tests/test_trace_exporter_otlp_grpc.rs @@ -25,80 +25,80 @@ mod grpc_export_tests { use std::time::Duration; use tokio::net::TcpListener; - /// Runs the in-process h2 server loop: accept one connection, decode the first - /// Export request, forward it on `req_tx`, and return a gRPC success response. + /// Runs the in-process h2 server: accept connections and handle each Export + /// stream in a spawned task. + /// + /// The `accept()` loop must keep running for the lifetime of the connection — + /// polling the [`server::Connection`] is what reads incoming frames and routes + /// DATA to an open stream's body, and what flushes our queued response frames. + /// Reading a request body inline (without looping `accept()`) would stall: a + /// DATA frame that lands after `accept()` returns the stream would never be + /// read, hanging `body.data()` until the client's request timeout. async fn run_grpc_test_server( listener: TcpListener, req_tx: mpsc::Sender, ) { let (socket, _) = listener.accept().await.unwrap(); let mut h2_conn = server::handshake(socket).await.unwrap(); - - if let Some(result) = h2_conn.accept().await { - let (request, mut respond) = result.unwrap(); - - // Collect the full request body. The chunk length must be captured - // before `extend_from_slice` consumes it to satisfy the borrow checker. - let mut body = request.into_body(); - let mut frame_data: Vec = Vec::new(); - while let Some(chunk) = body.data().await { - let chunk = chunk.unwrap(); - let len = chunk.len(); - frame_data.extend_from_slice(&chunk); - body.flow_control().release_capacity(len).ok(); - } - - // Decode the gRPC frame: skip the 5-byte prefix (1 compression flag + 4 length). - let decoded = if frame_data.len() > 5 { - ExportTraceServiceRequest::decode(&frame_data[5..]).ok() - } else { - None - }; - if let Some(req) = decoded { - let _ = req_tx.send(req); + while let Some(result) = h2_conn.accept().await { + if let Ok((request, respond)) = result { + tokio::spawn(handle_export_stream(request, respond, req_tx.clone())); } + } + } - // Send gRPC success response: 200 headers + protobuf body + grpc-status trailer. - let response_proto = ExportTraceServiceResponse::default(); - let proto_bytes = response_proto.encode_to_vec(); - let mut frame = Vec::with_capacity(5 + proto_bytes.len()); - frame.push(0u8); // no compression - frame.extend_from_slice(&(proto_bytes.len() as u32).to_be_bytes()); - frame.extend_from_slice(&proto_bytes); - - let response = http::Response::builder() - .status(200) - .header("content-type", "application/grpc") - .body(()) - .unwrap(); - let mut send_stream = respond.send_response(response, false).unwrap(); - send_stream.send_data(Bytes::from(frame), false).unwrap(); + /// Decode one gRPC Export request and reply with a success response + trailer. + async fn handle_export_stream( + request: http::Request, + mut respond: h2::server::SendResponse, + req_tx: mpsc::Sender, + ) { + // Collect the full request body. The chunk length must be captured before + // `extend_from_slice` consumes it to satisfy the borrow checker. + let mut body = request.into_body(); + let mut frame_data: Vec = Vec::new(); + while let Some(chunk) = body.data().await { + let Ok(chunk) = chunk else { return }; + let len = chunk.len(); + frame_data.extend_from_slice(&chunk); + body.flow_control().release_capacity(len).ok(); + } - // gRPC-status trailer (trailing HEADERS frame with END_STREAM). - let mut trailers = http::HeaderMap::new(); - trailers.insert("grpc-status", "0".parse().unwrap()); - send_stream.send_trailers(trailers).unwrap(); + // Decode the gRPC frame: skip the 5-byte prefix (1 compression flag + 4 length). + let decoded = if frame_data.len() > 5 { + ExportTraceServiceRequest::decode(&frame_data[5..]).ok() + } else { + None + }; + if let Some(req) = decoded { + let _ = req_tx.send(req); } - // Briefly drive the connection so the client receives the response frames - // (including the grpc-status trailer) before the socket closes. Bounded so - // the server task can never loop forever waiting on a pooled client - // connection that never closes. - let _ = tokio::time::timeout(Duration::from_secs(10), async { - while h2_conn.accept().await.is_some() {} - }) - .await; + // Send gRPC success response: 200 headers + protobuf body + grpc-status trailer. + let response_proto = ExportTraceServiceResponse::default(); + let proto_bytes = response_proto.encode_to_vec(); + let mut frame = Vec::with_capacity(5 + proto_bytes.len()); + frame.push(0u8); // no compression + frame.extend_from_slice(&(proto_bytes.len() as u32).to_be_bytes()); + frame.extend_from_slice(&proto_bytes); + + let response = http::Response::builder() + .status(200) + .header("content-type", "application/grpc") + .body(()) + .unwrap(); + let Ok(mut send_stream) = respond.send_response(response, false) else { + return; + }; + let _ = send_stream.send_data(Bytes::from(frame), false); + + // gRPC-status trailer (trailing HEADERS frame with END_STREAM). + let mut trailers = http::HeaderMap::new(); + trailers.insert("grpc-status", "0".parse().unwrap()); + let _ = send_stream.send_trailers(trailers); } - // Ignored in normal CI: the in-process exporter<->h2-server round trip is - // timing-fragile on heavily contended runners — the client `send` has hit its - // request timeout on macos-15 (GitHub) and the alpine/arm release-build matrix - // (GitLab) even though it passes locally and on less-loaded runners. The gRPC - // path is covered without it by this crate's unit tests (codec, status mapping, - // metadata, channel build, builder dispatch) and by live backend verification - // (DataDog/libdatadog#2171). Run it explicitly with: - // cargo nextest run -p libdd-data-pipeline --run-ignored all grpc_export - #[ignore = "timing-fragile in-process round trip on contended CI; run with --run-ignored all"] + #[cfg_attr(miri, ignore)] #[test] fn grpc_export_sends_decodable_request() { // The h2 server runs on its own OS thread + runtime, and the exporter