Skip to content

Commit 0b00c9c

Browse files
refactor(profiling): split upload function (#1733)
## What does this PR do? This PR splits the Exporter creation process into two steps (two functions to call): 1. Create the Tokio runtime with `init_runtime` 2. Upload the Profile with `send_blocking` (`send_blocking` calls `init_runtime` if it hasn't been initialised from the outside) This fixes a TSan-detected race condition where we try to cancel the Upload request (using the Cancellation Token) at the same time the runtime is being initialised. Example runtime Profiler fix PR: DataDog/dd-trace-py#16467 The PR also updates the existing `exporter.cpp` example to use the new function (tested with `cargo ffi-test --filter exporter`). ## Open questions Currently, the PR maintains compatibility with existing usages -- not explicitly calling `init_runtime` will make `send_blocking` call it when it needs it. However, this is risky (for the reasons that made us make this PR in the first place). * Should we prevent people from doing this? _Doing so will break compatibility with current usages._ * ... Or should we instead keep the race? _Doing so will guarantee backwards compatibility but also not force usages to be fixed._ --- We ended up going for the second, keeping the race so that we don't have a breaking API change. Co-authored-by: thomas.kowalski <thomas.kowalski@datadoghq.com>
1 parent 04c735c commit 0b00c9c

3 files changed

Lines changed: 63 additions & 7 deletions

File tree

examples/ffi/exporter.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,18 @@ int main(int argc, char *argv[]) {
146146
auto cancel = ddog_CancellationToken_new();
147147
auto cancel_for_background_thread = ddog_CancellationToken_clone(&cancel);
148148

149+
// Eagerly initialize the tokio runtime. This is optional, but required
150+
// to avoid race conditions if another thread might be using the
151+
// the cancellation token at the same time as the profile is being sent
152+
// (as is the case here).
153+
ddog_VoidResult init_result = ddog_prof_Exporter_init_runtime(exporter);
154+
if (init_result.tag != DDOG_VOID_RESULT_OK) {
155+
print_error("Failed to initialize exporter runtime: ", init_result.err);
156+
ddog_Error_drop(&init_result.err);
157+
ddog_prof_Exporter_drop(exporter);
158+
return 1;
159+
}
160+
149161
// As an example of CancellationToken usage, here we create a background
150162
// thread that sleeps for some time and then cancels a request early (e.g.
151163
// before the timeout in ddog_prof_Exporter_send_blocking is hit).

libdd-profiling-ffi/src/exporter.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,8 +222,38 @@ unsafe fn parse_json(
222222
}
223223
}
224224

225+
/// Initializes the tokio runtime for the exporter.
226+
///
227+
/// This function creates the tokio runtime used by `ddog_prof_Exporter_send_blocking`.
228+
/// It can be called ahead of time to ensure the runtime is ready before sending.
229+
///
230+
/// # Thread Affinity
231+
///
232+
/// **Important**: The runtime has thread affinity. This function should be called from
233+
/// the same thread that will later call `ddog_prof_Exporter_send_blocking`.
234+
///
235+
/// # Arguments
236+
/// * `exporter` - Borrows the exporter.
237+
///
238+
/// # Safety
239+
/// The `exporter` must point to a valid ProfileExporter that has not been dropped.
240+
#[no_mangle]
241+
#[must_use]
242+
#[named]
243+
pub unsafe extern "C" fn ddog_prof_Exporter_init_runtime(
244+
mut exporter: *mut Handle<ProfileExporter>,
245+
) -> VoidResult {
246+
wrap_with_void_ffi_result!({
247+
let exporter = exporter.to_inner_mut()?;
248+
exporter.init_runtime()?
249+
})
250+
}
251+
225252
/// Builds a request and sends it, returning the HttpStatus.
226253
///
254+
/// Note: If the runtime has not been initialized via `ddog_prof_Exporter_init_runtime`,
255+
/// it will be lazily initialized on first call.
256+
///
227257
/// # Arguments
228258
/// * `exporter` - Borrows the exporter.
229259
/// * `profile` - Takes ownership of the profile.

libdd-profiling/src/exporter/profile_exporter.rs

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,26 @@ impl ProfileExporter {
161161
/// - Using the async [`send`] method directly from within a tokio runtime
162162
///
163163
/// [`send`]: ProfileExporter::send
164+
/// Initializes the tokio runtime for blocking operations.
165+
///
166+
/// This method lazily creates a single-threaded tokio runtime. It can be called
167+
/// before `send_blocking` to ensure the runtime is ready ahead of time.
168+
///
169+
/// # Thread Affinity
170+
///
171+
/// **Important**: The runtime uses `new_current_thread()`, which has thread affinity.
172+
/// This method should be called from the same thread that will later call `send_blocking`.
173+
pub fn init_runtime(&mut self) -> anyhow::Result<()> {
174+
if self.runtime.is_none() {
175+
self.runtime = Some(
176+
tokio::runtime::Builder::new_current_thread()
177+
.enable_all()
178+
.build()?,
179+
);
180+
}
181+
Ok(())
182+
}
183+
164184
#[allow(clippy::too_many_arguments)]
165185
pub fn send_blocking(
166186
&mut self,
@@ -172,13 +192,7 @@ impl ProfileExporter {
172192
process_tags: Option<&str>,
173193
cancel: Option<&CancellationToken>,
174194
) -> anyhow::Result<reqwest::StatusCode> {
175-
if self.runtime.is_none() {
176-
self.runtime = Some(
177-
tokio::runtime::Builder::new_current_thread()
178-
.enable_all()
179-
.build()?,
180-
);
181-
}
195+
self.init_runtime()?;
182196

183197
Ok(self
184198
.runtime

0 commit comments

Comments
 (0)