Skip to content

Commit 0b70ef7

Browse files
committed
test(sdk): add shutdown regression and timeout tests for processor test suite
Add coverage for processor test suite gaps identified in #3381: - Add shutdown() regression tests with TokioSpawn*Exporter mocks for BatchSpanProcessor, BatchLogProcessor, and PeriodicReader on multi_thread(1) runtime, verifying the same BlockingStrategy fix that resolved force_flush() deadlocks also works for shutdown(). - Add timeout behavior tests with HangingExporter mocks for BatchSpanProcessor and BatchLogProcessor, verifying that shutdown_with_timeout returns Err(Timeout) when exporters hang. - Improve documentation on SimpleLogProcessor's ignored deadlock tests, explaining they demonstrate inherent design limitations (not bugs) and linking to relevant issues (#2802, #3381). - Document current_thread runtime limitation across all processors and explain why PeriodicReader timeout test is omitted (hardcoded 5s timeout makes it too slow for regular test suite). Closes #3381
1 parent 7be8d1f commit 0b70ef7

4 files changed

Lines changed: 175 additions & 20 deletions

File tree

opentelemetry-sdk/src/logs/batch_log_processor.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1103,6 +1103,56 @@ mod tests {
11031103
processor.shutdown().unwrap();
11041104
}
11051105

1106+
// Regression test: shutdown() goes through the same export path as force_flush()
1107+
// (get_logs_and_export via BlockingStrategy) and then calls exporter.shutdown().
1108+
// Without BlockingStrategy, this would deadlock on multi_thread(1) just like
1109+
// force_flush() did (#2802, #3356).
1110+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1111+
async fn test_batch_log_processor_shutdown_with_tokio_spawn_exporter() {
1112+
let exporter = TokioSpawnLogExporter::new();
1113+
let exported_count = exporter.exported_count.clone();
1114+
let processor = BatchLogProcessor::new(exporter, BatchConfig::default());
1115+
1116+
let mut record = SdkLogRecord::new();
1117+
let instrumentation = InstrumentationScope::default();
1118+
processor.emit(&mut record, &instrumentation);
1119+
1120+
processor.shutdown().unwrap();
1121+
1122+
assert_eq!(exported_count.load(Ordering::Relaxed), 1);
1123+
}
1124+
1125+
// Test that shutdown() returns a timeout error when the exporter hangs.
1126+
// The BatchLogProcessor's shutdown_with_timeout uses recv_timeout (default 5s),
1127+
// so a hanging exporter should result in Err(Timeout).
1128+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1129+
async fn test_batch_log_processor_shutdown_timeout_with_hanging_exporter() {
1130+
#[derive(Debug)]
1131+
struct HangingLogExporter;
1132+
1133+
impl LogExporter for HangingLogExporter {
1134+
async fn export(&self, _batch: LogBatch<'_>) -> OTelSdkResult {
1135+
// Block forever, simulating an exporter that cannot complete
1136+
futures_util::future::pending::<()>().await;
1137+
Ok(())
1138+
}
1139+
1140+
fn shutdown(&self) -> OTelSdkResult {
1141+
Ok(())
1142+
}
1143+
}
1144+
1145+
let processor = BatchLogProcessor::new(HangingLogExporter, BatchConfig::default());
1146+
1147+
let mut record = SdkLogRecord::new();
1148+
let instrumentation = InstrumentationScope::default();
1149+
processor.emit(&mut record, &instrumentation);
1150+
1151+
// Use a short timeout to avoid slow tests
1152+
let result = processor.shutdown_with_timeout(Duration::from_millis(500));
1153+
assert!(result.is_err(), "Expected timeout error from hanging exporter");
1154+
}
1155+
11061156
/// A slow exporter that counts the number of logs received.
11071157
/// Used for stress testing the BatchLogProcessor.
11081158
#[derive(Debug, Clone)]

opentelemetry-sdk/src/logs/simple_log_processor.rs

Lines changed: 30 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -346,24 +346,25 @@ mod tests {
346346
);
347347
}
348348

349-
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
350-
#[ignore]
351-
// This test demonstrates a potential deadlock scenario in a multi-threaded Tokio runtime.
352-
// It spawns Tokio tasks equal to the number of runtime worker threads (4) to emit log events.
353-
// Each task attempts to acquire a mutex on the exporter in `SimpleLogProcessor::emit`.
354-
// Only one task obtains the lock, while the others are blocked, waiting for its release.
349+
// KNOWN DEADLOCK: This test is intentionally #[ignore]d because it demonstrates
350+
// an inherent limitation of SimpleLogProcessor with async exporters.
355351
//
356-
// The task holding the lock invokes the LogExporterThatRequiresTokio, which performs an
357-
// asynchronous operation (e.g., network I/O simulated by `tokio::sleep`). This operation
358-
// requires yielding control back to the Tokio runtime to make progress.
352+
// SimpleLogProcessor uses futures_executor::block_on() to synchronously execute
353+
// async export operations. When all tokio worker threads are occupied (one holding
354+
// the exporter mutex in block_on, the rest waiting for the mutex), no thread is
355+
// available to drive the tokio::spawn-ed tasks the exporter depends on.
359356
//
360-
// However, all worker threads are occupied:
361-
// - One thread is executing the async exporter operation
362-
// - Three threads are blocked waiting for the mutex
357+
// This is NOT a bug — it's a fundamental design trade-off of SimpleLogProcessor:
358+
// - SimpleLogProcessor is designed for debugging/testing, not production use
359+
// - For production with async exporters, use BatchLogProcessor which runs exports
360+
// on a dedicated background thread with BlockingStrategy
361+
// - Unlike BatchLogProcessor's deadlock (#2802) which was fixable with BlockingStrategy,
362+
// this deadlock cannot be fixed without changing SimpleLogProcessor's synchronous design
363363
//
364-
// This leads to a deadlock as there are no available threads to drive the async operation
365-
// to completion, preventing the mutex from being released. Consequently, neither the blocked
366-
// tasks nor the exporter can proceed.
364+
// See: https://github.com/open-telemetry/opentelemetry-rust/issues/3381
365+
// https://github.com/open-telemetry/opentelemetry-rust/issues/2802
366+
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
367+
#[ignore]
367368
async fn test_simple_processor_async_exporter_with_all_runtime_worker_threads_blocked() {
368369
let exporter = LogExporterThatRequiresTokio::new();
369370
let processor = Arc::new(SimpleLogProcessor::new(exporter.clone()));
@@ -426,13 +427,22 @@ mod tests {
426427
assert_eq!(exporter.len(), 1);
427428
}
428429

430+
// KNOWN DEADLOCK: This test is intentionally #[ignore]d because it demonstrates
431+
// a fundamental limitation of SimpleLogProcessor on current_thread runtimes.
432+
//
433+
// On a current_thread runtime, SimpleLogProcessor's futures_executor::block_on()
434+
// blocks the only available thread. Any exporter that depends on tokio (e.g.,
435+
// tokio::spawn, tokio::time::sleep, tonic gRPC) cannot make progress because
436+
// there is no thread available to drive the tokio runtime.
437+
//
438+
// This applies to ALL processors on current_thread runtimes with tokio-dependent
439+
// exporters — it's a fundamental limitation, not a bug. The multi_thread(1) scenario
440+
// (common in 1-vCPU k8s pods) IS supported via BlockingStrategy in batch processors.
441+
//
442+
// See: https://github.com/open-telemetry/opentelemetry-rust/issues/3381
443+
// https://github.com/open-telemetry/opentelemetry-rust/issues/2802
429444
#[tokio::test(flavor = "current_thread")]
430445
#[ignore]
431-
// This test uses a current-thread runtime, where all operations run on the main thread.
432-
// The processor emits a log event while the runtime is blocked using `futures::block_on`
433-
// to complete the export operation. The exporter, which performs an async operation and
434-
// requires the runtime, cannot progress because the main thread is already blocked.
435-
// This results in a deadlock, as the runtime cannot move forward.
436446
async fn test_simple_processor_async_exporter_with_current_thread_runtime() {
437447
let exporter = LogExporterThatRequiresTokio::new();
438448

opentelemetry-sdk/src/metrics/periodic_reader.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -675,6 +675,39 @@ mod tests {
675675
assert!(exported_count.load(Ordering::Relaxed) >= 1);
676676
}
677677

678+
// Regression test: shutdown() goes through the same export path as force_flush()
679+
// (collect_and_export via BlockingStrategy) and then calls exporter.shutdown().
680+
// Without BlockingStrategy, this would deadlock on multi_thread(1) just like
681+
// force_flush() did (#2802, #3356).
682+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
683+
async fn test_periodic_reader_shutdown_with_tokio_spawn_exporter() {
684+
let exporter = TokioSpawnMetricExporter::default();
685+
let exported_count = exporter.exported_count.clone();
686+
let is_shutdown = exporter.is_shutdown.clone();
687+
688+
let reader = PeriodicReader::builder(exporter)
689+
.with_interval(Duration::from_secs(120))
690+
.build();
691+
692+
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
693+
let meter = meter_provider.meter("test");
694+
let counter = meter.u64_counter("test.counter").build();
695+
counter.add(1, &[]);
696+
697+
meter_provider.shutdown().unwrap();
698+
699+
assert!(exported_count.load(Ordering::Relaxed) >= 1);
700+
assert!(is_shutdown.load(Ordering::Relaxed));
701+
}
702+
703+
// Note: A timeout test for PeriodicReader with a hanging exporter is not
704+
// included here because PeriodicReader::shutdown() has a hardcoded 5-second
705+
// timeout (see TODO in shutdown()) that is not configurable, making the test
706+
// too slow for the regular test suite. The timeout behavior is architecturally
707+
// identical to BatchSpanProcessor and BatchLogProcessor (recv_timeout on the
708+
// response channel), so those tests provide sufficient coverage.
709+
// See: https://github.com/open-telemetry/opentelemetry-rust/issues/3381
710+
678711
#[test]
679712
fn collection_triggered_by_interval_multiple() {
680713
// Arrange

opentelemetry-sdk/src/trace/span_processor.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1661,6 +1661,68 @@ mod tests {
16611661
assert_eq!(exported_spans.len(), 4);
16621662
}
16631663

1664+
// Regression test: shutdown() goes through the same export path as force_flush()
1665+
// (get_spans_and_export via BlockingStrategy) and then calls exporter.shutdown().
1666+
// Without BlockingStrategy, this would deadlock on multi_thread(1) just like
1667+
// force_flush() did (#2802, #3356).
1668+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1669+
async fn test_batch_processor_shutdown_with_tokio_spawn_exporter() {
1670+
let exporter = TokioSpawnSpanExporter::new();
1671+
let exporter_shared = exporter.exported_spans.clone();
1672+
1673+
let config = BatchConfigBuilder::default()
1674+
.with_max_queue_size(5)
1675+
.with_max_export_batch_size(3)
1676+
.build();
1677+
1678+
let processor = BatchSpanProcessor::new(exporter, config);
1679+
1680+
for _ in 0..4 {
1681+
let span = new_test_export_span_data();
1682+
processor.on_end(span);
1683+
}
1684+
1685+
processor.shutdown().unwrap();
1686+
1687+
let exported_spans = exporter_shared.lock().unwrap();
1688+
assert_eq!(exported_spans.len(), 4);
1689+
}
1690+
1691+
// Test that shutdown() returns a timeout error when the exporter hangs.
1692+
// The BatchSpanProcessor's shutdown_with_timeout uses recv_timeout (default 5s),
1693+
// so a hanging exporter should result in Err(Timeout).
1694+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1695+
async fn test_batch_processor_shutdown_timeout_with_hanging_exporter() {
1696+
#[derive(Debug)]
1697+
struct HangingSpanExporter;
1698+
1699+
impl SpanExporter for HangingSpanExporter {
1700+
async fn export(&self, _batch: Vec<SpanData>) -> OTelSdkResult {
1701+
// Block forever, simulating an exporter that cannot complete
1702+
futures_util::future::pending::<()>().await;
1703+
Ok(())
1704+
}
1705+
1706+
fn shutdown(&self) -> OTelSdkResult {
1707+
Ok(())
1708+
}
1709+
}
1710+
1711+
let config = BatchConfigBuilder::default()
1712+
.with_max_queue_size(5)
1713+
.with_max_export_batch_size(3)
1714+
.build();
1715+
1716+
let processor = BatchSpanProcessor::new(HangingSpanExporter, config);
1717+
1718+
let span = new_test_export_span_data();
1719+
processor.on_end(span);
1720+
1721+
// Use a short timeout to avoid slow tests
1722+
let result = processor.shutdown_with_timeout(Duration::from_millis(500));
1723+
assert!(result.is_err(), "Expected timeout error from hanging exporter");
1724+
}
1725+
16641726
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
16651727
async fn test_batch_processor_multi_thread() {
16661728
let exporter = MockSpanExporter::new();

0 commit comments

Comments
 (0)