diff --git a/opentelemetry-sdk/Cargo.toml b/opentelemetry-sdk/Cargo.toml index 1f2aaccde5..1062499e36 100644 --- a/opentelemetry-sdk/Cargo.toml +++ b/opentelemetry-sdk/Cargo.toml @@ -47,17 +47,13 @@ trace = ["opentelemetry/trace", "rand", "percent-encoding"] jaeger_remote_sampler = ["trace", "opentelemetry-http", "http", "serde", "serde_json", "url", "experimental_async_runtime"] logs = ["opentelemetry/logs"] metrics = ["opentelemetry/metrics"] -testing = ["opentelemetry/testing", "trace", "metrics", "logs", "rt-tokio", "rt-tokio-current-thread", "tokio/macros", "tokio/rt-multi-thread"] +testing = ["opentelemetry/testing", "trace", "metrics", "logs", "rt-tokio", "tokio/macros", "tokio/rt-multi-thread"] experimental_async_runtime = [] rt-tokio = ["tokio/rt", "tokio/time", "tokio-stream", "experimental_async_runtime"] -rt-tokio-current-thread = ["tokio/rt", "tokio/time", "tokio-stream", "experimental_async_runtime"] internal-logs = ["opentelemetry/internal-logs"] -experimental_metrics_periodicreader_with_async_runtime = ["metrics", "experimental_async_runtime"] spec_unstable_metrics_views = ["metrics"] experimental_metrics_custom_reader = ["metrics"] -experimental_logs_batch_log_processor_with_async_runtime = ["logs", "experimental_async_runtime"] experimental_logs_concurrent_log_processor = ["logs"] -experimental_trace_batch_span_processor_with_async_runtime = ["tokio/sync", "trace", "experimental_async_runtime"] experimental_metrics_disable_name_validation = ["metrics"] bench_profiling = [] diff --git a/opentelemetry-sdk/src/lib.rs b/opentelemetry-sdk/src/lib.rs index ebf84c4164..66bb04fb24 100644 --- a/opentelemetry-sdk/src/lib.rs +++ b/opentelemetry-sdk/src/lib.rs @@ -89,8 +89,7 @@ //! metrics aggregation can be added via the following flags: //! //! * `experimental_async_runtime`: Enables the experimental `Runtime` trait and related functionality. -//! * `rt-tokio`: Spawn telemetry tasks using [tokio]'s multi-thread runtime. -//! * `rt-tokio-current-thread`: Spawn telemetry tasks on a separate runtime so that the main runtime won't be blocked. +//! * `rt-tokio`: Spawn telemetry tasks using [tokio]. Automatically detects the runtime flavor (multi-thread or current-thread). //! //! [tokio]: https://crates.io/crates/tokio #![warn( diff --git a/opentelemetry-sdk/src/logs/batch_log_processor.rs b/opentelemetry-sdk/src/logs/batch_log_processor.rs index 3092bc6825..33e4c5f7a3 100644 --- a/opentelemetry-sdk/src/logs/batch_log_processor.rs +++ b/opentelemetry-sdk/src/logs/batch_log_processor.rs @@ -17,6 +17,7 @@ use crate::error::{OTelSdkError, OTelSdkResult}; use crate::logs::log_processor::LogProcessor; +use crate::util::BlockingStrategy; use crate::{ logs::{LogBatch, LogExporter, SdkLogRecord}, Resource, @@ -40,12 +41,6 @@ use std::{ pub(crate) const OTEL_BLRP_SCHEDULE_DELAY: &str = "OTEL_BLRP_SCHEDULE_DELAY"; /// Default delay interval between two consecutive exports. pub(crate) const OTEL_BLRP_SCHEDULE_DELAY_DEFAULT: Duration = Duration::from_millis(1_000); -/// Maximum allowed time to export data. -#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] -pub(crate) const OTEL_BLRP_EXPORT_TIMEOUT: &str = "OTEL_BLRP_EXPORT_TIMEOUT"; -/// Default maximum allowed time to export data. -#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] -pub(crate) const OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT: Duration = Duration::from_millis(30_000); /// Maximum queue size. pub(crate) const OTEL_BLRP_MAX_QUEUE_SIZE: &str = "OTEL_BLRP_MAX_QUEUE_SIZE"; /// Default maximum queue size. @@ -342,6 +337,7 @@ impl BatchLogProcessor { let max_export_batch_size = config.max_export_batch_size; let current_batch_size = Arc::new(AtomicUsize::new(0)); let current_batch_size_for_thread = current_batch_size.clone(); + let blocking_strategy = BlockingStrategy::new(); let handle = thread::Builder::new() .name("OpenTelemetry.Logs.BatchProcessor".to_string()) @@ -368,6 +364,7 @@ impl BatchLogProcessor { last_export_time: &mut Instant, current_batch_size: &AtomicUsize, max_export_size: usize, + blocking_strategy: &BlockingStrategy, ) -> OTelSdkResult where E: LogExporter + Send + Sync + 'static, @@ -388,7 +385,8 @@ impl BatchLogProcessor { let count_of_logs = logs.len(); // Count of logs that will be exported total_exported_logs += count_of_logs; - result = export_batch_sync(exporter, logs, last_export_time); // This method clears the logs vec after exporting + result = + export_batch_sync(exporter, logs, last_export_time, blocking_strategy); // This method clears the logs vec after exporting current_batch_size.fetch_sub(count_of_logs, Ordering::Relaxed); } @@ -417,6 +415,7 @@ impl BatchLogProcessor { &mut last_export_time, ¤t_batch_size, max_export_batch_size, + &blocking_strategy, ); } Ok(BatchMessage::ForceFlush(sender)) => { @@ -428,6 +427,7 @@ impl BatchLogProcessor { &mut last_export_time, ¤t_batch_size, max_export_batch_size, + &blocking_strategy, ); let _ = sender.send(result); } @@ -440,6 +440,7 @@ impl BatchLogProcessor { &mut last_export_time, ¤t_batch_size, max_export_batch_size, + &blocking_strategy, ); let _ = exporter.shutdown(); let _ = sender.send(result); @@ -468,6 +469,7 @@ impl BatchLogProcessor { &mut last_export_time, ¤t_batch_size, max_export_batch_size, + &blocking_strategy, ); } Err(RecvTimeoutError::Disconnected) => { @@ -518,6 +520,7 @@ fn export_batch_sync( exporter: &E, batch: &mut Vec>, last_export_time: &mut Instant, + blocking_strategy: &BlockingStrategy, ) -> OTelSdkResult where E: LogExporter + ?Sized, @@ -529,7 +532,7 @@ where } let export = exporter.export(LogBatch::new_with_owned_data(batch.as_slice())); - let export_result = futures_executor::block_on(export); + let export_result = blocking_strategy.block_on(export); // Clear the batch vec after exporting batch.clear(); @@ -588,10 +591,6 @@ pub struct BatchConfig { /// of logs one batch after the other without any delay. The default value /// is 512. pub(crate) max_export_batch_size: usize, - - /// The maximum duration to export a batch of data. - #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] - pub(crate) max_export_timeout: Duration, } impl Default for BatchConfig { @@ -606,8 +605,6 @@ pub struct BatchConfigBuilder { max_queue_size: usize, scheduled_delay: Duration, max_export_batch_size: usize, - #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] - max_export_timeout: Duration, } impl Default for BatchConfigBuilder { @@ -625,8 +622,6 @@ impl Default for BatchConfigBuilder { max_queue_size: OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, scheduled_delay: OTEL_BLRP_SCHEDULE_DELAY_DEFAULT, max_export_batch_size: OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT, - #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] - max_export_timeout: OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT, } .init_from_env_vars() } @@ -658,19 +653,6 @@ impl BatchConfigBuilder { self } - /// Set max_export_timeout for [`BatchConfigBuilder`]. - /// It's the maximum duration to export a batch of data. - /// The default value is 30000 milliseconds. - /// - /// Corresponding environment variable: `OTEL_BLRP_EXPORT_TIMEOUT`. - /// - /// Note: Programmatically setting this will override any value set via the environment variable. - #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] - pub fn with_max_export_timeout(mut self, max_export_timeout: Duration) -> Self { - self.max_export_timeout = max_export_timeout; - self - } - /// Set max_export_batch_size for [`BatchConfigBuilder`]. /// It's the maximum number of logs to process in a single batch. If there are /// more than one batch worth of logs then it processes multiple batches @@ -695,8 +677,6 @@ impl BatchConfigBuilder { BatchConfig { max_queue_size: self.max_queue_size, scheduled_delay: self.scheduled_delay, - #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] - max_export_timeout: self.max_export_timeout, max_export_batch_size, } } @@ -723,14 +703,6 @@ impl BatchConfigBuilder { self.scheduled_delay = Duration::from_millis(scheduled_delay); } - #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] - if let Some(max_export_timeout) = env::var(OTEL_BLRP_EXPORT_TIMEOUT) - .ok() - .and_then(|s| u64::from_str(&s).ok()) - { - self.max_export_timeout = Duration::from_millis(max_export_timeout); - } - self } } @@ -743,8 +715,6 @@ mod tests { OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BLRP_SCHEDULE_DELAY, OTEL_BLRP_SCHEDULE_DELAY_DEFAULT, }; - #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] - use super::{OTEL_BLRP_EXPORT_TIMEOUT, OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT}; use crate::error::OTelSdkResult; use crate::logs::log_processor::tests::MockLogExporter; use crate::logs::SdkLogRecord; @@ -764,10 +734,6 @@ mod tests { fn test_default_const_values() { assert_eq!(OTEL_BLRP_SCHEDULE_DELAY, "OTEL_BLRP_SCHEDULE_DELAY"); assert_eq!(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT.as_millis(), 1_000); - #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] - assert_eq!(OTEL_BLRP_EXPORT_TIMEOUT, "OTEL_BLRP_EXPORT_TIMEOUT"); - #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] - assert_eq!(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT.as_millis(), 30_000); assert_eq!(OTEL_BLRP_MAX_QUEUE_SIZE, "OTEL_BLRP_MAX_QUEUE_SIZE"); assert_eq!(OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, 2_048); assert_eq!( @@ -782,8 +748,6 @@ mod tests { // The following environment variables are expected to be unset so that their default values are used. let env_vars = vec![ OTEL_BLRP_SCHEDULE_DELAY, - #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] - OTEL_BLRP_EXPORT_TIMEOUT, OTEL_BLRP_MAX_QUEUE_SIZE, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, ]; @@ -791,8 +755,6 @@ mod tests { let config = temp_env::with_vars_unset(env_vars, BatchConfig::default); assert_eq!(config.scheduled_delay, OTEL_BLRP_SCHEDULE_DELAY_DEFAULT); - #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] - assert_eq!(config.max_export_timeout, OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT); assert_eq!(config.max_queue_size, OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT); assert_eq!( config.max_export_batch_size, @@ -825,8 +787,6 @@ mod tests { fn test_batch_config_configurable_by_env_vars() { let env_vars = vec![ (OTEL_BLRP_SCHEDULE_DELAY, Some("2000")), - #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] - (OTEL_BLRP_EXPORT_TIMEOUT, Some("60000")), (OTEL_BLRP_MAX_QUEUE_SIZE, Some("4096")), (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("1024")), ]; @@ -834,8 +794,6 @@ mod tests { let config = temp_env::with_vars(env_vars, BatchConfig::default); assert_eq!(config.scheduled_delay, Duration::from_millis(2000)); - #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] - assert_eq!(config.max_export_timeout, Duration::from_millis(60000)); assert_eq!(config.max_queue_size, 4096); assert_eq!(config.max_export_batch_size, 1024); } @@ -852,25 +810,18 @@ mod tests { assert_eq!(config.max_queue_size, 256); assert_eq!(config.max_export_batch_size, 256); assert_eq!(config.scheduled_delay, OTEL_BLRP_SCHEDULE_DELAY_DEFAULT); - #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] - assert_eq!(config.max_export_timeout, OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT); } #[test] fn test_batch_config_with_fields() { - let batch_builder = BatchConfigBuilder::default() + let batch = BatchConfigBuilder::default() .with_max_export_batch_size(1) .with_scheduled_delay(Duration::from_millis(2)) - .with_max_queue_size(4); - - #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] - let batch_builder = batch_builder.with_max_export_timeout(Duration::from_millis(3)); - let batch = batch_builder.build(); + .with_max_queue_size(4) + .build(); assert_eq!(batch.max_export_batch_size, 1); assert_eq!(batch.scheduled_delay, Duration::from_millis(2)); - #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] - assert_eq!(batch.max_export_timeout, Duration::from_millis(3)); assert_eq!(batch.max_queue_size, 4); } @@ -879,8 +830,6 @@ mod tests { let mut env_vars = vec![ (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("500")), (OTEL_BLRP_SCHEDULE_DELAY, Some("I am not number")), - #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] - (OTEL_BLRP_EXPORT_TIMEOUT, Some("2046")), ]; temp_env::with_vars(env_vars.clone(), || { let builder = BatchLogProcessor::builder(InMemoryLogExporter::default()); @@ -894,12 +843,6 @@ mod tests { builder.config.max_queue_size, OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT ); - - #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] - assert_eq!( - builder.config.max_export_timeout, - Duration::from_millis(2046) - ); }); env_vars.push((OTEL_BLRP_MAX_QUEUE_SIZE, Some("120"))); diff --git a/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs b/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs deleted file mode 100644 index bd725bec9a..0000000000 --- a/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs +++ /dev/null @@ -1,859 +0,0 @@ -use crate::error::{OTelSdkError, OTelSdkResult}; -use crate::{ - logs::{LogBatch, LogExporter, SdkLogRecord}, - Resource, -}; - -use opentelemetry::{otel_debug, otel_error, otel_warn, InstrumentationScope}; - -use std::{ - fmt::{self, Debug, Formatter}, - sync::Arc, -}; -use std::{ - sync::atomic::{AtomicUsize, Ordering}, - time::Duration, -}; - -use super::{BatchConfig, LogProcessor}; -#[cfg(feature = "experimental_async_runtime")] -use crate::runtime::{to_interval_stream, RuntimeChannel, TrySend}; -use futures_channel::oneshot; -use futures_util::{ - future::{self, Either}, - {pin_mut, stream, StreamExt as _}, -}; - -#[allow(clippy::large_enum_variant)] -#[derive(Debug)] -enum BatchMessage { - /// Export logs, usually called when the log is emitted. - ExportLog((SdkLogRecord, InstrumentationScope)), - /// Flush the current buffer to the backend, it can be triggered by - /// pre configured interval or a call to `force_push` function. - Flush(Option>), - /// Shut down the worker thread, push all logs in buffer to the backend. - Shutdown(oneshot::Sender), - /// Set the resource for the exporter. - SetResource(Arc), -} - -/// A [`LogProcessor`] that asynchronously buffers log records and reports -/// them at a pre-configured interval. -pub struct BatchLogProcessor { - message_sender: R::Sender, - - // Track dropped logs - we'll log this at shutdown - dropped_logs_count: AtomicUsize, - - // Track the maximum queue size that was configured for this processor - max_queue_size: usize, -} - -impl Debug for BatchLogProcessor { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - f.debug_struct("BatchLogProcessor") - .field("message_sender", &self.message_sender) - .finish() - } -} - -impl LogProcessor for BatchLogProcessor { - fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) { - let result = self.message_sender.try_send(BatchMessage::ExportLog(( - record.clone(), - instrumentation.clone(), - ))); - - // TODO - Implement throttling to prevent error flooding when the queue is full or closed. - if result.is_err() { - // Increment dropped logs count. The first time we have to drop a log, - // emit a warning. - if self.dropped_logs_count.fetch_add(1, Ordering::Relaxed) == 0 { - otel_warn!(name: "BatchLogProcessor.LogDroppingStarted", - message = "BatchLogProcessor dropped a LogRecord due to queue full/internal errors. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total logs dropped."); - } - } - } - - fn force_flush(&self) -> OTelSdkResult { - let (res_sender, res_receiver) = oneshot::channel(); - self.message_sender - .try_send(BatchMessage::Flush(Some(res_sender))) - .map_err(|err| OTelSdkError::InternalFailure(format!("{err:?}")))?; - - futures_executor::block_on(res_receiver) - .map_err(|err| OTelSdkError::InternalFailure(format!("{err:?}"))) - .and_then(std::convert::identity) - } - - fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { - let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed); - let max_queue_size = self.max_queue_size; - if dropped_logs > 0 { - otel_warn!( - name: "BatchLogProcessor.LogsDropped", - dropped_logs_count = dropped_logs, - max_queue_size = max_queue_size, - message = "Logs were dropped due to a queue being full or other error. The count represents the total count of log records dropped in the lifetime of this BatchLogProcessor. Consider increasing the queue size and/or decrease delay between intervals." - ); - } - let (res_sender, res_receiver) = oneshot::channel(); - self.message_sender - .try_send(BatchMessage::Shutdown(res_sender)) - .map_err(|err| OTelSdkError::InternalFailure(format!("{err:?}")))?; - - futures_executor::block_on(res_receiver) - .map_err(|err| OTelSdkError::InternalFailure(format!("{err:?}"))) - .and_then(std::convert::identity) - } - - fn set_resource(&mut self, resource: &Resource) { - let resource = Arc::new(resource.clone()); - let _ = self - .message_sender - .try_send(BatchMessage::SetResource(resource)); - } -} - -impl BatchLogProcessor { - pub(crate) fn new(mut exporter: E, config: BatchConfig, runtime: R) -> Self - where - E: LogExporter + Send + Sync + 'static, - { - let (message_sender, message_receiver) = - runtime.batch_message_channel(config.max_queue_size); - let inner_runtime = runtime.clone(); - - // Spawn worker process via user-defined spawn function. - runtime.spawn(async move { - // Timer will take a reference to the current runtime, so its important we do this within the - // runtime.spawn() - let ticker = to_interval_stream(inner_runtime.clone(), config.scheduled_delay) - .skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval. - .map(|_| BatchMessage::Flush(None)); - - let timeout_runtime = inner_runtime.clone(); - let mut logs = Vec::new(); - let mut messages = Box::pin(stream::select(message_receiver, ticker)); - - while let Some(message) = messages.next().await { - match message { - // Log has finished, add to buffer of pending logs. - BatchMessage::ExportLog(log) => { - logs.push(log); - if logs.len() == config.max_export_batch_size { - let result = export_with_timeout( - config.max_export_timeout, - &mut exporter, - &timeout_runtime, - logs.split_off(0), - ) - .await; - - if let Err(err) = result { - otel_error!( - name: "BatchLogProcessor.Export.Error", - error = format!("{}", err) - ); - } - } - } - // Log batch interval time reached or a force flush has been invoked, export current logs. - BatchMessage::Flush(res_channel) => { - let result = export_with_timeout( - config.max_export_timeout, - &mut exporter, - &timeout_runtime, - logs.split_off(0), - ) - .await; - - if let Some(channel) = res_channel { - if let Err(send_error) = channel.send(result) { - otel_debug!( - name: "BatchLogProcessor.Flush.SendResultError", - error = format!("{:?}", send_error), - ); - } - } - } - // Stream has terminated or processor is shutdown, return to finish execution. - BatchMessage::Shutdown(ch) => { - let result = export_with_timeout( - config.max_export_timeout, - &mut exporter, - &timeout_runtime, - logs.split_off(0), - ) - .await; - - let _ = exporter.shutdown(); //TODO - handle error - - if let Err(send_error) = ch.send(result) { - otel_debug!( - name: "BatchLogProcessor.Shutdown.SendResultError", - error = format!("{:?}", send_error), - ); - } - break; - } - // propagate the resource - BatchMessage::SetResource(resource) => { - exporter.set_resource(&resource); - } - } - } - }); - // Return batch processor with link to worker - BatchLogProcessor { - message_sender, - dropped_logs_count: AtomicUsize::new(0), - max_queue_size: config.max_queue_size, - } - } - - /// Create a new batch processor builder - pub fn builder(exporter: E, runtime: R) -> BatchLogProcessorBuilder - where - E: LogExporter, - { - BatchLogProcessorBuilder { - exporter, - config: Default::default(), - runtime, - } - } -} - -async fn export_with_timeout( - time_out: Duration, - exporter: &mut E, - runtime: &R, - batch: Vec<(SdkLogRecord, InstrumentationScope)>, -) -> OTelSdkResult -where - R: RuntimeChannel, - E: LogExporter + ?Sized, -{ - if batch.is_empty() { - return Ok(()); - } - - // TBD - Can we avoid this conversion as it involves heap allocation with new vector? - let log_vec: Vec<(&SdkLogRecord, &InstrumentationScope)> = batch - .iter() - .map(|log_data| (&log_data.0, &log_data.1)) - .collect(); - let export = exporter.export(LogBatch::new(log_vec.as_slice())); - let timeout = runtime.delay(time_out); - pin_mut!(export); - pin_mut!(timeout); - match future::select(export, timeout).await { - Either::Left((export_res, _)) => export_res, - Either::Right((_, _)) => OTelSdkResult::Err(OTelSdkError::Timeout(time_out)), - } -} - -/// A builder for creating [`BatchLogProcessor`] instances. -/// -#[derive(Debug)] -pub struct BatchLogProcessorBuilder { - exporter: E, - config: BatchConfig, - runtime: R, -} - -impl BatchLogProcessorBuilder -where - E: LogExporter + 'static, - R: RuntimeChannel, -{ - /// Set the BatchConfig for [`BatchLogProcessorBuilder`] - pub fn with_batch_config(self, config: BatchConfig) -> Self { - BatchLogProcessorBuilder { config, ..self } - } - - /// Build a batch processor - pub fn build(self) -> BatchLogProcessor { - BatchLogProcessor::new(self.exporter, self.config, self.runtime) - } -} - -#[cfg(all(test, feature = "testing", feature = "logs"))] -mod tests { - use crate::error::OTelSdkResult; - use crate::logs::batch_log_processor::{ - OTEL_BLRP_EXPORT_TIMEOUT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, OTEL_BLRP_MAX_QUEUE_SIZE, - OTEL_BLRP_SCHEDULE_DELAY, - }; - use crate::logs::log_processor_with_async_runtime::BatchLogProcessor; - use crate::logs::InMemoryLogExporterBuilder; - use crate::logs::SdkLogRecord; - use crate::logs::{LogBatch, LogExporter}; - use crate::runtime; - use crate::{ - logs::{ - batch_log_processor::{ - OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT, - OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BLRP_SCHEDULE_DELAY_DEFAULT, - }, - BatchConfig, BatchConfigBuilder, InMemoryLogExporter, LogProcessor, SdkLoggerProvider, - SimpleLogProcessor, - }, - Resource, - }; - use opentelemetry::logs::AnyValue; - use opentelemetry::logs::LogRecord; - use opentelemetry::logs::{Logger, LoggerProvider}; - use opentelemetry::KeyValue; - use opentelemetry::{InstrumentationScope, Key}; - use std::sync::{Arc, Mutex}; - use std::time::Duration; - - #[derive(Debug, Clone)] - struct MockLogExporter { - resource: Arc>>, - } - - impl LogExporter for MockLogExporter { - async fn export(&self, _batch: LogBatch<'_>) -> OTelSdkResult { - Ok(()) - } - - fn set_resource(&mut self, resource: &Resource) { - self.resource - .lock() - .map(|mut res_opt| { - res_opt.replace(resource.clone()); - }) - .expect("mock log exporter shouldn't error when setting resource"); - } - } - - // Implementation specific to the MockLogExporter, not part of the LogExporter trait - impl MockLogExporter { - fn get_resource(&self) -> Option { - (*self.resource).lock().unwrap().clone() - } - } - - #[test] - fn test_default_const_values() { - assert_eq!(OTEL_BLRP_SCHEDULE_DELAY, "OTEL_BLRP_SCHEDULE_DELAY"); - assert_eq!(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT.as_millis(), 1_000); - assert_eq!(OTEL_BLRP_EXPORT_TIMEOUT, "OTEL_BLRP_EXPORT_TIMEOUT"); - assert_eq!(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT.as_millis(), 30_000); - assert_eq!(OTEL_BLRP_MAX_QUEUE_SIZE, "OTEL_BLRP_MAX_QUEUE_SIZE"); - assert_eq!(OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, 2_048); - assert_eq!( - OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, - "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE" - ); - assert_eq!(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT, 512); - } - - #[test] - fn test_default_batch_config_adheres_to_specification() { - // The following environment variables are expected to be unset so that their default values are used. - let env_vars = vec![ - OTEL_BLRP_SCHEDULE_DELAY, - OTEL_BLRP_EXPORT_TIMEOUT, - OTEL_BLRP_MAX_QUEUE_SIZE, - OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, - ]; - - let config = temp_env::with_vars_unset(env_vars, BatchConfig::default); - - assert_eq!(config.scheduled_delay, OTEL_BLRP_SCHEDULE_DELAY_DEFAULT); - assert_eq!(config.max_export_timeout, OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT); - assert_eq!(config.max_queue_size, OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT); - assert_eq!( - config.max_export_batch_size, - OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT - ); - } - - #[test] - fn test_batch_config_configurable_by_env_vars() { - let env_vars = vec![ - (OTEL_BLRP_SCHEDULE_DELAY, Some("2000")), - (OTEL_BLRP_EXPORT_TIMEOUT, Some("60000")), - (OTEL_BLRP_MAX_QUEUE_SIZE, Some("4096")), - (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("1024")), - ]; - - let config = temp_env::with_vars(env_vars, BatchConfig::default); - - assert_eq!(config.scheduled_delay, Duration::from_millis(2000)); - assert_eq!(config.max_export_timeout, Duration::from_millis(60000)); - assert_eq!(config.max_queue_size, 4096); - assert_eq!(config.max_export_batch_size, 1024); - } - - #[test] - fn test_batch_config_max_export_batch_size_validation() { - let env_vars = vec![ - (OTEL_BLRP_MAX_QUEUE_SIZE, Some("256")), - (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("1024")), - ]; - - let config = temp_env::with_vars(env_vars, BatchConfig::default); - - assert_eq!(config.max_queue_size, 256); - assert_eq!(config.max_export_batch_size, 256); - assert_eq!(config.scheduled_delay, OTEL_BLRP_SCHEDULE_DELAY_DEFAULT); - assert_eq!(config.max_export_timeout, OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT); - } - - #[test] - fn test_batch_config_with_fields() { - let batch = BatchConfigBuilder::default() - .with_max_export_batch_size(1) - .with_scheduled_delay(Duration::from_millis(2)) - .with_max_export_timeout(Duration::from_millis(3)) - .with_max_queue_size(4) - .build(); - - assert_eq!(batch.max_export_batch_size, 1); - assert_eq!(batch.scheduled_delay, Duration::from_millis(2)); - assert_eq!(batch.max_export_timeout, Duration::from_millis(3)); - assert_eq!(batch.max_queue_size, 4); - } - - #[test] - fn test_build_batch_log_processor_builder() { - let mut env_vars = vec![ - (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("500")), - (OTEL_BLRP_SCHEDULE_DELAY, Some("I am not number")), - (OTEL_BLRP_EXPORT_TIMEOUT, Some("2046")), - ]; - temp_env::with_vars(env_vars.clone(), || { - let builder = - BatchLogProcessor::builder(InMemoryLogExporter::default(), runtime::Tokio); - - assert_eq!(builder.config.max_export_batch_size, 500); - assert_eq!( - builder.config.scheduled_delay, - OTEL_BLRP_SCHEDULE_DELAY_DEFAULT - ); - assert_eq!( - builder.config.max_queue_size, - OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT - ); - assert_eq!( - builder.config.max_export_timeout, - Duration::from_millis(2046) - ); - }); - - env_vars.push((OTEL_BLRP_MAX_QUEUE_SIZE, Some("120"))); - - temp_env::with_vars(env_vars, || { - let builder = - BatchLogProcessor::builder(InMemoryLogExporter::default(), runtime::Tokio); - assert_eq!(builder.config.max_export_batch_size, 120); - assert_eq!(builder.config.max_queue_size, 120); - }); - } - - #[test] - fn test_build_batch_log_processor_builder_with_custom_config() { - let expected = BatchConfigBuilder::default() - .with_max_export_batch_size(1) - .with_scheduled_delay(Duration::from_millis(2)) - .with_max_export_timeout(Duration::from_millis(3)) - .with_max_queue_size(4) - .build(); - - let builder = BatchLogProcessor::builder(InMemoryLogExporter::default(), runtime::Tokio) - .with_batch_config(expected); - - let actual = &builder.config; - assert_eq!(actual.max_export_batch_size, 1); - assert_eq!(actual.scheduled_delay, Duration::from_millis(2)); - assert_eq!(actual.max_export_timeout, Duration::from_millis(3)); - assert_eq!(actual.max_queue_size, 4); - } - - #[test] - fn test_set_resource_simple_processor() { - let exporter = MockLogExporter { - resource: Arc::new(Mutex::new(None)), - }; - let processor = SimpleLogProcessor::new(exporter.clone()); - let _ = SdkLoggerProvider::builder() - .with_log_processor(processor) - .with_resource( - Resource::builder_empty() - .with_attributes([ - KeyValue::new("k1", "v1"), - KeyValue::new("k2", "v3"), - KeyValue::new("k3", "v3"), - KeyValue::new("k4", "v4"), - KeyValue::new("k5", "v5"), - ]) - .build(), - ) - .build(); - assert_eq!(exporter.get_resource().unwrap().into_iter().count(), 5); - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 1)] - async fn test_set_resource_batch_processor() { - let exporter = MockLogExporter { - resource: Arc::new(Mutex::new(None)), - }; - let processor = - BatchLogProcessor::new(exporter.clone(), BatchConfig::default(), runtime::Tokio); - let provider = SdkLoggerProvider::builder() - .with_log_processor(processor) - .with_resource( - Resource::builder_empty() - .with_attributes([ - KeyValue::new("k1", "v1"), - KeyValue::new("k2", "v3"), - KeyValue::new("k3", "v3"), - KeyValue::new("k4", "v4"), - KeyValue::new("k5", "v5"), - ]) - .build(), - ) - .build(); - - provider.force_flush().unwrap(); - - assert_eq!(exporter.get_resource().unwrap().into_iter().count(), 5); - let _ = provider.shutdown(); - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_batch_shutdown() { - // assert we will receive an error - // setup - let exporter = InMemoryLogExporterBuilder::default() - .keep_records_on_shutdown() - .build(); - let processor = - BatchLogProcessor::new(exporter.clone(), BatchConfig::default(), runtime::Tokio); - - let mut record = SdkLogRecord::new(); - let instrumentation = InstrumentationScope::default(); - - processor.emit(&mut record, &instrumentation); - processor.force_flush().unwrap(); - processor.shutdown().unwrap(); - // todo: expect to see errors here. How should we assert this? - processor.emit(&mut record, &instrumentation); - assert_eq!(1, exporter.get_emitted_logs().unwrap().len()) - } - - #[tokio::test(flavor = "current_thread")] - async fn test_batch_log_processor_shutdown_under_async_runtime_current_flavor_multi_thread() { - let exporter = InMemoryLogExporterBuilder::default().build(); - let processor = BatchLogProcessor::new( - exporter.clone(), - BatchConfig::default(), - runtime::TokioCurrentThread, - ); - - processor.shutdown().unwrap(); - } - - #[tokio::test(flavor = "current_thread")] - #[ignore = "See issue https://github.com/open-telemetry/opentelemetry-rust/issues/1968"] - async fn test_batch_log_processor_with_async_runtime_shutdown_under_async_runtime_current_flavor_multi_thread( - ) { - let exporter = InMemoryLogExporterBuilder::default().build(); - let processor = BatchLogProcessor::new( - exporter.clone(), - BatchConfig::default(), - runtime::TokioCurrentThread, - ); - - // - // deadlock happens in shutdown with tokio current_thread runtime - // - processor.shutdown().unwrap(); - } - - #[tokio::test(flavor = "current_thread")] - async fn test_batch_log_processor_shutdown_with_async_runtime_current_flavor_current_thread() { - let exporter = InMemoryLogExporterBuilder::default().build(); - let processor = BatchLogProcessor::new( - exporter.clone(), - BatchConfig::default(), - runtime::TokioCurrentThread, - ); - processor.shutdown().unwrap(); - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_multi_thread() { - let exporter = InMemoryLogExporterBuilder::default().build(); - let processor = - BatchLogProcessor::new(exporter.clone(), BatchConfig::default(), runtime::Tokio); - processor.shutdown().unwrap(); - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_current_thread() { - let exporter = InMemoryLogExporterBuilder::default().build(); - let processor = - BatchLogProcessor::new(exporter.clone(), BatchConfig::default(), runtime::Tokio); - processor.shutdown().unwrap(); - } - - #[derive(Debug)] - struct FirstProcessor { - pub(crate) logs: Arc>>, - } - - impl LogProcessor for FirstProcessor { - fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) { - // add attribute - record.add_attribute( - Key::from_static_str("processed_by"), - AnyValue::String("FirstProcessor".into()), - ); - // update body - record.body = Some("Updated by FirstProcessor".into()); - - self.logs - .lock() - .unwrap() - .push((record.clone(), instrumentation.clone())); //clone as the LogProcessor is storing the data. - } - - fn force_flush(&self) -> OTelSdkResult { - Ok(()) - } - - fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult { - Ok(()) - } - } - - #[derive(Debug)] - struct SecondProcessor { - pub(crate) logs: Arc>>, - } - - impl LogProcessor for SecondProcessor { - fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) { - assert!(record.attributes_contains( - &Key::from_static_str("processed_by"), - &AnyValue::String("FirstProcessor".into()) - )); - assert!( - record.body.clone().unwrap() - == AnyValue::String("Updated by FirstProcessor".into()) - ); - self.logs - .lock() - .unwrap() - .push((record.clone(), instrumentation.clone())); - } - - fn force_flush(&self) -> OTelSdkResult { - Ok(()) - } - - fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult { - Ok(()) - } - } - #[test] - fn test_log_data_modification_by_multiple_processors() { - let first_processor_logs = Arc::new(Mutex::new(Vec::new())); - let second_processor_logs = Arc::new(Mutex::new(Vec::new())); - - let first_processor = FirstProcessor { - logs: Arc::clone(&first_processor_logs), - }; - let second_processor = SecondProcessor { - logs: Arc::clone(&second_processor_logs), - }; - - let logger_provider = SdkLoggerProvider::builder() - .with_log_processor(first_processor) - .with_log_processor(second_processor) - .build(); - - let logger = logger_provider.logger("test-logger"); - let mut log_record = logger.create_log_record(); - log_record.body = Some(AnyValue::String("Test log".into())); - - logger.emit(log_record); - - assert_eq!(first_processor_logs.lock().unwrap().len(), 1); - assert_eq!(second_processor_logs.lock().unwrap().len(), 1); - - let first_log = &first_processor_logs.lock().unwrap()[0]; - let second_log = &second_processor_logs.lock().unwrap()[0]; - - assert!(first_log.0.attributes_contains( - &Key::from_static_str("processed_by"), - &AnyValue::String("FirstProcessor".into()) - )); - assert!(second_log.0.attributes_contains( - &Key::from_static_str("processed_by"), - &AnyValue::String("FirstProcessor".into()) - )); - - assert!( - first_log.0.body.clone().unwrap() - == AnyValue::String("Updated by FirstProcessor".into()) - ); - assert!( - second_log.0.body.clone().unwrap() - == AnyValue::String("Updated by FirstProcessor".into()) - ); - } - - #[test] - fn test_build_batch_log_processor_builder_rt() { - let mut env_vars = vec![ - (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("500")), - (OTEL_BLRP_SCHEDULE_DELAY, Some("I am not number")), - (OTEL_BLRP_EXPORT_TIMEOUT, Some("2046")), - ]; - temp_env::with_vars(env_vars.clone(), || { - let builder = - BatchLogProcessor::builder(InMemoryLogExporter::default(), runtime::Tokio); - - assert_eq!(builder.config.max_export_batch_size, 500); - assert_eq!( - builder.config.scheduled_delay, - OTEL_BLRP_SCHEDULE_DELAY_DEFAULT - ); - assert_eq!( - builder.config.max_queue_size, - OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT - ); - assert_eq!( - builder.config.max_export_timeout, - Duration::from_millis(2046) - ); - }); - - env_vars.push((OTEL_BLRP_MAX_QUEUE_SIZE, Some("120"))); - - temp_env::with_vars(env_vars, || { - let builder = - BatchLogProcessor::builder(InMemoryLogExporter::default(), runtime::Tokio); - assert_eq!(builder.config.max_export_batch_size, 120); - assert_eq!(builder.config.max_queue_size, 120); - }); - } - - #[test] - fn test_build_batch_log_processor_builder_rt_with_custom_config() { - let expected = BatchConfigBuilder::default() - .with_max_export_batch_size(1) - .with_scheduled_delay(Duration::from_millis(2)) - .with_max_export_timeout(Duration::from_millis(3)) - .with_max_queue_size(4) - .build(); - - let builder = BatchLogProcessor::builder(InMemoryLogExporter::default(), runtime::Tokio) - .with_batch_config(expected); - - let actual = &builder.config; - assert_eq!(actual.max_export_batch_size, 1); - assert_eq!(actual.scheduled_delay, Duration::from_millis(2)); - assert_eq!(actual.max_export_timeout, Duration::from_millis(3)); - assert_eq!(actual.max_queue_size, 4); - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 1)] - async fn test_set_resource_batch_processor_rt() { - let exporter = MockLogExporter { - resource: Arc::new(Mutex::new(None)), - }; - let processor = - BatchLogProcessor::new(exporter.clone(), BatchConfig::default(), runtime::Tokio); - let provider = SdkLoggerProvider::builder() - .with_log_processor(processor) - .with_resource(Resource::new(vec![ - KeyValue::new("k1", "v1"), - KeyValue::new("k2", "v3"), - KeyValue::new("k3", "v3"), - KeyValue::new("k4", "v4"), - KeyValue::new("k5", "v5"), - ])) - .build(); - provider.force_flush().unwrap(); - assert_eq!(exporter.get_resource().unwrap().into_iter().count(), 5); - let _ = provider.shutdown(); - } - - #[tokio::test(flavor = "multi_thread")] - #[ignore = "See issue https://github.com/open-telemetry/opentelemetry-rust/issues/1968"] - async fn test_batch_shutdown_rt() { - // assert we will receive an error - // setup - let exporter = InMemoryLogExporterBuilder::default() - .keep_records_on_shutdown() - .build(); - let processor = - BatchLogProcessor::new(exporter.clone(), BatchConfig::default(), runtime::Tokio); - - let mut record = SdkLogRecord::new(); - let instrumentation = InstrumentationScope::default(); - - processor.emit(&mut record, &instrumentation); - processor.force_flush().unwrap(); - processor.shutdown().unwrap(); - // todo: expect to see errors here. How should we assert this? - processor.emit(&mut record, &instrumentation); - assert_eq!(1, exporter.get_emitted_logs().unwrap().len()) - } - - #[tokio::test(flavor = "current_thread")] - #[ignore = "See issue https://github.com/open-telemetry/opentelemetry-rust/issues/1968"] - async fn test_batch_log_processor_rt_shutdown_with_async_runtime_current_flavor_multi_thread() { - let exporter = InMemoryLogExporterBuilder::default().build(); - let processor = - BatchLogProcessor::new(exporter.clone(), BatchConfig::default(), runtime::Tokio); - - // - // deadlock happens in shutdown with tokio current_thread runtime - // - processor.shutdown().unwrap(); - } - - #[tokio::test(flavor = "current_thread")] - async fn test_batch_log_processor_rt_shutdown_with_async_runtime_current_flavor_current_thread() - { - let exporter = InMemoryLogExporterBuilder::default().build(); - let processor = BatchLogProcessor::new( - exporter.clone(), - BatchConfig::default(), - runtime::TokioCurrentThread, - ); - - processor.shutdown().unwrap(); - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_batch_log_processor_rt_shutdown_with_async_runtime_multi_flavor_multi_thread() { - let exporter = InMemoryLogExporterBuilder::default().build(); - let processor = - BatchLogProcessor::new(exporter.clone(), BatchConfig::default(), runtime::Tokio); - - processor.shutdown().unwrap(); - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_batch_log_processor_rt_shutdown_with_async_runtime_multi_flavor_current_thread() { - let exporter = InMemoryLogExporterBuilder::default().build(); - let processor = BatchLogProcessor::new( - exporter.clone(), - BatchConfig::default(), - runtime::TokioCurrentThread, - ); - - processor.shutdown().unwrap(); - } -} diff --git a/opentelemetry-sdk/src/logs/mod.rs b/opentelemetry-sdk/src/logs/mod.rs index 84674aefea..01f55a8294 100644 --- a/opentelemetry-sdk/src/logs/mod.rs +++ b/opentelemetry-sdk/src/logs/mod.rs @@ -29,10 +29,6 @@ pub use simple_log_processor::SimpleLogProcessor; /// Module for ConcurrentLogProcessor. pub mod concurrent_log_processor; -#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] -/// Module for BatchLogProcessor with async runtime. -pub mod log_processor_with_async_runtime; - #[cfg(all(test, feature = "testing"))] mod tests { use super::*; diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 2d358be73b..3a5389a7b2 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -53,9 +53,6 @@ pub(crate) mod meter; mod meter_provider; pub(crate) mod noop; pub(crate) mod periodic_reader; -#[cfg(feature = "experimental_metrics_periodicreader_with_async_runtime")] -/// Module for periodic reader with async runtime. -pub mod periodic_reader_with_async_runtime; pub(crate) mod pipeline; #[cfg(feature = "experimental_metrics_custom_reader")] pub mod reader; diff --git a/opentelemetry-sdk/src/metrics/periodic_reader.rs b/opentelemetry-sdk/src/metrics/periodic_reader.rs index 70e78f9253..120e7b00dc 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader.rs @@ -13,6 +13,7 @@ use opentelemetry::{otel_debug, otel_error, otel_info, otel_warn, Context}; use crate::{ error::{OTelSdkError, OTelSdkResult}, metrics::{exporter::PushMetricExporter, reader::SdkProducer}, + util::BlockingStrategy, Resource, }; @@ -152,6 +153,7 @@ impl PeriodicReader { message_sender, producer: Mutex::new(None), exporter: exporter_arc.clone(), + blocking_strategy: BlockingStrategy::new(), }), }; let cloned_reader = reader.clone(); @@ -351,6 +353,7 @@ struct PeriodicReaderInner { exporter: Arc, message_sender: mpsc::Sender, producer: Mutex>>, + blocking_strategy: BlockingStrategy, } impl PeriodicReaderInner { @@ -407,9 +410,8 @@ impl PeriodicReaderInner { }); otel_debug!(name: "PeriodicReaderMetricsCollected", count = metrics_count, time_taken_in_millis = time_taken_for_collect.as_millis()); - // Relying on futures executor to execute async call. // TODO: Pass timeout to exporter - futures_executor::block_on(self.exporter.export(rm)) + self.blocking_strategy.block_on(self.exporter.export(rm)) } fn force_flush(&self) -> OTelSdkResult { diff --git a/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs b/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs deleted file mode 100644 index 066db5b340..0000000000 --- a/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs +++ /dev/null @@ -1,537 +0,0 @@ -use std::{ - env, fmt, mem, - sync::{Arc, Mutex, Weak}, - time::Duration, -}; - -use futures_channel::{mpsc, oneshot}; -use futures_util::{ - future::{self, Either}, - pin_mut, - stream::{self, FusedStream}, - StreamExt, -}; -use opentelemetry::{otel_debug, otel_error}; - -use crate::runtime::{to_interval_stream, Runtime}; -use crate::{ - error::{OTelSdkError, OTelSdkResult}, - metrics::{exporter::PushMetricExporter, reader::SdkProducer}, - Resource, -}; - -use super::{ - data::ResourceMetrics, instrument::InstrumentKind, pipeline::Pipeline, reader::MetricReader, -}; - -const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30); -const DEFAULT_INTERVAL: Duration = Duration::from_secs(60); - -const METRIC_EXPORT_INTERVAL_NAME: &str = "OTEL_METRIC_EXPORT_INTERVAL"; -const METRIC_EXPORT_TIMEOUT_NAME: &str = "OTEL_METRIC_EXPORT_TIMEOUT"; - -/// Configuration options for [PeriodicReader]. -/// -/// A periodic reader is a [MetricReader] that collects and exports metric data -/// to the exporter at a defined interval. -/// -/// By default, the returned [MetricReader] will collect and export data every -/// 60 seconds, and will cancel export attempts that exceed 30 seconds. The -/// export time is not counted towards the interval between attempts. -/// -/// The [collect] method of the returned [MetricReader] continues to gather and -/// return metric data to the user. It will not automatically send that data to -/// the exporter outside of the predefined interval. -/// -/// [collect]: MetricReader::collect -#[derive(Debug)] -pub struct PeriodicReaderBuilder { - interval: Duration, - timeout: Duration, - exporter: E, - runtime: RT, -} - -impl PeriodicReaderBuilder -where - E: PushMetricExporter, - RT: Runtime, -{ - fn new(exporter: E, runtime: RT) -> Self { - let interval = env::var(METRIC_EXPORT_INTERVAL_NAME) - .ok() - .and_then(|v| v.parse().map(Duration::from_millis).ok()) - .unwrap_or(DEFAULT_INTERVAL); - let timeout = env::var(METRIC_EXPORT_TIMEOUT_NAME) - .ok() - .and_then(|v| v.parse().map(Duration::from_millis).ok()) - .unwrap_or(DEFAULT_TIMEOUT); - - PeriodicReaderBuilder { - interval, - timeout, - exporter, - runtime, - } - } - - /// Configures the intervening time between exports for a [PeriodicReader]. - /// - /// This option overrides any value set for the `OTEL_METRIC_EXPORT_INTERVAL` - /// environment variable. - /// - /// If this option is not used or `interval` is equal to zero, 60 seconds is - /// used as the default. - pub fn with_interval(mut self, interval: Duration) -> Self { - if !interval.is_zero() { - self.interval = interval; - } - self - } - - /// Configures the time a [PeriodicReader] waits for an export to complete - /// before canceling it. - /// - /// This option overrides any value set for the `OTEL_METRIC_EXPORT_TIMEOUT` - /// environment variable. - /// - /// If this option is not used or `timeout` is equal to zero, 30 seconds is used - /// as the default. - pub fn with_timeout(mut self, timeout: Duration) -> Self { - if !timeout.is_zero() { - self.timeout = timeout; - } - self - } - - /// Create a [PeriodicReader] with the given config. - pub fn build(self) -> PeriodicReader { - let (message_sender, message_receiver) = mpsc::channel(256); - - let worker = move |reader: &PeriodicReader| { - let runtime = self.runtime.clone(); - let reader = reader.clone(); - self.runtime.spawn(async move { - let ticker = to_interval_stream(runtime.clone(), self.interval) - .skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval. - .map(|_| Message::Export); - let messages = Box::pin(stream::select(message_receiver, ticker)); - PeriodicReaderWorker { - reader, - timeout: self.timeout, - runtime, - rm: ResourceMetrics { - resource: Resource::empty(), - scope_metrics: Vec::new(), - }, - } - .run(messages) - .await - }); - }; - - otel_debug!( - name: "PeriodicReader.BuildCompleted", - message = "Periodic reader built.", - interval_in_secs = self.interval.as_secs(), - temporality = format!("{:?}", self.exporter.temporality()), - ); - - PeriodicReader { - exporter: Arc::new(self.exporter), - inner: Arc::new(Mutex::new(PeriodicReaderInner { - message_sender, - is_shutdown: false, - sdk_producer_or_worker: ProducerOrWorker::Worker(Box::new(worker)), - })), - } - } -} - -/// A [MetricReader] that continuously collects and exports metric data at a set -/// interval. -/// -/// By default it will collect and export data every 60 seconds, and will cancel -/// export attempts that exceed 30 seconds. The export time is not counted -/// towards the interval between attempts. -/// -/// The [collect] method of the returned continues to gather and -/// return metric data to the user. It will not automatically send that data to -/// the exporter outside of the predefined interval. -/// -/// The [runtime] can be selected based on feature flags set for this crate. -/// -/// The exporter can be any exporter that implements [PushMetricExporter] such -/// as [opentelemetry-otlp]. -/// -/// [collect]: MetricReader::collect -/// [runtime]: crate::runtime -/// [opentelemetry-otlp]: https://docs.rs/opentelemetry-otlp/latest/opentelemetry_otlp/ -/// -/// # Example -/// -/// ```no_run -/// use opentelemetry_sdk::metrics::periodic_reader_with_async_runtime::PeriodicReader; -/// # fn example(get_exporter: impl Fn() -> E, get_runtime: impl Fn() -> R) -/// # where -/// # E: opentelemetry_sdk::metrics::exporter::PushMetricExporter, -/// # R: opentelemetry_sdk::runtime::Runtime, -/// # { -/// -/// let exporter = get_exporter(); // set up a push exporter like OTLP -/// let runtime = get_runtime(); // select runtime: e.g. opentelemetry_sdk:runtime::Tokio -/// -/// let reader = PeriodicReader::builder(exporter, runtime).build(); -/// # drop(reader); -/// # } -/// ``` -pub struct PeriodicReader { - exporter: Arc, - inner: Arc>>, -} - -impl Clone for PeriodicReader { - fn clone(&self) -> Self { - Self { - exporter: Arc::clone(&self.exporter), - inner: Arc::clone(&self.inner), - } - } -} - -impl PeriodicReader { - /// Configuration options for a periodic reader - pub fn builder(exporter: E, runtime: RT) -> PeriodicReaderBuilder - where - RT: Runtime, - { - PeriodicReaderBuilder::new(exporter, runtime) - } -} - -impl fmt::Debug for PeriodicReader { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("PeriodicReader").finish() - } -} - -struct PeriodicReaderInner { - message_sender: mpsc::Sender, - is_shutdown: bool, - sdk_producer_or_worker: ProducerOrWorker, -} - -#[derive(Debug)] -enum Message { - Export, - Flush(oneshot::Sender), - Shutdown(oneshot::Sender), -} - -enum ProducerOrWorker { - Producer(Weak), - #[allow(clippy::type_complexity)] - Worker(Box) + Send + Sync>), -} - -struct PeriodicReaderWorker { - reader: PeriodicReader, - timeout: Duration, - runtime: RT, - rm: ResourceMetrics, -} - -impl PeriodicReaderWorker { - async fn collect_and_export(&mut self) -> OTelSdkResult { - self.reader - .collect(&mut self.rm) - .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?; - if self.rm.scope_metrics.is_empty() { - otel_debug!( - name: "PeriodicReaderWorker.NoMetricsToExport", - ); - // No metrics to export. - return Ok(()); - } - - otel_debug!( - name: "PeriodicReaderWorker.InvokeExporter", - message = "Calling exporter's export method with collected metrics.", - count = self.rm.scope_metrics.len(), - ); - let export = self.reader.exporter.export(&self.rm); - let timeout = self.runtime.delay(self.timeout); - pin_mut!(export); - pin_mut!(timeout); - - match future::select(export, timeout).await { - Either::Left((res, _)) => { - res // return the status of export. - } - Either::Right(_) => Err(OTelSdkError::Timeout(self.timeout)), - } - } - - async fn process_message(&mut self, message: Message) -> bool { - match message { - Message::Export => { - otel_debug!( - name: "PeriodicReader.ExportTriggered", - message = "Export message received.", - ); - if let Err(err) = self.collect_and_export().await { - otel_error!( - name: "PeriodicReader.ExportFailed", - message = "Failed to export metrics", - reason = format!("{}", err)); - } - } - Message::Flush(ch) => { - otel_debug!( - name: "PeriodicReader.ForceFlushCalled", - message = "Flush message received.", - ); - let res = self.collect_and_export().await; - if let Err(send_error) = ch.send(res) { - otel_debug!( - name: "PeriodicReader.Flush.SendResultError", - message = "Failed to send flush result.", - reason = format!("{:?}", send_error), - ); - } - } - Message::Shutdown(ch) => { - otel_debug!( - name: "PeriodicReader.ShutdownCalled", - message = "Shutdown message received", - ); - let res = self.collect_and_export().await; - let _ = self.reader.exporter.shutdown(); - if let Err(send_error) = - ch.send(res.map_err(|e| OTelSdkError::InternalFailure(e.to_string()))) - { - otel_debug!( - name: "PeriodicReader.Shutdown.SendResultError", - message = "Failed to send shutdown result", - reason = format!("{:?}", send_error), - ); - } - return false; - } - } - - true - } - - async fn run(mut self, mut messages: impl FusedStream + Unpin) { - while let Some(message) = messages.next().await { - if !self.process_message(message).await { - break; - } - } - } -} - -impl MetricReader for PeriodicReader { - fn register_pipeline(&self, pipeline: Weak) { - let mut inner = match self.inner.lock() { - Ok(guard) => guard, - Err(_) => return, - }; - - let worker = match &mut inner.sdk_producer_or_worker { - ProducerOrWorker::Producer(_) => { - // Only register once. If producer is already set, do nothing. - otel_debug!(name: "PeriodicReader.DuplicateRegistration", - message = "duplicate registration found, did not register periodic reader."); - return; - } - ProducerOrWorker::Worker(w) => mem::replace(w, Box::new(|_| {})), - }; - - inner.sdk_producer_or_worker = ProducerOrWorker::Producer(pipeline); - worker(self); - } - - fn collect(&self, rm: &mut ResourceMetrics) -> OTelSdkResult { - let inner = self - .inner - .lock() - .map_err(|_| OTelSdkError::InternalFailure("Failed to lock pipeline".into()))?; - - if inner.is_shutdown { - return Err(OTelSdkError::AlreadyShutdown); - } - - if let Some(producer) = match &inner.sdk_producer_or_worker { - ProducerOrWorker::Producer(sdk_producer) => sdk_producer.upgrade(), - ProducerOrWorker::Worker(_) => None, - } { - producer.produce(rm)?; - } else { - return Err(OTelSdkError::InternalFailure( - "reader is not registered".into(), - )); - } - - Ok(()) - } - - fn force_flush(&self) -> OTelSdkResult { - let mut inner = self - .inner - .lock() - .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?; - if inner.is_shutdown { - return Err(OTelSdkError::AlreadyShutdown); - } - let (sender, receiver) = oneshot::channel(); - inner - .message_sender - .try_send(Message::Flush(sender)) - .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?; - - drop(inner); // don't hold lock when blocking on future - - futures_executor::block_on(receiver) - .map_err(|err| OTelSdkError::InternalFailure(err.to_string())) - .and_then(|res| res) - } - - fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { - let mut inner = self - .inner - .lock() - .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?; - if inner.is_shutdown { - return Err(OTelSdkError::AlreadyShutdown); - } - - let (sender, receiver) = oneshot::channel(); - inner - .message_sender - .try_send(Message::Shutdown(sender)) - .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?; - drop(inner); // don't hold lock when blocking on future - - let shutdown_result = futures_executor::block_on(receiver) - .map_err(|err| OTelSdkError::InternalFailure(err.to_string()))?; - - // Acquire the lock again to set the shutdown flag - let mut inner = self - .inner - .lock() - .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?; - inner.is_shutdown = true; - - shutdown_result - } - - /// To construct a [MetricReader][metric-reader] when setting up an SDK, - /// The output temporality (optional), a function of instrument kind. - /// This function SHOULD be obtained from the exporter. - /// - /// If not configured, the Cumulative temporality SHOULD be used. - /// - /// [metric-reader]: https://github.com/open-telemetry/opentelemetry-specification/blob/0a78571045ca1dca48621c9648ec3c832c3c541c/specification/metrics/sdk.md#metricreader - fn temporality(&self, kind: InstrumentKind) -> super::Temporality { - kind.temporality_preference(self.exporter.temporality()) - } -} - -#[cfg(all(test, feature = "testing"))] -mod tests { - use super::PeriodicReader; - use crate::error::OTelSdkError; - use crate::metrics::reader::MetricReader; - use crate::{ - metrics::data::ResourceMetrics, metrics::InMemoryMetricExporter, metrics::SdkMeterProvider, - runtime, Resource, - }; - use opentelemetry::metrics::MeterProvider; - use std::sync::mpsc; - - #[test] - fn collection_triggered_by_interval_tokio_current() { - collection_triggered_by_interval_helper(runtime::TokioCurrentThread); - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 1)] - async fn collection_triggered_by_interval_from_tokio_multi_one_thread_on_runtime_tokio() { - collection_triggered_by_interval_helper(runtime::Tokio); - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn collection_triggered_by_interval_from_tokio_multi_two_thread_on_runtime_tokio() { - collection_triggered_by_interval_helper(runtime::Tokio); - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 1)] - async fn collection_triggered_by_interval_from_tokio_multi_one_thread_on_runtime_tokio_current() - { - collection_triggered_by_interval_helper(runtime::TokioCurrentThread); - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn collection_triggered_by_interval_from_tokio_multi_two_thread_on_runtime_tokio_current() - { - collection_triggered_by_interval_helper(runtime::TokioCurrentThread); - } - - #[tokio::test(flavor = "current_thread")] - #[ignore = "See issue https://github.com/open-telemetry/opentelemetry-rust/issues/2056"] - async fn collection_triggered_by_interval_from_tokio_current_on_runtime_tokio() { - collection_triggered_by_interval_helper(runtime::Tokio); - } - - #[tokio::test(flavor = "current_thread")] - async fn collection_triggered_by_interval_from_tokio_current_on_runtime_tokio_current() { - collection_triggered_by_interval_helper(runtime::TokioCurrentThread); - } - - #[test] - fn unregistered_collect() { - // Arrange - let exporter = InMemoryMetricExporter::default(); - let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build(); - let mut rm = ResourceMetrics { - resource: Resource::empty(), - scope_metrics: Vec::new(), - }; - - // Act - let result = reader.collect(&mut rm); - - // Assert - assert!( - matches!(result.unwrap_err(), OTelSdkError::InternalFailure(err) if err == "reader is not registered") - ); - } - - fn collection_triggered_by_interval_helper(runtime: RT) - where - RT: crate::runtime::Runtime, - { - let interval = std::time::Duration::from_millis(1); - let exporter = InMemoryMetricExporter::default(); - let reader = PeriodicReader::builder(exporter.clone(), runtime) - .with_interval(interval) - .build(); - let (sender, receiver) = mpsc::channel(); - - // Act - let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); - let meter = meter_provider.meter("test"); - let _counter = meter - .u64_observable_counter("testcounter") - .with_callback(move |_| { - sender.send(()).expect("channel should still be open"); - }) - .build(); - - // Assert - receiver - .recv() - .expect("message should be available in channel, indicating a collection occurred"); - } -} diff --git a/opentelemetry-sdk/src/runtime.rs b/opentelemetry-sdk/src/runtime.rs index 8ae9c61df7..ba58ac314b 100644 --- a/opentelemetry-sdk/src/runtime.rs +++ b/opentelemetry-sdk/src/runtime.rs @@ -57,7 +57,15 @@ pub(crate) fn to_interval_stream( }) } -/// Runtime implementation, which works with Tokio's multi thread runtime. +/// Runtime implementation that works with Tokio. +/// +/// Automatically detects the current runtime flavor and handles both +/// multi-threaded and current-thread runtimes: +/// - **Multi-threaded runtime**: Spawns tasks directly on the tokio runtime. +/// - **Current-thread runtime**: Spawns a separate OS thread with its own +/// tokio current-thread runtime to avoid deadlocks. +/// - **No runtime available**: Spawns a separate OS thread with a new +/// tokio current-thread runtime. #[cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio"))] #[cfg_attr( docsrs, @@ -76,60 +84,35 @@ impl Runtime for Tokio { where F: Future + Send + 'static, { - #[allow(clippy::let_underscore_future)] - // we don't have to await on the returned future to execute - let _ = tokio::spawn(future); - } - - fn delay(&self, duration: Duration) -> impl Future + Send + 'static { - tokio::time::sleep(duration) - } -} - -/// Runtime implementation, which works with Tokio's current thread runtime. -#[cfg(all( - feature = "experimental_async_runtime", - feature = "rt-tokio-current-thread" -))] -#[cfg_attr( - docsrs, - doc(cfg(all( - feature = "experimental_async_runtime", - feature = "rt-tokio-current-thread" - ))) -)] -#[derive(Debug, Clone)] -pub struct TokioCurrentThread; - -#[cfg(all( - feature = "experimental_async_runtime", - feature = "rt-tokio-current-thread" -))] -#[cfg_attr( - docsrs, - doc(cfg(all( - feature = "experimental_async_runtime", - feature = "rt-tokio-current-thread" - ))) -)] -impl Runtime for TokioCurrentThread { - fn spawn(&self, future: F) - where - F: Future + Send + 'static, - { - // We cannot force push tracing in current thread tokio scheduler because we rely on - // BatchSpanProcessor to export spans in a background task, meanwhile we need to block the - // shutdown function so that the runtime will not finish the blocked task and kill any - // remaining tasks. But there is only one thread to run task, so it's a deadlock - // - // Thus, we spawn the background task in a separate thread. - std::thread::spawn(move || { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .expect("failed to create Tokio current thead runtime for OpenTelemetry batch processing"); - rt.block_on(future); - }); + match tokio::runtime::Handle::try_current() { + Ok(handle) => { + match handle.runtime_flavor() { + tokio::runtime::RuntimeFlavor::CurrentThread => { + // Single-threaded: spawn separate OS thread with own runtime + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("failed to create Tokio current thread runtime for OpenTelemetry"); + rt.block_on(future); + }); + } + _ => { + #[allow(clippy::let_underscore_future)] + let _ = tokio::spawn(future); + } + } + } + Err(_) => { + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("failed to create Tokio runtime for OpenTelemetry"); + rt.block_on(future); + }); + } + } } fn delay(&self, duration: Duration) -> impl Future + Send + 'static { @@ -183,10 +166,7 @@ pub trait TrySend: Sync + Send { fn try_send(&self, item: Self::Message) -> Result<(), TrySendError>; } -#[cfg(all( - feature = "experimental_async_runtime", - any(feature = "rt-tokio", feature = "rt-tokio-current-thread") -))] +#[cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio"))] impl TrySend for tokio::sync::mpsc::Sender { type Message = T; @@ -219,33 +199,6 @@ impl RuntimeChannel for Tokio { } } -#[cfg(all( - feature = "experimental_async_runtime", - feature = "rt-tokio-current-thread" -))] -#[cfg_attr( - docsrs, - doc(cfg(all( - feature = "experimental_async_runtime", - feature = "rt-tokio-current-thread" - ))) -)] -impl RuntimeChannel for TokioCurrentThread { - type Receiver = tokio_stream::wrappers::ReceiverStream; - type Sender = tokio::sync::mpsc::Sender; - - fn batch_message_channel( - &self, - capacity: usize, - ) -> (Self::Sender, Self::Receiver) { - let (sender, receiver) = tokio::sync::mpsc::channel(capacity); - ( - sender, - tokio_stream::wrappers::ReceiverStream::new(receiver), - ) - } -} - /// Runtime implementation for synchronous execution environments. /// /// This runtime can be used when executing in a non-async environment. diff --git a/opentelemetry-sdk/src/testing/trace/span_exporters.rs b/opentelemetry-sdk/src/testing/trace/span_exporters.rs index ed1ae6e7b8..35de079b97 100644 --- a/opentelemetry-sdk/src/testing/trace/span_exporters.rs +++ b/opentelemetry-sdk/src/testing/trace/span_exporters.rs @@ -88,7 +88,7 @@ impl Display for TestExportError { } } -#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))] +#[cfg(feature = "rt-tokio")] impl From> for TestExportError { fn from(err: tokio::sync::mpsc::error::SendError) -> Self { TestExportError(err.to_string()) diff --git a/opentelemetry-sdk/src/trace/mod.rs b/opentelemetry-sdk/src/trace/mod.rs index d259f24f82..24e5d1d17e 100644 --- a/opentelemetry-sdk/src/trace/mod.rs +++ b/opentelemetry-sdk/src/trace/mod.rs @@ -17,9 +17,6 @@ mod sampler; mod span; mod span_limit; mod span_processor; -#[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")] -/// Experimental feature to use async runtime with batch span processor. -pub mod span_processor_with_async_runtime; mod tracer; pub use config::Config; @@ -52,10 +49,6 @@ pub use tracer::SdkTracer as Tracer; // for back-compat else tracing-opentelemet #[cfg(feature = "jaeger_remote_sampler")] pub use sampler::{JaegerRemoteSampler, JaegerRemoteSamplerBuilder}; -#[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")] -#[cfg(test)] -mod runtime_tests; - #[cfg(all(test, feature = "testing"))] mod tests { use super::*; diff --git a/opentelemetry-sdk/src/trace/runtime_tests.rs b/opentelemetry-sdk/src/trace/runtime_tests.rs deleted file mode 100644 index e068a377fe..0000000000 --- a/opentelemetry-sdk/src/trace/runtime_tests.rs +++ /dev/null @@ -1,158 +0,0 @@ -// Note that all tests here should be marked as ignore so that it won't be picked up by default We -// need to run those tests one by one as the GlobalTracerProvider is a shared object between -// threads Use cargo test -- --ignored --test-threads=1 to run those tests. -#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))] -use crate::runtime::RuntimeChannel; -#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))] -use crate::trace::SpanExporter; -#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))] -use crate::{error::OTelSdkResult, runtime}; -#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))] -use opentelemetry::global::*; -#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))] -use opentelemetry::trace::Tracer; -#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))] -use std::fmt::Debug; -#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))] -use std::sync::atomic::{AtomicUsize, Ordering}; -#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))] -use std::sync::Arc; -#[derive(Debug)] -#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))] -struct SpanCountExporter { - span_count: Arc, -} - -#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))] -impl SpanExporter for SpanCountExporter { - async fn export(&self, batch: Vec) -> OTelSdkResult { - self.span_count.fetch_add(batch.len(), Ordering::SeqCst); - Ok(()) - } -} - -#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))] -impl SpanCountExporter { - fn new() -> SpanCountExporter { - SpanCountExporter { - span_count: Arc::new(AtomicUsize::new(0)), - } - } -} - -#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))] -fn build_batch_tracer_provider( - exporter: SpanCountExporter, - runtime: R, -) -> crate::trace::SdkTracerProvider { - use crate::trace::SdkTracerProvider; - let processor = crate::trace::span_processor_with_async_runtime::BatchSpanProcessor::builder( - exporter, runtime, - ) - .build(); - SdkTracerProvider::builder() - .with_span_processor(processor) - .build() -} - -#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))] -fn build_simple_tracer_provider(exporter: SpanCountExporter) -> crate::trace::SdkTracerProvider { - use crate::trace::SdkTracerProvider; - SdkTracerProvider::builder() - .with_simple_exporter(exporter) - .build() -} - -#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))] -async fn test_set_provider_in_tokio( - runtime: R, -) -> (Arc, crate::trace::SdkTracerProvider) { - let exporter = SpanCountExporter::new(); - let span_count = exporter.span_count.clone(); - let tracer_provider = build_batch_tracer_provider(exporter, runtime); - set_tracer_provider(tracer_provider.clone()); - let tracer = tracer("opentelemetery"); - - tracer.in_span("test", |_cx| {}); - - (span_count, tracer_provider) -} - -// When using `tokio::spawn` to spawn the worker task in batch processor -// -// multiple -> no shut down -> not export -// multiple -> shut down -> export -// single -> no shutdown -> not export -// single -> shutdown -> hang forever - -// When using |fut| tokio::task::spawn_blocking(|| futures::executor::block_on(fut)) -// to spawn the worker task in batch processor -// -// multiple -> no shutdown -> hang forever -// multiple -> shut down -> export -// single -> shut down -> export -// single -> no shutdown -> hang forever - -// Test if the multiple thread tokio runtime could exit successfully when not force flushing spans -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -#[ignore = "requires --test-threads=1"] -#[cfg(feature = "rt-tokio")] -async fn test_set_provider_multiple_thread_tokio() { - let (span_count, _) = test_set_provider_in_tokio(runtime::Tokio).await; - assert_eq!(span_count.load(Ordering::SeqCst), 0); -} - -// Test if the multiple thread tokio runtime could exit successfully when force flushing spans -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -#[ignore = "requires --test-threads=1"] -#[cfg(feature = "rt-tokio")] -async fn test_set_provider_multiple_thread_tokio_shutdown() { - let (span_count, tracer_provider) = test_set_provider_in_tokio(runtime::Tokio).await; - tracer_provider - .shutdown() - .expect("TracerProvider should shutdown properly"); - assert!(span_count.load(Ordering::SeqCst) > 0); -} - -// Test use simple processor in single thread tokio runtime. -// Expected to see the spans being exported to buffer -#[tokio::test] -#[ignore = "requires --test-threads=1"] -#[cfg(feature = "rt-tokio")] -async fn test_set_provider_single_thread_tokio_with_simple_processor() { - let exporter = SpanCountExporter::new(); - let span_count = exporter.span_count.clone(); - let tracer_provider = build_simple_tracer_provider(exporter); - set_tracer_provider(tracer_provider.clone()); - let tracer = tracer("opentelemetry"); - - tracer.in_span("test", |_cx| {}); - - tracer_provider - .shutdown() - .expect("TracerProvider should shutdown properly"); - - assert!(span_count.load(Ordering::SeqCst) > 0); -} - -// Test if the single thread tokio runtime could exit successfully when not force flushing spans -#[tokio::test] -#[ignore = "requires --test-threads=1"] -#[cfg(feature = "rt-tokio-current-thread")] -async fn test_set_provider_single_thread_tokio() { - let (span_count, _) = test_set_provider_in_tokio(runtime::TokioCurrentThread).await; - assert_eq!(span_count.load(Ordering::SeqCst), 0) -} - -// Test if the single thread tokio runtime could exit successfully when force flushing spans. -#[tokio::test] -#[ignore = "requires --test-threads=1"] -#[cfg(feature = "rt-tokio-current-thread")] -async fn test_set_provider_single_thread_tokio_shutdown() { - let (span_count, tracer_provider) = - test_set_provider_in_tokio(runtime::TokioCurrentThread).await; - tracer_provider - .shutdown() - .expect("TracerProvider should shutdown properly"); - assert!(span_count.load(Ordering::SeqCst) > 0) -} diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 6d7a39901a..2b8ddfa03f 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -38,6 +38,7 @@ use crate::error::{OTelSdkError, OTelSdkResult}; use crate::resource::Resource; use crate::trace::Span; use crate::trace::{SpanData, SpanExporter}; +use crate::util::BlockingStrategy; use opentelemetry::Context; use opentelemetry::{otel_debug, otel_error, otel_warn}; use std::cmp::min; @@ -347,6 +348,7 @@ impl BatchSpanProcessor { let max_export_batch_size = config.max_export_batch_size; let current_batch_size = Arc::new(AtomicUsize::new(0)); let current_batch_size_for_thread = current_batch_size.clone(); + let blocking_strategy = BlockingStrategy::new(); let handle = thread::Builder::new() .name("OpenTelemetry.Traces.BatchProcessor".to_string()) @@ -361,6 +363,7 @@ impl BatchSpanProcessor { let mut spans = Vec::with_capacity(config.max_export_batch_size); let mut last_export_time = Instant::now(); let current_batch_size = current_batch_size_for_thread; + let blocking_strategy = blocking_strategy; loop { let remaining_time_option = config .scheduled_delay @@ -384,6 +387,7 @@ impl BatchSpanProcessor { &mut last_export_time, ¤t_batch_size, &config, + &blocking_strategy, ); } BatchMessage::ForceFlush(sender) => { @@ -395,6 +399,7 @@ impl BatchSpanProcessor { &mut last_export_time, ¤t_batch_size, &config, + &blocking_strategy, ); let _ = sender.send(result); } @@ -407,6 +412,7 @@ impl BatchSpanProcessor { &mut last_export_time, ¤t_batch_size, &config, + &blocking_strategy, ); let _ = exporter.shutdown(); let _ = sender.send(result); @@ -436,6 +442,7 @@ impl BatchSpanProcessor { &mut last_export_time, ¤t_batch_size, &config, + &blocking_strategy, ); } Err(RecvTimeoutError::Disconnected) => { @@ -490,6 +497,7 @@ impl BatchSpanProcessor { last_export_time: &mut Instant, current_batch_size: &AtomicUsize, config: &BatchConfig, + blocking_strategy: &BlockingStrategy, ) -> OTelSdkResult where E: SpanExporter + Send + Sync + 'static, @@ -510,7 +518,7 @@ impl BatchSpanProcessor { let count_of_spans = spans.len(); // Count of spans that will be exported total_exported_spans += count_of_spans; - result = Self::export_batch_sync(exporter, spans, last_export_time); // This method clears the spans vec after exporting + result = Self::export_batch_sync(exporter, spans, last_export_time, blocking_strategy); // This method clears the spans vec after exporting current_batch_size.fetch_sub(count_of_spans, Ordering::Relaxed); } @@ -522,6 +530,7 @@ impl BatchSpanProcessor { exporter: &E, batch: &mut Vec, last_export_time: &mut Instant, + blocking_strategy: &BlockingStrategy, ) -> OTelSdkResult where E: SpanExporter + ?Sized, @@ -539,7 +548,7 @@ impl BatchSpanProcessor { // every export. See if this can be optimized by // *not* requiring ownership in the exporter. let export = exporter.export(batch.split_off(0)); - let export_result = futures_executor::block_on(export); + let export_result = blocking_strategy.block_on(export); match export_result { Ok(_) => OTelSdkResult::Ok(()), @@ -861,23 +870,6 @@ impl BatchConfigBuilder { self } - #[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")] - /// Set max_concurrent_exports for [`BatchConfigBuilder`]. - /// It's the maximum number of concurrent exports. - /// Limits the number of spawned tasks for exports and thus memory consumed by an exporter. - /// The default value is 1. - /// If the max_concurrent_exports value is default value, it will cause exports to be performed - /// synchronously on the BatchSpanProcessor task. - /// The default value is 1. - /// - /// Corresponding environment variable: `OTEL_BSP_MAX_CONCURRENT_EXPORTS`. - /// - /// Note: Programmatically setting this will override any value set via the environment variable. - pub fn with_max_concurrent_exports(mut self, max_concurrent_exports: usize) -> Self { - self.max_concurrent_exports = max_concurrent_exports; - self - } - /// Set scheduled_delay_duration for [`BatchConfigBuilder`]. /// It's the delay interval in milliseconds between two consecutive processing of batches. /// The default value is 5000 milliseconds. @@ -890,19 +882,6 @@ impl BatchConfigBuilder { self } - /// Set max_export_timeout for [`BatchConfigBuilder`]. - /// It's the maximum duration to export a batch of data. - /// The The default value is 30000 milliseconds. - /// - /// Corresponding environment variable: `OTEL_BSP_EXPORT_TIMEOUT`. - /// - /// Note: Programmatically setting this will override any value set via the environment variable. - #[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")] - pub fn with_max_export_timeout(mut self, max_export_timeout: Duration) -> Self { - self.max_export_timeout = max_export_timeout; - self - } - /// Builds a `BatchConfig` enforcing the following invariants: /// * `max_export_batch_size` must be less than or equal to `max_queue_size`. pub fn build(self) -> BatchConfig { @@ -1085,23 +1064,12 @@ mod tests { let config = BatchConfigBuilder::default() .with_max_export_batch_size(512) .with_max_queue_size(2048) - .with_scheduled_delay(Duration::from_millis(1000)); - #[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")] - let config = { - config - .with_max_concurrent_exports(10) - .with_max_export_timeout(Duration::from_millis(2000)) - }; - let config = config.build(); + .with_scheduled_delay(Duration::from_millis(1000)) + .build(); assert_eq!(config.max_export_batch_size, 512); assert_eq!(config.max_queue_size, 2048); assert_eq!(config.scheduled_delay, Duration::from_millis(1000)); - #[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")] - { - assert_eq!(config.max_concurrent_exports, 10); - assert_eq!(config.max_export_timeout, Duration::from_millis(2000)); - } }); } @@ -1142,22 +1110,11 @@ mod tests { let batch = BatchConfigBuilder::default() .with_max_export_batch_size(10) .with_scheduled_delay(Duration::from_millis(10)) - .with_max_queue_size(10); - #[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")] - let batch = { - batch - .with_max_concurrent_exports(10) - .with_max_export_timeout(Duration::from_millis(10)) - }; - let batch = batch.build(); + .with_max_queue_size(10) + .build(); assert_eq!(batch.max_export_batch_size, 10); assert_eq!(batch.scheduled_delay, Duration::from_millis(10)); assert_eq!(batch.max_queue_size, 10); - #[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")] - { - assert_eq!(batch.max_concurrent_exports, 10); - assert_eq!(batch.max_export_timeout, Duration::from_millis(10)); - } } // Helper function to create a default test span diff --git a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs deleted file mode 100644 index b294f74043..0000000000 --- a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs +++ /dev/null @@ -1,706 +0,0 @@ -use crate::error::{OTelSdkError, OTelSdkResult}; -use crate::resource::Resource; -use crate::runtime::{to_interval_stream, RuntimeChannel, TrySend}; -use crate::trace::BatchConfig; -use crate::trace::Span; -use crate::trace::SpanProcessor; -use crate::trace::{SpanData, SpanExporter}; -use futures_channel::oneshot; -use futures_util::{ - future::{self, BoxFuture, Either}, - pin_mut, select, - stream::{self, FusedStream, FuturesUnordered}, - StreamExt as _, -}; -use opentelemetry::Context; -use opentelemetry::{otel_debug, otel_error, otel_warn}; -use std::fmt; -use std::sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, -}; -use std::time::Duration; -use tokio::sync::RwLock; - -/// A [`SpanProcessor`] that asynchronously buffers finished spans and reports -/// them at a preconfigured interval. -/// -/// Batch span processors need to run a background task to collect and send -/// spans. Different runtimes need different ways to handle the background task. -/// -/// Note: Configuring an opentelemetry `Runtime` that's not compatible with the -/// underlying runtime can cause deadlocks (see tokio section). -/// -/// ### Use with Tokio -/// -/// Tokio currently offers two different schedulers. One is -/// `current_thread_scheduler`, the other is `multiple_thread_scheduler`. Both -/// of them default to use batch span processors to install span exporters. -/// -/// Tokio's `current_thread_scheduler` can cause the program to hang forever if -/// blocking work is scheduled with other tasks in the same runtime. To avoid -/// this, be sure to enable the `rt-tokio-current-thread` feature in this crate -/// if you are using that runtime (e.g. users of actix-web), and blocking tasks -/// will then be scheduled on a different thread. -/// -/// # Examples -/// -/// This processor can be configured with an [`executor`] of your choice to -/// batch and upload spans asynchronously when they end. If you have added a -/// library like [`tokio`], you can pass in their respective -/// `spawn` and `interval` functions to have batching performed in those -/// contexts. -/// -/// ``` -/// # #[cfg(feature="tokio")] -/// # { -/// use opentelemetry::global; -/// use opentelemetry_sdk::{runtime, testing::trace::NoopSpanExporter, trace}; -/// use opentelemetry_sdk::trace::BatchConfigBuilder; -/// use std::time::Duration; -/// use opentelemetry_sdk::trace::span_processor_with_async_runtime::BatchSpanProcessor; -/// -/// #[tokio::main] -/// async fn main() { -/// // Configure your preferred exporter -/// let exporter = NoopSpanExporter::new(); -/// -/// // Create a batch span processor using an exporter and a runtime -/// let batch = BatchSpanProcessor::builder(exporter, runtime::Tokio) -/// .with_batch_config(BatchConfigBuilder::default().with_max_queue_size(4096).build()) -/// .build(); -/// -/// // Then use the `with_batch_exporter` method to have the provider export spans in batches. -/// let provider = trace::SdkTracerProvider::builder() -/// .with_span_processor(batch) -/// .build(); -/// -/// let _ = global::set_tracer_provider(provider); -/// } -/// # } -/// ``` -/// -/// [`executor`]: https://docs.rs/futures/0.3/futures/executor/index.html -/// [`tokio`]: https://tokio.rs -pub struct BatchSpanProcessor { - message_sender: R::Sender, - - // Track dropped spans - dropped_spans_count: AtomicUsize, - - // Track the maximum queue size that was configured for this processor - max_queue_size: usize, -} - -impl fmt::Debug for BatchSpanProcessor { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("BatchSpanProcessor") - .field("message_sender", &self.message_sender) - .finish() - } -} - -impl SpanProcessor for BatchSpanProcessor { - fn on_start(&self, _span: &mut Span, _cx: &Context) { - // Ignored - } - - fn on_end(&self, span: SpanData) { - if !span.span_context.is_sampled() { - return; - } - - let result = self.message_sender.try_send(BatchMessage::ExportSpan(span)); - - // If the queue is full, and we can't buffer a span - if result.is_err() { - // Increment the number of dropped spans. If this is the first time we've had to drop, - // emit a warning. - if self.dropped_spans_count.fetch_add(1, Ordering::Relaxed) == 0 { - otel_warn!(name: "BatchSpanProcessor.SpanDroppingStarted", - message = "Beginning to drop span messages due to full/internal errors. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total spans dropped."); - } - } - } - - fn force_flush(&self) -> OTelSdkResult { - let (res_sender, res_receiver) = oneshot::channel(); - self.message_sender - .try_send(BatchMessage::Flush(Some(res_sender))) - .map_err(|err| { - OTelSdkError::InternalFailure(format!("Failed to send flush message: {err}")) - })?; - - futures_executor::block_on(res_receiver).map_err(|err| { - OTelSdkError::InternalFailure(format!("Flush response channel error: {err}")) - })? - } - - fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { - let dropped_spans = self.dropped_spans_count.load(Ordering::Relaxed); - let max_queue_size = self.max_queue_size; - if dropped_spans > 0 { - otel_warn!( - name: "BatchSpanProcessor.Shutdown", - dropped_spans = dropped_spans, - max_queue_size = max_queue_size, - message = "Spans were dropped due to a full or closed queue. The count represents the total count of span records dropped in the lifetime of the BatchSpanProcessor. Consider increasing the queue size and/or decrease delay between intervals." - ); - } - - let (res_sender, res_receiver) = oneshot::channel(); - self.message_sender - .try_send(BatchMessage::Shutdown(res_sender)) - .map_err(|err| { - OTelSdkError::InternalFailure(format!("Failed to send shutdown message: {err}")) - })?; - - futures_executor::block_on(res_receiver).map_err(|err| { - OTelSdkError::InternalFailure(format!("Shutdown response channel error: {err}")) - })? - } - - fn set_resource(&mut self, resource: &Resource) { - let resource = Arc::new(resource.clone()); - let _ = self - .message_sender - .try_send(BatchMessage::SetResource(resource)); - } -} - -/// Messages sent between application thread and batch span processor's work thread. -// In this enum the size difference is not a concern because: -// 1. If we wrap SpanData into a pointer, it will add overhead when processing. -// 2. Most of the messages will be ExportSpan. -#[allow(clippy::large_enum_variant)] -#[derive(Debug)] -enum BatchMessage { - /// Export spans, usually called when span ends - ExportSpan(SpanData), - /// Flush the current buffer to the backend, it can be triggered by - /// pre configured interval or a call to `force_push` function. - Flush(Option>), - /// Shut down the worker thread, push all spans in buffer to the backend. - Shutdown(oneshot::Sender), - /// Set the resource for the exporter. - SetResource(Arc), -} - -struct BatchSpanProcessorInternal { - spans: Vec, - export_tasks: FuturesUnordered>, - runtime: R, - config: BatchConfig, - // TODO: Redesign the `SpanExporter` trait to use immutable references (`&self`) - // for all methods. This would allow us to remove the `RwLock` and just use `Arc`, - // similar to how `crate::logs::LogExporter` is implemented. - exporter: Arc>, -} - -impl BatchSpanProcessorInternal { - async fn flush(&mut self, res_channel: Option>) { - let export_result = Self::export( - self.spans.split_off(0), - self.exporter.clone(), - self.runtime.clone(), - self.config.max_export_timeout, - ) - .await; - let task = Box::pin(async move { - if let Some(channel) = res_channel { - // If a response channel is provided, attempt to send the export result through it. - if let Err(result) = channel.send(export_result) { - otel_debug!( - name: "BatchSpanProcessor.Flush.SendResultError", - reason = format!("{:?}", result) - ); - } - } else if let Err(err) = export_result { - // If no channel is provided and the export operation encountered an error, - // log the error directly here. - // TODO: Consider returning the status instead of logging it. - otel_error!( - name: "BatchSpanProcessor.Flush.ExportError", - reason = format!("{:?}", err), - message = "Failed during the export process" - ); - } - - Ok(()) - }); - - if self.config.max_concurrent_exports == 1 { - let _ = task.await; - } else { - self.export_tasks.push(task); - while self.export_tasks.next().await.is_some() {} - } - } - - /// Process a single message - /// - /// A return value of false indicates shutdown - async fn process_message(&mut self, message: BatchMessage) -> bool { - match message { - // Span has finished, add to buffer of pending spans. - BatchMessage::ExportSpan(span) => { - self.spans.push(span); - - if self.spans.len() == self.config.max_export_batch_size { - // If concurrent exports are saturated, wait for one to complete. - if !self.export_tasks.is_empty() - && self.export_tasks.len() == self.config.max_concurrent_exports - { - self.export_tasks.next().await; - } - - let batch = self.spans.split_off(0); - let exporter = self.exporter.clone(); - let runtime = self.runtime.clone(); - let max_export_timeout = self.config.max_export_timeout; - - let task = async move { - if let Err(err) = - Self::export(batch, exporter, runtime, max_export_timeout).await - { - otel_error!( - name: "BatchSpanProcessor.Export.Error", - reason = format!("{}", err) - ); - } - - Ok(()) - }; - - // Special case when not using concurrent exports - if self.config.max_concurrent_exports == 1 { - let _ = task.await; - } else { - self.export_tasks.push(Box::pin(task)); - } - } - } - // Span batch interval time reached or a force flush has been invoked, export - // current spans. - // - // This is a hint to ensure that any tasks associated with Spans for which the - // SpanProcessor had already received events prior to the call to ForceFlush - // SHOULD be completed as soon as possible, preferably before returning from - // this method. - // - // In particular, if any SpanProcessor has any associated exporter, it SHOULD - // try to call the exporter's Export with all spans for which this was not - // already done and then invoke ForceFlush on it. The built-in SpanProcessors - // MUST do so. If a timeout is specified (see below), the SpanProcessor MUST - // prioritize honoring the timeout over finishing all calls. It MAY skip or - // abort some or all Export or ForceFlush calls it has made to achieve this - // goal. - // - // NB: `force_flush` is not currently implemented on exporters; the equivalent - // would be waiting for exporter tasks to complete. In the case of - // channel-coupled exporters, they will need a `force_flush` implementation to - // properly block. - BatchMessage::Flush(res_channel) => { - self.flush(res_channel).await; - } - // Stream has terminated or processor is shutdown, return to finish execution. - BatchMessage::Shutdown(ch) => { - self.flush(Some(ch)).await; - let _ = self.exporter.write().await.shutdown(); - return false; - } - // propagate the resource - BatchMessage::SetResource(resource) => { - self.exporter.write().await.set_resource(&resource); - } - } - true - } - - async fn export( - batch: Vec, - exporter: Arc>, - runtime: R, - max_export_timeout: Duration, - ) -> OTelSdkResult { - // Batch size check for flush / shutdown. Those methods may be called - // when there's no work to do. - if batch.is_empty() { - return Ok(()); - } - - let exporter_guard = exporter.read().await; - let export = exporter_guard.export(batch); - let timeout = runtime.delay(max_export_timeout); - - pin_mut!(export); - pin_mut!(timeout); - - match future::select(export, timeout).await { - Either::Left((export_res, _)) => export_res, - Either::Right((_, _)) => Err(OTelSdkError::Timeout(max_export_timeout)), - } - } - - async fn run(mut self, mut messages: impl FusedStream + Unpin) { - loop { - select! { - // FuturesUnordered implements Fuse intelligently such that it - // will become eligible again once new tasks are added to it. - _ = self.export_tasks.next() => { - // An export task completed; do we need to do anything with it? - }, - message = messages.next() => { - match message { - Some(message) => { - if !self.process_message(message).await { - break; - } - }, - None => break, - } - }, - } - } - } -} - -impl BatchSpanProcessor { - pub(crate) fn new(exporter: E, config: BatchConfig, runtime: R) -> Self - where - E: SpanExporter + Send + Sync + 'static, - { - let (message_sender, message_receiver) = - runtime.batch_message_channel(config.max_queue_size); - - let max_queue_size = config.max_queue_size; - - let inner_runtime = runtime.clone(); - // Spawn worker process via user-defined spawn function. - runtime.spawn(async move { - // Timer will take a reference to the current runtime, so its important we do this within the - // runtime.spawn() - let ticker = to_interval_stream(inner_runtime.clone(), config.scheduled_delay) - .skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval. - .map(|_| BatchMessage::Flush(None)); - let timeout_runtime = inner_runtime.clone(); - - let messages = Box::pin(stream::select(message_receiver, ticker)); - let processor = BatchSpanProcessorInternal { - spans: Vec::new(), - export_tasks: FuturesUnordered::new(), - runtime: timeout_runtime, - config, - exporter: Arc::new(RwLock::new(exporter)), - }; - - processor.run(messages).await - }); - - // Return batch processor with link to worker - BatchSpanProcessor { - message_sender, - dropped_spans_count: AtomicUsize::new(0), - max_queue_size, - } - } - - /// Create a new batch processor builder - pub fn builder(exporter: E, runtime: R) -> BatchSpanProcessorBuilder - where - E: SpanExporter, - { - BatchSpanProcessorBuilder { - exporter, - config: Default::default(), - runtime, - } - } -} - -/// A builder for creating [`BatchSpanProcessor`] instances. -/// -#[derive(Debug)] -pub struct BatchSpanProcessorBuilder { - exporter: E, - config: BatchConfig, - runtime: R, -} - -impl BatchSpanProcessorBuilder -where - E: SpanExporter + 'static, - R: RuntimeChannel, -{ - /// Set the BatchConfig for [BatchSpanProcessorBuilder] - pub fn with_batch_config(self, config: BatchConfig) -> Self { - BatchSpanProcessorBuilder { config, ..self } - } - - /// Build a batch processor - pub fn build(self) -> BatchSpanProcessor { - BatchSpanProcessor::new(self.exporter, self.config, self.runtime) - } -} - -#[cfg(all(test, feature = "testing", feature = "trace"))] -mod tests { - // cargo test trace::span_processor::tests:: --features=testing - use super::{BatchSpanProcessor, SpanProcessor}; - use crate::error::OTelSdkResult; - use crate::runtime; - use crate::testing::trace::{new_test_export_span_data, new_tokio_test_exporter}; - use crate::trace::span_processor::{ - OTEL_BSP_EXPORT_TIMEOUT, OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_MAX_QUEUE_SIZE, - OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BSP_SCHEDULE_DELAY, OTEL_BSP_SCHEDULE_DELAY_DEFAULT, - }; - use crate::trace::{BatchConfig, BatchConfigBuilder, InMemorySpanExporterBuilder}; - use crate::trace::{SpanData, SpanExporter}; - use futures_util::Future; - use std::fmt::Debug; - use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; - use std::sync::Arc; - use std::time::Duration; - - struct BlockingExporter { - delay_for: Duration, - delay_fn: D, - } - - impl Debug for BlockingExporter - where - D: Fn(Duration) -> DS + 'static + Send + Sync, - DS: Future + Send + Sync + 'static, - { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_str("blocking exporter for testing") - } - } - - impl SpanExporter for BlockingExporter - where - D: Fn(Duration) -> DS + 'static + Send + Sync, - DS: Future + Send + Sync + 'static, - { - async fn export(&self, _batch: Vec) -> OTelSdkResult { - (self.delay_fn)(self.delay_for).await; - Ok(()) - } - } - - /// Exporter that records whether two exports overlap in time. - struct TrackingExporter { - /// Artificial delay to keep each export alive for a while. - delay: Duration, - /// Current number of in-flight exports. - active: Arc, - /// Set to true the first time we see overlap. - concurrent_seen: Arc, - } - - impl Debug for TrackingExporter { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_str("tracking exporter") - } - } - - impl SpanExporter for TrackingExporter { - async fn export(&self, _batch: Vec) -> crate::error::OTelSdkResult { - // Increment in-flight counter and note any overlap. - let inflight = self.active.fetch_add(1, Ordering::SeqCst) + 1; - if inflight > 1 { - self.concurrent_seen.store(true, Ordering::SeqCst); - } - - // Keep the export "busy" for a bit. - tokio::time::sleep(self.delay).await; - - // Decrement counter. - self.active.fetch_sub(1, Ordering::SeqCst); - Ok(()) - } - } - - #[test] - fn test_build_batch_span_processor_builder() { - let mut env_vars = vec![ - (OTEL_BSP_MAX_EXPORT_BATCH_SIZE, Some("500")), - (OTEL_BSP_SCHEDULE_DELAY, Some("I am not number")), - (OTEL_BSP_EXPORT_TIMEOUT, Some("2046")), - ]; - temp_env::with_vars(env_vars.clone(), || { - let builder = BatchSpanProcessor::builder( - InMemorySpanExporterBuilder::new().build(), - runtime::Tokio, - ); - // export batch size cannot exceed max queue size - assert_eq!(builder.config.max_export_batch_size, 500); - assert_eq!( - builder.config.scheduled_delay, - OTEL_BSP_SCHEDULE_DELAY_DEFAULT - ); - assert_eq!( - builder.config.max_queue_size, - OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT - ); - assert_eq!( - builder.config.max_export_timeout, - Duration::from_millis(2046) - ); - }); - - env_vars.push((OTEL_BSP_MAX_QUEUE_SIZE, Some("120"))); - - temp_env::with_vars(env_vars, || { - let builder = BatchSpanProcessor::builder( - InMemorySpanExporterBuilder::new().build(), - runtime::Tokio, - ); - assert_eq!(builder.config.max_export_batch_size, 120); - assert_eq!(builder.config.max_queue_size, 120); - }); - } - - #[tokio::test] - async fn test_batch_span_processor() { - let (exporter, mut export_receiver, _shutdown_receiver) = new_tokio_test_exporter(); - let config = BatchConfigBuilder::default() - .with_scheduled_delay(Duration::from_secs(60 * 60 * 24)) // set the tick to 24 hours so we know the span must be exported via force_flush - .build(); - let processor = BatchSpanProcessor::new(exporter, config, runtime::TokioCurrentThread); - let handle = tokio::spawn(async move { - loop { - if let Some(span) = export_receiver.recv().await { - assert_eq!(span.span_context, new_test_export_span_data().span_context); - break; - } - } - }); - tokio::time::sleep(Duration::from_secs(1)).await; // skip the first - processor.on_end(new_test_export_span_data()); - let flush_res = processor.force_flush(); - assert!(flush_res.is_ok()); - let _shutdown_result = processor.shutdown(); - - assert!( - tokio::time::timeout(Duration::from_secs(5), handle) - .await - .is_ok(), - "timed out in 5 seconds. force_flush may not export any data when called" - ); - } - - // If `time_out` is `true`, then the export should fail with a timeout. - // Else, the exporter should be able to export within the timeout duration. - async fn timeout_test_tokio(time_out: bool) { - let config = BatchConfig { - max_export_timeout: Duration::from_millis(if time_out { 5 } else { 60 }), - scheduled_delay: Duration::from_secs(60 * 60 * 24), // set the tick to 24 hours so we know the span must be exported via force_flush, - ..Default::default() - }; - let exporter = BlockingExporter { - delay_for: Duration::from_millis(if !time_out { 5 } else { 60 }), - delay_fn: tokio::time::sleep, - }; - let processor = BatchSpanProcessor::new(exporter, config, runtime::TokioCurrentThread); - tokio::time::sleep(Duration::from_secs(1)).await; // skip the first - processor.on_end(new_test_export_span_data()); - let flush_res = processor.force_flush(); - if time_out { - assert!(flush_res.is_err()); - } else { - assert!(flush_res.is_ok()); - } - let shutdown_res = processor.shutdown(); - assert!(shutdown_res.is_ok()); - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_timeout_tokio_timeout() { - // If time_out is true, then we ask exporter to block for 60s and set timeout to 5s. - // If time_out is false, then we ask the exporter to block for 5s and set timeout to 60s. - // Either way, the test should be finished within 5s. - timeout_test_tokio(true).await; - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_timeout_tokio_not_timeout() { - timeout_test_tokio(false).await; - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_concurrent_exports_expected() { - // Shared state for the exporter. - let active = Arc::new(AtomicUsize::new(0)); - let concurrent_seen = Arc::new(AtomicBool::new(false)); - - let exporter = TrackingExporter { - delay: Duration::from_millis(50), - active: active.clone(), - concurrent_seen: concurrent_seen.clone(), - }; - - // Intentionally tiny batch-size so every span forces an export. - let config = BatchConfig { - max_export_batch_size: 1, - max_queue_size: 16, - scheduled_delay: Duration::from_secs(3600), // effectively disabled - max_export_timeout: Duration::from_secs(5), - max_concurrent_exports: 2, // what we want to verify - }; - - // Spawn the processor. - let processor = BatchSpanProcessor::new(exporter, config, runtime::Tokio); - - // Finish three spans in rapid succession. - processor.on_end(new_test_export_span_data()); - processor.on_end(new_test_export_span_data()); - processor.on_end(new_test_export_span_data()); - - // Wait until everything has been exported. - processor.force_flush().expect("force flush failed"); - processor.shutdown().expect("shutdown failed"); - - // Expect at least one period with >1 export in flight. - assert!( - concurrent_seen.load(Ordering::SeqCst), - "exports never overlapped, processor is still serialising them" - ); - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn test_exports_serial_when_max_concurrent_exports_1() { - let active = Arc::new(AtomicUsize::new(0)); - let concurrent_seen = Arc::new(AtomicBool::new(false)); - - let exporter = TrackingExporter { - delay: Duration::from_millis(50), - active: active.clone(), - concurrent_seen: concurrent_seen.clone(), - }; - - let config = BatchConfig { - max_export_batch_size: 1, - max_queue_size: 16, - scheduled_delay: Duration::from_secs(3600), - max_export_timeout: Duration::from_secs(5), - max_concurrent_exports: 1, // what we want to verify - }; - - let processor = BatchSpanProcessor::new(exporter, config, runtime::Tokio); - - // Finish several spans quickly. - processor.on_end(new_test_export_span_data()); - processor.on_end(new_test_export_span_data()); - processor.on_end(new_test_export_span_data()); - - processor.force_flush().expect("force flush failed"); - processor.shutdown().expect("shutdown failed"); - - // There must never have been more than one export in flight. - assert!( - !concurrent_seen.load(Ordering::SeqCst), - "exports overlapped even though max_concurrent_exports was 1" - ); - } -} diff --git a/opentelemetry-sdk/src/util.rs b/opentelemetry-sdk/src/util.rs index adad9cc6d5..68b0d3eb08 100644 --- a/opentelemetry-sdk/src/util.rs +++ b/opentelemetry-sdk/src/util.rs @@ -1,9 +1,50 @@ //! Internal utilities /// Helper which wraps `tokio::time::interval` and makes it return a stream -#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))] +#[cfg(feature = "rt-tokio")] pub fn tokio_interval_stream( period: std::time::Duration, ) -> tokio_stream::wrappers::IntervalStream { tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(period)) } + +/// Strategy for blocking on async futures from synchronous contexts. +/// +/// When constructed within a tokio runtime, captures the runtime handle +/// and enters the runtime context via [`tokio::runtime::Handle::enter()`] +/// before blocking with [`futures_executor::block_on()`]. This makes tokio +/// types (spawn, timers, IO resources) available on dedicated background +/// threads without taking ownership of the reactor — IO continues to be +/// driven by the runtime's own threads. +/// +/// Falls back to plain [`futures_executor::block_on()`] when no tokio runtime +/// is available (e.g., non-tokio environments). +#[cfg(any(feature = "trace", feature = "logs", feature = "metrics"))] +#[derive(Clone, Debug)] +pub(crate) enum BlockingStrategy { + #[cfg(feature = "rt-tokio")] + TokioHandle(tokio::runtime::Handle), + FuturesExecutor, +} + +#[cfg(any(feature = "trace", feature = "logs", feature = "metrics"))] +impl BlockingStrategy { + pub(crate) fn new() -> Self { + #[cfg(feature = "rt-tokio")] + if let Ok(handle) = tokio::runtime::Handle::try_current() { + return Self::TokioHandle(handle); + } + Self::FuturesExecutor + } + + pub(crate) fn block_on(&self, future: F) -> F::Output { + match self { + #[cfg(feature = "rt-tokio")] + Self::TokioHandle(handle) => { + let _guard = handle.enter(); + futures_executor::block_on(future) + } + Self::FuturesExecutor => futures_executor::block_on(future), + } + } +}