Skip to content

Commit d37cca1

Browse files
committed
test(sdk): add regression tests for tokio runtime deadlock fix
Add tests with TokioSpawn*Exporter mocks that call tokio::spawn() inside export(), simulating tonic/gRPC exporters. These prove that BlockingStrategy correctly provides tokio runtime context on the processor's dedicated OS thread, preventing deadlocks on constrained multi_thread(1) runtimes (open-telemetry#2802, open-telemetry#3356).
1 parent 92a6a3b commit d37cca1

3 files changed

Lines changed: 198 additions & 0 deletions

File tree

opentelemetry-sdk/src/logs/batch_log_processor.rs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1036,6 +1036,66 @@ mod tests {
10361036
processor.shutdown().unwrap();
10371037
}
10381038

1039+
// Mock exporter that uses tokio::spawn internally, simulating tonic/gRPC
1040+
// exporters where the export future depends on tokio tasks. Without
1041+
// BlockingStrategy, this deadlocks on constrained runtimes because
1042+
// futures_executor::block_on() cannot drive tokio::spawn-ed tasks.
1043+
#[derive(Debug, Clone)]
1044+
struct TokioSpawnLogExporter {
1045+
exported_count: Arc<AtomicUsize>,
1046+
}
1047+
1048+
impl TokioSpawnLogExporter {
1049+
fn new() -> Self {
1050+
Self {
1051+
exported_count: Arc::new(AtomicUsize::new(0)),
1052+
}
1053+
}
1054+
}
1055+
1056+
impl LogExporter for TokioSpawnLogExporter {
1057+
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
1058+
let count = batch.len();
1059+
// Simulate tonic/gRPC: the export future depends on a tokio::spawn-ed task.
1060+
let result = tokio::spawn(async move { count }).await.unwrap();
1061+
assert_eq!(result, count);
1062+
self.exported_count.fetch_add(count, Ordering::Relaxed);
1063+
Ok(())
1064+
}
1065+
1066+
fn shutdown(&self) -> OTelSdkResult {
1067+
Ok(())
1068+
}
1069+
}
1070+
1071+
// Regression test for deadlock on constrained tokio runtimes (#2802, #3356).
1072+
// Uses TokioSpawnLogExporter which internally calls tokio::spawn(),
1073+
// simulating tonic/gRPC exporters where the export future depends on
1074+
// tokio-spawned tasks. Without BlockingStrategy, this deadlocks because
1075+
// futures_executor::block_on() cannot drive tokio::spawn-ed tasks
1076+
// without the runtime context.
1077+
//
1078+
// Note: current_thread runtime is not tested here because it has a
1079+
// fundamental limitation — the single runtime thread is blocked by
1080+
// force_flush()'s recv(), so no thread is available to drive spawned
1081+
// tasks. The multi_thread(1) scenario (1-vCPU k8s pods) is the primary
1082+
// target of this fix.
1083+
1084+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1085+
async fn test_batch_log_processor_multi_thread_1_worker_with_tokio_spawn_exporter() {
1086+
let exporter = TokioSpawnLogExporter::new();
1087+
let exported_count = exporter.exported_count.clone();
1088+
let processor = BatchLogProcessor::new(exporter, BatchConfig::default());
1089+
1090+
let mut record = SdkLogRecord::new();
1091+
let instrumentation = InstrumentationScope::default();
1092+
processor.emit(&mut record, &instrumentation);
1093+
1094+
processor.force_flush().unwrap();
1095+
1096+
assert_eq!(exported_count.load(Ordering::Relaxed), 1);
1097+
}
1098+
10391099
#[tokio::test(flavor = "multi_thread")]
10401100
async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_current_thread() {
10411101
let exporter = InMemoryLogExporterBuilder::default().build();

opentelemetry-sdk/src/metrics/periodic_reader.rs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -606,6 +606,75 @@ mod tests {
606606
}
607607
}
608608

609+
// Mock exporter that uses tokio::spawn internally, simulating tonic/gRPC
610+
// exporters where the export future depends on tokio tasks. Without
611+
// BlockingStrategy, this deadlocks on constrained runtimes because
612+
// futures_executor::block_on() cannot drive tokio::spawn-ed tasks.
613+
#[derive(Debug, Clone, Default)]
614+
struct TokioSpawnMetricExporter {
615+
exported_count: Arc<AtomicUsize>,
616+
is_shutdown: Arc<AtomicBool>,
617+
}
618+
619+
impl PushMetricExporter for TokioSpawnMetricExporter {
620+
async fn export(&self, _metrics: &ResourceMetrics) -> OTelSdkResult {
621+
// Simulate tonic/gRPC: the export future depends on a tokio::spawn-ed task.
622+
let result = tokio::spawn(async { 42 }).await.unwrap();
623+
assert_eq!(result, 42);
624+
self.exported_count.fetch_add(1, Ordering::Relaxed);
625+
Ok(())
626+
}
627+
628+
fn force_flush(&self) -> OTelSdkResult {
629+
Ok(())
630+
}
631+
632+
fn shutdown(&self) -> OTelSdkResult {
633+
self.is_shutdown.store(true, Ordering::Relaxed);
634+
Ok(())
635+
}
636+
637+
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
638+
self.shutdown()
639+
}
640+
641+
fn temporality(&self) -> Temporality {
642+
Temporality::Cumulative
643+
}
644+
}
645+
646+
// Regression test for deadlock on constrained tokio runtimes (#2802, #3356).
647+
// Uses TokioSpawnMetricExporter which internally calls tokio::spawn(),
648+
// simulating tonic/gRPC exporters where the export future depends on
649+
// tokio-spawned tasks. Without BlockingStrategy, this deadlocks because
650+
// futures_executor::block_on() cannot drive tokio::spawn-ed tasks
651+
// without the runtime context.
652+
//
653+
// Note: current_thread runtime is not tested here because it has a
654+
// fundamental limitation — the single runtime thread is blocked by
655+
// force_flush()'s recv(), so no thread is available to drive spawned
656+
// tasks. The multi_thread(1) scenario (1-vCPU k8s pods) is the primary
657+
// target of this fix.
658+
659+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
660+
async fn test_periodic_reader_multi_thread_1_worker_with_tokio_spawn_exporter() {
661+
let exporter = TokioSpawnMetricExporter::default();
662+
let exported_count = exporter.exported_count.clone();
663+
664+
let reader = PeriodicReader::builder(exporter)
665+
.with_interval(Duration::from_secs(120))
666+
.build();
667+
668+
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
669+
let meter = meter_provider.meter("test");
670+
let counter = meter.u64_counter("test.counter").build();
671+
counter.add(1, &[]);
672+
673+
meter_provider.force_flush().unwrap();
674+
675+
assert!(exported_count.load(Ordering::Relaxed) >= 1);
676+
}
677+
609678
#[test]
610679
fn collection_triggered_by_interval_multiple() {
611680
// Arrange

opentelemetry-sdk/src/trace/span_processor.rs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1546,6 +1546,39 @@ mod tests {
15461546
);
15471547
}
15481548

1549+
// Mock exporter that uses tokio::spawn internally, simulating tonic/gRPC
1550+
// exporters where the export future depends on tokio tasks. Without
1551+
// BlockingStrategy, this deadlocks on constrained runtimes because
1552+
// futures_executor::block_on() cannot drive tokio::spawn-ed tasks.
1553+
#[derive(Debug, Clone)]
1554+
struct TokioSpawnSpanExporter {
1555+
exported_spans: Arc<Mutex<Vec<SpanData>>>,
1556+
}
1557+
1558+
impl TokioSpawnSpanExporter {
1559+
fn new() -> Self {
1560+
Self {
1561+
exported_spans: Arc::new(Mutex::new(Vec::new())),
1562+
}
1563+
}
1564+
}
1565+
1566+
impl SpanExporter for TokioSpawnSpanExporter {
1567+
async fn export(&self, batch: Vec<SpanData>) -> OTelSdkResult {
1568+
// Simulate tonic/gRPC: the export future depends on a tokio::spawn-ed task.
1569+
// Without tokio runtime context, this panics or deadlocks.
1570+
let count = batch.len();
1571+
let result = tokio::spawn(async move { count }).await.unwrap();
1572+
assert_eq!(result, batch.len());
1573+
self.exported_spans.lock().unwrap().extend(batch);
1574+
Ok(())
1575+
}
1576+
1577+
fn shutdown(&self) -> OTelSdkResult {
1578+
Ok(())
1579+
}
1580+
}
1581+
15491582
#[tokio::test(flavor = "current_thread")]
15501583
async fn test_batch_processor_current_thread_runtime() {
15511584
let exporter = MockSpanExporter::new();
@@ -1592,6 +1625,42 @@ mod tests {
15921625
assert_eq!(exported_spans.len(), 4);
15931626
}
15941627

1628+
// Regression test for deadlock on constrained tokio runtimes (#2802, #3356).
1629+
// Uses TokioSpawnSpanExporter which internally calls tokio::spawn(),
1630+
// simulating tonic/gRPC exporters where the export future depends on
1631+
// tokio-spawned tasks. Without BlockingStrategy, this deadlocks because
1632+
// futures_executor::block_on() cannot drive tokio::spawn-ed tasks
1633+
// without the runtime context.
1634+
//
1635+
// Note: current_thread runtime is not tested here because it has a
1636+
// fundamental limitation — the single runtime thread is blocked by
1637+
// force_flush()'s recv(), so no thread is available to drive spawned
1638+
// tasks. The multi_thread(1) scenario (1-vCPU k8s pods) is the primary
1639+
// target of this fix.
1640+
1641+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1642+
async fn test_batch_processor_multi_thread_1_worker_with_tokio_spawn_exporter() {
1643+
let exporter = TokioSpawnSpanExporter::new();
1644+
let exporter_shared = exporter.exported_spans.clone();
1645+
1646+
let config = BatchConfigBuilder::default()
1647+
.with_max_queue_size(5)
1648+
.with_max_export_batch_size(3)
1649+
.build();
1650+
1651+
let processor = BatchSpanProcessor::new(exporter, config);
1652+
1653+
for _ in 0..4 {
1654+
let span = new_test_export_span_data();
1655+
processor.on_end(span);
1656+
}
1657+
1658+
processor.force_flush().unwrap();
1659+
1660+
let exported_spans = exporter_shared.lock().unwrap();
1661+
assert_eq!(exported_spans.len(), 4);
1662+
}
1663+
15951664
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
15961665
async fn test_batch_processor_multi_thread() {
15971666
let exporter = MockSpanExporter::new();

0 commit comments

Comments
 (0)