-
Notifications
You must be signed in to change notification settings - Fork 403
Add fork safety and cooperative cancellation to native trace exporter #5835
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: lloeki/native-transport
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,6 +20,9 @@ static VALUE _native_from_span(VALUE klass, VALUE span); | |
| /* TraceExporter methods */ | ||
| static VALUE _native_exporter_new(int argc, VALUE *argv, VALUE klass); | ||
| static VALUE _native_send_traces(VALUE self, VALUE traces); | ||
| static VALUE _native_before_fork(VALUE self); | ||
| static VALUE _native_after_fork_in_parent(VALUE self); | ||
| static VALUE _native_after_fork_in_child(VALUE self); | ||
|
|
||
| /* Response helpers */ | ||
| static VALUE create_ok_response(long trace_count, VALUE payload); | ||
|
|
@@ -473,6 +476,46 @@ static VALUE _native_exporter_new( | |
| exporter); | ||
| } | ||
|
|
||
| /* ======================================================================== | ||
| * Fork safety hooks | ||
| * | ||
| * These coordinate the tokio runtime lifecycle around process forks | ||
| * (Puma, Unicorn, Passenger). | ||
| * ======================================================================== */ | ||
|
|
||
| static VALUE _native_before_fork(VALUE self) { | ||
| ddog_TraceExporter *exporter; | ||
| TypedData_Get_Struct(self, ddog_TraceExporter, &trace_exporter_typed_data, exporter); | ||
| if (exporter == NULL) { | ||
| raise_error(rb_eRuntimeError, "TraceExporter has not been initialized or was already freed"); | ||
| } | ||
| ddog_TraceExporterError *err = ddog_trace_exporter_before_fork(exporter); | ||
| check_exporter_error("Failed to prepare for fork", err); | ||
| return Qnil; | ||
| } | ||
|
|
||
| static VALUE _native_after_fork_in_parent(VALUE self) { | ||
| ddog_TraceExporter *exporter; | ||
| TypedData_Get_Struct(self, ddog_TraceExporter, &trace_exporter_typed_data, exporter); | ||
| if (exporter == NULL) { | ||
| raise_error(rb_eRuntimeError, "TraceExporter has not been initialized or was already freed"); | ||
| } | ||
| ddog_TraceExporterError *err = ddog_trace_exporter_after_fork_in_parent(exporter); | ||
| check_exporter_error("Failed to restore after fork in parent", err); | ||
| return Qnil; | ||
| } | ||
|
|
||
| static VALUE _native_after_fork_in_child(VALUE self) { | ||
| ddog_TraceExporter *exporter; | ||
| TypedData_Get_Struct(self, ddog_TraceExporter, &trace_exporter_typed_data, exporter); | ||
| if (exporter == NULL) { | ||
| raise_error(rb_eRuntimeError, "TraceExporter has not been initialized or was already freed"); | ||
| } | ||
| ddog_TraceExporterError *err = ddog_trace_exporter_after_fork_in_child(exporter); | ||
| check_exporter_error("Failed to restore after fork in child", err); | ||
| return Qnil; | ||
| } | ||
|
|
||
| /* ======================================================================== | ||
| * GVL-release helper for ddog_trace_exporter_send_trace_chunks | ||
| * | ||
|
|
@@ -485,6 +528,7 @@ typedef struct { | |
| const ddog_TraceExporter *exporter; | ||
| ddog_TracerTraceChunks *chunks; | ||
| ddog_TraceExporterResponse *response; | ||
| ddog_TraceExporterCancelToken *cancel_token; /* borrowed, not owned */ | ||
| ddog_TraceExporterErrorCode error_code; | ||
| bool failed; | ||
| bool send_ran; | ||
|
|
@@ -493,7 +537,7 @@ typedef struct { | |
| static void *send_chunks_without_gvl(void *data) { | ||
| send_chunks_args_t *args = (send_chunks_args_t *)data; | ||
| ddog_TraceExporterError *err = ddog_trace_exporter_send_trace_chunks( | ||
| args->exporter, args->chunks, &args->response); | ||
| args->exporter, args->chunks, &args->response, args->cancel_token); | ||
| if (err != NULL) { | ||
| args->error_code = err->code; | ||
| args->failed = true; | ||
|
|
@@ -503,6 +547,20 @@ static void *send_chunks_without_gvl(void *data) { | |
| return NULL; | ||
| } | ||
|
|
||
| /* | ||
| * Unblock function: cooperatively cancel an in-flight send. | ||
| * | ||
| * Called by Ruby when an interrupt (Thread#kill, shutdown) fires while | ||
| * the thread is inside rb_thread_call_without_gvl2. Cancelling the | ||
| * token causes the Rust HTTP pipeline to abort the in-flight request | ||
| * and return promptly, which is not possible with RUBY_UBF_IO's | ||
| * signal-based approach. | ||
| */ | ||
| static void interrupt_exporter_call(void *cancel_token) { | ||
| ddog_trace_exporter_cancel_token_cancel( | ||
| (ddog_TraceExporterCancelToken *)cancel_token); | ||
| } | ||
|
|
||
| /* | ||
| * Check for a pending Ruby exception without raising it. | ||
| * Mirrors the profiling extension's check_if_pending_exception(). | ||
|
|
@@ -595,13 +653,24 @@ static VALUE build_and_send_traces(VALUE arg) { | |
| * response before any Ruby exception propagates -- otherwise we | ||
| * would leak those Rust-allocated objects. | ||
| * | ||
| * A cancellation token is created per send call and passed to the | ||
| * custom unblock function (interrupt_exporter_call). When Ruby | ||
| * interrupts the thread (shutdown, Thread#kill), the UBF cancels | ||
| * the token, which cooperatively aborts the in-flight HTTP request | ||
| * in the Rust runtime. This replaces the signal-based RUBY_UBF_IO | ||
| * which could not actually cancel the Rust HTTP pipeline. | ||
| * | ||
| * An interrupt (e.g. Thread#kill) may cause gvl2 to return before | ||
| * our function runs, so we loop until it does. | ||
| */ | ||
| ddog_TraceExporterCancelToken cancel_token = | ||
| ddog_trace_exporter_cancel_token_new(); | ||
|
|
||
| send_chunks_args_t args = { | ||
| .exporter = ctx->exporter, | ||
| .chunks = ctx->chunks, | ||
| .response = NULL, | ||
| .cancel_token = &cancel_token, | ||
| .failed = false, | ||
| .send_ran = false, | ||
| }; | ||
|
|
@@ -610,12 +679,14 @@ static VALUE build_and_send_traces(VALUE arg) { | |
| while (!args.send_ran && !pending_exception) { | ||
| rb_thread_call_without_gvl2( | ||
| send_chunks_without_gvl, &args, | ||
| RUBY_UBF_IO, NULL); | ||
| interrupt_exporter_call, &cancel_token); | ||
|
Comment on lines
680
to
+682
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When Useful? React with 👍 / 👎.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Such a tiny window here for this to happen in args->send_ran = true;
return NULL;(give or take other internals in But I guess that's an accurate concern. |
||
|
|
||
| if (!args.send_ran) { | ||
| pending_exception = check_if_pending_exception(); | ||
| } | ||
| } | ||
|
|
||
| ddog_trace_exporter_cancel_token_drop(&cancel_token); | ||
| /* Only null chunks when the send actually ran and consumed them. | ||
| * If an interrupt fired before the send executed, chunks are still | ||
| * live and the ensure handler must free them. */ | ||
|
|
@@ -736,6 +807,14 @@ void trace_exporter_init(VALUE tracing_module) { | |
| rb_define_method(trace_exporter_class, "_native_send_traces", | ||
| _native_send_traces, 1); | ||
|
|
||
| /* Instance: fork safety hooks */ | ||
| rb_define_method(trace_exporter_class, "_native_before_fork", | ||
| _native_before_fork, 0); | ||
| rb_define_method(trace_exporter_class, "_native_after_fork_in_parent", | ||
| _native_after_fork_in_parent, 0); | ||
| rb_define_method(trace_exporter_class, "_native_after_fork_in_child", | ||
| _native_after_fork_in_child, 0); | ||
|
|
||
| /* ---------------------------------------------------------------- | ||
| * Response class (defined in Ruby, loaded lazily) | ||
| * | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -77,6 +77,15 @@ def initialize(agent_settings:, logger: Datadog.logger) | |
| service: service, | ||
| version: version | ||
| ) | ||
|
|
||
| # In forked child processes the tokio runtime is dead. | ||
| # Recreate it so the exporter can send traces again. | ||
| exporter = @exporter | ||
| Core::Utils::AtForkMonkeyPatch.at_fork(:child) do | ||
| exporter._native_after_fork_in_child | ||
| rescue => e | ||
| Datadog.logger.warn { "Native transport after-fork reset failed: #{e}" } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this a warning? Or that means - no traces for the fork child?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looking the implementation of But, I agree that we should log what would happen if it failed, something like: "Native transport after-fork reset failed. Traces might not be send to Datadog: "
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Exactly, I think it makes sense at least to explain the consequences of that error in the warning. |
||
| end | ||
| end | ||
|
|
||
| # Send a list of traces to the agent. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This now calls the cancellation-token API, but the repository still declares and locks
libdatadogto33.0.0.1.0, whose installeddatadog/data-pipeline.hdoes not defineddog_TraceExporterCancelTokenor the new cancel/fork functions. I verified with the current lockfile dependency by runningbundle exec rake compile, which fails in this file on these new symbols, so anyone building the gem against the declared dependency cannot install the native extension until the libdatadog dependency is bumped or the calls are guarded.Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, this depends on DataDog/libdatadog#2051 being merged.