Skip to content

Commit f170f9e

Browse files
committed
Replace RUBY_UBF_IO with cooperative cancellation token
Create a cancellation token per send call and pass it to the custom unblock function. When Ruby interrupts the thread (shutdown, Thread#kill), the UBF cancels the token, which cooperatively aborts the in-flight HTTP request in the Rust runtime. This replaces the signal-based RUBY_UBF_IO which could not actually cancel the Rust HTTP pipeline.
1 parent f8bc43d commit f170f9e

1 file changed

Lines changed: 30 additions & 2 deletions

File tree

ext/libdatadog_api/trace_exporter.c

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -528,6 +528,7 @@ typedef struct {
528528
const ddog_TraceExporter *exporter;
529529
ddog_TracerTraceChunks *chunks;
530530
ddog_TraceExporterResponse *response;
531+
ddog_TraceExporterCancelToken *cancel_token; /* borrowed, not owned */
531532
ddog_TraceExporterErrorCode error_code;
532533
bool failed;
533534
bool send_ran;
@@ -536,7 +537,7 @@ typedef struct {
536537
static void *send_chunks_without_gvl(void *data) {
537538
send_chunks_args_t *args = (send_chunks_args_t *)data;
538539
ddog_TraceExporterError *err = ddog_trace_exporter_send_trace_chunks(
539-
args->exporter, args->chunks, &args->response);
540+
args->exporter, args->chunks, &args->response, args->cancel_token);
540541
if (err != NULL) {
541542
args->error_code = err->code;
542543
args->failed = true;
@@ -546,6 +547,20 @@ static void *send_chunks_without_gvl(void *data) {
546547
return NULL;
547548
}
548549

550+
/*
551+
* Unblock function: cooperatively cancel an in-flight send.
552+
*
553+
* Called by Ruby when an interrupt (Thread#kill, shutdown) fires while
554+
* the thread is inside rb_thread_call_without_gvl2. Cancelling the
555+
* token causes the Rust HTTP pipeline to abort the in-flight request
556+
* and return promptly, which is not possible with RUBY_UBF_IO's
557+
* signal-based approach.
558+
*/
559+
static void interrupt_exporter_call(void *cancel_token) {
560+
ddog_trace_exporter_cancel_token_cancel(
561+
(ddog_TraceExporterCancelToken *)cancel_token);
562+
}
563+
549564
/*
550565
* Check for a pending Ruby exception without raising it.
551566
* Mirrors the profiling extension's check_if_pending_exception().
@@ -638,13 +653,24 @@ static VALUE build_and_send_traces(VALUE arg) {
638653
* response before any Ruby exception propagates -- otherwise we
639654
* would leak those Rust-allocated objects.
640655
*
656+
* A cancellation token is created per send call and passed to the
657+
* custom unblock function (interrupt_exporter_call). When Ruby
658+
* interrupts the thread (shutdown, Thread#kill), the UBF cancels
659+
* the token, which cooperatively aborts the in-flight HTTP request
660+
* in the Rust runtime. This replaces the signal-based RUBY_UBF_IO
661+
* which could not actually cancel the Rust HTTP pipeline.
662+
*
641663
* An interrupt (e.g. Thread#kill) may cause gvl2 to return before
642664
* our function runs, so we loop until it does.
643665
*/
666+
ddog_TraceExporterCancelToken cancel_token =
667+
ddog_trace_exporter_cancel_token_new();
668+
644669
send_chunks_args_t args = {
645670
.exporter = ctx->exporter,
646671
.chunks = ctx->chunks,
647672
.response = NULL,
673+
.cancel_token = &cancel_token,
648674
.failed = false,
649675
.send_ran = false,
650676
};
@@ -653,12 +679,14 @@ static VALUE build_and_send_traces(VALUE arg) {
653679
while (!args.send_ran && !pending_exception) {
654680
rb_thread_call_without_gvl2(
655681
send_chunks_without_gvl, &args,
656-
RUBY_UBF_IO, NULL);
682+
interrupt_exporter_call, &cancel_token);
657683

658684
if (!args.send_ran) {
659685
pending_exception = check_if_pending_exception();
660686
}
661687
}
688+
689+
ddog_trace_exporter_cancel_token_drop(&cancel_token);
662690
/* Only null chunks when the send actually ran and consumed them.
663691
* If an interrupt fired before the send executed, chunks are still
664692
* live and the ensure handler must free them. */

0 commit comments

Comments
 (0)