Skip to content

Commit 2a6c295

Browse files
lloekidd-octo-sts[bot]VianneyRuhlmann
authored
feat(data-pipeline)!: add fork safety hooks and cancellation token for trace exporter FFI (#2051)
# What does this PR do? Add fork safety and cooperative cancellation support to the trace exporter FFI: 1. **Fork hooks**: Expose `ddog_trace_exporter_before_fork`, `_after_fork_in_parent`, and `_after_fork_in_child` that delegate to the underlying `SharedRuntime`. These allow C callers to coordinate the tokio runtime lifecycle around process forks. 2. **Cancellation token**: Introduce `ddog_trace_exporter_cancel_token_new`, `_cancel`, and `_drop` for managing cancellation tokens. Thread the token into `ddog_trace_exporter_send_trace_chunks` via `tokio::select!` so callers can cooperatively abort in-flight HTTP requests. # Motivation FUP to #1952. Ruby application servers (Puma, Unicorn, Passenger) fork worker processes. The tokio runtime does not survive `fork()` — its threads are not carried over. Without the fork hooks, the trace exporter is dead in child processes. The existing `RUBY_UBF_IO` unblock function on the dd-trace-rb side sends a signal that cannot actually cancel an in-flight Rust HTTP request. The cancellation token enables cooperative abort: when Ruby interrupts a thread (shutdown, `Thread#kill`), the UBF cancels the token, which causes `tokio::select!` to abort the send and return promptly. # Additional Notes - Added a `pub fn shared_runtime()` accessor to `TraceExporter` since the field was private. - The cancellation token uses `Handle<CancellationToken>` from `libdd-common-ffi`, same pattern as profiling-ffi, with a `ddog_TraceExporterCancelToken` cbindgen rename to avoid symbol conflicts. - The `cancel` parameter on `send_trace_chunks` is nullable; passing `NULL` preserves existing behavior. AI was used to accelerate implementation; all code was reviewed and understood. # How to test the change? ``` cargo test -p libdd-data-pipeline-ffi --lib ``` 46 tests pass (40 existing + 6 new for fork hooks and cancellation token). Co-authored-by: dd-octo-sts[bot] <200755185+dd-octo-sts[bot]@users.noreply.github.com> Co-authored-by: VianneyRuhlmann <vianney.ruhlmann@datadoghq.com>
1 parent a97e1d4 commit 2a6c295

11 files changed

Lines changed: 139 additions & 8 deletions

File tree

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

builder/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ default = [
1919
"log",
2020
"ddsketch",
2121
"ffe",
22+
"shared-runtime",
2223
]
2324
crashtracker = []
2425
profiling = []
@@ -29,6 +30,7 @@ library-config = []
2930
log = []
3031
ddsketch = []
3132
ffe = []
33+
shared-runtime = []
3234
regex-lite = ["libdd-common/regex-lite"]
3335

3436
[lib]

builder/src/bin/release.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ pub fn main() {
7474
f.push("ddsketch-ffi".to_string());
7575
#[cfg(feature = "ffe")]
7676
f.push("datadog-ffe-ffi".to_string());
77+
#[cfg(feature = "shared-runtime")]
78+
f.push("shared-runtime".to_string());
7779
f
7880
};
7981

builder/src/profiling.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ impl Profiling {
5959
headers.push("ddsketch.h");
6060
#[cfg(feature = "ffe")]
6161
headers.push("ffe.h");
62+
#[cfg(feature = "shared-runtime")]
63+
headers.push("shared-runtime.h");
6264

6365
let mut origin_path: PathBuf = [&self.source_include, "dummy.h"].iter().collect();
6466
let mut target_path: PathBuf = [&self.target_include, "dummy.h"].iter().collect();

libdd-data-pipeline-ffi/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,5 @@ libdd-shared-runtime = { version = "1.0.0", path = "../libdd-shared-runtime" }
3636
libdd-common-ffi = { path = "../libdd-common-ffi", default-features = false }
3737
libdd-tinybytes = { path = "../libdd-tinybytes" }
3838
libdd-trace-utils = { path = "../libdd-trace-utils" }
39+
tokio-util = "0.7.11"
3940
tracing = { version = "0.1", default-features = false }

libdd-data-pipeline-ffi/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,7 @@ C FFI bindings for the libdd-data-pipeline library.
55
## Overview
66

77
`libdd-data-pipeline-ffi` provides C-compatible FFI bindings for `libdd-data-pipeline`, enabling high-performance trace processing from C, C++, PHP, Ruby, Python, and other languages.
8+
9+
## Dependencies
10+
11+
This crate depends on `tokio-util` for its `CancellationToken` type. The cancellation token created by `ddog_trace_exporter_cancel_token_new` and passed to `ddog_trace_exporter_send_trace_chunks` is a `tokio_util::sync::CancellationToken`, which the data pipeline uses to cooperatively abort an in-flight send. The token is exposed opaquely to C, so callers never need to depend on `tokio-util` themselves.

libdd-data-pipeline-ffi/cbindgen.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,13 @@ after_includes = """
1313
typedef struct ddog_TraceExporter ddog_TraceExporter;
1414
typedef struct ddog_TracerSpan ddog_TracerSpan;
1515
typedef struct ddog_TracerTraceChunks ddog_TracerTraceChunks;
16+
typedef struct ddog_TraceExporterCancelToken ddog_TraceExporterCancelToken;
1617
"""
1718

1819
[export]
1920
prefix = "ddog_"
2021
renaming_overrides_prefixing = true
21-
exclude = ["TraceExporter", "TracerSpan", "TracerTraceChunks"]
22+
exclude = ["TraceExporter", "TracerSpan", "TracerTraceChunks", "TokioCancellationToken"]
2223

2324
[export.rename]
2425
"ByteSlice" = "ddog_ByteSlice"
@@ -32,6 +33,7 @@ exclude = ["TraceExporter", "TracerSpan", "TracerTraceChunks"]
3233
"TracerSpan" = "ddog_TracerSpan"
3334
"TracerSpanFields" = "ddog_TracerSpanFields"
3435
"TracerTraceChunks" = "ddog_TracerTraceChunks"
36+
"TokioCancellationToken" = "ddog_TraceExporterCancelToken"
3537

3638
[export.mangle]
3739
rename_types = "PascalCase"

libdd-data-pipeline-ffi/src/tracer.rs

Lines changed: 86 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ use libdd_tinybytes::BytesString;
1919
use libdd_trace_utils::span::v04::SpanBytes;
2020
use std::ptr::NonNull;
2121

22+
type TokioCancellationToken = tokio_util::sync::CancellationToken;
23+
2224
// ---------------------------------------------------------------------------
2325
// Helper
2426
// ---------------------------------------------------------------------------
@@ -295,6 +297,49 @@ pub unsafe extern "C" fn ddog_tracer_trace_chunks_push_span(
295297
)
296298
}
297299

300+
// ---------------------------------------------------------------------------
301+
// Cancellation token
302+
// ---------------------------------------------------------------------------
303+
304+
/// Create a new cancellation token.
305+
///
306+
/// The returned token must be freed with
307+
/// [`ddog_trace_exporter_cancel_token_drop`].
308+
#[no_mangle]
309+
pub extern "C" fn ddog_trace_exporter_cancel_token_new() -> Box<TokioCancellationToken> {
310+
Box::new(TokioCancellationToken::new())
311+
}
312+
313+
/// Cancel a cancellation token.
314+
///
315+
/// All clones of the same token observe the cancellation. If a
316+
/// [`ddog_trace_exporter_send_trace_chunks`] call is using this token at the
317+
/// time of cancellation, that send stops waiting for the agent at its next
318+
/// await point and returns an error; the trace chunks it was sending may be
319+
/// lost.
320+
///
321+
/// Cancellation only affects a send that is in progress. If no send is using
322+
/// the token, cancelling it has no immediate effect: a send started afterwards
323+
/// with an already-cancelled token returns an error without contacting the
324+
/// agent, and a token cancelled after its send has already finished does
325+
/// nothing.
326+
#[no_mangle]
327+
pub extern "C" fn ddog_trace_exporter_cancel_token_cancel(token: Option<&TokioCancellationToken>) {
328+
if let Some(token) = token {
329+
token.cancel();
330+
}
331+
}
332+
333+
/// Free a cancellation token.
334+
///
335+
/// After this call the token is invalid and must not be reused.
336+
#[no_mangle]
337+
pub extern "C" fn ddog_trace_exporter_cancel_token_drop(
338+
token: Option<Box<TokioCancellationToken>>,
339+
) {
340+
drop(token);
341+
}
342+
298343
// ---------------------------------------------------------------------------
299344
// Send trace chunks
300345
// ---------------------------------------------------------------------------
@@ -305,13 +350,22 @@ pub unsafe extern "C" fn ddog_tracer_trace_chunks_push_span(
305350
/// serializes in the configured output format, and sends to the agent
306351
/// with retry logic.
307352
///
353+
/// When `cancel` is non-null, cancelling that token (via
354+
/// [`ddog_trace_exporter_cancel_token_cancel`]) while the send is in progress
355+
/// aborts the in-flight request and returns an error with code
356+
/// [`ExporterErrorCode::IoError`]. Cancellation is cooperative: it only takes
357+
/// effect while a request is actually in flight. A token that is already
358+
/// cancelled when the send starts makes this function return that error
359+
/// immediately, and cancelling after the send has finished has no effect.
360+
/// Cancelling an in-flight send may cause the trace chunks being sent to be
361+
/// lost.
362+
///
308363
/// On success, if `response_out` is non-null, a heap-allocated
309364
/// [`ExporterResponse`] is written there. The caller owns it and must
310365
/// free it with `ddog_trace_exporter_response_free`.
311366
///
312367
/// # Safety
313368
///
314-
/// * `exporter` must be a valid `TraceExporter` pointer.
315369
/// * `chunks` is consumed and must not be used after this call.
316370
/// * If `response_out` is non-null it must point to valid writable memory for a
317371
/// `Box<ExporterResponse>`.
@@ -320,6 +374,7 @@ pub unsafe extern "C" fn ddog_trace_exporter_send_trace_chunks(
320374
exporter: Option<&TraceExporter>,
321375
chunks: Option<Box<TracerTraceChunks>>,
322376
response_out: Option<NonNull<Box<ExporterResponse>>>,
377+
cancel: Option<&TokioCancellationToken>,
323378
) -> Option<Box<ExporterError>> {
324379
let Some(exporter) = exporter else {
325380
return gen_error!(ErrorCode::InvalidArgument);
@@ -329,7 +384,7 @@ pub unsafe extern "C" fn ddog_trace_exporter_send_trace_chunks(
329384
};
330385

331386
catch_panic!(
332-
match exporter.send_trace_chunks(chunks.0) {
387+
match exporter.send_trace_chunks(chunks.0, cancel) {
333388
Ok(resp) => {
334389
if let Some(out) = response_out {
335390
out.as_ptr().write(Box::new(ExporterResponse::from(resp)));
@@ -651,7 +706,7 @@ mod tests {
651706
fn send_trace_chunks_null_exporter_returns_error() {
652707
unsafe {
653708
let chunks = make_chunks(0);
654-
let err = ddog_trace_exporter_send_trace_chunks(None, Some(chunks), None);
709+
let err = ddog_trace_exporter_send_trace_chunks(None, Some(chunks), None, None);
655710
assert!(err.is_some());
656711
assert_eq!(err.as_ref().unwrap().code, ErrorCode::InvalidArgument);
657712
ddog_trace_exporter_error_free(err);
@@ -687,4 +742,32 @@ mod tests {
687742
ddog_tracer_trace_chunks_free(chunks);
688743
}
689744
}
745+
746+
// -- Cancellation token -------------------------------------------------
747+
748+
#[test]
749+
fn cancel_token_new_and_drop() {
750+
let token = ddog_trace_exporter_cancel_token_new();
751+
ddog_trace_exporter_cancel_token_drop(Some(token));
752+
}
753+
754+
#[test]
755+
fn cancel_token_cancel() {
756+
let token = ddog_trace_exporter_cancel_token_new();
757+
ddog_trace_exporter_cancel_token_cancel(Some(&token));
758+
ddog_trace_exporter_cancel_token_drop(Some(token));
759+
}
760+
761+
#[test]
762+
fn send_trace_chunks_null_cancel_is_accepted() {
763+
// Passing a null (None) cancel argument behaves like no cancellation.
764+
unsafe {
765+
let chunks = make_chunks(0);
766+
let err = ddog_trace_exporter_send_trace_chunks(None, Some(chunks), None, None);
767+
// exporter is None, so we get InvalidArgument, but no crash
768+
// from the absent cancel argument.
769+
assert!(err.is_some());
770+
ddog_trace_exporter_error_free(err);
771+
}
772+
}
690773
}

libdd-data-pipeline/src/trace_exporter/mod.rs

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ use std::sync::{Arc, Once};
5252
use std::time::Duration;
5353
use std::{borrow::Borrow, str::FromStr};
5454
use tokio::task::JoinSet;
55+
use tokio_util::sync::CancellationToken;
5556
use tracing::{debug, error, warn};
5657

5758
const INFO_ENDPOINT: &str = "/info";
@@ -477,14 +478,39 @@ impl<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static> Tra
477478
/// Send a list of trace chunks to the agent (or OTLP endpoint when configured).
478479
///
479480
/// Sync facade over [`Self::send_trace_chunks_async`]; panics inside an existing
480-
/// tokio context. Returns the agent response (or `Unchanged` for OTLP).
481+
/// tokio context.
482+
///
483+
/// # Arguments
484+
/// * trace_chunks: A list of trace chunks. Each trace chunk is a list of spans.
485+
/// * cancellation_token: When provided, cancelling the token aborts the send while it is in
486+
/// progress. The send only observes a token that is cancelled while the request is in-flight;
487+
/// a token cancelled before this call returns immediately, and a token cancelled after the
488+
/// send has already finished has no effect. Cancelling an in-flight send may cause the trace
489+
/// chunks being sent to be lost.
490+
///
491+
/// # Returns
492+
/// * Ok(AgentResponse): The response from the agent (or Unchanged for OTLP)
493+
/// * Err(TraceExporterError): An error detailing what went wrong in the process
481494
#[cfg(not(target_arch = "wasm32"))]
482495
pub fn send_trace_chunks<T: TraceData>(
483496
&self,
484497
trace_chunks: Vec<Vec<Span<T>>>,
498+
cancellation_token: Option<&CancellationToken>,
485499
) -> Result<AgentResponse, TraceExporterError> {
486-
self.shared_runtime
487-
.block_on(self.send_trace_chunks_async(trace_chunks))?
500+
self.shared_runtime.block_on(async {
501+
match cancellation_token {
502+
Some(token) => {
503+
tokio::select! {
504+
res = self.send_trace_chunks_async(trace_chunks) => res,
505+
_ = token.cancelled() => Err(TraceExporterError::Io(std::io::Error::new(
506+
std::io::ErrorKind::Interrupted,
507+
"send cancelled via cancellation token",
508+
))),
509+
}
510+
}
511+
None => self.send_trace_chunks_async(trace_chunks).await,
512+
}
513+
})?
488514
}
489515

490516
/// Send a list of trace chunks to the agent, asynchronously (or OTLP when configured).

libdd-profiling-ffi/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ name = "datadog_profiling_ffi"
1818

1919
[features]
2020
default = ["ddcommon-ffi"]
21-
cbindgen = ["build_common/cbindgen", "libdd-common-ffi/cbindgen"]
21+
cbindgen = ["build_common/cbindgen", "libdd-common-ffi/cbindgen", "libdd-shared-runtime-ffi?/cbindgen"]
2222
ddtelemetry-ffi = ["dep:libdd-telemetry-ffi"]
2323
datadog-log-ffi = ["dep:libdd-log-ffi"]
2424
symbolizer = ["symbolizer-ffi"]
@@ -33,6 +33,7 @@ datadog-library-config-ffi = ["dep:libdd-library-config-ffi"]
3333
ddcommon-ffi = ["dep:libdd-common-ffi"]
3434
ddsketch-ffi = ["dep:libdd-ddsketch-ffi"]
3535
datadog-ffe-ffi = ["dep:datadog-ffe-ffi"]
36+
shared-runtime = ["dep:libdd-shared-runtime-ffi", "libdd-shared-runtime-ffi/catch_panic"]
3637
regex-lite = ["libdd-common/regex-lite"]
3738

3839
[build-dependencies]
@@ -60,3 +61,4 @@ symbolizer-ffi = { path = "../symbolizer-ffi", optional = true, default-features
6061
thiserror = "2"
6162
tokio-util = "0.7.1"
6263
datadog-ffe-ffi = { path = "../datadog-ffe-ffi", default-features = false, optional = true }
64+
libdd-shared-runtime-ffi = { path = "../libdd-shared-runtime-ffi", default-features = false, optional = true }

0 commit comments

Comments
 (0)