Skip to content

Commit 77586fa

Browse files
committed
test(sdk): add regression tests for tokio runtime deadlock fix
Add tests with TokioSpawn*Exporter mocks that call tokio::spawn() inside export(), simulating tonic/gRPC exporters. These prove that BlockingStrategy correctly provides tokio runtime context on the processor's dedicated OS thread, preventing deadlocks on constrained multi_thread(1) runtimes (open-telemetry#2802, open-telemetry#3356).
1 parent 22ac050 commit 77586fa

3 files changed

Lines changed: 201 additions & 0 deletions

File tree

opentelemetry-sdk/src/logs/batch_log_processor.rs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1036,6 +1036,66 @@ mod tests {
10361036
processor.shutdown().unwrap();
10371037
}
10381038

1039+
// Mock exporter that uses tokio::spawn internally, simulating tonic/gRPC
1040+
// exporters where the export future depends on tokio tasks. Without
1041+
// BlockingStrategy, this deadlocks on constrained runtimes because
1042+
// futures_executor::block_on() cannot drive tokio::spawn-ed tasks.
1043+
#[derive(Debug, Clone)]
1044+
struct TokioSpawnLogExporter {
1045+
exported_count: Arc<AtomicUsize>,
1046+
}
1047+
1048+
impl TokioSpawnLogExporter {
1049+
fn new() -> Self {
1050+
Self {
1051+
exported_count: Arc::new(AtomicUsize::new(0)),
1052+
}
1053+
}
1054+
}
1055+
1056+
impl LogExporter for TokioSpawnLogExporter {
1057+
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
1058+
let count = batch.len();
1059+
// Simulate tonic/gRPC: the export future depends on a tokio::spawn-ed task.
1060+
let result = tokio::spawn(async move { count }).await.unwrap();
1061+
assert_eq!(result, count);
1062+
self.exported_count.fetch_add(count, Ordering::Relaxed);
1063+
Ok(())
1064+
}
1065+
1066+
fn shutdown(&self) -> OTelSdkResult {
1067+
Ok(())
1068+
}
1069+
}
1070+
1071+
// Regression test for deadlock on constrained tokio runtimes (#2802, #3356).
1072+
// Uses TokioSpawnLogExporter which internally calls tokio::spawn(),
1073+
// simulating tonic/gRPC exporters where the export future depends on
1074+
// tokio-spawned tasks. Without BlockingStrategy, this deadlocks because
1075+
// futures_executor::block_on() cannot drive tokio::spawn-ed tasks
1076+
// without the runtime context.
1077+
//
1078+
// Note: current_thread runtime is not tested here because it has a
1079+
// fundamental limitation — the single runtime thread is blocked by
1080+
// force_flush()'s recv(), so no thread is available to drive spawned
1081+
// tasks. The multi_thread(1) scenario (1-vCPU k8s pods) is the primary
1082+
// target of this fix.
1083+
1084+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1085+
async fn test_batch_log_processor_multi_thread_1_worker_with_tokio_spawn_exporter() {
1086+
let exporter = TokioSpawnLogExporter::new();
1087+
let exported_count = exporter.exported_count.clone();
1088+
let processor = BatchLogProcessor::new(exporter, BatchConfig::default());
1089+
1090+
let mut record = SdkLogRecord::new();
1091+
let instrumentation = InstrumentationScope::default();
1092+
processor.emit(&mut record, &instrumentation);
1093+
1094+
processor.force_flush().unwrap();
1095+
1096+
assert_eq!(exported_count.load(Ordering::Relaxed), 1);
1097+
}
1098+
10391099
#[tokio::test(flavor = "multi_thread")]
10401100
async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_current_thread() {
10411101
let exporter = InMemoryLogExporterBuilder::default().build();

opentelemetry-sdk/src/metrics/periodic_reader.rs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -606,6 +606,75 @@ mod tests {
606606
}
607607
}
608608

609+
// Mock exporter that uses tokio::spawn internally, simulating tonic/gRPC
610+
// exporters where the export future depends on tokio tasks. Without
611+
// BlockingStrategy, this deadlocks on constrained runtimes because
612+
// futures_executor::block_on() cannot drive tokio::spawn-ed tasks.
613+
#[derive(Debug, Clone, Default)]
614+
struct TokioSpawnMetricExporter {
615+
exported_count: Arc<AtomicUsize>,
616+
is_shutdown: Arc<AtomicBool>,
617+
}
618+
619+
impl PushMetricExporter for TokioSpawnMetricExporter {
620+
async fn export(&self, _metrics: &ResourceMetrics) -> OTelSdkResult {
621+
// Simulate tonic/gRPC: the export future depends on a tokio::spawn-ed task.
622+
let result = tokio::spawn(async { 42 }).await.unwrap();
623+
assert_eq!(result, 42);
624+
self.exported_count.fetch_add(1, Ordering::Relaxed);
625+
Ok(())
626+
}
627+
628+
fn force_flush(&self) -> OTelSdkResult {
629+
Ok(())
630+
}
631+
632+
fn shutdown(&self) -> OTelSdkResult {
633+
self.is_shutdown.store(true, Ordering::Relaxed);
634+
Ok(())
635+
}
636+
637+
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
638+
self.shutdown()
639+
}
640+
641+
fn temporality(&self) -> Temporality {
642+
Temporality::Cumulative
643+
}
644+
}
645+
646+
// Regression test for deadlock on constrained tokio runtimes (#2802, #3356).
647+
// Uses TokioSpawnMetricExporter which internally calls tokio::spawn(),
648+
// simulating tonic/gRPC exporters where the export future depends on
649+
// tokio-spawned tasks. Without BlockingStrategy, this deadlocks because
650+
// futures_executor::block_on() cannot drive tokio::spawn-ed tasks
651+
// without the runtime context.
652+
//
653+
// Note: current_thread runtime is not tested here because it has a
654+
// fundamental limitation — the single runtime thread is blocked by
655+
// force_flush()'s recv(), so no thread is available to drive spawned
656+
// tasks. The multi_thread(1) scenario (1-vCPU k8s pods) is the primary
657+
// target of this fix.
658+
659+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
660+
async fn test_periodic_reader_multi_thread_1_worker_with_tokio_spawn_exporter() {
661+
let exporter = TokioSpawnMetricExporter::default();
662+
let exported_count = exporter.exported_count.clone();
663+
664+
let reader = PeriodicReader::builder(exporter)
665+
.with_interval(Duration::from_secs(120))
666+
.build();
667+
668+
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
669+
let meter = meter_provider.meter("test");
670+
let counter = meter.u64_counter("test.counter").build();
671+
counter.add(1, &[]);
672+
673+
meter_provider.force_flush().unwrap();
674+
675+
assert!(exported_count.load(Ordering::Relaxed) >= 1);
676+
}
677+
609678
#[test]
610679
fn collection_triggered_by_interval_multiple() {
611680
// Arrange

opentelemetry-sdk/src/trace/span_processor.rs

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -988,6 +988,7 @@ mod tests {
988988
OTEL_BSP_SCHEDULE_DELAY, OTEL_BSP_SCHEDULE_DELAY_DEFAULT,
989989
};
990990
use crate::error::OTelSdkResult;
991+
use crate::util::BlockingStrategy;
991992
use crate::testing::trace::new_test_export_span_data;
992993
use crate::trace::span_processor::{
993994
OTEL_BSP_EXPORT_TIMEOUT_DEFAULT, OTEL_BSP_MAX_CONCURRENT_EXPORTS,
@@ -1302,13 +1303,15 @@ mod tests {
13021303
sender.send(create_test_span("counted")).unwrap();
13031304
sender.send(create_test_span("unaccounted")).unwrap();
13041305

1306+
let blocking_strategy = BlockingStrategy::new();
13051307
let result = BatchSpanProcessor::get_spans_and_export(
13061308
&receiver,
13071309
&exporter,
13081310
&mut spans,
13091311
&mut last_export_time,
13101312
&current_batch_size,
13111313
&config,
1314+
&blocking_strategy,
13121315
);
13131316

13141317
assert!(result.is_ok(), "export should succeed");
@@ -1546,6 +1549,39 @@ mod tests {
15461549
);
15471550
}
15481551

1552+
// Mock exporter that uses tokio::spawn internally, simulating tonic/gRPC
1553+
// exporters where the export future depends on tokio tasks. Without
1554+
// BlockingStrategy, this deadlocks on constrained runtimes because
1555+
// futures_executor::block_on() cannot drive tokio::spawn-ed tasks.
1556+
#[derive(Debug, Clone)]
1557+
struct TokioSpawnSpanExporter {
1558+
exported_spans: Arc<Mutex<Vec<SpanData>>>,
1559+
}
1560+
1561+
impl TokioSpawnSpanExporter {
1562+
fn new() -> Self {
1563+
Self {
1564+
exported_spans: Arc::new(Mutex::new(Vec::new())),
1565+
}
1566+
}
1567+
}
1568+
1569+
impl SpanExporter for TokioSpawnSpanExporter {
1570+
async fn export(&self, batch: Vec<SpanData>) -> OTelSdkResult {
1571+
// Simulate tonic/gRPC: the export future depends on a tokio::spawn-ed task.
1572+
// Without tokio runtime context, this panics or deadlocks.
1573+
let count = batch.len();
1574+
let result = tokio::spawn(async move { count }).await.unwrap();
1575+
assert_eq!(result, batch.len());
1576+
self.exported_spans.lock().unwrap().extend(batch);
1577+
Ok(())
1578+
}
1579+
1580+
fn shutdown(&self) -> OTelSdkResult {
1581+
Ok(())
1582+
}
1583+
}
1584+
15491585
#[tokio::test(flavor = "current_thread")]
15501586
async fn test_batch_processor_current_thread_runtime() {
15511587
let exporter = MockSpanExporter::new();
@@ -1592,6 +1628,42 @@ mod tests {
15921628
assert_eq!(exported_spans.len(), 4);
15931629
}
15941630

1631+
// Regression test for deadlock on constrained tokio runtimes (#2802, #3356).
1632+
// Uses TokioSpawnSpanExporter which internally calls tokio::spawn(),
1633+
// simulating tonic/gRPC exporters where the export future depends on
1634+
// tokio-spawned tasks. Without BlockingStrategy, this deadlocks because
1635+
// futures_executor::block_on() cannot drive tokio::spawn-ed tasks
1636+
// without the runtime context.
1637+
//
1638+
// Note: current_thread runtime is not tested here because it has a
1639+
// fundamental limitation — the single runtime thread is blocked by
1640+
// force_flush()'s recv(), so no thread is available to drive spawned
1641+
// tasks. The multi_thread(1) scenario (1-vCPU k8s pods) is the primary
1642+
// target of this fix.
1643+
1644+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1645+
async fn test_batch_processor_multi_thread_1_worker_with_tokio_spawn_exporter() {
1646+
let exporter = TokioSpawnSpanExporter::new();
1647+
let exporter_shared = exporter.exported_spans.clone();
1648+
1649+
let config = BatchConfigBuilder::default()
1650+
.with_max_queue_size(5)
1651+
.with_max_export_batch_size(3)
1652+
.build();
1653+
1654+
let processor = BatchSpanProcessor::new(exporter, config);
1655+
1656+
for _ in 0..4 {
1657+
let span = new_test_export_span_data();
1658+
processor.on_end(span);
1659+
}
1660+
1661+
processor.force_flush().unwrap();
1662+
1663+
let exported_spans = exporter_shared.lock().unwrap();
1664+
assert_eq!(exported_spans.len(), 4);
1665+
}
1666+
15951667
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
15961668
async fn test_batch_processor_multi_thread() {
15971669
let exporter = MockSpanExporter::new();

0 commit comments

Comments
 (0)