Skip to content

Commit c3840f7

Browse files
committed
test(sdk): add regression tests for tokio runtime deadlock fix
Add tests with TokioSpawn*Exporter mocks that call tokio::spawn() inside export(), simulating tonic/gRPC exporters. These prove that BlockingStrategy correctly provides tokio runtime context on the processor's dedicated OS thread, preventing deadlocks on constrained multi_thread(1) runtimes (#2802, #3356).
1 parent 4c2b288 commit c3840f7

3 files changed

Lines changed: 198 additions & 0 deletions

File tree

opentelemetry-sdk/src/logs/batch_log_processor.rs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1008,6 +1008,66 @@ mod tests {
10081008
processor.shutdown().unwrap();
10091009
}
10101010

1011+
// Mock exporter that uses tokio::spawn internally, simulating tonic/gRPC
1012+
// exporters where the export future depends on tokio tasks. Without
1013+
// BlockingStrategy, this deadlocks on constrained runtimes because
1014+
// futures_executor::block_on() cannot drive tokio::spawn-ed tasks.
1015+
#[derive(Debug, Clone)]
1016+
struct TokioSpawnLogExporter {
1017+
exported_count: Arc<AtomicUsize>,
1018+
}
1019+
1020+
impl TokioSpawnLogExporter {
1021+
fn new() -> Self {
1022+
Self {
1023+
exported_count: Arc::new(AtomicUsize::new(0)),
1024+
}
1025+
}
1026+
}
1027+
1028+
impl LogExporter for TokioSpawnLogExporter {
1029+
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
1030+
let count = batch.len();
1031+
// Simulate tonic/gRPC: the export future depends on a tokio::spawn-ed task.
1032+
let result = tokio::spawn(async move { count }).await.unwrap();
1033+
assert_eq!(result, count);
1034+
self.exported_count.fetch_add(count, Ordering::Relaxed);
1035+
Ok(())
1036+
}
1037+
1038+
fn shutdown(&self) -> OTelSdkResult {
1039+
Ok(())
1040+
}
1041+
}
1042+
1043+
// Regression test for deadlock on constrained tokio runtimes (#2802, #3356).
1044+
// Uses TokioSpawnLogExporter which internally calls tokio::spawn(),
1045+
// simulating tonic/gRPC exporters where the export future depends on
1046+
// tokio-spawned tasks. Without BlockingStrategy, this deadlocks because
1047+
// futures_executor::block_on() cannot drive tokio::spawn-ed tasks
1048+
// without the runtime context.
1049+
//
1050+
// Note: current_thread runtime is not tested here because it has a
1051+
// fundamental limitation — the single runtime thread is blocked by
1052+
// force_flush()'s recv(), so no thread is available to drive spawned
1053+
// tasks. The multi_thread(1) scenario (1-vCPU k8s pods) is the primary
1054+
// target of this fix.
1055+
1056+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1057+
async fn test_batch_log_processor_multi_thread_1_worker_with_tokio_spawn_exporter() {
1058+
let exporter = TokioSpawnLogExporter::new();
1059+
let exported_count = exporter.exported_count.clone();
1060+
let processor = BatchLogProcessor::new(exporter, BatchConfig::default());
1061+
1062+
let mut record = SdkLogRecord::new();
1063+
let instrumentation = InstrumentationScope::default();
1064+
processor.emit(&mut record, &instrumentation);
1065+
1066+
processor.force_flush().unwrap();
1067+
1068+
assert_eq!(exported_count.load(Ordering::Relaxed), 1);
1069+
}
1070+
10111071
#[tokio::test(flavor = "multi_thread")]
10121072
async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_current_thread() {
10131073
let exporter = InMemoryLogExporterBuilder::default().build();

opentelemetry-sdk/src/metrics/periodic_reader.rs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -606,6 +606,75 @@ mod tests {
606606
}
607607
}
608608

609+
// Mock exporter that uses tokio::spawn internally, simulating tonic/gRPC
610+
// exporters where the export future depends on tokio tasks. Without
611+
// BlockingStrategy, this deadlocks on constrained runtimes because
612+
// futures_executor::block_on() cannot drive tokio::spawn-ed tasks.
613+
#[derive(Debug, Clone, Default)]
614+
struct TokioSpawnMetricExporter {
615+
exported_count: Arc<AtomicUsize>,
616+
is_shutdown: Arc<AtomicBool>,
617+
}
618+
619+
impl PushMetricExporter for TokioSpawnMetricExporter {
620+
async fn export(&self, _metrics: &ResourceMetrics) -> OTelSdkResult {
621+
// Simulate tonic/gRPC: the export future depends on a tokio::spawn-ed task.
622+
let result = tokio::spawn(async { 42 }).await.unwrap();
623+
assert_eq!(result, 42);
624+
self.exported_count.fetch_add(1, Ordering::Relaxed);
625+
Ok(())
626+
}
627+
628+
fn force_flush(&self) -> OTelSdkResult {
629+
Ok(())
630+
}
631+
632+
fn shutdown(&self) -> OTelSdkResult {
633+
self.is_shutdown.store(true, Ordering::Relaxed);
634+
Ok(())
635+
}
636+
637+
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
638+
self.shutdown()
639+
}
640+
641+
fn temporality(&self) -> Temporality {
642+
Temporality::Cumulative
643+
}
644+
}
645+
646+
// Regression test for deadlock on constrained tokio runtimes (#2802, #3356).
647+
// Uses TokioSpawnMetricExporter which internally calls tokio::spawn(),
648+
// simulating tonic/gRPC exporters where the export future depends on
649+
// tokio-spawned tasks. Without BlockingStrategy, this deadlocks because
650+
// futures_executor::block_on() cannot drive tokio::spawn-ed tasks
651+
// without the runtime context.
652+
//
653+
// Note: current_thread runtime is not tested here because it has a
654+
// fundamental limitation — the single runtime thread is blocked by
655+
// force_flush()'s recv(), so no thread is available to drive spawned
656+
// tasks. The multi_thread(1) scenario (1-vCPU k8s pods) is the primary
657+
// target of this fix.
658+
659+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
660+
async fn test_periodic_reader_multi_thread_1_worker_with_tokio_spawn_exporter() {
661+
let exporter = TokioSpawnMetricExporter::default();
662+
let exported_count = exporter.exported_count.clone();
663+
664+
let reader = PeriodicReader::builder(exporter)
665+
.with_interval(Duration::from_secs(120))
666+
.build();
667+
668+
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
669+
let meter = meter_provider.meter("test");
670+
let counter = meter.u64_counter("test.counter").build();
671+
counter.add(1, &[]);
672+
673+
meter_provider.force_flush().unwrap();
674+
675+
assert!(exported_count.load(Ordering::Relaxed) >= 1);
676+
}
677+
609678
#[test]
610679
fn collection_triggered_by_interval_multiple() {
611680
// Arrange

opentelemetry-sdk/src/trace/span_processor.rs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1430,6 +1430,39 @@ mod tests {
14301430
);
14311431
}
14321432

1433+
// Mock exporter that uses tokio::spawn internally, simulating tonic/gRPC
1434+
// exporters where the export future depends on tokio tasks. Without
1435+
// BlockingStrategy, this deadlocks on constrained runtimes because
1436+
// futures_executor::block_on() cannot drive tokio::spawn-ed tasks.
1437+
#[derive(Debug, Clone)]
1438+
struct TokioSpawnSpanExporter {
1439+
exported_spans: Arc<Mutex<Vec<SpanData>>>,
1440+
}
1441+
1442+
impl TokioSpawnSpanExporter {
1443+
fn new() -> Self {
1444+
Self {
1445+
exported_spans: Arc::new(Mutex::new(Vec::new())),
1446+
}
1447+
}
1448+
}
1449+
1450+
impl SpanExporter for TokioSpawnSpanExporter {
1451+
async fn export(&self, batch: Vec<SpanData>) -> OTelSdkResult {
1452+
// Simulate tonic/gRPC: the export future depends on a tokio::spawn-ed task.
1453+
// Without tokio runtime context, this panics or deadlocks.
1454+
let count = batch.len();
1455+
let result = tokio::spawn(async move { count }).await.unwrap();
1456+
assert_eq!(result, batch.len());
1457+
self.exported_spans.lock().unwrap().extend(batch);
1458+
Ok(())
1459+
}
1460+
1461+
fn shutdown(&self) -> OTelSdkResult {
1462+
Ok(())
1463+
}
1464+
}
1465+
14331466
#[tokio::test(flavor = "current_thread")]
14341467
async fn test_batch_processor_current_thread_runtime() {
14351468
let exporter = MockSpanExporter::new();
@@ -1476,6 +1509,42 @@ mod tests {
14761509
assert_eq!(exported_spans.len(), 4);
14771510
}
14781511

1512+
// Regression test for deadlock on constrained tokio runtimes (#2802, #3356).
1513+
// Uses TokioSpawnSpanExporter which internally calls tokio::spawn(),
1514+
// simulating tonic/gRPC exporters where the export future depends on
1515+
// tokio-spawned tasks. Without BlockingStrategy, this deadlocks because
1516+
// futures_executor::block_on() cannot drive tokio::spawn-ed tasks
1517+
// without the runtime context.
1518+
//
1519+
// Note: current_thread runtime is not tested here because it has a
1520+
// fundamental limitation — the single runtime thread is blocked by
1521+
// force_flush()'s recv(), so no thread is available to drive spawned
1522+
// tasks. The multi_thread(1) scenario (1-vCPU k8s pods) is the primary
1523+
// target of this fix.
1524+
1525+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1526+
async fn test_batch_processor_multi_thread_1_worker_with_tokio_spawn_exporter() {
1527+
let exporter = TokioSpawnSpanExporter::new();
1528+
let exporter_shared = exporter.exported_spans.clone();
1529+
1530+
let config = BatchConfigBuilder::default()
1531+
.with_max_queue_size(5)
1532+
.with_max_export_batch_size(3)
1533+
.build();
1534+
1535+
let processor = BatchSpanProcessor::new(exporter, config);
1536+
1537+
for _ in 0..4 {
1538+
let span = new_test_export_span_data();
1539+
processor.on_end(span);
1540+
}
1541+
1542+
processor.force_flush().unwrap();
1543+
1544+
let exported_spans = exporter_shared.lock().unwrap();
1545+
assert_eq!(exported_spans.len(), 4);
1546+
}
1547+
14791548
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
14801549
async fn test_batch_processor_multi_thread() {
14811550
let exporter = MockSpanExporter::new();

0 commit comments

Comments
 (0)