Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions opentelemetry-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,9 @@ trace = ["opentelemetry/trace", "rand", "percent-encoding"]
jaeger_remote_sampler = ["trace", "opentelemetry-http", "http", "serde", "serde_json", "url", "experimental_async_runtime"]
logs = ["opentelemetry/logs"]
metrics = ["opentelemetry/metrics"]
testing = ["opentelemetry/testing", "trace", "metrics", "logs", "tokio/sync"]
testing = ["opentelemetry/testing", "trace", "metrics", "logs", "rt-tokio", "tokio/macros", "tokio/rt-multi-thread"]
experimental_async_runtime = []
rt-tokio = ["tokio/rt", "tokio/time", "tokio-stream", "experimental_async_runtime"]
rt-tokio-current-thread = ["tokio/rt", "tokio/time", "tokio-stream", "experimental_async_runtime"]
internal-logs = ["opentelemetry/internal-logs"]
experimental_metrics_periodicreader_with_async_runtime = ["metrics", "experimental_async_runtime"]
spec_unstable_metrics_views = ["metrics"]
Expand Down
5 changes: 3 additions & 2 deletions opentelemetry-sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,9 @@
//! metrics aggregation can be added via the following flags:
//!
//! * `experimental_async_runtime`: Enables the experimental `Runtime` trait and related functionality.
//! * `rt-tokio`: Spawn telemetry tasks using [tokio]'s multi-thread runtime.
//! * `rt-tokio-current-thread`: Spawn telemetry tasks on a separate runtime so that the main runtime won't be blocked.
//! * `rt-tokio`: Spawn telemetry tasks using [tokio]'s runtime. Automatically detects the runtime
//! flavor (multi-threaded or current-thread) and uses the appropriate spawning strategy to avoid
//! deadlocks.
//!
//! [tokio]: https://crates.io/crates/tokio
#![warn(
Expand Down
53 changes: 14 additions & 39 deletions opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ enum BatchMessage {
/// them at a pre-configured interval.
pub struct BatchLogProcessor<R: RuntimeChannel> {
message_sender: R::Sender<BatchMessage>,
runtime: R,

// Track dropped logs - we'll log this at shutdown
dropped_logs_count: AtomicUsize,
Expand Down Expand Up @@ -82,7 +83,8 @@ impl<R: RuntimeChannel> LogProcessor for BatchLogProcessor<R> {
.try_send(BatchMessage::Flush(Some(res_sender)))
.map_err(|err| OTelSdkError::InternalFailure(format!("{err:?}")))?;

futures_executor::block_on(res_receiver)
self.runtime
.block_on(res_receiver)
.map_err(|err| OTelSdkError::InternalFailure(format!("{err:?}")))
.and_then(std::convert::identity)
}
Expand All @@ -103,7 +105,8 @@ impl<R: RuntimeChannel> LogProcessor for BatchLogProcessor<R> {
.try_send(BatchMessage::Shutdown(res_sender))
.map_err(|err| OTelSdkError::InternalFailure(format!("{err:?}")))?;

futures_executor::block_on(res_receiver)
self.runtime
.block_on(res_receiver)
.map_err(|err| OTelSdkError::InternalFailure(format!("{err:?}")))
.and_then(std::convert::identity)
}
Expand Down Expand Up @@ -208,6 +211,7 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
// Return batch processor with link to worker
BatchLogProcessor {
message_sender,
runtime,
dropped_logs_count: AtomicUsize::new(0),
max_queue_size: config.max_queue_size,
}
Expand Down Expand Up @@ -551,40 +555,17 @@ mod tests {
#[tokio::test(flavor = "current_thread")]
async fn test_batch_log_processor_shutdown_under_async_runtime_current_flavor_multi_thread() {
let exporter = InMemoryLogExporterBuilder::default().build();
let processor = BatchLogProcessor::new(
exporter.clone(),
BatchConfig::default(),
runtime::TokioCurrentThread,
);
let processor =
BatchLogProcessor::new(exporter.clone(), BatchConfig::default(), runtime::Tokio);

processor.shutdown().unwrap();
}

#[tokio::test(flavor = "current_thread")]
#[ignore = "See issue https://github.com/open-telemetry/opentelemetry-rust/issues/1968"]
async fn test_batch_log_processor_with_async_runtime_shutdown_under_async_runtime_current_flavor_multi_thread(
) {
let exporter = InMemoryLogExporterBuilder::default().build();
let processor = BatchLogProcessor::new(
exporter.clone(),
BatchConfig::default(),
runtime::TokioCurrentThread,
);

//
// deadlock happens in shutdown with tokio current_thread runtime
//
processor.shutdown().unwrap();
}

#[tokio::test(flavor = "current_thread")]
async fn test_batch_log_processor_shutdown_with_async_runtime_current_flavor_current_thread() {
let exporter = InMemoryLogExporterBuilder::default().build();
let processor = BatchLogProcessor::new(
exporter.clone(),
BatchConfig::default(),
runtime::TokioCurrentThread,
);
let processor =
BatchLogProcessor::new(exporter.clone(), BatchConfig::default(), runtime::Tokio);
processor.shutdown().unwrap();
}

Expand Down Expand Up @@ -827,11 +808,8 @@ mod tests {
async fn test_batch_log_processor_rt_shutdown_with_async_runtime_current_flavor_current_thread()
{
let exporter = InMemoryLogExporterBuilder::default().build();
let processor = BatchLogProcessor::new(
exporter.clone(),
BatchConfig::default(),
runtime::TokioCurrentThread,
);
let processor =
BatchLogProcessor::new(exporter.clone(), BatchConfig::default(), runtime::Tokio);

processor.shutdown().unwrap();
}
Expand All @@ -848,11 +826,8 @@ mod tests {
#[tokio::test(flavor = "multi_thread")]
async fn test_batch_log_processor_rt_shutdown_with_async_runtime_multi_flavor_current_thread() {
let exporter = InMemoryLogExporterBuilder::default().build();
let processor = BatchLogProcessor::new(
exporter.clone(),
BatchConfig::default(),
runtime::TokioCurrentThread,
);
let processor =
BatchLogProcessor::new(exporter.clone(), BatchConfig::default(), runtime::Tokio);

processor.shutdown().unwrap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,11 @@ where
}

/// Create a [PeriodicReader] with the given config.
pub fn build(self) -> PeriodicReader<E> {
pub fn build(self) -> PeriodicReader<E, RT> {
let (message_sender, message_receiver) = mpsc::channel(256);
let runtime = self.runtime.clone();

let worker = move |reader: &PeriodicReader<E>| {
let worker = move |reader: &PeriodicReader<E, RT>| {
let runtime = self.runtime.clone();
let reader = reader.clone();
self.runtime.spawn(async move {
Expand Down Expand Up @@ -144,6 +145,7 @@ where
is_shutdown: false,
sdk_producer_or_worker: ProducerOrWorker::Worker(Box::new(worker)),
})),
runtime,
}
}
}
Expand Down Expand Up @@ -185,40 +187,39 @@ where
/// # drop(reader);
/// # }
/// ```
pub struct PeriodicReader<E: PushMetricExporter> {
pub struct PeriodicReader<E: PushMetricExporter, R: Runtime> {
exporter: Arc<E>,
inner: Arc<Mutex<PeriodicReaderInner<E>>>,
inner: Arc<Mutex<PeriodicReaderInner<E, R>>>,
runtime: R,
}

impl<E: PushMetricExporter> Clone for PeriodicReader<E> {
impl<E: PushMetricExporter, R: Runtime> Clone for PeriodicReader<E, R> {
fn clone(&self) -> Self {
Self {
exporter: Arc::clone(&self.exporter),
inner: Arc::clone(&self.inner),
runtime: self.runtime.clone(),
}
}
}

impl<E: PushMetricExporter> PeriodicReader<E> {
impl<E: PushMetricExporter, R: Runtime> PeriodicReader<E, R> {
/// Configuration options for a periodic reader
pub fn builder<RT>(exporter: E, runtime: RT) -> PeriodicReaderBuilder<E, RT>
where
RT: Runtime,
{
pub fn builder(exporter: E, runtime: R) -> PeriodicReaderBuilder<E, R> {
PeriodicReaderBuilder::new(exporter, runtime)
}
}

impl<E: PushMetricExporter> fmt::Debug for PeriodicReader<E> {
impl<E: PushMetricExporter, R: Runtime> fmt::Debug for PeriodicReader<E, R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PeriodicReader").finish()
}
}

struct PeriodicReaderInner<E: PushMetricExporter> {
struct PeriodicReaderInner<E: PushMetricExporter, R: Runtime> {
message_sender: mpsc::Sender<Message>,
is_shutdown: bool,
sdk_producer_or_worker: ProducerOrWorker<E>,
sdk_producer_or_worker: ProducerOrWorker<E, R>,
}

#[derive(Debug)]
Expand All @@ -228,14 +229,14 @@ enum Message {
Shutdown(oneshot::Sender<OTelSdkResult>),
}

enum ProducerOrWorker<E: PushMetricExporter> {
enum ProducerOrWorker<E: PushMetricExporter, R: Runtime> {
Producer(Weak<dyn SdkProducer>),
#[allow(clippy::type_complexity)]
Worker(Box<dyn FnOnce(&PeriodicReader<E>) + Send + Sync>),
Worker(Box<dyn FnOnce(&PeriodicReader<E, R>) + Send + Sync>),
}

struct PeriodicReaderWorker<E: PushMetricExporter, RT: Runtime> {
reader: PeriodicReader<E>,
reader: PeriodicReader<E, RT>,
timeout: Duration,
runtime: RT,
rm: ResourceMetrics,
Expand Down Expand Up @@ -332,7 +333,7 @@ impl<E: PushMetricExporter, RT: Runtime> PeriodicReaderWorker<E, RT> {
}
}

impl<E: PushMetricExporter> MetricReader for PeriodicReader<E> {
impl<E: PushMetricExporter, R: Runtime> MetricReader for PeriodicReader<E, R> {
fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
let mut inner = match self.inner.lock() {
Ok(guard) => guard,
Expand Down Expand Up @@ -393,7 +394,8 @@ impl<E: PushMetricExporter> MetricReader for PeriodicReader<E> {

drop(inner); // don't hold lock when blocking on future

futures_executor::block_on(receiver)
self.runtime
.block_on(receiver)
.map_err(|err| OTelSdkError::InternalFailure(err.to_string()))
.and_then(|res| res)
}
Expand All @@ -414,7 +416,9 @@ impl<E: PushMetricExporter> MetricReader for PeriodicReader<E> {
.map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
drop(inner); // don't hold lock when blocking on future

let shutdown_result = futures_executor::block_on(receiver)
let shutdown_result = self
.runtime
.block_on(receiver)
.map_err(|err| OTelSdkError::InternalFailure(err.to_string()))?;

// Acquire the lock again to set the shutdown flag
Expand Down Expand Up @@ -452,43 +456,25 @@ mod tests {
use std::sync::mpsc;

#[test]
fn collection_triggered_by_interval_tokio_current() {
collection_triggered_by_interval_helper(runtime::TokioCurrentThread);
fn collection_triggered_by_interval_tokio() {
collection_triggered_by_interval_helper(runtime::Tokio);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn collection_triggered_by_interval_from_tokio_multi_one_thread_on_runtime_tokio() {
async fn collection_triggered_by_interval_from_tokio_multi_one_thread() {
collection_triggered_by_interval_helper(runtime::Tokio);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn collection_triggered_by_interval_from_tokio_multi_two_thread_on_runtime_tokio() {
async fn collection_triggered_by_interval_from_tokio_multi_two_thread() {
collection_triggered_by_interval_helper(runtime::Tokio);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn collection_triggered_by_interval_from_tokio_multi_one_thread_on_runtime_tokio_current()
{
collection_triggered_by_interval_helper(runtime::TokioCurrentThread);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn collection_triggered_by_interval_from_tokio_multi_two_thread_on_runtime_tokio_current()
{
collection_triggered_by_interval_helper(runtime::TokioCurrentThread);
}

#[tokio::test(flavor = "current_thread")]
#[ignore = "See issue https://github.com/open-telemetry/opentelemetry-rust/issues/2056"]
async fn collection_triggered_by_interval_from_tokio_current_on_runtime_tokio() {
async fn collection_triggered_by_interval_from_tokio_current_thread() {
collection_triggered_by_interval_helper(runtime::Tokio);
}

#[tokio::test(flavor = "current_thread")]
async fn collection_triggered_by_interval_from_tokio_current_on_runtime_tokio_current() {
collection_triggered_by_interval_helper(runtime::TokioCurrentThread);
}

#[test]
fn unregistered_collect() {
// Arrange
Expand Down Expand Up @@ -534,4 +520,15 @@ mod tests {
.recv()
.expect("message should be available in channel, indicating a collection occurred");
}

/// Regression test for https://github.com/open-telemetry/opentelemetry-rust/issues/2802
#[tokio::test]
async fn shutdown_does_not_deadlock_on_current_thread_tokio_runtime() {
let exporter = InMemoryMetricExporter::default();
let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio)
.with_interval(std::time::Duration::from_secs(10))
.build();
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
meter_provider.shutdown().expect("shutdown should succeed");
}
}
Loading
Loading