Skip to content

Commit 401a112

Browse files
committed
fix(sdk): resolve exporter deadlock on constrained tokio runtimes
The default thread-based processors (BatchSpanProcessor, BatchLogProcessor, PeriodicReader) call futures_executor::block_on() on their dedicated worker threads. When the exporter uses tonic/gRPC, the export future depends on tokio tasks (e.g. tonic's Buffer worker) that can only be polled by tokio worker threads. If all tokio worker threads are blocked (single-threaded runtime, or multi-thread with 1 worker), this creates a circular wait. Add BlockingStrategy that captures the tokio runtime handle at construction time and enters the runtime context via Handle::enter() before calling futures_executor::block_on(). This makes tokio types available on the dedicated background threads without taking ownership of the reactor. Falls back to plain futures_executor::block_on() without tokio. Fixes: #2802
1 parent c52f4a3 commit 401a112

4 files changed

Lines changed: 68 additions & 6 deletions

File tree

opentelemetry-sdk/src/logs/batch_log_processor.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
1818
use crate::error::{OTelSdkError, OTelSdkResult};
1919
use crate::logs::log_processor::LogProcessor;
20+
use crate::util::BlockingStrategy;
2021
use crate::{
2122
logs::{LogBatch, LogExporter, SdkLogRecord},
2223
Resource,
@@ -342,6 +343,7 @@ impl BatchLogProcessor {
342343
let max_export_batch_size = config.max_export_batch_size;
343344
let current_batch_size = Arc::new(AtomicUsize::new(0));
344345
let current_batch_size_for_thread = current_batch_size.clone();
346+
let blocking_strategy = BlockingStrategy::new();
345347

346348
let handle = thread::Builder::new()
347349
.name("OpenTelemetry.Logs.BatchProcessor".to_string())
@@ -368,6 +370,7 @@ impl BatchLogProcessor {
368370
last_export_time: &mut Instant,
369371
current_batch_size: &AtomicUsize,
370372
max_export_size: usize,
373+
blocking_strategy: &BlockingStrategy,
371374
) -> OTelSdkResult
372375
where
373376
E: LogExporter + Send + Sync + 'static,
@@ -388,13 +391,15 @@ impl BatchLogProcessor {
388391
let count_of_logs = logs.len(); // Count of logs that will be exported
389392
total_exported_logs += count_of_logs;
390393

391-
result = export_batch_sync(exporter, logs, last_export_time); // This method clears the logs vec after exporting
394+
result =
395+
export_batch_sync(exporter, logs, last_export_time, blocking_strategy); // This method clears the logs vec after exporting
392396

393397
current_batch_size.fetch_sub(count_of_logs, Ordering::Relaxed);
394398
}
395399
result
396400
}
397401

402+
let blocking_strategy = blocking_strategy;
398403
loop {
399404
let remaining_time = config
400405
.scheduled_delay
@@ -417,6 +422,7 @@ impl BatchLogProcessor {
417422
&mut last_export_time,
418423
&current_batch_size,
419424
max_export_batch_size,
425+
&blocking_strategy,
420426
);
421427
}
422428
Ok(BatchMessage::ForceFlush(sender)) => {
@@ -428,6 +434,7 @@ impl BatchLogProcessor {
428434
&mut last_export_time,
429435
&current_batch_size,
430436
max_export_batch_size,
437+
&blocking_strategy,
431438
);
432439
let _ = sender.send(result);
433440
}
@@ -440,6 +447,7 @@ impl BatchLogProcessor {
440447
&mut last_export_time,
441448
&current_batch_size,
442449
max_export_batch_size,
450+
&blocking_strategy,
443451
);
444452
let _ = exporter.shutdown();
445453
let _ = sender.send(result);
@@ -468,6 +476,7 @@ impl BatchLogProcessor {
468476
&mut last_export_time,
469477
&current_batch_size,
470478
max_export_batch_size,
479+
&blocking_strategy,
471480
);
472481
}
473482
Err(RecvTimeoutError::Disconnected) => {
@@ -518,6 +527,7 @@ fn export_batch_sync<E>(
518527
exporter: &E,
519528
batch: &mut Vec<Box<(SdkLogRecord, InstrumentationScope)>>,
520529
last_export_time: &mut Instant,
530+
blocking_strategy: &BlockingStrategy,
521531
) -> OTelSdkResult
522532
where
523533
E: LogExporter + ?Sized,
@@ -529,7 +539,7 @@ where
529539
}
530540

531541
let export = exporter.export(LogBatch::new_with_owned_data(batch.as_slice()));
532-
let export_result = futures_executor::block_on(export);
542+
let export_result = blocking_strategy.block_on(export);
533543

534544
// Clear the batch vec after exporting
535545
batch.clear();

opentelemetry-sdk/src/metrics/periodic_reader.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use opentelemetry::{otel_debug, otel_error, otel_info, otel_warn, Context};
1313
use crate::{
1414
error::{OTelSdkError, OTelSdkResult},
1515
metrics::{exporter::PushMetricExporter, reader::SdkProducer},
16+
util::BlockingStrategy,
1617
Resource,
1718
};
1819

@@ -152,6 +153,7 @@ impl<E: PushMetricExporter> PeriodicReader<E> {
152153
message_sender,
153154
producer: Mutex::new(None),
154155
exporter: exporter_arc.clone(),
156+
blocking_strategy: BlockingStrategy::new(),
155157
}),
156158
};
157159
let cloned_reader = reader.clone();
@@ -351,6 +353,7 @@ struct PeriodicReaderInner<E: PushMetricExporter> {
351353
exporter: Arc<E>,
352354
message_sender: mpsc::Sender<Message>,
353355
producer: Mutex<Option<Weak<dyn SdkProducer>>>,
356+
blocking_strategy: BlockingStrategy,
354357
}
355358

356359
impl<E: PushMetricExporter> PeriodicReaderInner<E> {
@@ -407,9 +410,8 @@ impl<E: PushMetricExporter> PeriodicReaderInner<E> {
407410
});
408411
otel_debug!(name: "PeriodicReaderMetricsCollected", count = metrics_count, time_taken_in_millis = time_taken_for_collect.as_millis());
409412

410-
// Relying on futures executor to execute async call.
411413
// TODO: Pass timeout to exporter
412-
futures_executor::block_on(self.exporter.export(rm))
414+
self.blocking_strategy.block_on(self.exporter.export(rm))
413415
}
414416

415417
fn force_flush(&self) -> OTelSdkResult {

opentelemetry-sdk/src/trace/span_processor.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use crate::error::{OTelSdkError, OTelSdkResult};
3838
use crate::resource::Resource;
3939
use crate::trace::Span;
4040
use crate::trace::{SpanData, SpanExporter};
41+
use crate::util::BlockingStrategy;
4142
use opentelemetry::Context;
4243
use opentelemetry::{otel_debug, otel_error, otel_warn};
4344
use std::cmp::min;
@@ -345,6 +346,7 @@ impl BatchSpanProcessor {
345346
let max_export_batch_size = config.max_export_batch_size;
346347
let current_batch_size = Arc::new(AtomicUsize::new(0));
347348
let current_batch_size_for_thread = current_batch_size.clone();
349+
let blocking_strategy = BlockingStrategy::new();
348350

349351
let handle = thread::Builder::new()
350352
.name("OpenTelemetry.Traces.BatchProcessor".to_string())
@@ -359,6 +361,7 @@ impl BatchSpanProcessor {
359361
let mut spans = Vec::with_capacity(config.max_export_batch_size);
360362
let mut last_export_time = Instant::now();
361363
let current_batch_size = current_batch_size_for_thread;
364+
let blocking_strategy = blocking_strategy;
362365
loop {
363366
let remaining_time_option = config
364367
.scheduled_delay
@@ -382,6 +385,7 @@ impl BatchSpanProcessor {
382385
&mut last_export_time,
383386
&current_batch_size,
384387
&config,
388+
&blocking_strategy,
385389
);
386390
}
387391
BatchMessage::ForceFlush(sender) => {
@@ -393,6 +397,7 @@ impl BatchSpanProcessor {
393397
&mut last_export_time,
394398
&current_batch_size,
395399
&config,
400+
&blocking_strategy,
396401
);
397402
let _ = sender.send(result);
398403
}
@@ -405,6 +410,7 @@ impl BatchSpanProcessor {
405410
&mut last_export_time,
406411
&current_batch_size,
407412
&config,
413+
&blocking_strategy,
408414
);
409415
let _ = exporter.shutdown();
410416
let _ = sender.send(result);
@@ -434,6 +440,7 @@ impl BatchSpanProcessor {
434440
&mut last_export_time,
435441
&current_batch_size,
436442
&config,
443+
&blocking_strategy,
437444
);
438445
}
439446
Err(RecvTimeoutError::Disconnected) => {
@@ -488,6 +495,7 @@ impl BatchSpanProcessor {
488495
last_export_time: &mut Instant,
489496
current_batch_size: &AtomicUsize,
490497
config: &BatchConfig,
498+
blocking_strategy: &BlockingStrategy,
491499
) -> OTelSdkResult
492500
where
493501
E: SpanExporter + Send + Sync + 'static,
@@ -508,7 +516,7 @@ impl BatchSpanProcessor {
508516
let count_of_spans = spans.len(); // Count of spans that will be exported
509517
total_exported_spans += count_of_spans;
510518

511-
result = Self::export_batch_sync(exporter, spans, last_export_time); // This method clears the spans vec after exporting
519+
result = Self::export_batch_sync(exporter, spans, last_export_time, blocking_strategy); // This method clears the spans vec after exporting
512520

513521
current_batch_size.fetch_sub(count_of_spans, Ordering::Relaxed);
514522
}
@@ -520,6 +528,7 @@ impl BatchSpanProcessor {
520528
exporter: &E,
521529
batch: &mut Vec<SpanData>,
522530
last_export_time: &mut Instant,
531+
blocking_strategy: &BlockingStrategy,
523532
) -> OTelSdkResult
524533
where
525534
E: SpanExporter + ?Sized,
@@ -537,7 +546,7 @@ impl BatchSpanProcessor {
537546
// every export. See if this can be optimized by
538547
// *not* requiring ownership in the exporter.
539548
let export = exporter.export(batch.split_off(0));
540-
let export_result = futures_executor::block_on(export);
549+
let export_result = blocking_strategy.block_on(export);
541550

542551
match export_result {
543552
Ok(_) => OTelSdkResult::Ok(()),

opentelemetry-sdk/src/util.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,44 @@ pub fn tokio_interval_stream(
77
) -> tokio_stream::wrappers::IntervalStream {
88
tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(period))
99
}
10+
11+
/// Strategy for blocking on async futures from synchronous contexts.
12+
///
13+
/// When constructed within a tokio runtime, captures the runtime handle
14+
/// and enters the runtime context via [`tokio::runtime::Handle::enter()`]
15+
/// before blocking with [`futures_executor::block_on()`]. This makes tokio
16+
/// types (spawn, timers, IO resources) available on dedicated background
17+
/// threads without taking ownership of the reactor — IO continues to be
18+
/// driven by the runtime's own threads.
19+
///
20+
/// Falls back to plain [`futures_executor::block_on()`] when no tokio runtime
21+
/// is available (e.g., non-tokio environments).
22+
#[cfg(any(feature = "trace", feature = "logs", feature = "metrics"))]
23+
#[derive(Clone, Debug)]
24+
pub(crate) enum BlockingStrategy {
25+
#[cfg(feature = "rt-tokio")]
26+
TokioHandle(tokio::runtime::Handle),
27+
FuturesExecutor,
28+
}
29+
30+
#[cfg(any(feature = "trace", feature = "logs", feature = "metrics"))]
31+
impl BlockingStrategy {
32+
pub(crate) fn new() -> Self {
33+
#[cfg(feature = "rt-tokio")]
34+
if let Ok(handle) = tokio::runtime::Handle::try_current() {
35+
return Self::TokioHandle(handle);
36+
}
37+
Self::FuturesExecutor
38+
}
39+
40+
pub(crate) fn block_on<F: std::future::Future>(&self, future: F) -> F::Output {
41+
match self {
42+
#[cfg(feature = "rt-tokio")]
43+
Self::TokioHandle(handle) => {
44+
let _guard = handle.enter();
45+
futures_executor::block_on(future)
46+
}
47+
Self::FuturesExecutor => futures_executor::block_on(future),
48+
}
49+
}
50+
}

0 commit comments

Comments
 (0)