Skip to content

Commit 4b79b7e

Browse files
authored
feat(shared-runtime)!: SharedRuntime Borrowed & Owned mode (#2061)
# What ? Split `SharedRuntime` into a trait with three implementations: - `ForkSafeRuntime` *(native only)*: owns a multi-thread tokio runtime, supports fork hooks (`before_fork` / `after_fork_parent` / `after_fork_child`) and synchronous `shutdown`. - `BasicRuntime` *(native only)*: wraps a library-built or caller-provided `Arc<tokio::runtime::Runtime>`, no fork hooks, no synchronous shutdown. - `LocalRuntime` *(wasm32 only)*: single-threaded executor that spawns workers via `wasm_bindgen_futures::spawn_local`, no fork protocol, async-only. `BlockingRuntime` *(native only)* is a sub-trait of `SharedRuntime` that adds `block_on`; implemented by `ForkSafeRuntime` and `BasicRuntime`. Sync facades on the trace exporter (`send` / `shutdown` / `build`) bound their runtime parameter on it. `TraceExporter` is generic over the runtime — `TraceExporter<C, R: SharedRuntime>` — so callers pick `ForkSafeRuntime`, `BasicRuntime`, or `LocalRuntime` explicitly. The FFI pins `R = ForkSafeRuntime` for ABI stability. # Why ? Some callers already have a tokio runtime they want to reuse instead of letting libdatadog create its own. The previous `SharedRuntime` only supported the fork-safe owned case. Splitting into distinct types makes the lifecycle and fork-safety contract explicit and lets callers pick the model that matches their environment. The wasm32 target previously shared a file with the native implementation behind `#[cfg]` walls; it is now a dedicated module with a clean separation. # How ? - Introduce a `SharedRuntime` trait (`new`, `spawn_worker`, `shutdown_async`). - Add a native-only `BlockingRuntime: SharedRuntime` sub-trait with `block_on`. - Move the existing owned-runtime logic into `ForkSafeRuntime`; fork hooks and sync `shutdown` are inherent methods (not part of the trait). - Add `BasicRuntime::with_worker_threads` (library-built) and `BasicRuntime::from_handle(Arc<Runtime>)` (caller-provided). - Extract the wasm32 `spawn_local` path from `fork_safe.rs` into its own `local.rs` module as `LocalRuntime`. - Make `TraceExporter<C, R>` and `TraceExporterBuilder<R>` generic over the runtime; sync entry points additionally require `R: BlockingRuntime`. `Default` for the builder is impl'd only for `R = ForkSafeRuntime` on native so `TraceExporterBuilder::default()` resolves unambiguously. - `restart_on_fork = true` is silently ignored (with a `warn!`) on `BasicRuntime` and `LocalRuntime` since they do not implement a fork protocol. - Update FFI to pin `R = ForkSafeRuntime` in its `TraceExporter` type alias. # Additional Notes - Breaking change for Rust callers of `libdd_shared_runtime`: `SharedRuntime` is now a trait; use `ForkSafeRuntime::with_worker_threads(1)` (or the trait method `SharedRuntime::new()` with the trait in scope) instead of `SharedRuntime::new()`. - FFI handle type changes from `SharedRuntime` to `ForkSafeRuntime`. Co-authored-by: jules.wiriath <jules.wiriath@datadoghq.com>
1 parent 73e73f6 commit 4b79b7e

20 files changed

Lines changed: 1150 additions & 706 deletions

File tree

libdd-data-pipeline-ffi/src/trace_exporter.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,12 @@ use libdd_data_pipeline::trace_exporter::{
1515
};
1616
use libdd_data_pipeline::OtlpProtocol;
1717

18-
pub(crate) type TraceExporter = GenericTraceExporter<NativeCapabilities>;
18+
// FFI pins the runtime parameter to `ForkSafeRuntime` for ABI stability. Rust callers that
19+
// don't need the fork protocol can use `TraceExporter<NativeCapabilities, BasicRuntime>`
20+
// directly.
21+
pub(crate) type TraceExporter = GenericTraceExporter<NativeCapabilities, ForkSafeRuntime>;
1922

20-
use libdd_shared_runtime::SharedRuntime;
23+
use libdd_shared_runtime::ForkSafeRuntime;
2124
use std::{ptr::NonNull, sync::Arc, time::Duration};
2225
use tracing::debug;
2326

@@ -82,7 +85,7 @@ pub struct TraceExporterConfig {
8285
process_tags: Option<String>,
8386
test_session_token: Option<String>,
8487
connection_timeout: Option<u64>,
85-
shared_runtime: Option<Arc<SharedRuntime>>,
88+
shared_runtime: Option<Arc<ForkSafeRuntime>>,
8689
otlp_endpoint: Option<String>,
8790
otlp_protocol: Option<OtlpProtocol>,
8891
}
@@ -458,7 +461,7 @@ pub unsafe extern "C" fn ddog_trace_exporter_config_set_connection_timeout(
458461
#[no_mangle]
459462
pub unsafe extern "C" fn ddog_trace_exporter_config_set_shared_runtime(
460463
config: Option<&mut TraceExporterConfig>,
461-
handle: Option<NonNull<SharedRuntime>>,
464+
handle: Option<NonNull<ForkSafeRuntime>>,
462465
) -> Option<Box<ExporterError>> {
463466
catch_panic!(
464467
match (config, handle) {

libdd-data-pipeline/benches/trace_buffer.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use libdd_data_pipeline::trace_buffer::{Export, TraceBuffer, TraceBufferConfig,
1010
use libdd_data_pipeline::trace_exporter::{
1111
agent_response::AgentResponse, error::TraceExporterError,
1212
};
13-
use libdd_shared_runtime::SharedRuntime;
13+
use libdd_shared_runtime::{ForkSafeRuntime, SharedRuntime};
1414
use libdd_tinybytes::BytesString;
1515
use libdd_trace_utils::span::v04::SpanBytes;
1616
use libdd_trace_utils::span::vec_map::VecMap;
@@ -73,8 +73,8 @@ impl Export<SpanBytes> for SleepExport {
7373
}
7474
}
7575

76-
fn setup_buffer() -> (Arc<SharedRuntime>, Arc<TraceBuffer<SpanBytes>>) {
77-
let rt = Arc::new(SharedRuntime::new().expect("SharedRuntime::new"));
76+
fn setup_buffer() -> (Arc<ForkSafeRuntime>, Arc<TraceBuffer<SpanBytes>>) {
77+
let rt = Arc::new(ForkSafeRuntime::new().expect("ForkSafeRuntime::new"));
7878
let cfg = TraceBufferConfig::new()
7979
.max_buffered_bytes(1_000_000)
8080
.flush_threshold_bytes(100_000)

libdd-data-pipeline/examples/send-traces-with-stats.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use libdd_data_pipeline::trace_exporter::{
99
use libdd_log::logger::{
1010
logger_configure_std, logger_set_log_level, LogEventLevel, StdConfig, StdTarget,
1111
};
12-
use libdd_shared_runtime::SharedRuntime;
12+
use libdd_shared_runtime::{ForkSafeRuntime, SharedRuntime};
1313
use libdd_trace_protobuf::pb;
1414
use std::{
1515
collections::HashMap,
@@ -56,11 +56,11 @@ fn main() {
5656
.expect("Failed to configure logger");
5757
logger_set_log_level(LogEventLevel::Debug).expect("Failed to set log level");
5858

59-
let shared_runtime = Arc::new(SharedRuntime::new().expect("Failed to create runtime"));
59+
let shared_runtime = Arc::new(ForkSafeRuntime::new().expect("Failed to create runtime"));
6060

6161
let args = Args::parse();
6262
let telemetry_cfg = TelemetryConfig::default();
63-
let mut builder = TraceExporter::<NativeCapabilities>::builder();
63+
let mut builder = TraceExporter::<NativeCapabilities, ForkSafeRuntime>::builder();
6464
builder
6565
.set_url(&args.url)
6666
.set_hostname("test")

libdd-data-pipeline/src/agent_info/fetcher.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -159,11 +159,11 @@ async fn fetch_and_hash_response<C: HttpClientCapability + SleepCapability>(
159159
/// ```no_run
160160
/// # use anyhow::Result;
161161
/// # use libdd_capabilities_impl::{HttpClientCapability, NativeCapabilities};
162-
/// # use libdd_shared_runtime::Worker;
162+
/// # use libdd_shared_runtime::{ForkSafeRuntime, SharedRuntime, Worker};
163+
/// # use libdd_data_pipeline::agent_info;
163164
/// # #[tokio::main]
164165
/// # async fn main() -> Result<()> {
165166
/// // Define the endpoint
166-
/// use libdd_data_pipeline::agent_info;
167167
/// let endpoint = libdd_common::Endpoint::from_url("http://localhost:8126/info".parse().unwrap());
168168
/// // Create the fetcher
169169
/// let (mut fetcher, _response_observer) = libdd_data_pipeline::agent_info::AgentInfoFetcher::<
@@ -172,7 +172,7 @@ async fn fetch_and_hash_response<C: HttpClientCapability + SleepCapability>(
172172
/// endpoint, std::time::Duration::from_secs(5 * 60)
173173
/// );
174174
/// // Start the fetcher on a shared runtime
175-
/// let runtime = libdd_shared_runtime::SharedRuntime::new()?;
175+
/// let runtime = ForkSafeRuntime::new()?;
176176
/// runtime.spawn_worker(fetcher, true)?;
177177
///
178178
/// // Get the Arc to access the info
@@ -356,7 +356,7 @@ mod single_threaded_tests {
356356
use crate::agent_info;
357357
use httpmock::prelude::*;
358358
use libdd_capabilities_impl::NativeCapabilities;
359-
use libdd_shared_runtime::SharedRuntime;
359+
use libdd_shared_runtime::{ForkSafeRuntime, SharedRuntime};
360360

361361
const TEST_INFO: &str = r#"{
362362
"version": "0.0.0",
@@ -596,7 +596,7 @@ mod single_threaded_tests {
596596
Duration::from_millis(100),
597597
);
598598
assert!(agent_info::get_agent_info().is_none());
599-
let shared_runtime = SharedRuntime::new().unwrap();
599+
let shared_runtime = ForkSafeRuntime::new().unwrap();
600600
let _ = shared_runtime.spawn_worker(fetcher, true).unwrap();
601601

602602
// Wait until the info is fetched
@@ -679,7 +679,7 @@ mod single_threaded_tests {
679679
// Interval is too long to fetch during the test
680680
AgentInfoFetcher::<NativeCapabilities>::new(endpoint, Duration::from_secs(3600));
681681

682-
let shared_runtime = SharedRuntime::new().unwrap();
682+
let shared_runtime = ForkSafeRuntime::new().unwrap();
683683
let _ = shared_runtime.spawn_worker(fetcher, true).unwrap();
684684

685685
// Create a mock HTTP response with the new agent state
@@ -760,7 +760,7 @@ mod single_threaded_tests {
760760
let (fetcher, response_observer) =
761761
AgentInfoFetcher::<NativeCapabilities>::new(endpoint, Duration::from_secs(3600)); // Very long interval
762762

763-
let shared_runtime = SharedRuntime::new().unwrap();
763+
let shared_runtime = ForkSafeRuntime::new().unwrap();
764764
let _ = shared_runtime.spawn_worker(fetcher, true).unwrap();
765765

766766
// Create a mock HTTP response with the same agent state

libdd-data-pipeline/src/telemetry/mod.rs

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -335,15 +335,15 @@ mod tests {
335335
use httpmock::MockServer;
336336
use libdd_capabilities::HttpError;
337337

338-
use libdd_shared_runtime::{SharedRuntime, WorkerHandle};
338+
use libdd_shared_runtime::{BlockingRuntime, ForkSafeRuntime, SharedRuntime, WorkerHandle};
339339
use libdd_trace_utils::test_utils::poll_for_mock_hits;
340340
// Use `regex::Regex` directly here because `httpmock`'s `body_matches`
341341
// requires `Into<HttpMockRegex>`, which is only implemented for
342342
// `regex::Regex`, not `regex_lite::Regex`.
343343
use regex::Regex;
344344
use tokio::time::sleep;
345345

346-
fn get_test_client(url: &str, runtime: &SharedRuntime) -> (TelemetryClient, WorkerHandle) {
346+
fn get_test_client(url: &str, runtime: &ForkSafeRuntime) -> (TelemetryClient, WorkerHandle) {
347347
let (client, worker) = TelemetryClientBuilder::default()
348348
.set_service_name("test_service")
349349
.set_service_version("test_version")
@@ -395,7 +395,7 @@ mod tests {
395395
#[test]
396396
fn api_bytes_test() {
397397
let payload = Regex::new(r#""metric":"trace_api.bytes","tags":\["src_library:libdatadog"\],"sketch_b64":".+","common":true,"interval":\d+,"type":"distribution""#).unwrap();
398-
let shared_runtime = SharedRuntime::new().expect("Failed to create runtime");
398+
let shared_runtime = ForkSafeRuntime::new().expect("Failed to create runtime");
399399
let server = MockServer::start();
400400
let mut telemetry_srv = server.mock(|when, then| {
401401
when.method(POST).body_matches(payload);
@@ -426,7 +426,7 @@ mod tests {
426426
#[test]
427427
fn requests_test() {
428428
let payload = Regex::new(r#""metric":"trace_api.requests","points":\[\[\d+,1\.0\]\],"tags":\["src_library:libdatadog"\],"common":true,"type":"count""#).unwrap();
429-
let shared_runtime = SharedRuntime::new().expect("Failed to create runtime");
429+
let shared_runtime = ForkSafeRuntime::new().expect("Failed to create runtime");
430430
let server = MockServer::start();
431431
let mut telemetry_srv = server.mock(|when, then| {
432432
when.method(POST).body_matches(payload);
@@ -457,7 +457,7 @@ mod tests {
457457
#[test]
458458
fn responses_per_code_test() {
459459
let payload = Regex::new(r#""metric":"trace_api.responses","points":\[\[\d+,1\.0\]\],"tags":\["status_code:200","src_library:libdatadog"\],"common":true,"type":"count"#).unwrap();
460-
let shared_runtime = SharedRuntime::new().expect("Failed to create runtime");
460+
let shared_runtime = ForkSafeRuntime::new().expect("Failed to create runtime");
461461
let server = MockServer::start();
462462
let mut telemetry_srv = server.mock(|when, then| {
463463
when.method(POST).body_matches(payload);
@@ -488,7 +488,7 @@ mod tests {
488488
#[test]
489489
fn errors_timeout_test() {
490490
let payload = Regex::new(r#""metric":"trace_api.errors","points":\[\[\d+,1\.0\]\],"tags":\["src_library:libdatadog","type:timeout"\],"common":true,"type":"count"#).unwrap();
491-
let shared_runtime = SharedRuntime::new().expect("Failed to create runtime");
491+
let shared_runtime = ForkSafeRuntime::new().expect("Failed to create runtime");
492492
let server = MockServer::start();
493493
let mut telemetry_srv = server.mock(|when, then| {
494494
when.method(POST).body_matches(payload);
@@ -519,7 +519,7 @@ mod tests {
519519
#[test]
520520
fn errors_network_test() {
521521
let payload = Regex::new(r#""metric":"trace_api.errors","points":\[\[\d+,1\.0\]\],"tags":\["src_library:libdatadog","type:network"\],"common":true,"type":"count"#).unwrap();
522-
let shared_runtime = SharedRuntime::new().expect("Failed to create runtime");
522+
let shared_runtime = ForkSafeRuntime::new().expect("Failed to create runtime");
523523
let server = MockServer::start();
524524
let mut telemetry_srv = server.mock(|when, then| {
525525
when.method(POST).body_matches(payload);
@@ -550,7 +550,7 @@ mod tests {
550550
#[test]
551551
fn errors_status_code_test() {
552552
let payload = Regex::new(r#""metric":"trace_api.errors","points":\[\[\d+,1\.0\]\],"tags":\["src_library:libdatadog","type:status_code"\],"common":true,"type":"count"#).unwrap();
553-
let shared_runtime = SharedRuntime::new().expect("Failed to create runtime");
553+
let shared_runtime = ForkSafeRuntime::new().expect("Failed to create runtime");
554554
let server = MockServer::start();
555555
let mut telemetry_srv = server.mock(|when, then| {
556556
when.method(POST).body_matches(payload);
@@ -581,7 +581,7 @@ mod tests {
581581
#[test]
582582
fn chunks_sent_test() {
583583
let payload = Regex::new(r#""metric":"trace_chunks_sent","points":\[\[\d+,1\.0\]\],"tags":\["src_library:libdatadog"\],"common":true,"type":"count"#).unwrap();
584-
let shared_runtime = SharedRuntime::new().expect("Failed to create runtime");
584+
let shared_runtime = ForkSafeRuntime::new().expect("Failed to create runtime");
585585
let server = MockServer::start();
586586
let mut telemetry_srv = server.mock(|when, then| {
587587
when.method(POST).body_matches(payload);
@@ -612,7 +612,7 @@ mod tests {
612612
#[test]
613613
fn chunks_dropped_send_failure_test() {
614614
let payload = Regex::new(r#""metric":"trace_chunks_dropped","points":\[\[\d+,1\.0\]\],"tags":\["src_library:libdatadog","reason:send_failure"\],"common":true,"type":"count"#).unwrap();
615-
let shared_runtime = SharedRuntime::new().expect("Failed to create runtime");
615+
let shared_runtime = ForkSafeRuntime::new().expect("Failed to create runtime");
616616
let server = MockServer::start();
617617
let mut telemetry_srv = server.mock(|when, then| {
618618
when.method(POST).body_matches(payload);
@@ -644,7 +644,7 @@ mod tests {
644644
fn send_client_side_stats_drops_test() {
645645
let payload_p0 = Regex::new(r#""metric":"trace_chunks_dropped","points":\[\[\d+,3\.0\]\],"tags":\["src_library:libdatadog","reason:p0_drop"\],"common":true,"type":"count"#).unwrap();
646646
let payload_trace_filter = Regex::new(r#""metric":"trace_chunks_dropped","points":\[\[\d+,5\.0\]\],"tags":\["src_library:libdatadog","reason:trace_filters"\],"common":true,"type":"count"#).unwrap();
647-
let shared_runtime = SharedRuntime::new().expect("Failed to create runtime");
647+
let shared_runtime = ForkSafeRuntime::new().expect("Failed to create runtime");
648648
let server = MockServer::start();
649649
let mut telemetry_srv = server.mock(|when, then| {
650650
when.method(POST)
@@ -673,7 +673,7 @@ mod tests {
673673
#[test]
674674
fn chunks_dropped_serialization_error_test() {
675675
let payload = Regex::new(r#""metric":"trace_chunks_dropped","points":\[\[\d+,1\.0\]\],"tags":\["src_library:libdatadog","reason:serialization_error"\],"common":true,"type":"count"#).unwrap();
676-
let shared_runtime = SharedRuntime::new().expect("Failed to create runtime");
676+
let shared_runtime = ForkSafeRuntime::new().expect("Failed to create runtime");
677677
let server = MockServer::start();
678678
let mut telemetry_srv = server.mock(|when, then| {
679679
when.method(POST).body_matches(payload);
@@ -822,7 +822,7 @@ mod tests {
822822
#[cfg_attr(miri, ignore)]
823823
#[test]
824824
fn runtime_id_test() {
825-
let shared_runtime = SharedRuntime::new().expect("Failed to create runtime");
825+
let shared_runtime = ForkSafeRuntime::new().expect("Failed to create runtime");
826826
let server = MockServer::start();
827827
let mut telemetry_srv = server.mock(|when, then| {
828828
when.method(POST).body_includes(r#""runtime_id":"foo""#);
@@ -853,7 +853,7 @@ mod tests {
853853
#[cfg_attr(miri, ignore)]
854854
#[test]
855855
fn application_metadata_test() {
856-
let shared_runtime = SharedRuntime::new().expect("Failed to create runtime");
856+
let shared_runtime = ForkSafeRuntime::new().expect("Failed to create runtime");
857857
let server = MockServer::start();
858858
let mut telemetry_srv = server.mock(|when, then| {
859859
when.method(POST).body_includes(
@@ -887,7 +887,7 @@ mod tests {
887887
#[cfg_attr(miri, ignore)]
888888
#[test]
889889
fn session_headers_telemetry_test() {
890-
let shared_runtime = SharedRuntime::new().expect("Failed to create runtime");
890+
let shared_runtime = ForkSafeRuntime::new().expect("Failed to create runtime");
891891
let server = MockServer::start();
892892
let mut telemetry_srv = server.mock(|when, then| {
893893
when.method(POST)

libdd-data-pipeline/src/trace_buffer/mod.rs

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use std::{
1414
};
1515

1616
use libdd_capabilities::{HttpClientCapability, MaybeSend, SleepCapability};
17-
use libdd_shared_runtime::Worker;
17+
use libdd_shared_runtime::{SharedRuntime, Worker};
1818

1919
use crate::trace_exporter::{
2020
agent_response::AgentResponse, error::TraceExporterError, TraceExporter,
@@ -643,18 +643,28 @@ pub trait Export<T>: Send + Debug {
643643
}
644644

645645
#[derive(Debug)]
646-
pub struct DefaultExport<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static> {
647-
trace_exporter: TraceExporter<C>,
646+
pub struct DefaultExport<C, R>
647+
where
648+
C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static,
649+
R: SharedRuntime + std::fmt::Debug + Send + Sync + 'static,
650+
{
651+
trace_exporter: TraceExporter<C, R>,
648652
}
649653

650-
impl<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static> DefaultExport<C> {
651-
pub fn new(trace_exporter: TraceExporter<C>) -> Self {
654+
impl<C, R> DefaultExport<C, R>
655+
where
656+
C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static,
657+
R: SharedRuntime + std::fmt::Debug + Send + Sync + 'static,
658+
{
659+
pub fn new(trace_exporter: TraceExporter<C, R>) -> Self {
652660
Self { trace_exporter }
653661
}
654662
}
655663

656-
impl<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static>
657-
Export<libdd_trace_utils::span::v04::SpanBytes> for DefaultExport<C>
664+
impl<C, R> Export<libdd_trace_utils::span::v04::SpanBytes> for DefaultExport<C, R>
665+
where
666+
C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static,
667+
R: SharedRuntime + std::fmt::Debug + Send + Sync + 'static,
658668
{
659669
fn export_trace_chunks(
660670
&mut self,
@@ -781,7 +791,7 @@ mod tests {
781791
use std::sync::Arc;
782792
use std::time::Duration;
783793

784-
use libdd_shared_runtime::SharedRuntime;
794+
use libdd_shared_runtime::{BlockingRuntime, ForkSafeRuntime, SharedRuntime};
785795

786796
use crate::trace_buffer::{BufferSize, Export, TraceBuffer, TraceBufferConfig};
787797
use crate::trace_exporter::agent_response::AgentResponse;
@@ -828,11 +838,11 @@ mod tests {
828838
assert_export: Box<dyn FnMut(Vec<Vec<()>>) + Send + Sync>,
829839
cfg: TraceBufferConfig,
830840
) -> (
831-
Arc<SharedRuntime>,
841+
Arc<ForkSafeRuntime>,
832842
Arc<tokio::sync::Semaphore>,
833843
TraceBuffer<()>,
834844
) {
835-
let rt = Arc::new(SharedRuntime::new().unwrap());
845+
let rt = Arc::new(ForkSafeRuntime::new().unwrap());
836846
let sem: Arc<tokio::sync::Semaphore> = Arc::new(tokio::sync::Semaphore::new(0));
837847
let (sender, worker) = TraceBuffer::new(
838848
cfg,

0 commit comments

Comments
 (0)