Skip to content

Commit c2751ef

Browse files
benglekump
andauthored
feat(data-pipeline)!: add stdout log trace exporter (#2074)
# What does this PR do? Adds a stdout "log exporter" transport to libdatadog's trace pipeline. When enabled, `TraceExporter` writes traces as newline-delimited JSON (`{"traces":[[...]]}`) to stdout in the format consumed by the Datadog Forwarder, instead of sending them to an agent over HTTP. New pieces: - **`libdd-trace-utils::json_log_encoder`** — Forwarder JSON encoder: lowercase hex IDs (incl. 128-bit), integer `error`, ns `start`/`duration`, empty-field omission, greedy 256 KiB line batching, oversize-span drop. - **`TraceExporterBuilder::set_output_to_log(max_line_size)`** — selects a stdout destination that bypasses agent-info polling, client-side stats, V1 negotiation, and telemetry. Exposed over FFI as `ddog_trace_exporter_config_set_output_to_log`. - **`libdd_capabilities::LogWriterCapability`** — the write goes through a capability so the transport also works on wasm (where the host, e.g. JavaScript, supplies the sink). The native capability writes to stdout. # Motivation Serverless runtimes (primarily AWS Lambda) often have no reachable Datadog agent or Lambda extension. The established fallback is to write traces to stdout, where the Datadog Forwarder tails the logs and submits them to the trace intake. Today each tracer (dd-trace-js/py/go/java) reimplements this independently; this lands a single reusable implementation in libdatadog that native-backed tracers can adopt. # Additional Notes - **Breaking (Rust API):** `TraceExporter<C>` (and `TraceExporterBuilder::build`/`build_async`, `trace_buffer::DefaultExport<C>`) now require `C: LogWriterCapability`. Callers using a custom capability bundle must implement `libdd_capabilities::LogWriterCapability`; `NativeCapabilities` already implements it, so callers using `NativeCapabilities` are unaffected. PR title carries the `!` marker accordingly. - Selection policy stays with the caller — there is no env-reading helper in libdatadog (an earlier `recommended_log_output()` was removed per review; reading env from libdatadog has caused production crashes). The SDK detects Lambda/agent/extension itself and calls `set_output_to_log`. - `meta_struct` is intentionally omitted (raw msgpack the log intake can't interpret), matching the reference exporters. - Writes are **synchronous/blocking** — intended for single-threaded serverless runtimes where there is no shared async reactor to stall. Log output takes precedence over an OTLP endpoint if both are configured. - No new third-party dependencies (`serde_json` already in tree) → no `Cargo.lock` / `LICENSE-3rdparty.csv` churn. # How to test the change? ```bash cargo nextest run -p libdd-trace-utils -p libdd-data-pipeline -p libdd-data-pipeline-ffi -E '!test(tracing_integration_tests::)' cargo +stable clippy -p libdd-capabilities -p libdd-capabilities-impl -p libdd-trace-utils -p libdd-data-pipeline -p libdd-data-pipeline-ffi --all-targets --all-features -- -D warnings cargo ffi-test # FFI crate touched ``` Key tests: - `json_log_encoder` — golden bytes, Forwarder `is_trace` contract, hex (incl. 128-bit), size-cap batching, oversize-drop, non-finite-metric → null, multi-trace flatten, span_links/span_events present-case. - `trace_exporter::tests::test_log_mode_send_writes_forwarder_json` — full `send(msgpack)` → decode → log branch → capability bytes (via a capturing test capability). - `trace_exporter::tests::test_log_mode_makes_no_agent_requests` — zero agent calls + `info_fetcher` not spawned in log mode. - `log_writer` helper tests; FFI `config_output_to_log_test`. Co-authored-by: ekump <edmund.kump@datadoghq.com>
1 parent 3ff0006 commit c2751ef

11 files changed

Lines changed: 1173 additions & 30 deletions

File tree

libdd-capabilities-impl/src/lib.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use std::time::Duration;
1515

1616
pub use http::NativeHttpClient;
1717
use libdd_capabilities::{http::HttpError, MaybeSend};
18-
pub use libdd_capabilities::{HttpClientCapability, SleepCapability};
18+
pub use libdd_capabilities::{HttpClientCapability, LogWriterCapability, SleepCapability};
1919
pub use sleep::NativeSleepCapability;
2020

2121
/// Bundle struct for native platform capabilities.
@@ -64,6 +64,18 @@ impl HttpClientCapability for NativeCapabilities {
6464
}
6565
}
6666

67+
impl LogWriterCapability for NativeCapabilities {
68+
fn write_log_output(&self, bytes: &[u8]) -> std::io::Result<()> {
69+
use std::io::Write;
70+
// `Stdout` is internally synchronized; lock once so the whole buffer
71+
// (one or more newline-terminated JSON lines) is written without
72+
// interleaving, then flush.
73+
let mut out = std::io::stdout().lock();
74+
out.write_all(bytes)?;
75+
out.flush()
76+
}
77+
}
78+
6779
impl SleepCapability for NativeCapabilities {
6880
fn new() -> Self {
6981
Self {

libdd-capabilities/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@
44
//! Portable capability traits for cross-platform libdatadog.
55
66
pub mod http;
7+
pub mod log_output;
78
pub mod maybe_send;
89
pub mod sleep;
910
pub mod spawn;
1011

1112
pub use self::http::{HttpClientCapability, HttpError};
13+
pub use self::log_output::LogWriterCapability;
1214
pub use self::sleep::SleepCapability;
1315
pub use self::spawn::SpawnError;
1416
pub use ::http::{Request, Response};
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
//! Log-output capability trait.
5+
//!
6+
//! Lets the trace pipeline emit already-encoded log-exporter output to the
7+
//! platform's log sink. On native targets this writes to process stdout; a wasm
8+
//! consumer implements it by handing the bytes to the host (e.g. JavaScript),
9+
//! since wasm cannot write to stdout directly.
10+
11+
/// Capability for writing encoded log-exporter output to the platform log sink.
12+
pub trait LogWriterCapability {
13+
/// Write a buffer of newline-delimited log output.
14+
///
15+
/// `bytes` may contain one or more `\n`-terminated JSON lines. Implementations
16+
/// should write the whole buffer (so individual lines are not interleaved with
17+
/// other writers) and flush before returning.
18+
fn write_log_output(&self, bytes: &[u8]) -> std::io::Result<()>;
19+
}

libdd-data-pipeline-ffi/src/trace_exporter.rs

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ pub struct TraceExporterConfig {
8888
shared_runtime: Option<Arc<ForkSafeRuntime>>,
8989
otlp_endpoint: Option<String>,
9090
otlp_protocol: Option<OtlpProtocol>,
91+
output_to_log: bool,
92+
log_max_line_size: Option<usize>,
9193
}
9294

9395
#[no_mangle]
@@ -541,6 +543,37 @@ pub unsafe extern "C" fn ddog_trace_exporter_config_set_otlp_protocol(
541543
)
542544
}
543545

546+
/// Configure the exporter to write traces as newline-delimited JSON to stdout (the Datadog
547+
/// Forwarder "log exporter" path) instead of sending them to a Datadog agent. Used in serverless
548+
/// environments (e.g. AWS Lambda) when no agent is reachable.
549+
///
550+
/// `max_line_size` overrides the per-line byte cap; pass `0` to use the default (256 KiB, the AWS
551+
/// CloudWatch Logs limit). When enabled, agent-specific behavior (agent-info polling, client-side
552+
/// stats, V1 negotiation) is bypassed.
553+
///
554+
/// In this mode each span's `meta` is serialized to process stdout (and thus captured by CloudWatch
555+
/// Logs in Lambda); `meta_struct` is excluded because it holds raw msgpack the log intake cannot
556+
/// interpret.
557+
///
558+
/// Writes are synchronous/blocking on stdout, so this mode targets single-threaded / current-thread
559+
/// serverless runtimes (e.g. AWS Lambda) where a blocking write won't stall a shared async reactor.
560+
#[no_mangle]
561+
pub unsafe extern "C" fn ddog_trace_exporter_config_set_output_to_log(
562+
config: Option<&mut TraceExporterConfig>,
563+
max_line_size: usize,
564+
) -> Option<Box<ExporterError>> {
565+
catch_panic!(
566+
if let Some(handle) = config {
567+
handle.output_to_log = true;
568+
handle.log_max_line_size = (max_line_size != 0).then_some(max_line_size);
569+
None
570+
} else {
571+
gen_error!(ErrorCode::InvalidArgument)
572+
},
573+
gen_error!(ErrorCode::Panic)
574+
)
575+
}
576+
544577
/// Create a new TraceExporter instance.
545578
///
546579
/// When an OTLP endpoint is configured via `TraceExporterConfig`, the exporter sends traces to
@@ -613,6 +646,10 @@ pub unsafe extern "C" fn ddog_trace_exporter_new(
613646
}
614647
}
615648

649+
if config.output_to_log {
650+
builder.set_output_to_log(config.log_max_line_size);
651+
}
652+
616653
match builder.build() {
617654
Ok(exporter) => {
618655
out_handle.as_ptr().write(Box::new(exporter));
@@ -708,11 +745,37 @@ mod tests {
708745
assert!(cfg.process_tags.is_none());
709746
assert!(cfg.test_session_token.is_none());
710747
assert!(cfg.connection_timeout.is_none());
748+
assert!(!cfg.output_to_log);
749+
assert_eq!(cfg.log_max_line_size, None);
711750

712751
ddog_trace_exporter_config_free(cfg);
713752
}
714753
}
715754

755+
#[test]
756+
fn config_output_to_log_test() {
757+
unsafe {
758+
// Null config handle -> InvalidArgument.
759+
let error = ddog_trace_exporter_config_set_output_to_log(None, 0);
760+
assert_eq!(error.as_ref().unwrap().code, ErrorCode::InvalidArgument);
761+
ddog_trace_exporter_error_free(error);
762+
763+
// 0 is a sentinel for "use the default cap" -> None.
764+
let mut config = Some(TraceExporterConfig::default());
765+
let error = ddog_trace_exporter_config_set_output_to_log(config.as_mut(), 0);
766+
assert_eq!(error, None);
767+
let cfg = config.unwrap();
768+
assert!(cfg.output_to_log);
769+
assert_eq!(cfg.log_max_line_size, None);
770+
771+
// Non-zero cap is stored as-is.
772+
let mut config = Some(TraceExporterConfig::default());
773+
let error = ddog_trace_exporter_config_set_output_to_log(config.as_mut(), 4096);
774+
assert_eq!(error, None);
775+
assert_eq!(config.unwrap().log_max_line_size, Some(4096));
776+
}
777+
}
778+
716779
#[test]
717780
fn config_url_test() {
718781
unsafe {

libdd-data-pipeline/src/trace_buffer/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use std::{
1313
time::{Duration, Instant},
1414
};
1515

16-
use libdd_capabilities::{HttpClientCapability, MaybeSend, SleepCapability};
16+
use libdd_capabilities::{HttpClientCapability, LogWriterCapability, MaybeSend, SleepCapability};
1717
use libdd_shared_runtime::{SharedRuntime, Worker};
1818

1919
use crate::trace_exporter::{
@@ -645,15 +645,15 @@ pub trait Export<T>: Send + Debug {
645645
#[derive(Debug)]
646646
pub struct DefaultExport<C, R>
647647
where
648-
C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static,
648+
C: HttpClientCapability + SleepCapability + LogWriterCapability + MaybeSend + Sync + 'static,
649649
R: SharedRuntime + std::fmt::Debug + Send + Sync + 'static,
650650
{
651651
trace_exporter: TraceExporter<C, R>,
652652
}
653653

654654
impl<C, R> DefaultExport<C, R>
655655
where
656-
C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static,
656+
C: HttpClientCapability + SleepCapability + LogWriterCapability + MaybeSend + Sync + 'static,
657657
R: SharedRuntime + std::fmt::Debug + Send + Sync + 'static,
658658
{
659659
pub fn new(trace_exporter: TraceExporter<C, R>) -> Self {
@@ -663,7 +663,7 @@ where
663663

664664
impl<C, R> Export<libdd_trace_utils::span::v04::SpanBytes> for DefaultExport<C, R>
665665
where
666-
C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static,
666+
C: HttpClientCapability + SleepCapability + LogWriterCapability + MaybeSend + Sync + 'static,
667667
R: SharedRuntime + std::fmt::Debug + Send + Sync + 'static,
668668
{
669669
fn export_trace_chunks(

libdd-data-pipeline/src/trace_exporter/builder.rs

Lines changed: 103 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use crate::otlp::{OtlpMetricsConfig, OtlpResourceInfo, OtlpTraceConfig};
88
use crate::telemetry::TelemetryClientBuilder;
99
use crate::trace_exporter::agent_response::AgentResponsePayloadVersion;
1010
use crate::trace_exporter::error::BuilderErrorKind;
11+
use crate::trace_exporter::log_writer::DEFAULT_LOG_MAX_LINE_SIZE;
1112
#[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))]
1213
use crate::trace_exporter::TelemetryConfig;
1314
#[cfg(not(target_arch = "wasm32"))]
@@ -18,7 +19,7 @@ use crate::trace_exporter::{
1819
TracerMetadata, INFO_ENDPOINT,
1920
};
2021
use arc_swap::ArcSwap;
21-
use libdd_capabilities::{HttpClientCapability, MaybeSend, SleepCapability};
22+
use libdd_capabilities::{HttpClientCapability, LogWriterCapability, MaybeSend, SleepCapability};
2223
use libdd_common::{parse_uri, tag, Endpoint};
2324
use libdd_dogstatsd_client::new;
2425
use libdd_shared_runtime::SharedRuntime;
@@ -90,6 +91,11 @@ pub struct TraceExporterBuilder<R: SharedRuntime> {
9091
otlp_metrics_headers: Vec<(String, String)>,
9192
otel_trace_semantics_enabled: bool,
9293
runtime_id: Option<String>,
94+
/// When true, traces are written as newline-delimited JSON to stdout (the
95+
/// Datadog Forwarder "log exporter" path) instead of being sent to an agent.
96+
output_to_log: bool,
97+
/// Optional override for the maximum size of a single emitted log line.
98+
log_max_line_size: Option<usize>,
9399
}
94100

95101
/// Default is impl'd for `R = ForkSafeRuntime` only so that bare
@@ -149,6 +155,8 @@ impl<R: SharedRuntime> TraceExporterBuilder<R> {
149155
otlp_metrics_headers: Vec::new(),
150156
otel_trace_semantics_enabled: false,
151157
runtime_id: None,
158+
output_to_log: false,
159+
log_max_line_size: None,
152160
}
153161
}
154162
}
@@ -425,14 +433,41 @@ impl<R: SharedRuntime> TraceExporterBuilder<R> {
425433
self.runtime_id = Some(id.to_owned());
426434
self
427435
}
436+
/// Configure the exporter to write traces as newline-delimited JSON to stdout
437+
/// (the Datadog Forwarder "log exporter" path) instead of sending them to a
438+
/// Datadog agent. This is the transport used in serverless environments (e.g.
439+
/// AWS Lambda) when no agent is reachable.
440+
///
441+
/// `max_line_size` overrides the per-line byte cap; `None` (or `Some(0)`,
442+
/// which is coerced to the default) uses the default of 256 KiB, the AWS
443+
/// CloudWatch Logs per-event limit. When this is set, agent-specific behavior
444+
/// (agent-info polling, client-side stats, V1 negotiation) is bypassed.
445+
///
446+
/// In this mode each span's `meta` is serialized to process stdout (and thus
447+
/// captured by CloudWatch Logs in Lambda); `meta_struct` is excluded because
448+
/// it holds raw msgpack the log intake cannot interpret. Writes are
449+
/// synchronous/blocking, so this mode targets single-threaded serverless
450+
/// runtimes where blocking stdout writes do not stall a shared async reactor.
451+
///
452+
/// Takes precedence over an OTLP endpoint: if both this and `set_otlp_endpoint`
453+
/// are configured, traces are written to the log output and not sent via OTLP.
454+
pub fn set_output_to_log(&mut self, max_line_size: Option<usize>) -> &mut Self {
455+
self.output_to_log = true;
456+
// Treat `Some(0)` as "use the default" (a 0 cap would drop every span);
457+
// keeps parity with the FFI setter's 0-sentinel.
458+
self.log_max_line_size = max_line_size.filter(|&n| n != 0);
459+
self
460+
}
428461

429462
/// Build the [`TraceExporter`] synchronously.
430463
///
431464
/// Sync facade over [`Self::build_async`]; panics inside an existing tokio context.
432465
/// Requires `R: BlockingRuntime` so the builder can drive setup on the runtime. Not
433466
/// available on wasm — use [`Self::build_async`] there.
434467
#[cfg(not(target_arch = "wasm32"))]
435-
pub fn build<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static>(
468+
pub fn build<
469+
C: HttpClientCapability + SleepCapability + LogWriterCapability + MaybeSend + Sync + 'static,
470+
>(
436471
mut self,
437472
) -> Result<TraceExporter<C, R>, TraceExporterError>
438473
where
@@ -459,7 +494,7 @@ impl<R: SharedRuntime> TraceExporterBuilder<R> {
459494
/// context. If [`set_shared_runtime`](Self::set_shared_runtime) was not called, a new
460495
/// runtime is constructed via [`SharedRuntime::new`].
461496
pub async fn build_async<
462-
C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static,
497+
C: HttpClientCapability + SleepCapability + LogWriterCapability + MaybeSend + Sync + 'static,
463498
>(
464499
self,
465500
) -> Result<TraceExporter<C, R>, TraceExporterError> {
@@ -506,19 +541,28 @@ impl<R: SharedRuntime> TraceExporterBuilder<R> {
506541
let info_endpoint = Endpoint::from_url(add_path(&agent_url, INFO_ENDPOINT));
507542
let (info_fetcher, info_response_observer) =
508543
AgentInfoFetcher::<C>::new(info_endpoint, Duration::from_secs(5 * 60));
509-
let info_fetcher_handle =
510-
shared_runtime
511-
.spawn_worker(info_fetcher, false)
512-
.map_err(|e| {
513-
TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration(
514-
e.to_string(),
515-
))
516-
})?;
517-
// The handle is currently only tracked for shutdown on native; on wasm
518-
// it is dropped here (the worker keeps running on the JS event loop
519-
// until the page/module is torn down).
544+
// TODO(APMSP-3609): consolidate per-mode worker gating (info-fetcher, telemetry,
545+
// stats concentrator) off the selected export destination in one place.
546+
// In log-export mode there is no agent to poll; skip spawning the worker
547+
// entirely so we don't make repeated failing `/info` calls (e.g. in Lambda).
548+
let info_fetcher_handle = if self.output_to_log {
549+
None
550+
} else {
551+
Some(
552+
shared_runtime
553+
.spawn_worker(info_fetcher, false)
554+
.map_err(|e| {
555+
TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration(
556+
e.to_string(),
557+
))
558+
})?,
559+
)
560+
};
561+
// The handle is only tracked for shutdown on native; on wasm the `workers`
562+
// field is cfg'd out, so drop it here (the worker keeps running on the JS
563+
// event loop until the page/module is torn down).
520564
#[cfg(target_arch = "wasm32")]
521-
let _ = info_fetcher_handle;
565+
drop(info_fetcher_handle);
522566

523567
#[allow(unused_mut)]
524568
let mut stats = StatsComputationStatus::Disabled;
@@ -530,7 +574,13 @@ impl<R: SharedRuntime> TraceExporterBuilder<R> {
530574
#[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))]
531575
let (telemetry_client, telemetry_handle) = {
532576
let sessions = self.telemetry_instrumentation_sessions;
533-
let telemetry = self.telemetry.map(|telemetry_config| {
577+
// Telemetry talks to the agent; disable it in log-export mode.
578+
let telemetry = if self.output_to_log {
579+
None
580+
} else {
581+
self.telemetry
582+
}
583+
.map(|telemetry_config| {
534584
let mut builder = TelemetryClientBuilder::default()
535585
.set_language(&self.language)
536586
.set_language_version(&self.language_version)
@@ -647,6 +697,10 @@ impl<R: SharedRuntime> TraceExporterBuilder<R> {
647697
otlp_stats_enabled = true;
648698
}
649699

700+
let log_output = self
701+
.output_to_log
702+
.then(|| self.log_max_line_size.unwrap_or(DEFAULT_LOG_MAX_LINE_SIZE));
703+
650704
Ok(TraceExporter {
651705
endpoint: Endpoint {
652706
url: agent_url,
@@ -708,6 +762,7 @@ impl<R: SharedRuntime> TraceExporterBuilder<R> {
708762
otlp_config,
709763
trace_filterer: ArcSwap::from_pointee(TraceFilterer::with_empty_conf()),
710764
otlp_stats_enabled,
765+
log_output,
711766
})
712767
}
713768

@@ -734,6 +789,38 @@ mod tests {
734789
use libdd_capabilities_impl::NativeCapabilities;
735790
use libdd_shared_runtime::ForkSafeRuntime;
736791

792+
#[cfg_attr(miri, ignore)]
793+
#[test]
794+
fn test_log_output_mode() {
795+
let mut builder = TraceExporterBuilder::default();
796+
builder
797+
.set_service("test")
798+
.set_input_format(TraceExporterInputFormat::V04)
799+
.set_output_to_log(None);
800+
let exporter = builder.build::<NativeCapabilities>().unwrap();
801+
// Log-output mode is enabled and the agent-info worker is not spawned.
802+
// (End-to-end send -> bytes is covered by the capability-injecting test in
803+
// `mod.rs` and the `log_writer` unit tests.)
804+
assert!(
805+
exporter.log_output.is_some(),
806+
"log_output should be set in log-output mode"
807+
);
808+
assert!(
809+
exporter.workers.info_fetcher.is_none(),
810+
"no agent-info worker should be spawned in log mode"
811+
);
812+
}
813+
814+
#[test]
815+
fn set_output_to_log_some_zero_uses_default() {
816+
// `Some(0)` is coerced to "use the default cap" (a 0 cap would drop every
817+
// span), which is represented as `None` on the builder field.
818+
let mut builder = TraceExporterBuilder::default();
819+
builder.set_output_to_log(Some(0));
820+
assert!(builder.output_to_log);
821+
assert_eq!(builder.log_max_line_size, None);
822+
}
823+
737824
#[cfg_attr(miri, ignore)]
738825
#[test]
739826
fn test_new() {

0 commit comments

Comments
 (0)