Skip to content

Commit ed5ff32

Browse files
committed
test(sdk): add regression tests for tokio runtime deadlock fix
Add tests with TokioSpawn*Exporter mocks that call tokio::spawn() inside export(), simulating tonic/gRPC exporters. These prove that BlockingStrategy correctly provides tokio runtime context on the processor's dedicated OS thread, preventing deadlocks on constrained multi_thread(1) runtimes (#2802, #3356).
1 parent 401a112 commit ed5ff32

3 files changed

Lines changed: 198 additions & 0 deletions

File tree

opentelemetry-sdk/src/logs/batch_log_processor.rs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1008,6 +1008,66 @@ mod tests {
10081008
processor.shutdown().unwrap();
10091009
}
10101010

1011+
// Mock exporter that uses tokio::spawn internally, simulating tonic/gRPC
1012+
// exporters where the export future depends on tokio tasks. Without
1013+
// BlockingStrategy, this deadlocks on constrained runtimes because
1014+
// futures_executor::block_on() cannot drive tokio::spawn-ed tasks.
1015+
#[derive(Debug, Clone)]
1016+
struct TokioSpawnLogExporter {
1017+
exported_count: Arc<AtomicUsize>,
1018+
}
1019+
1020+
impl TokioSpawnLogExporter {
1021+
fn new() -> Self {
1022+
Self {
1023+
exported_count: Arc::new(AtomicUsize::new(0)),
1024+
}
1025+
}
1026+
}
1027+
1028+
impl LogExporter for TokioSpawnLogExporter {
1029+
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
1030+
let count = batch.len();
1031+
// Simulate tonic/gRPC: the export future depends on a tokio::spawn-ed task.
1032+
let result = tokio::spawn(async move { count }).await.unwrap();
1033+
assert_eq!(result, count);
1034+
self.exported_count.fetch_add(count, Ordering::Relaxed);
1035+
Ok(())
1036+
}
1037+
1038+
fn shutdown(&self) -> OTelSdkResult {
1039+
Ok(())
1040+
}
1041+
}
1042+
1043+
// Regression test for deadlock on constrained tokio runtimes (#2802, #3356).
1044+
// Uses TokioSpawnLogExporter which internally calls tokio::spawn(),
1045+
// simulating tonic/gRPC exporters where the export future depends on
1046+
// tokio-spawned tasks. Without BlockingStrategy, this deadlocks because
1047+
// futures_executor::block_on() cannot drive tokio::spawn-ed tasks
1048+
// without the runtime context.
1049+
//
1050+
// Note: current_thread runtime is not tested here because it has a
1051+
// fundamental limitation — the single runtime thread is blocked by
1052+
// force_flush()'s recv(), so no thread is available to drive spawned
1053+
// tasks. The multi_thread(1) scenario (1-vCPU k8s pods) is the primary
1054+
// target of this fix.
1055+
1056+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1057+
async fn test_batch_log_processor_multi_thread_1_worker_with_tokio_spawn_exporter() {
1058+
let exporter = TokioSpawnLogExporter::new();
1059+
let exported_count = exporter.exported_count.clone();
1060+
let processor = BatchLogProcessor::new(exporter, BatchConfig::default());
1061+
1062+
let mut record = SdkLogRecord::new();
1063+
let instrumentation = InstrumentationScope::default();
1064+
processor.emit(&mut record, &instrumentation);
1065+
1066+
processor.force_flush().unwrap();
1067+
1068+
assert_eq!(exported_count.load(Ordering::Relaxed), 1);
1069+
}
1070+
10111071
#[tokio::test(flavor = "multi_thread")]
10121072
async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_current_thread() {
10131073
let exporter = InMemoryLogExporterBuilder::default().build();

opentelemetry-sdk/src/metrics/periodic_reader.rs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -606,6 +606,75 @@ mod tests {
606606
}
607607
}
608608

609+
// Mock exporter that uses tokio::spawn internally, simulating tonic/gRPC
610+
// exporters where the export future depends on tokio tasks. Without
611+
// BlockingStrategy, this deadlocks on constrained runtimes because
612+
// futures_executor::block_on() cannot drive tokio::spawn-ed tasks.
613+
#[derive(Debug, Clone, Default)]
614+
struct TokioSpawnMetricExporter {
615+
exported_count: Arc<AtomicUsize>,
616+
is_shutdown: Arc<AtomicBool>,
617+
}
618+
619+
impl PushMetricExporter for TokioSpawnMetricExporter {
620+
async fn export(&self, _metrics: &ResourceMetrics) -> OTelSdkResult {
621+
// Simulate tonic/gRPC: the export future depends on a tokio::spawn-ed task.
622+
let result = tokio::spawn(async { 42 }).await.unwrap();
623+
assert_eq!(result, 42);
624+
self.exported_count.fetch_add(1, Ordering::Relaxed);
625+
Ok(())
626+
}
627+
628+
fn force_flush(&self) -> OTelSdkResult {
629+
Ok(())
630+
}
631+
632+
fn shutdown(&self) -> OTelSdkResult {
633+
self.is_shutdown.store(true, Ordering::Relaxed);
634+
Ok(())
635+
}
636+
637+
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
638+
self.shutdown()
639+
}
640+
641+
fn temporality(&self) -> Temporality {
642+
Temporality::Cumulative
643+
}
644+
}
645+
646+
// Regression test for deadlock on constrained tokio runtimes (#2802, #3356).
647+
// Uses TokioSpawnMetricExporter which internally calls tokio::spawn(),
648+
// simulating tonic/gRPC exporters where the export future depends on
649+
// tokio-spawned tasks. Without BlockingStrategy, this deadlocks because
650+
// futures_executor::block_on() cannot drive tokio::spawn-ed tasks
651+
// without the runtime context.
652+
//
653+
// Note: current_thread runtime is not tested here because it has a
654+
// fundamental limitation — the single runtime thread is blocked by
655+
// force_flush()'s recv(), so no thread is available to drive spawned
656+
// tasks. The multi_thread(1) scenario (1-vCPU k8s pods) is the primary
657+
// target of this fix.
658+
659+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
660+
async fn test_periodic_reader_multi_thread_1_worker_with_tokio_spawn_exporter() {
661+
let exporter = TokioSpawnMetricExporter::default();
662+
let exported_count = exporter.exported_count.clone();
663+
664+
let reader = PeriodicReader::builder(exporter)
665+
.with_interval(Duration::from_secs(120))
666+
.build();
667+
668+
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
669+
let meter = meter_provider.meter("test");
670+
let counter = meter.u64_counter("test.counter").build();
671+
counter.add(1, &[]);
672+
673+
meter_provider.force_flush().unwrap();
674+
675+
assert!(exported_count.load(Ordering::Relaxed) >= 1);
676+
}
677+
609678
#[test]
610679
fn collection_triggered_by_interval_multiple() {
611680
// Arrange

opentelemetry-sdk/src/trace/span_processor.rs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1492,6 +1492,39 @@ mod tests {
14921492
);
14931493
}
14941494

1495+
// Mock exporter that uses tokio::spawn internally, simulating tonic/gRPC
1496+
// exporters where the export future depends on tokio tasks. Without
1497+
// BlockingStrategy, this deadlocks on constrained runtimes because
1498+
// futures_executor::block_on() cannot drive tokio::spawn-ed tasks.
1499+
#[derive(Debug, Clone)]
1500+
struct TokioSpawnSpanExporter {
1501+
exported_spans: Arc<Mutex<Vec<SpanData>>>,
1502+
}
1503+
1504+
impl TokioSpawnSpanExporter {
1505+
fn new() -> Self {
1506+
Self {
1507+
exported_spans: Arc::new(Mutex::new(Vec::new())),
1508+
}
1509+
}
1510+
}
1511+
1512+
impl SpanExporter for TokioSpawnSpanExporter {
1513+
async fn export(&self, batch: Vec<SpanData>) -> OTelSdkResult {
1514+
// Simulate tonic/gRPC: the export future depends on a tokio::spawn-ed task.
1515+
// Without tokio runtime context, this panics or deadlocks.
1516+
let count = batch.len();
1517+
let result = tokio::spawn(async move { count }).await.unwrap();
1518+
assert_eq!(result, batch.len());
1519+
self.exported_spans.lock().unwrap().extend(batch);
1520+
Ok(())
1521+
}
1522+
1523+
fn shutdown(&self) -> OTelSdkResult {
1524+
Ok(())
1525+
}
1526+
}
1527+
14951528
#[tokio::test(flavor = "current_thread")]
14961529
async fn test_batch_processor_current_thread_runtime() {
14971530
let exporter = MockSpanExporter::new();
@@ -1538,6 +1571,42 @@ mod tests {
15381571
assert_eq!(exported_spans.len(), 4);
15391572
}
15401573

1574+
// Regression test for deadlock on constrained tokio runtimes (#2802, #3356).
1575+
// Uses TokioSpawnSpanExporter which internally calls tokio::spawn(),
1576+
// simulating tonic/gRPC exporters where the export future depends on
1577+
// tokio-spawned tasks. Without BlockingStrategy, this deadlocks because
1578+
// futures_executor::block_on() cannot drive tokio::spawn-ed tasks
1579+
// without the runtime context.
1580+
//
1581+
// Note: current_thread runtime is not tested here because it has a
1582+
// fundamental limitation — the single runtime thread is blocked by
1583+
// force_flush()'s recv(), so no thread is available to drive spawned
1584+
// tasks. The multi_thread(1) scenario (1-vCPU k8s pods) is the primary
1585+
// target of this fix.
1586+
1587+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1588+
async fn test_batch_processor_multi_thread_1_worker_with_tokio_spawn_exporter() {
1589+
let exporter = TokioSpawnSpanExporter::new();
1590+
let exporter_shared = exporter.exported_spans.clone();
1591+
1592+
let config = BatchConfigBuilder::default()
1593+
.with_max_queue_size(5)
1594+
.with_max_export_batch_size(3)
1595+
.build();
1596+
1597+
let processor = BatchSpanProcessor::new(exporter, config);
1598+
1599+
for _ in 0..4 {
1600+
let span = new_test_export_span_data();
1601+
processor.on_end(span);
1602+
}
1603+
1604+
processor.force_flush().unwrap();
1605+
1606+
let exported_spans = exporter_shared.lock().unwrap();
1607+
assert_eq!(exported_spans.len(), 4);
1608+
}
1609+
15411610
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
15421611
async fn test_batch_processor_multi_thread() {
15431612
let exporter = MockSpanExporter::new();

0 commit comments

Comments
 (0)