Skip to content

Commit ccfa6f5

Browse files
committed
test(sdk): add regression tests for tokio runtime deadlock fix
Add tests with TokioSpawn*Exporter mocks that call tokio::spawn() inside export(), simulating tonic/gRPC exporters. These prove that BlockingStrategy correctly provides tokio runtime context on the processor's dedicated OS thread, preventing deadlocks on constrained multi_thread(1) runtimes (open-telemetry#2802, open-telemetry#3356).
1 parent 575ce93 commit ccfa6f5

3 files changed

Lines changed: 198 additions & 0 deletions

File tree

opentelemetry-sdk/src/logs/batch_log_processor.rs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1031,6 +1031,66 @@ mod tests {
10311031
processor.shutdown().unwrap();
10321032
}
10331033

1034+
// Mock exporter that uses tokio::spawn internally, simulating tonic/gRPC
1035+
// exporters where the export future depends on tokio tasks. Without
1036+
// BlockingStrategy, this deadlocks on constrained runtimes because
1037+
// futures_executor::block_on() cannot drive tokio::spawn-ed tasks.
1038+
#[derive(Debug, Clone)]
1039+
struct TokioSpawnLogExporter {
1040+
exported_count: Arc<AtomicUsize>,
1041+
}
1042+
1043+
impl TokioSpawnLogExporter {
1044+
fn new() -> Self {
1045+
Self {
1046+
exported_count: Arc::new(AtomicUsize::new(0)),
1047+
}
1048+
}
1049+
}
1050+
1051+
impl LogExporter for TokioSpawnLogExporter {
1052+
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
1053+
let count = batch.len();
1054+
// Simulate tonic/gRPC: the export future depends on a tokio::spawn-ed task.
1055+
let result = tokio::spawn(async move { count }).await.unwrap();
1056+
assert_eq!(result, count);
1057+
self.exported_count.fetch_add(count, Ordering::Relaxed);
1058+
Ok(())
1059+
}
1060+
1061+
fn shutdown(&self) -> OTelSdkResult {
1062+
Ok(())
1063+
}
1064+
}
1065+
1066+
// Regression test for deadlock on constrained tokio runtimes (#2802, #3356).
1067+
// Uses TokioSpawnLogExporter which internally calls tokio::spawn(),
1068+
// simulating tonic/gRPC exporters where the export future depends on
1069+
// tokio-spawned tasks. Without BlockingStrategy, this deadlocks because
1070+
// futures_executor::block_on() cannot drive tokio::spawn-ed tasks
1071+
// without the runtime context.
1072+
//
1073+
// Note: current_thread runtime is not tested here because it has a
1074+
// fundamental limitation — the single runtime thread is blocked by
1075+
// force_flush()'s recv(), so no thread is available to drive spawned
1076+
// tasks. The multi_thread(1) scenario (1-vCPU k8s pods) is the primary
1077+
// target of this fix.
1078+
1079+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1080+
async fn test_batch_log_processor_multi_thread_1_worker_with_tokio_spawn_exporter() {
1081+
let exporter = TokioSpawnLogExporter::new();
1082+
let exported_count = exporter.exported_count.clone();
1083+
let processor = BatchLogProcessor::new(exporter, BatchConfig::default());
1084+
1085+
let mut record = SdkLogRecord::new();
1086+
let instrumentation = InstrumentationScope::default();
1087+
processor.emit(&mut record, &instrumentation);
1088+
1089+
processor.force_flush().unwrap();
1090+
1091+
assert_eq!(exported_count.load(Ordering::Relaxed), 1);
1092+
}
1093+
10341094
#[tokio::test(flavor = "multi_thread")]
10351095
async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_current_thread() {
10361096
let exporter = InMemoryLogExporterBuilder::default().build();

opentelemetry-sdk/src/metrics/periodic_reader.rs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -606,6 +606,75 @@ mod tests {
606606
}
607607
}
608608

609+
// Mock exporter that uses tokio::spawn internally, simulating tonic/gRPC
610+
// exporters where the export future depends on tokio tasks. Without
611+
// BlockingStrategy, this deadlocks on constrained runtimes because
612+
// futures_executor::block_on() cannot drive tokio::spawn-ed tasks.
613+
#[derive(Debug, Clone, Default)]
614+
struct TokioSpawnMetricExporter {
615+
exported_count: Arc<AtomicUsize>,
616+
is_shutdown: Arc<AtomicBool>,
617+
}
618+
619+
impl PushMetricExporter for TokioSpawnMetricExporter {
620+
async fn export(&self, _metrics: &ResourceMetrics) -> OTelSdkResult {
621+
// Simulate tonic/gRPC: the export future depends on a tokio::spawn-ed task.
622+
let result = tokio::spawn(async { 42 }).await.unwrap();
623+
assert_eq!(result, 42);
624+
self.exported_count.fetch_add(1, Ordering::Relaxed);
625+
Ok(())
626+
}
627+
628+
fn force_flush(&self) -> OTelSdkResult {
629+
Ok(())
630+
}
631+
632+
fn shutdown(&self) -> OTelSdkResult {
633+
self.is_shutdown.store(true, Ordering::Relaxed);
634+
Ok(())
635+
}
636+
637+
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
638+
self.shutdown()
639+
}
640+
641+
fn temporality(&self) -> Temporality {
642+
Temporality::Cumulative
643+
}
644+
}
645+
646+
// Regression test for deadlock on constrained tokio runtimes (#2802, #3356).
647+
// Uses TokioSpawnMetricExporter which internally calls tokio::spawn(),
648+
// simulating tonic/gRPC exporters where the export future depends on
649+
// tokio-spawned tasks. Without BlockingStrategy, this deadlocks because
650+
// futures_executor::block_on() cannot drive tokio::spawn-ed tasks
651+
// without the runtime context.
652+
//
653+
// Note: current_thread runtime is not tested here because it has a
654+
// fundamental limitation — the single runtime thread is blocked by
655+
// force_flush()'s recv(), so no thread is available to drive spawned
656+
// tasks. The multi_thread(1) scenario (1-vCPU k8s pods) is the primary
657+
// target of this fix.
658+
659+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
660+
async fn test_periodic_reader_multi_thread_1_worker_with_tokio_spawn_exporter() {
661+
let exporter = TokioSpawnMetricExporter::default();
662+
let exported_count = exporter.exported_count.clone();
663+
664+
let reader = PeriodicReader::builder(exporter)
665+
.with_interval(Duration::from_secs(120))
666+
.build();
667+
668+
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
669+
let meter = meter_provider.meter("test");
670+
let counter = meter.u64_counter("test.counter").build();
671+
counter.add(1, &[]);
672+
673+
meter_provider.force_flush().unwrap();
674+
675+
assert!(exported_count.load(Ordering::Relaxed) >= 1);
676+
}
677+
609678
#[test]
610679
fn collection_triggered_by_interval_multiple() {
611680
// Arrange

opentelemetry-sdk/src/trace/span_processor.rs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1494,6 +1494,39 @@ mod tests {
14941494
);
14951495
}
14961496

1497+
// Mock exporter that uses tokio::spawn internally, simulating tonic/gRPC
1498+
// exporters where the export future depends on tokio tasks. Without
1499+
// BlockingStrategy, this deadlocks on constrained runtimes because
1500+
// futures_executor::block_on() cannot drive tokio::spawn-ed tasks.
1501+
#[derive(Debug, Clone)]
1502+
struct TokioSpawnSpanExporter {
1503+
exported_spans: Arc<Mutex<Vec<SpanData>>>,
1504+
}
1505+
1506+
impl TokioSpawnSpanExporter {
1507+
fn new() -> Self {
1508+
Self {
1509+
exported_spans: Arc::new(Mutex::new(Vec::new())),
1510+
}
1511+
}
1512+
}
1513+
1514+
impl SpanExporter for TokioSpawnSpanExporter {
1515+
async fn export(&self, batch: Vec<SpanData>) -> OTelSdkResult {
1516+
// Simulate tonic/gRPC: the export future depends on a tokio::spawn-ed task.
1517+
// Without tokio runtime context, this panics or deadlocks.
1518+
let count = batch.len();
1519+
let result = tokio::spawn(async move { count }).await.unwrap();
1520+
assert_eq!(result, batch.len());
1521+
self.exported_spans.lock().unwrap().extend(batch);
1522+
Ok(())
1523+
}
1524+
1525+
fn shutdown(&self) -> OTelSdkResult {
1526+
Ok(())
1527+
}
1528+
}
1529+
14971530
#[tokio::test(flavor = "current_thread")]
14981531
async fn test_batch_processor_current_thread_runtime() {
14991532
let exporter = MockSpanExporter::new();
@@ -1540,6 +1573,42 @@ mod tests {
15401573
assert_eq!(exported_spans.len(), 4);
15411574
}
15421575

1576+
// Regression test for deadlock on constrained tokio runtimes (#2802, #3356).
1577+
// Uses TokioSpawnSpanExporter which internally calls tokio::spawn(),
1578+
// simulating tonic/gRPC exporters where the export future depends on
1579+
// tokio-spawned tasks. Without BlockingStrategy, this deadlocks because
1580+
// futures_executor::block_on() cannot drive tokio::spawn-ed tasks
1581+
// without the runtime context.
1582+
//
1583+
// Note: current_thread runtime is not tested here because it has a
1584+
// fundamental limitation — the single runtime thread is blocked by
1585+
// force_flush()'s recv(), so no thread is available to drive spawned
1586+
// tasks. The multi_thread(1) scenario (1-vCPU k8s pods) is the primary
1587+
// target of this fix.
1588+
1589+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1590+
async fn test_batch_processor_multi_thread_1_worker_with_tokio_spawn_exporter() {
1591+
let exporter = TokioSpawnSpanExporter::new();
1592+
let exporter_shared = exporter.exported_spans.clone();
1593+
1594+
let config = BatchConfigBuilder::default()
1595+
.with_max_queue_size(5)
1596+
.with_max_export_batch_size(3)
1597+
.build();
1598+
1599+
let processor = BatchSpanProcessor::new(exporter, config);
1600+
1601+
for _ in 0..4 {
1602+
let span = new_test_export_span_data();
1603+
processor.on_end(span);
1604+
}
1605+
1606+
processor.force_flush().unwrap();
1607+
1608+
let exported_spans = exporter_shared.lock().unwrap();
1609+
assert_eq!(exported_spans.len(), 4);
1610+
}
1611+
15431612
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
15441613
async fn test_batch_processor_multi_thread() {
15451614
let exporter = MockSpanExporter::new();

0 commit comments

Comments
 (0)