diff --git a/Cargo.lock b/Cargo.lock index 4763752c88..eb00734a20 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2990,6 +2990,7 @@ dependencies = [ "duplicate 2.0.1", "either", "getrandom 0.2.15", + "h2", "http", "http-body-util", "httpmock", @@ -3016,7 +3017,9 @@ dependencies = [ "sha2", "tempfile", "tokio", + "tokio-stream", "tokio-util", + "tonic", "tracing", "uuid", ] diff --git a/libdd-data-pipeline-ffi/src/trace_exporter.rs b/libdd-data-pipeline-ffi/src/trace_exporter.rs index 8cb1c5eb37..36776fda70 100644 --- a/libdd-data-pipeline-ffi/src/trace_exporter.rs +++ b/libdd-data-pipeline-ffi/src/trace_exporter.rs @@ -503,15 +503,17 @@ pub unsafe extern "C" fn ddog_trace_exporter_config_set_otlp_endpoint( ) } -/// Sets the OTLP export protocol. Accepts the OTel-standard values `http/json` (default) or -/// `http/protobuf`; `grpc` is rejected as not yet supported. The host language resolves the value -/// (e.g. from `OTEL_EXPORTER_OTLP_TRACES_PROTOCOL`). +/// Sets the OTLP export protocol. Accepts all three OTel-standard values: +/// - `http/json` (default) — OTLP over HTTP with JSON body +/// - `http/protobuf` — OTLP over HTTP with protobuf body +/// - `grpc` — OTLP over HTTP/2 (plaintext only; `https://` is not yet supported) /// -/// Has no effect unless an OTLP endpoint is also configured via -/// `ddog_trace_exporter_config_set_otlp_endpoint`; without one, traces are sent to the -/// Datadog agent and this protocol selection is ignored. +/// The host language resolves the value (e.g. from `OTEL_EXPORTER_OTLP_TRACES_PROTOCOL`). /// -/// Returns `None` on success, `ErrorCode::InvalidArgument` for a null config or an unaccepted +/// Has no effect unless an OTLP endpoint is configured via +/// `ddog_trace_exporter_config_set_otlp_endpoint`. +/// +/// Returns `None` on success, `ErrorCode::InvalidArgument` for a null config or an unrecognized /// value, and `ErrorCode::InvalidInput` for a non-UTF-8 string. #[no_mangle] pub unsafe extern "C" fn ddog_trace_exporter_config_set_otlp_protocol( @@ -524,9 +526,9 @@ pub unsafe extern "C" fn ddog_trace_exporter_config_set_otlp_protocol( Ok(s) => s, Err(e) => return Some(e), }; - // `FromStr` is the single source of truth for string -> OtlpProtocol. It accepts only - // the supported HTTP encodings (`http/json`, `http/protobuf`); `grpc` and any unknown - // value are rejected with an error, so an unsupported protocol can never be stored. + // `FromStr` is the single source of truth for string -> OtlpProtocol. It accepts all + // three OTel-standard encodings (`http/json`, `http/protobuf`, `grpc`); any unknown + // value is rejected with an error, so an unsupported protocol can never be stored. match value.parse::() { Ok(p) => { handle.otlp_protocol = Some(p); @@ -1362,14 +1364,17 @@ mod tests { Some(OtlpProtocol::HttpProtobuf) ); - // "grpc" → InvalidArgument + // "grpc" → success (gRPC is now supported) let mut config = Some(TraceExporterConfig::default()); let error = ddog_trace_exporter_config_set_otlp_protocol( config.as_mut(), CharSlice::from("grpc"), ); - assert_eq!(error.as_ref().unwrap().code, ErrorCode::InvalidArgument); - ddog_trace_exporter_error_free(error); + assert_eq!(error, None); + assert_eq!( + config.as_ref().unwrap().otlp_protocol, + Some(OtlpProtocol::Grpc) + ); // Garbage value → InvalidArgument let mut config = Some(TraceExporterConfig::default()); @@ -1407,17 +1412,39 @@ mod tests { } #[test] - fn set_otlp_protocol_rejects_grpc_and_unknown() { - let mut cfg = TraceExporterConfig::default(); - for bad in ["grpc", "nonsense"] { + fn set_otlp_protocol_accepts_all_three_protocols() { + use libdd_data_pipeline::OtlpProtocol; + for (input, expected) in [ + ("http/json", OtlpProtocol::HttpJson), + ("http/protobuf", OtlpProtocol::HttpProtobuf), + ("grpc", OtlpProtocol::Grpc), + ] { + let mut cfg = TraceExporterConfig::default(); let err = unsafe { - ddog_trace_exporter_config_set_otlp_protocol(Some(&mut cfg), CharSlice::from(bad)) + ddog_trace_exporter_config_set_otlp_protocol(Some(&mut cfg), CharSlice::from(input)) }; - assert!(err.is_some(), "expected error for {bad}"); - assert_eq!(cfg.otlp_protocol, None, "{bad} must not be stored"); + assert!(err.is_none(), "expected success for {input}: {err:?}"); + assert_eq!( + cfg.otlp_protocol, + Some(expected), + "wrong protocol for {input}" + ); } } + #[test] + fn set_otlp_protocol_rejects_unknown() { + let mut cfg = TraceExporterConfig::default(); + let err = unsafe { + ddog_trace_exporter_config_set_otlp_protocol( + Some(&mut cfg), + CharSlice::from("nonsense"), + ) + }; + assert!(err.is_some(), "expected error for unknown protocol"); + assert_eq!(cfg.otlp_protocol, None, "must not be stored on error"); + } + #[cfg(all(feature = "catch_panic", panic = "unwind"))] #[test] fn catch_panic_test() { diff --git a/libdd-data-pipeline/Cargo.toml b/libdd-data-pipeline/Cargo.toml index ca4c4075e0..43d85a99b6 100644 --- a/libdd-data-pipeline/Cargo.toml +++ b/libdd-data-pipeline/Cargo.toml @@ -46,10 +46,17 @@ libdd-tinybytes = { version = "1.1.1", path = "../libdd-tinybytes", features = [ "bytes_string", "serialization", ] } - [target.'cfg(not(target_arch = "wasm32"))'.dependencies] tokio = { version = "1.23", features = ["time", "test-util"], default-features = false } libdd-capabilities-impl = { version = "2.0.0", path = "../libdd-capabilities-impl", default-features = false } +# tonic's transport (hyper/tokio/socket2) does not build for wasm32, so the gRPC +# OTLP transport — and prost, which only the gRPC exporter uses — are gated off +# wasm targets along with the code that uses them. +tonic = { version = "0.14", default-features = false, features = [ + "transport", # Channel, Endpoint, Server + "codegen", # Grpc client, tonic::client::Grpc +] } +prost = "0.14.1" [target.'cfg(target_arch = "wasm32")'.dependencies] getrandom = { version = "0.2", features = ["js"] } @@ -63,6 +70,11 @@ name = "trace_buffer" harness = false path = "benches/trace_buffer.rs" +[[bench]] +name = "otlp_grpc_export" +harness = false +path = "benches/otlp_grpc_export.rs" + [dev-dependencies] libdd-capabilities-impl = { version = "2.0.0", path = "../libdd-capabilities-impl" } libdd-log = { path = "../libdd-log" } @@ -81,8 +93,11 @@ tokio = { version = "1.23", features = [ "rt", "time", "test-util", + "macros", ], default-features = false } duplicate = "2.0.1" +h2 = "0.4" +tokio-stream = { version = "0.1", features = ["net"] } [features] default = ["https", "telemetry"] diff --git a/libdd-data-pipeline/benches/otlp_grpc_export.rs b/libdd-data-pipeline/benches/otlp_grpc_export.rs new file mode 100644 index 0000000000..33e6d4c3cf --- /dev/null +++ b/libdd-data-pipeline/benches/otlp_grpc_export.rs @@ -0,0 +1,115 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Benchmarks for the OTLP gRPC export hot path. +//! +//! The gRPC exporter turns native trace chunks into the length-prefixed gRPC +//! wire frame that tonic's codec puts on the socket once per export. The prost +//! protobuf encoding is shared with the HTTP/protobuf path (already covered by +//! `libdd-trace-utils/benches/otlp_encoding.rs`); these benches measure the +//! gRPC-specific framing on top of it, plus the full native-spans -> wire-frame +//! preparation across trace sizes so the per-span cost is visible. + +use criterion::{black_box, criterion_group, criterion_main, BatchSize, Criterion}; +use libdd_trace_utils::msgpack_decoder; +use libdd_trace_utils::otlp_encoder::{map_traces_to_otlp, OtlpResourceInfo}; +use prost::Message; +use serde_json::{json, Value}; + +/// A realistic OTLP-bound span: a handful of string `meta` tags and a couple of +/// numeric `metrics`, so the per-span attribute work (the dominant cost) is +/// exercised. Mirrors the fixture in `libdd-trace-utils/benches/otlp_encoding.rs`. +fn generate_spans(num_spans: usize, trace_id: u64) -> Vec { + let root_span_id = 100_000_000_000 + (trace_id % 1_000_000); + (0..num_spans) + .map(|i| { + let span_id = root_span_id + i as u64; + let is_root = i == 0; + let parent_id = if is_root { 0 } else { root_span_id }; + let mut meta = json!({ + "http.method": "GET", + "http.url": "https://example.com/api/v1/users/12345", + "http.status_code": "200", + "env": "production", + "version": "1.2.3", + "component": "net/http", + }); + if is_root { + meta["_dd.p.tid"] = json!("5b8efff798038103"); + } + json!({ + "service": "bench-service", + "name": "http.request", + "resource": "GET /api/v1/users", + "trace_id": trace_id, + "span_id": span_id, + "parent_id": parent_id, + "start": 1_544_712_660_000_000_000_i64 + i as i64, + "duration": 1_000_000, + "error": 0, + "meta": meta, + "metrics": { "_sampling_priority_v1": 1, "_dd.top_level": 1 }, + "type": "web", + }) + }) + .collect() +} + +fn resource_info() -> OtlpResourceInfo { + // `OtlpResourceInfo` is `#[non_exhaustive]`, so build via Default + field assignment. + let mut info = OtlpResourceInfo::default(); + info.service = "bench-service".to_string(); + info.env = "production".to_string(); + info.app_version = "1.2.3".to_string(); + info.language = "rust".to_string(); + info.tracer_version = "9.9.9".to_string(); + info.runtime_id = "11111111-2222-3333-4444-555555555555".to_string(); + info +} + +/// Frame protobuf bytes exactly as gRPC does: a 1-byte compression flag and a +/// 4-byte big-endian length prefix, then the message body. +fn grpc_frame(body: &[u8]) -> Vec { + let mut framed = Vec::with_capacity(5 + body.len()); + framed.push(0u8); // compression flag: 0 = uncompressed + framed.extend_from_slice(&(body.len() as u32).to_be_bytes()); + framed.extend_from_slice(body); + framed +} + +pub fn grpc_export_benches(c: &mut Criterion) { + let info = resource_info(); + + for &num_spans in &[1usize, 1000usize] { + let id = format!("1x{num_spans}"); + let bytes = + rmp_serde::to_vec(&vec![generate_spans(num_spans, 100_000_000_000)]).expect("fixture"); + let (spans, _) = + msgpack_decoder::v04::from_slice(bytes.as_slice()).expect("decode fixture"); + let req = map_traces_to_otlp(spans.clone(), &info, false); + + // Encode-only: prost OTLP IR -> gRPC wire frame (what the codec emits per export). + c.bench_function(&format!("grpc/encode_framed/{id}"), |b| { + b.iter(|| { + let body = black_box(&req).encode_to_vec(); + black_box(grpc_frame(&body)) + }) + }); + + // End-to-end: native spans -> mapped OTLP IR -> gRPC wire frame. + c.bench_function(&format!("grpc/e2e_framed/{id}"), |b| { + b.iter_batched( + || spans.clone(), + |s| { + let req = map_traces_to_otlp(s, &info, false); + let body = req.encode_to_vec(); + black_box(grpc_frame(&body)) + }, + BatchSize::SmallInput, + ) + }); + } +} + +criterion_group!(benches, grpc_export_benches); +criterion_main!(benches); diff --git a/libdd-data-pipeline/src/otlp/config.rs b/libdd-data-pipeline/src/otlp/config.rs index 76b98493be..53e70df5b1 100644 --- a/libdd-data-pipeline/src/otlp/config.rs +++ b/libdd-data-pipeline/src/otlp/config.rs @@ -6,12 +6,16 @@ use http::HeaderMap; use std::time::Duration; -/// OTLP trace export protocol — selects the HTTP body encoding and `Content-Type`. +/// OTLP trace export protocol — selects the wire transport and body encoding. /// -/// Only the HTTP encodings libdatadog actually supports are representable. A `grpc` value (e.g. -/// resolved from the OTel-default `OTEL_EXPORTER_OTLP_PROTOCOL`) is rejected by -/// [`FromStr`](std::str::FromStr) rather than represented here, so an unsupported protocol can -/// never be constructed and silently mishandled downstream. +/// All three OTel-standard protocol strings parse successfully; the selection +/// controls which send path the exporter uses: +/// - `http/json` and `http/protobuf` → OTLP over HTTP/1.1 via +/// [`HttpClientCapability`](libdd_capabilities::HttpClientCapability). +/// - `grpc` → OTLP over HTTP/2 via a tonic [`Channel`](tonic::transport::Channel). +/// +/// Plaintext gRPC (`http://` scheme, port 4317) is supported. TLS gRPC +/// (`https://` scheme) is not yet implemented — use a TLS-terminating sidecar. #[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] pub enum OtlpProtocol { /// HTTP with a JSON body (`Content-Type: application/json`). The default. @@ -19,6 +23,9 @@ pub enum OtlpProtocol { HttpJson, /// HTTP with a protobuf body (`Content-Type: application/x-protobuf`). HttpProtobuf, + /// gRPC over HTTP/2. Protobuf-encoded body with 5-byte gRPC framing. + /// Default port is 4317. Only plaintext (`http://`) is supported. + Grpc, } impl std::str::FromStr for OtlpProtocol { @@ -27,11 +34,7 @@ impl std::str::FromStr for OtlpProtocol { match s { "http/json" => Ok(OtlpProtocol::HttpJson), "http/protobuf" => Ok(OtlpProtocol::HttpProtobuf), - // gRPC is a valid OTLP protocol in the OTel spec but is not implemented in - // libdatadog. Reject it explicitly so callers get a clean error at the parse - // boundary, rather than constructing an unsupported value that has to be guarded - // against everywhere downstream. - "grpc" => Err("OTLP gRPC export is not supported".to_string()), + "grpc" => Ok(OtlpProtocol::Grpc), other => Err(format!("unknown OTLP protocol: {other}")), } } @@ -40,24 +43,30 @@ impl std::str::FromStr for OtlpProtocol { impl OtlpProtocol { /// The HTTP `Content-Type` for this protocol's body encoding. Crate-internal: the public type /// is only constructed/selected by callers; encoding is the exporter's job. + /// Only called on the HTTP path; the gRPC path uses tonic's ProstCodec. pub(crate) fn content_type(&self) -> http::HeaderValue { + #[allow(clippy::unreachable)] match self { OtlpProtocol::HttpJson => libdd_common::header::APPLICATION_JSON, OtlpProtocol::HttpProtobuf => libdd_common::header::APPLICATION_PROTOBUF, + OtlpProtocol::Grpc => unreachable!("gRPC path does not call content_type()"), } } /// Encode the prost OTLP request to this protocol's wire format. Crate-internal so the /// third-party `serde_json::Error` does not leak into the public API. + /// Only called on the HTTP path; the gRPC path uses tonic's ProstCodec. pub(crate) fn encode( &self, req: &libdd_trace_utils::otlp_encoder::ProtoExportTraceServiceRequest, ) -> Result, serde_json::Error> { + #[allow(clippy::unreachable)] match self { OtlpProtocol::HttpJson => libdd_trace_utils::otlp_encoder::encode_otlp_json(req), OtlpProtocol::HttpProtobuf => { Ok(libdd_trace_utils::otlp_encoder::encode_otlp_protobuf(req)) } + OtlpProtocol::Grpc => unreachable!("gRPC path does not call encode()"), } } } @@ -99,10 +108,15 @@ mod tests { } #[test] - fn grpc_is_rejected_at_parse() { - // gRPC is unsupported, so it must not parse into a protocol: an unsupported value can - // never be constructed. - assert!(OtlpProtocol::from_str("grpc").is_err()); + fn grpc_parses_successfully() { + // gRPC is now a supported protocol — it must parse without error. + assert_eq!(OtlpProtocol::from_str("grpc").unwrap(), OtlpProtocol::Grpc); + } + + #[test] + fn grpc_config_is_send_sync() { + fn assert_send_sync() {} + assert_send_sync::(); } #[test] @@ -118,6 +132,21 @@ mod tests { } } +/// Parsed OTLP gRPC trace exporter configuration. +/// +/// The endpoint URL is consumed at build time to construct the tonic +/// [`Channel`](tonic::transport::Channel); only the per-request settings below +/// are retained here. +#[derive(Clone, Debug)] +pub struct OtlpGrpcTraceConfig { + /// Custom key-value pairs forwarded as gRPC request metadata. + pub headers: Vec<(String, String)>, + /// Per-request timeout (applied via [`tokio::time::timeout`]). + pub timeout: Duration, + /// When `true`, omit DD-specific per-span attributes from the payload. + pub otel_trace_semantics_enabled: bool, +} + /// Parsed OTLP trace-metrics exporter configuration. #[derive(Clone, Debug)] pub struct OtlpMetricsConfig { diff --git a/libdd-data-pipeline/src/otlp/grpc_exporter.rs b/libdd-data-pipeline/src/otlp/grpc_exporter.rs new file mode 100644 index 0000000000..1d4dbc975a --- /dev/null +++ b/libdd-data-pipeline/src/otlp/grpc_exporter.rs @@ -0,0 +1,395 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! OTLP gRPC trace exporter. +//! +//! Sends an [`ExportTraceServiceRequest`] over a tonic gRPC channel using +//! plaintext HTTP/2 (`http://` scheme). TLS (`https://`) is not yet supported. +//! +//! # gRPC framing +//! The inner [`ProstCodecImpl`] + tonic's [`Grpc`](tonic::client::Grpc) handle +//! the 5-byte frame prefix, protobuf encoding, and gRPC trailer parsing +//! automatically, using [`prost`] for message encoding/decoding. + +use super::config::OtlpGrpcTraceConfig; +use crate::trace_exporter::error::{BuilderErrorKind, RequestError, TraceExporterError}; +use libdd_trace_protobuf::opentelemetry::proto::collector::trace::v1::{ + ExportTraceServiceRequest, ExportTraceServiceResponse, +}; +use prost::Message as ProstMessage; +use std::marker::PhantomData; +use std::time::Duration; +use tonic::{ + client::Grpc, + codec::{Codec, DecodeBuf, Decoder, EncodeBuf, Encoder}, + metadata::{AsciiMetadataKey, AsciiMetadataValue}, + transport::{Channel, Endpoint}, + Code, Request, Status, +}; +use tracing::warn; + +/// gRPC path for the OTLP trace export RPC. +const GRPC_EXPORT_PATH: &str = "/opentelemetry.proto.collector.trace.v1.TraceService/Export"; + +/// Metadata header signalling that the client already computed APM stats, so the +/// agent does not recompute them. Attached per-request when stats are enabled. +const CLIENT_COMPUTED_STATS_HEADER: &str = "datadog-client-computed-stats"; + +// --------------------------------------------------------------------------- +// Prost codec implementation — tonic 0.14 removed ProstCodec from its public +// API; we provide a minimal replacement that satisfies tonic::codec::Codec. +// --------------------------------------------------------------------------- + +/// A [`tonic::codec::Codec`] that encodes and decodes prost messages. +#[derive(Clone, Default)] +pub(crate) struct ProstCodecImpl { + _phantom: PhantomData<(Enc, Dec)>, +} + +/// A prost message encoder that implements [`tonic::codec::Encoder`]. +pub(crate) struct ProstEncoder { + _phantom: PhantomData, +} + +impl Default for ProstEncoder { + fn default() -> Self { + Self { + _phantom: PhantomData, + } + } +} + +impl Encoder for ProstEncoder { + type Item = T; + type Error = Status; + + fn encode(&mut self, item: Self::Item, dst: &mut EncodeBuf<'_>) -> Result<(), Self::Error> { + item.encode(dst) + .map_err(|e| Status::internal(format!("Failed to encode protobuf message: {e}"))) + } +} + +/// A prost message decoder that implements [`tonic::codec::Decoder`]. +pub(crate) struct ProstDecoder { + _phantom: PhantomData, +} + +impl Default for ProstDecoder { + fn default() -> Self { + Self { + _phantom: PhantomData, + } + } +} + +impl Decoder for ProstDecoder { + type Item = T; + type Error = Status; + + fn decode(&mut self, src: &mut DecodeBuf<'_>) -> Result, Self::Error> { + use bytes::Buf as _; + // `copy_to_bytes` drains the whole buffer correctly even if `DecodeBuf`'s + // backing store is ever non-contiguous (`chunk()` only returns the first + // contiguous segment, so the old chunk()+advance() form could truncate). + let buf = src.copy_to_bytes(src.remaining()); + match T::decode(buf) { + Ok(msg) => Ok(Some(msg)), + Err(e) => Err(Status::internal(format!( + "Failed to decode protobuf message: {e}" + ))), + } + } +} + +impl Codec for ProstCodecImpl +where + Enc: ProstMessage + Default + Send + 'static, + Dec: ProstMessage + Default + Send + 'static, +{ + type Encode = Enc; + type Decode = Dec; + type Encoder = ProstEncoder; + type Decoder = ProstDecoder; + + fn encoder(&mut self) -> Self::Encoder { + ProstEncoder::default() + } + + fn decoder(&mut self) -> Self::Decoder { + ProstDecoder::default() + } +} + +// --------------------------------------------------------------------------- +// OtlpGrpcTransport +// --------------------------------------------------------------------------- + +/// A connected gRPC transport for OTLP trace export. +/// +/// Holds the per-export config and a lazily-connected tonic [`Channel`]. +/// Clone is cheap — `Channel` is internally reference-counted. +#[derive(Clone, Debug)] +pub(crate) struct OtlpGrpcTransport { + pub(crate) config: OtlpGrpcTraceConfig, + /// Lazily-connected HTTP/2 channel. tonic establishes the TCP connection + /// on the first RPC call and maintains a connection pool afterwards. + pub(crate) channel: Channel, +} + +/// Build a lazy tonic gRPC channel for `endpoint_url`. +/// +/// The channel does **not** connect eagerly — TCP setup happens on the first +/// RPC call. `timeout` is stored on the channel and applied per-request. +/// +/// Only `http://` scheme endpoints are accepted; `https://` is not yet +/// supported (use a TLS-terminating sidecar for encrypted connections). +pub(crate) fn build_grpc_channel( + endpoint_url: &str, + timeout: Duration, +) -> Result { + if endpoint_url.starts_with("https://") { + return Err(TraceExporterError::Builder( + BuilderErrorKind::InvalidConfiguration( + "gRPC TLS (https://) is not yet supported; use http:// and a \ + TLS-terminating sidecar if encryption is required" + .to_string(), + ), + )); + } + let channel = Endpoint::from_shared(endpoint_url.to_owned()) + .map_err(|e| TraceExporterError::Builder(BuilderErrorKind::InvalidUri(e.to_string())))? + .timeout(timeout) + .connect_lazy(); // Non-async; connects on first RPC call. + Ok(channel) +} + +/// Send an OTLP trace export request over gRPC. +/// +/// Uses the `transport.channel` tonic channel with [`ProstCodecImpl`] for +/// encoding/decoding. Custom metadata headers, the test session token, and +/// (when `client_computed_stats` is set) the client-computed-stats header are +/// attached to the request metadata. +/// +/// # Errors +/// +/// Returns [`TraceExporterError::Io`] on timeout or connection failure, +/// [`TraceExporterError::Request`] on non-OK gRPC status codes. +pub(crate) async fn send_otlp_traces_grpc( + transport: &OtlpGrpcTransport, + test_token: Option<&str>, + client_computed_stats: bool, + request: ExportTraceServiceRequest, +) -> Result<(), TraceExporterError> { + let mut client = Grpc::new(transport.channel.clone()); + + let mut req = Request::new(request); + attach_metadata( + &mut req, + &transport.config.headers, + test_token, + client_computed_stats, + ); + + let path = http::uri::PathAndQuery::from_static(GRPC_EXPORT_PATH); + let codec = ProstCodecImpl::::default(); + + // Tower's `Buffer` service (used inside tonic's `Channel`) requires + // `poll_ready` to be called and return `Ready` before `call`. tonic's + // `Grpc::ready()` drives that poll loop for us. + tokio::time::timeout(transport.config.timeout, async { + client.ready().await.map_err(|e| { + TraceExporterError::Io(std::io::Error::other(format!( + "gRPC channel not ready: {e}" + ))) + })?; + client + .unary(req, path, codec) + .await + .map(|_response| ()) + .map_err(grpc_status_to_error) + }) + .await + .map_err(|_| TraceExporterError::Io(std::io::Error::from(std::io::ErrorKind::TimedOut)))? +} + +/// Attach `headers`, the optional test-session token, and the optional +/// client-computed-stats marker to gRPC request metadata. +/// +/// Invalid custom headers are skipped with a warning rather than failing the +/// export — a single malformed user-supplied header should not drop the batch. +fn attach_metadata( + req: &mut Request, + headers: &[(String, String)], + test_token: Option<&str>, + client_computed_stats: bool, +) { + for (k, v) in headers { + match ( + k.parse::(), + v.parse::(), + ) { + (Ok(key), Ok(val)) => { + req.metadata_mut().insert(key, val); + } + _ => warn!("Skipping invalid gRPC metadata header: {k:?}={v:?}"), + } + } + if let Some(token) = test_token { + match token.parse::() { + Ok(val) => { + req.metadata_mut().insert( + AsciiMetadataKey::from_static("x-datadog-test-session-token"), + val, + ); + } + Err(_) => warn!("Skipping invalid test-session token: {token:?}"), + } + } + if client_computed_stats { + req.metadata_mut().insert( + AsciiMetadataKey::from_static(CLIENT_COMPUTED_STATS_HEADER), + AsciiMetadataValue::from_static("yes"), + ); + } +} + +fn grpc_status_to_error(status: Status) -> TraceExporterError { + match status.code() { + Code::Ok => { + // Ok status should never reach the error path — tonic's `unary` + // returns Ok(response) on Code::Ok, so map_err is not called. + TraceExporterError::Io(std::io::Error::other( + "gRPC Ok status reached error handler (unexpected)", + )) + } + // Server temporarily unreachable / overloaded. + Code::Unavailable => TraceExporterError::Io(std::io::Error::new( + std::io::ErrorKind::ConnectionRefused, + status.message(), + )), + // Server-side deadline fired — a timeout, not a refused connection. + Code::DeadlineExceeded => TraceExporterError::Io(std::io::Error::new( + std::io::ErrorKind::TimedOut, + status.message(), + )), + _ => TraceExporterError::Request(RequestError::new( + http::StatusCode::INTERNAL_SERVER_ERROR, + status.message(), + )), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + + #[test] + fn build_grpc_channel_rejects_https() { + let err = build_grpc_channel("https://localhost:4317", Duration::from_secs(10)); + assert!( + err.is_err(), + "https:// should be rejected until TLS is implemented" + ); + let msg = err.unwrap_err().to_string(); + assert!(msg.contains("TLS") || msg.contains("https"), "got: {msg}"); + } + + /// `connect_lazy()` internally registers with the Tokio reactor — wrap in a runtime. + #[tokio::test] + async fn build_grpc_channel_accepts_http() { + // connect_lazy() doesn't dial — this should always succeed. + let result = build_grpc_channel("http://localhost:4317", Duration::from_secs(10)); + assert!(result.is_ok(), "http:// must produce a channel: {result:?}"); + } + + #[test] + fn build_grpc_channel_rejects_malformed_url() { + let err = build_grpc_channel("not a url", Duration::from_secs(10)); + assert!(err.is_err()); + } + + #[test] + fn grpc_status_transient_map_to_io_with_correct_kind() { + // Unavailable → ConnectionRefused; DeadlineExceeded → TimedOut (not refused). + for (status, want) in [ + ( + Status::unavailable("backend down"), + std::io::ErrorKind::ConnectionRefused, + ), + ( + Status::deadline_exceeded("too slow"), + std::io::ErrorKind::TimedOut, + ), + ] { + match grpc_status_to_error(status) { + TraceExporterError::Io(e) => assert_eq!(e.kind(), want), + other => panic!("expected Io, got {other:?}"), + } + } + } + + #[test] + fn grpc_status_application_errors_map_to_request() { + for status in [ + Status::internal("boom"), + Status::invalid_argument("bad"), + Status::resource_exhausted("quota"), + ] { + assert!( + matches!(grpc_status_to_error(status), TraceExporterError::Request(_)), + "non-transient status should map to Request" + ); + } + } + + #[test] + fn attach_metadata_inserts_valid_header_and_token() { + let mut req = Request::new(ExportTraceServiceRequest::default()); + let headers = vec![("x-custom-key".to_string(), "val".to_string())]; + attach_metadata(&mut req, &headers, Some("tok123"), false); + let md = req.metadata(); + assert_eq!(md.get("x-custom-key").unwrap(), "val"); + assert_eq!(md.get("x-datadog-test-session-token").unwrap(), "tok123"); + assert!(md.get(CLIENT_COMPUTED_STATS_HEADER).is_none()); + } + + #[test] + fn attach_metadata_skips_invalid_header_keeps_valid() { + let mut req = Request::new(ExportTraceServiceRequest::default()); + // A space in the key is not a valid HTTP/2 header name, so it is skipped. + let headers = vec![ + ("bad key".to_string(), "v".to_string()), + ("good-key".to_string(), "ok".to_string()), + ]; + attach_metadata(&mut req, &headers, None, false); + assert_eq!(req.metadata().get("good-key").unwrap(), "ok"); + assert_eq!(req.metadata().len(), 1, "only the valid header is attached"); + } + + #[test] + fn attach_metadata_adds_client_computed_stats_when_enabled() { + let mut req = Request::new(ExportTraceServiceRequest::default()); + attach_metadata(&mut req, &[], None, true); + assert_eq!( + req.metadata().get(CLIENT_COMPUTED_STATS_HEADER).unwrap(), + "yes" + ); + } + + /// `connect_lazy()` requires a Tokio runtime — wrap in `#[tokio::test]`. + #[tokio::test] + async fn grpc_transport_is_clone() { + let channel = build_grpc_channel("http://localhost:4317", Duration::from_secs(5)) + .expect("http channel must build"); + let transport = OtlpGrpcTransport { + config: OtlpGrpcTraceConfig { + headers: vec![], + timeout: Duration::from_secs(5), + otel_trace_semantics_enabled: false, + }, + channel, + }; + let _clone = transport.clone(); + } +} diff --git a/libdd-data-pipeline/src/otlp/mod.rs b/libdd-data-pipeline/src/otlp/mod.rs index 0bda6b1b7e..bf428a8c09 100644 --- a/libdd-data-pipeline/src/otlp/mod.rs +++ b/libdd-data-pipeline/src/otlp/mod.rs @@ -29,9 +29,15 @@ pub mod config; pub mod exporter; +// gRPC OTLP export depends on tonic, which does not build for wasm32. +#[cfg(not(target_arch = "wasm32"))] +pub(crate) mod grpc_exporter; pub mod metrics; -pub use config::{OtlpMetricsConfig, OtlpProtocol, OtlpTraceConfig}; +pub use config::{OtlpGrpcTraceConfig, OtlpMetricsConfig, OtlpProtocol, OtlpTraceConfig}; pub use exporter::send_otlp_traces_http; pub use libdd_trace_utils::otlp_encoder::{map_traces_to_otlp, OtlpResourceInfo}; pub use metrics::OtlpStatsExporter; + +#[cfg(not(target_arch = "wasm32"))] +pub(crate) use grpc_exporter::{build_grpc_channel, send_otlp_traces_grpc, OtlpGrpcTransport}; diff --git a/libdd-data-pipeline/src/trace_exporter/builder.rs b/libdd-data-pipeline/src/trace_exporter/builder.rs index b0286df2ea..86a391b526 100644 --- a/libdd-data-pipeline/src/trace_exporter/builder.rs +++ b/libdd-data-pipeline/src/trace_exporter/builder.rs @@ -4,6 +4,9 @@ use crate::agent_info::AgentInfoFetcher; use crate::otlp::config::{OtlpProtocol, DEFAULT_OTLP_TIMEOUT}; use crate::otlp::{OtlpMetricsConfig, OtlpResourceInfo, OtlpTraceConfig}; +// gRPC OTLP export depends on tonic, which does not build for wasm32. +#[cfg(not(target_arch = "wasm32"))] +use crate::otlp::{build_grpc_channel, OtlpGrpcTraceConfig, OtlpGrpcTransport}; #[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))] use crate::telemetry::TelemetryClientBuilder; use crate::trace_exporter::agent_response::AgentResponsePayloadVersion; @@ -13,9 +16,9 @@ use crate::trace_exporter::TelemetryConfig; #[cfg(not(target_arch = "wasm32"))] use crate::trace_exporter::TraceExporterWorkers; use crate::trace_exporter::{ - add_path, StatsComputationStatus, TelemetryInstrumentationSessions, TraceExporter, - TraceExporterError, TraceExporterInputFormat, TraceExporterOutputFormat, TraceSerializer, - TracerMetadata, INFO_ENDPOINT, + add_path, OtlpExportMode, StatsComputationStatus, TelemetryInstrumentationSessions, + TraceExporter, TraceExporterError, TraceExporterInputFormat, TraceExporterOutputFormat, + TraceSerializer, TracerMetadata, INFO_ENDPOINT, }; use arc_swap::ArcSwap; use libdd_capabilities::{HttpClientCapability, MaybeSend, SleepCapability}; @@ -374,10 +377,10 @@ impl TraceExporterBuilder { self } - /// Selects the OTLP export protocol: [`OtlpProtocol::HttpJson`] (default) or - /// [`OtlpProtocol::HttpProtobuf`]. The host language resolves this from - /// `OTEL_EXPORTER_OTLP_TRACES_PROTOCOL` / `OTEL_EXPORTER_OTLP_PROTOCOL`; a `grpc` value is - /// unsupported and is rejected when parsed into [`OtlpProtocol`], so it never reaches here. + /// Selects the OTLP export protocol. The host language resolves this from + /// `OTEL_EXPORTER_OTLP_TRACES_PROTOCOL` / `OTEL_EXPORTER_OTLP_PROTOCOL`. All three OTel + /// protocol strings (`http/json`, `http/protobuf`, `grpc`) are valid. Plaintext gRPC only; + /// `https://` endpoints are rejected at [`build`](Self::build) time. pub fn set_otlp_protocol(&mut self, protocol: OtlpProtocol) -> &mut Self { self.otlp_protocol = protocol; self @@ -575,15 +578,39 @@ impl TraceExporterBuilder { .map(Duration::from_millis) .unwrap_or(DEFAULT_OTLP_TIMEOUT); - // `self.otlp_protocol` is always an HTTP encoding here: gRPC is rejected at the parse - // boundary (`OtlpProtocol::from_str`) and so can never be constructed. - let otlp_config = self.otlp_endpoint.map(|url| OtlpTraceConfig { - endpoint_url: url, - headers: build_otlp_header_map(self.otlp_headers), - timeout: otlp_timeout, - protocol: self.otlp_protocol, - otel_trace_semantics_enabled: self.otel_trace_semantics_enabled, - }); + let otlp = match self.otlp_endpoint { + #[cfg(not(target_arch = "wasm32"))] + Some(ref url) if self.otlp_protocol == OtlpProtocol::Grpc => { + let channel = build_grpc_channel(url, otlp_timeout)?; + Some(OtlpExportMode::Grpc(OtlpGrpcTransport { + config: OtlpGrpcTraceConfig { + headers: self.otlp_headers.clone(), + timeout: otlp_timeout, + otel_trace_semantics_enabled: self.otel_trace_semantics_enabled, + }, + channel, + })) + } + // gRPC export is unavailable on wasm32 (tonic does not build there). Reject it + // explicitly instead of falling through to the HTTP arm, which would store + // `OtlpProtocol::Grpc` in `OtlpTraceConfig` and panic at `encode()` on send. + #[cfg(target_arch = "wasm32")] + Some(_) if self.otlp_protocol == OtlpProtocol::Grpc => { + return Err(TraceExporterError::Builder( + BuilderErrorKind::InvalidConfiguration( + "OTLP gRPC export is not supported on wasm32 targets".to_string(), + ), + )); + } + Some(ref url) => Some(OtlpExportMode::Http(OtlpTraceConfig { + endpoint_url: url.clone(), + headers: build_otlp_header_map(self.otlp_headers.clone()), + timeout: otlp_timeout, + protocol: self.otlp_protocol, + otel_trace_semantics_enabled: self.otel_trace_semantics_enabled, + })), + None => None, + }; let otlp_metrics_config = self.otlp_metrics_endpoint.map(|url| OtlpMetricsConfig { endpoint_url: url, @@ -705,7 +732,7 @@ impl TraceExporterBuilder { agent_payload_response_version: self .agent_rates_payload_version_enabled .then(AgentResponsePayloadVersion::new), - otlp_config, + otlp, trace_filterer: ArcSwap::from_pointee(TraceFilterer::with_empty_conf()), otlp_stats_enabled, }) @@ -885,4 +912,50 @@ mod tests { "http://127.0.0.1:8126/v0.4/traces" ); } + + #[cfg_attr(miri, ignore)] + #[test] + fn build_with_grpc_protocol_and_endpoint_succeeds() { + let mut builder = TraceExporterBuilder::default(); + builder + .set_otlp_endpoint("http://localhost:4317") + .set_otlp_protocol(crate::otlp::OtlpProtocol::Grpc); + // connect_lazy() doesn't dial, so this must build successfully even without a server. + let result = builder.build::(); + assert!( + result.is_ok(), + "gRPC protocol + endpoint should build successfully: {result:?}" + ); + } + + #[cfg_attr(miri, ignore)] + #[test] + fn build_with_grpc_protocol_no_endpoint_uses_agent_path() { + // A configured gRPC protocol without an OTLP endpoint is inert — the exporter + // falls back to the normal Datadog agent path. + let mut builder = TraceExporterBuilder::default(); + builder + .set_url("http://localhost:8126") + .set_otlp_protocol(crate::otlp::OtlpProtocol::Grpc); + let result = builder.build::(); + assert!( + result.is_ok(), + "gRPC protocol without endpoint must use agent path, not fail: {result:?}" + ); + } + + // build() spins up the shared runtime, whose worker hits a syscall Miri can't execute. + #[cfg_attr(miri, ignore)] + #[test] + fn build_with_grpc_https_endpoint_rejected() { + let mut builder = TraceExporterBuilder::default(); + builder + .set_otlp_endpoint("https://collector.example.com:4317") + .set_otlp_protocol(crate::otlp::OtlpProtocol::Grpc); + let result = builder.build::(); + assert!( + result.is_err(), + "https:// gRPC should be rejected until TLS is implemented" + ); + } } diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index a3a19aed78..eb7a8ee656 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -17,6 +17,9 @@ use self::stats::StatsComputationStatus; use self::trace_serializer::TraceSerializer; use crate::agent_info::ResponseObserver; use crate::otlp::{map_traces_to_otlp, send_otlp_traces_http, OtlpResourceInfo, OtlpTraceConfig}; +// gRPC OTLP export depends on tonic, which does not build for wasm32. +#[cfg(not(target_arch = "wasm32"))] +use crate::otlp::{send_otlp_traces_grpc, OtlpGrpcTransport}; #[cfg(feature = "telemetry")] use crate::telemetry::{SendPayloadTelemetry, TelemetryClient}; use crate::trace_exporter::agent_response::{ @@ -58,6 +61,19 @@ use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; use tracing::{debug, error, warn}; +/// Selects the OTLP export transport for a [`TraceExporter`] instance. +/// +/// Only one variant is active per instance; mutual exclusivity is enforced at +/// build time. +#[derive(Debug)] +pub(crate) enum OtlpExportMode { + /// OTLP over HTTP/1.1 (JSON or protobuf body). + Http(OtlpTraceConfig), + /// OTLP over HTTP/2 via gRPC. Unavailable on wasm32 (tonic does not build there). + #[cfg(not(target_arch = "wasm32"))] + Grpc(OtlpGrpcTransport), +} + const INFO_ENDPOINT: &str = "/info"; const V04_TRACES_ENDPOINT: &str = "/v0.4/traces"; const V05_TRACES_ENDPOINT: &str = "/v0.5/traces"; @@ -219,8 +235,8 @@ pub struct TraceExporter< #[cfg(not(target_arch = "wasm32"))] workers: TraceExporterWorkers, agent_payload_response_version: Option, - /// When set, traces are exported via OTLP HTTP/JSON instead of the Datadog agent. - otlp_config: Option, + /// When set, traces are exported via OTLP (HTTP or gRPC) instead of the Datadog agent. + otlp: Option, trace_filterer: ArcSwap, /// When true, span stats are computed and exported as OTLP metrics. The concentrator is /// started at build time, so agent-driven stats (de)activation in `check_agent_info` is @@ -567,23 +583,26 @@ impl OtlpResourceInfo { + let mut r = OtlpResourceInfo::default(); + r.service = self.metadata.service.clone(); + r.env = self.metadata.env.clone(); + r.app_version = self.metadata.app_version.clone(); + r.language = self.metadata.language.clone(); + r.tracer_version = self.metadata.tracer_version.clone(); + r.runtime_id = self.metadata.runtime_id.clone(); + r.client_computed_stats = self.otlp_stats_enabled; + r + } + /// Sends trace chunks via OTLP HTTP (JSON or protobuf) when OTLP config is enabled. async fn send_otlp_traces_inner( &self, traces: Vec>>, config: &OtlpTraceConfig, ) -> Result { - let resource_info = { - let mut r = OtlpResourceInfo::default(); - r.service = self.metadata.service.clone(); - r.env = self.metadata.env.clone(); - r.app_version = self.metadata.app_version.clone(); - r.language = self.metadata.language.clone(); - r.tracer_version = self.metadata.tracer_version.clone(); - r.runtime_id = self.metadata.runtime_id.clone(); - r.client_computed_stats = self.otlp_stats_enabled; - r - }; + let resource_info = self.build_otlp_resource_info(); // Single prost OTLP IR; the configured protocol encodes the same request to its wire // format (JSON or protobuf). OTel-semantics gating (omit DD-specific attrs) happens in // the mapper. @@ -620,6 +639,37 @@ impl( + &self, + traces: Vec>>, + transport: &OtlpGrpcTransport, + ) -> Result { + let resource_info = self.build_otlp_resource_info(); + + // map_traces_to_otlp returns the prost ExportTraceServiceRequest directly — + // the same type expected by send_otlp_traces_grpc; no conversion needed. + let proto_request = map_traces_to_otlp( + traces, + &resource_info, + transport.config.otel_trace_semantics_enabled, + ); + + // The client-computed-stats marker is attached as request metadata + // inside `send_otlp_traces_grpc`, avoiding a per-export clone of the + // transport (and its header `Vec`) on the hot path. + send_otlp_traces_grpc( + transport, + self.endpoint.test_token.as_deref(), + self.otlp_stats_enabled, + proto_request, + ) + .await?; + + Ok(AgentResponse::Unchanged) + } + /// Send traces payload to agent with retry and telemetry reporting async fn send_traces_with_telemetry( &self, @@ -682,12 +732,21 @@ impl { + return self.send_otlp_traces_inner(traces, config).await; + } + #[cfg(not(target_arch = "wasm32"))] + Some(OtlpExportMode::Grpc(transport)) => { + return self.send_otlp_grpc_inner(traces, transport).await; + } + None => {} } // Snapshot the effective format once so the serializer and the URL agree even if diff --git a/libdd-data-pipeline/tests/test_trace_exporter_otlp_grpc.rs b/libdd-data-pipeline/tests/test_trace_exporter_otlp_grpc.rs new file mode 100644 index 0000000000..c960595a12 --- /dev/null +++ b/libdd-data-pipeline/tests/test_trace_exporter_otlp_grpc.rs @@ -0,0 +1,192 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! End-to-end test for OTLP gRPC trace export. +//! +//! Starts an in-process HTTP/2 gRPC server using the `h2` crate, configures +//! a [`TraceExporter`] with `OtlpProtocol::Grpc`, sends a trace, and asserts +//! that the server received a valid [`ExportTraceServiceRequest`] containing +//! the expected span. + +#[cfg(test)] +mod grpc_export_tests { + use bytes::Bytes; + use h2::server; + use libdd_capabilities_impl::NativeCapabilities; + use libdd_data_pipeline::{trace_exporter::TraceExporterBuilder, OtlpProtocol}; + use libdd_trace_protobuf::opentelemetry::proto::{ + collector::trace::v1::{ExportTraceServiceRequest, ExportTraceServiceResponse}, + common::v1::any_value::Value, + }; + use libdd_trace_utils::test_utils::create_test_json_span; + use prost::Message; + use serde_json::json; + use std::sync::mpsc; + use std::time::Duration; + use tokio::net::TcpListener; + + /// Runs the in-process h2 server: accept connections and handle each Export + /// stream in a spawned task. + /// + /// The `accept()` loop must keep running for the lifetime of the connection — + /// polling the [`server::Connection`] is what reads incoming frames and routes + /// DATA to an open stream's body, and what flushes our queued response frames. + /// Reading a request body inline (without looping `accept()`) would stall: a + /// DATA frame that lands after `accept()` returns the stream would never be + /// read, hanging `body.data()` until the client's request timeout. + async fn run_grpc_test_server( + listener: TcpListener, + req_tx: mpsc::Sender, + ) { + let (socket, _) = listener.accept().await.unwrap(); + let mut h2_conn = server::handshake(socket).await.unwrap(); + while let Some(result) = h2_conn.accept().await { + if let Ok((request, respond)) = result { + tokio::spawn(handle_export_stream(request, respond, req_tx.clone())); + } + } + } + + /// Decode one gRPC Export request and reply with a success response + trailer. + async fn handle_export_stream( + request: http::Request, + mut respond: h2::server::SendResponse, + req_tx: mpsc::Sender, + ) { + // Collect the full request body. The chunk length must be captured before + // `extend_from_slice` consumes it to satisfy the borrow checker. + let mut body = request.into_body(); + let mut frame_data: Vec = Vec::new(); + while let Some(chunk) = body.data().await { + let Ok(chunk) = chunk else { return }; + let len = chunk.len(); + frame_data.extend_from_slice(&chunk); + body.flow_control().release_capacity(len).ok(); + } + + // Decode the gRPC frame: skip the 5-byte prefix (1 compression flag + 4 length). + let decoded = if frame_data.len() > 5 { + ExportTraceServiceRequest::decode(&frame_data[5..]).ok() + } else { + None + }; + if let Some(req) = decoded { + let _ = req_tx.send(req); + } + + // Send gRPC success response: 200 headers + protobuf body + grpc-status trailer. + let response_proto = ExportTraceServiceResponse::default(); + let proto_bytes = response_proto.encode_to_vec(); + let mut frame = Vec::with_capacity(5 + proto_bytes.len()); + frame.push(0u8); // no compression + frame.extend_from_slice(&(proto_bytes.len() as u32).to_be_bytes()); + frame.extend_from_slice(&proto_bytes); + + let response = http::Response::builder() + .status(200) + .header("content-type", "application/grpc") + .body(()) + .unwrap(); + let Ok(mut send_stream) = respond.send_response(response, false) else { + return; + }; + let _ = send_stream.send_data(Bytes::from(frame), false); + + // gRPC-status trailer (trailing HEADERS frame with END_STREAM). + let mut trailers = http::HeaderMap::new(); + trailers.insert("grpc-status", "0".parse().unwrap()); + let _ = send_stream.send_trailers(trailers); + } + + #[cfg_attr(miri, ignore)] + #[test] + fn grpc_export_sends_decodable_request() { + // The h2 server runs on its own OS thread + runtime, and the exporter + // drives its own runtime via `send` on this thread. Keeping them on + // separate threads (rather than one shared test runtime) means neither + // can starve the other under parallel CI load — the previous + // `#[tokio::test]` current-thread setup could deadlock the server task. + let (port_tx, port_rx) = mpsc::channel::(); + let (req_tx, req_rx) = mpsc::channel::(); + + // The server thread is detached and time-bounded end to end (60s ceiling): + // if the client never connects, the bound fires and the thread exits rather + // than lingering or blocking. The test never joins it — verification comes + // from `send` returning Ok (the client received grpc-status:0) plus the + // decoded request delivered on `req_rx`. + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(async move { + let _ = tokio::time::timeout(Duration::from_secs(60), async move { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + port_tx.send(listener.local_addr().unwrap().port()).unwrap(); + run_grpc_test_server(listener, req_tx).await; + }) + .await; + }); + }); + + let port = port_rx + .recv_timeout(Duration::from_secs(10)) + .expect("server did not bind within 10s"); + let endpoint = format!("http://127.0.0.1:{port}"); + + let mut builder = TraceExporterBuilder::default(); + builder + .set_otlp_endpoint(&endpoint) + .set_otlp_protocol(OtlpProtocol::Grpc) + .set_connection_timeout(Some(30_000)) + .set_language("test-lang") + .set_tracer_version("1.0") + .set_env("grpc-test-env") + .set_service("grpc-test-svc"); + let exporter = builder.build::().expect("build"); + + let mut span = create_test_json_span(1234, 12342, 12341, 1, false); + span["service"] = json!("grpc-test-svc"); + span["name"] = json!("grpc_span"); + let data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); + // No ambient async runtime on this thread, so the exporter's internal + // `block_on` runs directly without `spawn_blocking`. + exporter.send(data.as_ref()).expect("send ok"); + + // The server decodes and forwards the request before responding, so it is + // already available once `send` returns. + let received = req_rx + .recv_timeout(Duration::from_secs(10)) + .expect("server did not receive a decodable request within 10s"); + + // Validate the decoded request contains the expected span. + assert!( + !received.resource_spans.is_empty(), + "expected at least one ResourceSpans" + ); + let service_name = received + .resource_spans + .first() + .and_then(|rs| rs.resource.as_ref()) + .and_then(|r| { + r.attributes.iter().find_map(|kv| { + if kv.key == "service.name" { + kv.value + .as_ref() + .and_then(|v| v.value.as_ref()) + .and_then(|v| match v { + Value::StringValue(s) => Some(s.as_str()), + _ => None, + }) + } else { + None + } + }) + }); + assert_eq!( + service_name, + Some("grpc-test-svc"), + "service.name attribute not found or wrong value" + ); + } +}