Skip to content

Commit c5c339a

Browse files
committed
Add cancellation token support for trace chunk sending
Introduce `ddog_trace_exporter_cancel_token_new`, `_cancel`, and `_drop` for managing cancellation tokens. Thread the token into `ddog_trace_exporter_send_trace_chunks` via `tokio::select!` so callers can cooperatively abort in-flight HTTP requests.
1 parent 818c03c commit c5c339a

3 files changed

Lines changed: 145 additions & 7 deletions

File tree

libdd-data-pipeline-ffi/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,6 @@ libdd-shared-runtime = { version = "1.0.0", path = "../libdd-shared-runtime" }
3636
libdd-common-ffi = { path = "../libdd-common-ffi", default-features = false }
3737
libdd-tinybytes = { path = "../libdd-tinybytes" }
3838
libdd-trace-utils = { path = "../libdd-trace-utils" }
39+
tokio = { version = "1.23", features = ["macros"] }
40+
tokio-util = "0.7.11"
3941
tracing = { version = "0.1", default-features = false }

libdd-data-pipeline-ffi/cbindgen.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ exclude = ["TraceExporter", "TracerSpan", "TracerTraceChunks"]
3232
"TracerSpan" = "ddog_TracerSpan"
3333
"TracerSpanFields" = "ddog_TracerSpanFields"
3434
"TracerTraceChunks" = "ddog_TracerTraceChunks"
35+
"Handle_TokioCancellationToken" = "ddog_TraceExporterCancelToken"
3536

3637
[export.mangle]
3738
rename_types = "PascalCase"

libdd-data-pipeline-ffi/src/tracer.rs

Lines changed: 142 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,15 @@ use crate::error::{ExporterError, ExporterErrorCode as ErrorCode};
1313
use crate::response::ExporterResponse;
1414
use crate::trace_exporter::TraceExporter;
1515
use crate::{catch_panic, gen_error};
16+
use libdd_common_ffi::handle::{Handle, ToInner};
1617
use libdd_common_ffi::slice::AsBytes;
1718
use libdd_common_ffi::CharSlice;
1819
use libdd_tinybytes::BytesString;
1920
use libdd_trace_utils::span::v04::SpanBytes;
2021
use std::ptr::NonNull;
2122

23+
type TokioCancellationToken = tokio_util::sync::CancellationToken;
24+
2225
// ---------------------------------------------------------------------------
2326
// Helper
2427
// ---------------------------------------------------------------------------
@@ -295,6 +298,53 @@ pub unsafe extern "C" fn ddog_tracer_trace_chunks_push_span(
295298
)
296299
}
297300

301+
// ---------------------------------------------------------------------------
302+
// Cancellation token
303+
// ---------------------------------------------------------------------------
304+
305+
/// Create a new cancellation token.
306+
///
307+
/// The returned handle must be freed with
308+
/// [`ddog_trace_exporter_cancel_token_drop`].
309+
#[no_mangle]
310+
pub extern "C" fn ddog_trace_exporter_cancel_token_new() -> Handle<TokioCancellationToken> {
311+
Handle::from(TokioCancellationToken::new())
312+
}
313+
314+
/// Cancel a cancellation token.
315+
///
316+
/// All clones of the same token observe the cancellation. If the token is
317+
/// currently passed to [`ddog_trace_exporter_send_trace_chunks`], the
318+
/// in-flight HTTP request will be aborted cooperatively.
319+
///
320+
/// # Safety
321+
///
322+
/// * `token` must point to a valid [`Handle`] returned by
323+
/// [`ddog_trace_exporter_cancel_token_new`].
324+
#[no_mangle]
325+
pub unsafe extern "C" fn ddog_trace_exporter_cancel_token_cancel(
326+
mut token: *mut Handle<TokioCancellationToken>,
327+
) {
328+
if let Ok(inner) = token.to_inner_mut() {
329+
inner.cancel();
330+
}
331+
}
332+
333+
/// Free a cancellation token handle.
334+
///
335+
/// After this call the pointer is invalid and must not be reused.
336+
///
337+
/// # Safety
338+
///
339+
/// * `token` must point to a valid [`Handle`] returned by
340+
/// [`ddog_trace_exporter_cancel_token_new`], or be null.
341+
#[no_mangle]
342+
pub unsafe extern "C" fn ddog_trace_exporter_cancel_token_drop(
343+
mut token: *mut Handle<TokioCancellationToken>,
344+
) {
345+
drop(token.take());
346+
}
347+
298348
// ---------------------------------------------------------------------------
299349
// Send trace chunks
300350
// ---------------------------------------------------------------------------
@@ -305,6 +355,12 @@ pub unsafe extern "C" fn ddog_tracer_trace_chunks_push_span(
305355
/// serializes in the configured output format, and sends to the agent
306356
/// with retry logic.
307357
///
358+
/// When `cancel` is non-null it must point to a live
359+
/// [`Handle<CancellationToken>`] obtained from
360+
/// [`ddog_trace_exporter_cancel_token_new`]. If the token is cancelled
361+
/// while the send is in progress the HTTP request is aborted and an
362+
/// error with code [`ExporterErrorCode::IoError`] is returned.
363+
///
308364
/// On success, if `response_out` is non-null, a heap-allocated
309365
/// [`ExporterResponse`] is written there. The caller owns it and must
310366
/// free it with `ddog_trace_exporter_response_free`.
@@ -315,11 +371,13 @@ pub unsafe extern "C" fn ddog_tracer_trace_chunks_push_span(
315371
/// * `chunks` is consumed and must not be used after this call.
316372
/// * If `response_out` is non-null it must point to valid writable memory for a
317373
/// `Box<ExporterResponse>`.
374+
/// * If `cancel` is non-null it must point to a valid cancellation token handle.
318375
#[no_mangle]
319376
pub unsafe extern "C" fn ddog_trace_exporter_send_trace_chunks(
320377
exporter: Option<&TraceExporter>,
321378
chunks: Option<Box<TracerTraceChunks>>,
322379
response_out: Option<NonNull<Box<ExporterResponse>>>,
380+
mut cancel: *mut Handle<TokioCancellationToken>,
323381
) -> Option<Box<ExporterError>> {
324382
let Some(exporter) = exporter else {
325383
return gen_error!(ErrorCode::InvalidArgument);
@@ -328,15 +386,47 @@ pub unsafe extern "C" fn ddog_trace_exporter_send_trace_chunks(
328386
return gen_error!(ErrorCode::InvalidArgument);
329387
};
330388

389+
// Clone the cancellation token (if provided) so the caller retains
390+
// ownership of the handle while we use a cheap clone inside select!.
391+
let cancel_token: Option<TokioCancellationToken> = if cancel.is_null() {
392+
None
393+
} else {
394+
cancel.to_inner_mut().ok().map(|t| t.clone())
395+
};
396+
331397
catch_panic!(
332-
match exporter.send_trace_chunks(chunks.0) {
333-
Ok(resp) => {
334-
if let Some(out) = response_out {
335-
out.as_ptr().write(Box::new(ExporterResponse::from(resp)));
398+
{
399+
let result = if let Some(ct) = cancel_token {
400+
// Use select! so we can abort the in-flight request when
401+
// the caller cancels.
402+
let block_result = exporter.shared_runtime().block_on(async {
403+
tokio::select! {
404+
res = exporter.send_trace_chunks_async(chunks.0) => res,
405+
_ = ct.cancelled() => Err(
406+
std::io::Error::new(
407+
std::io::ErrorKind::Interrupted,
408+
"send cancelled via cancellation token",
409+
).into()
410+
),
411+
}
412+
});
413+
match block_result {
414+
Ok(inner) => inner,
415+
Err(io_err) => Err(io_err.into()),
416+
}
417+
} else {
418+
exporter.send_trace_chunks(chunks.0)
419+
};
420+
421+
match result {
422+
Ok(resp) => {
423+
if let Some(out) = response_out {
424+
out.as_ptr().write(Box::new(ExporterResponse::from(resp)));
425+
}
426+
None
336427
}
337-
None
428+
Err(e) => Some(Box::new(ExporterError::from(e))),
338429
}
339-
Err(e) => Some(Box::new(ExporterError::from(e))),
340430
},
341431
gen_error!(ErrorCode::Panic)
342432
)
@@ -736,7 +826,12 @@ mod tests {
736826
fn send_trace_chunks_null_exporter_returns_error() {
737827
unsafe {
738828
let chunks = make_chunks(0);
739-
let err = ddog_trace_exporter_send_trace_chunks(None, Some(chunks), None);
829+
let err = ddog_trace_exporter_send_trace_chunks(
830+
None,
831+
Some(chunks),
832+
None,
833+
std::ptr::null_mut(),
834+
);
740835
assert!(err.is_some());
741836
assert_eq!(err.as_ref().unwrap().code, ErrorCode::InvalidArgument);
742837
ddog_trace_exporter_error_free(err);
@@ -773,6 +868,46 @@ mod tests {
773868
}
774869
}
775870

871+
// -- Cancellation token -------------------------------------------------
872+
873+
#[test]
874+
fn cancel_token_new_and_drop() {
875+
unsafe {
876+
let mut token = ddog_trace_exporter_cancel_token_new();
877+
let ptr: *mut Handle<TokioCancellationToken> = &mut token;
878+
ddog_trace_exporter_cancel_token_drop(ptr);
879+
}
880+
}
881+
882+
#[test]
883+
fn cancel_token_cancel() {
884+
unsafe {
885+
let mut token = ddog_trace_exporter_cancel_token_new();
886+
let ptr: *mut Handle<TokioCancellationToken> = &mut token;
887+
ddog_trace_exporter_cancel_token_cancel(ptr);
888+
ddog_trace_exporter_cancel_token_drop(ptr);
889+
}
890+
}
891+
892+
#[test]
893+
fn send_trace_chunks_null_cancel_is_accepted() {
894+
// Passing a null cancel pointer should behave like the old
895+
// signature (no cancellation).
896+
unsafe {
897+
let chunks = make_chunks(0);
898+
let err = ddog_trace_exporter_send_trace_chunks(
899+
None,
900+
Some(chunks),
901+
None,
902+
std::ptr::null_mut(),
903+
);
904+
// exporter is None, so we get InvalidArgument, but no crash
905+
// from the null cancel pointer.
906+
assert!(err.is_some());
907+
ddog_trace_exporter_error_free(err);
908+
}
909+
}
910+
776911
// -- Fork safety hooks --------------------------------------------------
777912

778913
#[test]

0 commit comments

Comments
 (0)