Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 81 additions & 2 deletions ext/libdatadog_api/trace_exporter.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ static VALUE _native_from_span(VALUE klass, VALUE span);
/* TraceExporter methods */
static VALUE _native_exporter_new(int argc, VALUE *argv, VALUE klass);
static VALUE _native_send_traces(VALUE self, VALUE traces);
static VALUE _native_before_fork(VALUE self);
static VALUE _native_after_fork_in_parent(VALUE self);
static VALUE _native_after_fork_in_child(VALUE self);

/* Response helpers */
static VALUE create_ok_response(long trace_count, VALUE payload);
Expand Down Expand Up @@ -473,6 +476,46 @@ static VALUE _native_exporter_new(
exporter);
}

/* ========================================================================
* Fork safety hooks
*
* These coordinate the tokio runtime lifecycle around process forks
* (Puma, Unicorn, Passenger).
* ======================================================================== */

static VALUE _native_before_fork(VALUE self) {
ddog_TraceExporter *exporter;
TypedData_Get_Struct(self, ddog_TraceExporter, &trace_exporter_typed_data, exporter);
if (exporter == NULL) {
raise_error(rb_eRuntimeError, "TraceExporter has not been initialized or was already freed");
}
ddog_TraceExporterError *err = ddog_trace_exporter_before_fork(exporter);
check_exporter_error("Failed to prepare for fork", err);
return Qnil;
}

static VALUE _native_after_fork_in_parent(VALUE self) {
ddog_TraceExporter *exporter;
TypedData_Get_Struct(self, ddog_TraceExporter, &trace_exporter_typed_data, exporter);
if (exporter == NULL) {
raise_error(rb_eRuntimeError, "TraceExporter has not been initialized or was already freed");
}
ddog_TraceExporterError *err = ddog_trace_exporter_after_fork_in_parent(exporter);
check_exporter_error("Failed to restore after fork in parent", err);
return Qnil;
}

static VALUE _native_after_fork_in_child(VALUE self) {
ddog_TraceExporter *exporter;
TypedData_Get_Struct(self, ddog_TraceExporter, &trace_exporter_typed_data, exporter);
if (exporter == NULL) {
raise_error(rb_eRuntimeError, "TraceExporter has not been initialized or was already freed");
}
ddog_TraceExporterError *err = ddog_trace_exporter_after_fork_in_child(exporter);
check_exporter_error("Failed to restore after fork in child", err);
return Qnil;
}

/* ========================================================================
* GVL-release helper for ddog_trace_exporter_send_trace_chunks
*
Expand All @@ -485,6 +528,7 @@ typedef struct {
const ddog_TraceExporter *exporter;
ddog_TracerTraceChunks *chunks;
ddog_TraceExporterResponse *response;
ddog_TraceExporterCancelToken *cancel_token; /* borrowed, not owned */
ddog_TraceExporterErrorCode error_code;
bool failed;
bool send_ran;
Expand All @@ -493,7 +537,7 @@ typedef struct {
static void *send_chunks_without_gvl(void *data) {
send_chunks_args_t *args = (send_chunks_args_t *)data;
ddog_TraceExporterError *err = ddog_trace_exporter_send_trace_chunks(
args->exporter, args->chunks, &args->response);
args->exporter, args->chunks, &args->response, args->cancel_token);
if (err != NULL) {
args->error_code = err->code;
args->failed = true;
Expand All @@ -503,6 +547,20 @@ static void *send_chunks_without_gvl(void *data) {
return NULL;
}

/*
* Unblock function: cooperatively cancel an in-flight send.
*
* Called by Ruby when an interrupt (Thread#kill, shutdown) fires while
* the thread is inside rb_thread_call_without_gvl2. Cancelling the
* token causes the Rust HTTP pipeline to abort the in-flight request
* and return promptly, which is not possible with RUBY_UBF_IO's
* signal-based approach.
*/
static void interrupt_exporter_call(void *cancel_token) {
ddog_trace_exporter_cancel_token_cancel(
(ddog_TraceExporterCancelToken *)cancel_token);
}

/*
* Check for a pending Ruby exception without raising it.
* Mirrors the profiling extension's check_if_pending_exception().
Expand Down Expand Up @@ -595,13 +653,24 @@ static VALUE build_and_send_traces(VALUE arg) {
* response before any Ruby exception propagates -- otherwise we
* would leak those Rust-allocated objects.
*
* A cancellation token is created per send call and passed to the
* custom unblock function (interrupt_exporter_call). When Ruby
* interrupts the thread (shutdown, Thread#kill), the UBF cancels
* the token, which cooperatively aborts the in-flight HTTP request
* in the Rust runtime. This replaces the signal-based RUBY_UBF_IO
* which could not actually cancel the Rust HTTP pipeline.
*
* An interrupt (e.g. Thread#kill) may cause gvl2 to return before
* our function runs, so we loop until it does.
*/
ddog_TraceExporterCancelToken cancel_token =
ddog_trace_exporter_cancel_token_new();
Comment on lines +666 to +667
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Update libdatadog before calling new exporter APIs

This now calls the cancellation-token API, but the repository still declares and locks libdatadog to 33.0.0.1.0, whose installed datadog/data-pipeline.h does not define ddog_TraceExporterCancelToken or the new cancel/fork functions. I verified with the current lockfile dependency by running bundle exec rake compile, which fails in this file on these new symbols, so anyone building the gem against the declared dependency cannot install the native extension until the libdatadog dependency is bumped or the calls are guarded.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, this depends on DataDog/libdatadog#2051 being merged.


send_chunks_args_t args = {
.exporter = ctx->exporter,
.chunks = ctx->chunks,
.response = NULL,
.cancel_token = &cancel_token,
.failed = false,
.send_ran = false,
};
Expand All @@ -610,12 +679,14 @@ static VALUE build_and_send_traces(VALUE arg) {
while (!args.send_ran && !pending_exception) {
rb_thread_call_without_gvl2(
send_chunks_without_gvl, &args,
RUBY_UBF_IO, NULL);
interrupt_exporter_call, &cancel_token);
Comment on lines 680 to +682
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Propagate interrupts after cooperative cancellation

When Thread#kill or shutdown interrupts an in-flight send, the new UBF cancels the token and can make the Rust call return with args.send_ran == true; this loop then skips check_if_pending_exception() and falls through to create a transport error response instead of reliably raising the pending interrupt. In that scenario a writer thread that was killed during a native send may continue running after cancellation, so the pending exception should be checked after the GVL call once native response cleanup is safe, not only when the send never started.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Member

@marcotc marcotc May 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Such a tiny window here for this to happen in send_chunks_without_gvl:

  args->send_ran = true;
  return NULL;

(give or take other internals in rb_thread_call_without_gvl2)

But I guess that's an accurate concern.


if (!args.send_ran) {
pending_exception = check_if_pending_exception();
}
}

ddog_trace_exporter_cancel_token_drop(&cancel_token);
/* Only null chunks when the send actually ran and consumed them.
* If an interrupt fired before the send executed, chunks are still
* live and the ensure handler must free them. */
Expand Down Expand Up @@ -736,6 +807,14 @@ void trace_exporter_init(VALUE tracing_module) {
rb_define_method(trace_exporter_class, "_native_send_traces",
_native_send_traces, 1);

/* Instance: fork safety hooks */
rb_define_method(trace_exporter_class, "_native_before_fork",
_native_before_fork, 0);
rb_define_method(trace_exporter_class, "_native_after_fork_in_parent",
_native_after_fork_in_parent, 0);
rb_define_method(trace_exporter_class, "_native_after_fork_in_child",
_native_after_fork_in_child, 0);

/* ----------------------------------------------------------------
* Response class (defined in Ruby, loaded lazily)
*
Expand Down
9 changes: 9 additions & 0 deletions lib/datadog/tracing/transport/native.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ def initialize(agent_settings:, logger: Datadog.logger)
service: service,
version: version
)

# In forked child processes the tokio runtime is dead.
# Recreate it so the exporter can send traces again.
exporter = @exporter
Core::Utils::AtForkMonkeyPatch.at_fork(:child) do
exporter._native_after_fork_in_child
rescue => e
Datadog.logger.warn { "Native transport after-fork reset failed: #{e}" }
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a warning? Or that means - no traces for the fork child?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking the implementation of _native_after_fork_in_child, this looks like just a defensive rescue for uncommon failures like "TraceExporter has not been initialized or was already freed", which are errors likely "impossible" to happen on a well coded implementation (aka exceptions caught here should happen during code changes in development, not in production). I think this error message is just for ourselves really, since we don't expect this to fail.

But, I agree that we should log what would happen if it failed, something like: "Native transport after-fork reset failed. Traces might not be send to Datadog: "

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly, I think it makes sense at least to explain the consequences of that error in the warning.

end
end

# Send a list of traces to the agent.
Expand Down