Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions opentelemetry-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,13 @@ trace = ["opentelemetry/trace", "rand", "percent-encoding"]
jaeger_remote_sampler = ["trace", "opentelemetry-http", "http", "serde", "serde_json", "url", "experimental_async_runtime"]
logs = ["opentelemetry/logs"]
metrics = ["opentelemetry/metrics"]
testing = ["opentelemetry/testing", "trace", "metrics", "logs", "rt-tokio", "rt-tokio-current-thread", "tokio/macros", "tokio/rt-multi-thread"]
testing = ["opentelemetry/testing", "trace", "metrics", "logs", "rt-tokio", "tokio/macros", "tokio/rt-multi-thread"]
experimental_async_runtime = []
rt-tokio = ["tokio/rt", "tokio/time", "tokio-stream", "experimental_async_runtime"]
rt-tokio-current-thread = ["tokio/rt", "tokio/time", "tokio-stream", "experimental_async_runtime"]
internal-logs = ["opentelemetry/internal-logs"]
experimental_metrics_periodicreader_with_async_runtime = ["metrics", "experimental_async_runtime"]
spec_unstable_metrics_views = ["metrics"]
experimental_metrics_custom_reader = ["metrics"]
experimental_logs_batch_log_processor_with_async_runtime = ["logs", "experimental_async_runtime"]
experimental_logs_concurrent_log_processor = ["logs"]
experimental_trace_batch_span_processor_with_async_runtime = ["tokio/sync", "trace", "experimental_async_runtime"]
experimental_metrics_disable_name_validation = ["metrics"]
bench_profiling = []

Expand Down
3 changes: 1 addition & 2 deletions opentelemetry-sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@
//! metrics aggregation can be added via the following flags:
//!
//! * `experimental_async_runtime`: Enables the experimental `Runtime` trait and related functionality.
//! * `rt-tokio`: Spawn telemetry tasks using [tokio]'s multi-thread runtime.
//! * `rt-tokio-current-thread`: Spawn telemetry tasks on a separate runtime so that the main runtime won't be blocked.
//! * `rt-tokio`: Spawn telemetry tasks using [tokio]. Automatically detects the runtime flavor (multi-thread or current-thread).
//!
//! [tokio]: https://crates.io/crates/tokio
#![warn(
Expand Down
85 changes: 14 additions & 71 deletions opentelemetry-sdk/src/logs/batch_log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use crate::error::{OTelSdkError, OTelSdkResult};
use crate::logs::log_processor::LogProcessor;
use crate::util::BlockingStrategy;
use crate::{
logs::{LogBatch, LogExporter, SdkLogRecord},
Resource,
Expand All @@ -40,12 +41,6 @@ use std::{
pub(crate) const OTEL_BLRP_SCHEDULE_DELAY: &str = "OTEL_BLRP_SCHEDULE_DELAY";
/// Default delay interval between two consecutive exports.
pub(crate) const OTEL_BLRP_SCHEDULE_DELAY_DEFAULT: Duration = Duration::from_millis(1_000);
/// Maximum allowed time to export data.
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
pub(crate) const OTEL_BLRP_EXPORT_TIMEOUT: &str = "OTEL_BLRP_EXPORT_TIMEOUT";
/// Default maximum allowed time to export data.
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
pub(crate) const OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT: Duration = Duration::from_millis(30_000);
/// Maximum queue size.
pub(crate) const OTEL_BLRP_MAX_QUEUE_SIZE: &str = "OTEL_BLRP_MAX_QUEUE_SIZE";
/// Default maximum queue size.
Expand Down Expand Up @@ -342,6 +337,7 @@ impl BatchLogProcessor {
let max_export_batch_size = config.max_export_batch_size;
let current_batch_size = Arc::new(AtomicUsize::new(0));
let current_batch_size_for_thread = current_batch_size.clone();
let blocking_strategy = BlockingStrategy::new();

let handle = thread::Builder::new()
.name("OpenTelemetry.Logs.BatchProcessor".to_string())
Expand All @@ -368,6 +364,7 @@ impl BatchLogProcessor {
last_export_time: &mut Instant,
current_batch_size: &AtomicUsize,
max_export_size: usize,
blocking_strategy: &BlockingStrategy,
) -> OTelSdkResult
where
E: LogExporter + Send + Sync + 'static,
Expand All @@ -388,7 +385,8 @@ impl BatchLogProcessor {
let count_of_logs = logs.len(); // Count of logs that will be exported
total_exported_logs += count_of_logs;

result = export_batch_sync(exporter, logs, last_export_time); // This method clears the logs vec after exporting
result =
export_batch_sync(exporter, logs, last_export_time, blocking_strategy); // This method clears the logs vec after exporting

current_batch_size.fetch_sub(count_of_logs, Ordering::Relaxed);
}
Expand Down Expand Up @@ -417,6 +415,7 @@ impl BatchLogProcessor {
&mut last_export_time,
&current_batch_size,
max_export_batch_size,
&blocking_strategy,
);
}
Ok(BatchMessage::ForceFlush(sender)) => {
Expand All @@ -428,6 +427,7 @@ impl BatchLogProcessor {
&mut last_export_time,
&current_batch_size,
max_export_batch_size,
&blocking_strategy,
);
let _ = sender.send(result);
}
Expand All @@ -440,6 +440,7 @@ impl BatchLogProcessor {
&mut last_export_time,
&current_batch_size,
max_export_batch_size,
&blocking_strategy,
);
let _ = exporter.shutdown();
let _ = sender.send(result);
Expand Down Expand Up @@ -468,6 +469,7 @@ impl BatchLogProcessor {
&mut last_export_time,
&current_batch_size,
max_export_batch_size,
&blocking_strategy,
);
}
Err(RecvTimeoutError::Disconnected) => {
Expand Down Expand Up @@ -518,6 +520,7 @@ fn export_batch_sync<E>(
exporter: &E,
batch: &mut Vec<Box<(SdkLogRecord, InstrumentationScope)>>,
last_export_time: &mut Instant,
blocking_strategy: &BlockingStrategy,
) -> OTelSdkResult
where
E: LogExporter + ?Sized,
Expand All @@ -529,7 +532,7 @@ where
}

let export = exporter.export(LogBatch::new_with_owned_data(batch.as_slice()));
let export_result = futures_executor::block_on(export);
let export_result = blocking_strategy.block_on(export);

// Clear the batch vec after exporting
batch.clear();
Expand Down Expand Up @@ -588,10 +591,6 @@ pub struct BatchConfig {
/// of logs one batch after the other without any delay. The default value
/// is 512.
pub(crate) max_export_batch_size: usize,

/// The maximum duration to export a batch of data.
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
pub(crate) max_export_timeout: Duration,
}

impl Default for BatchConfig {
Expand All @@ -606,8 +605,6 @@ pub struct BatchConfigBuilder {
max_queue_size: usize,
scheduled_delay: Duration,
max_export_batch_size: usize,
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
max_export_timeout: Duration,
}

impl Default for BatchConfigBuilder {
Expand All @@ -625,8 +622,6 @@ impl Default for BatchConfigBuilder {
max_queue_size: OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT,
scheduled_delay: OTEL_BLRP_SCHEDULE_DELAY_DEFAULT,
max_export_batch_size: OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
max_export_timeout: OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT,
}
.init_from_env_vars()
}
Expand Down Expand Up @@ -658,19 +653,6 @@ impl BatchConfigBuilder {
self
}

/// Set max_export_timeout for [`BatchConfigBuilder`].
/// It's the maximum duration to export a batch of data.
/// The default value is 30000 milliseconds.
///
/// Corresponding environment variable: `OTEL_BLRP_EXPORT_TIMEOUT`.
///
/// Note: Programmatically setting this will override any value set via the environment variable.
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
pub fn with_max_export_timeout(mut self, max_export_timeout: Duration) -> Self {
self.max_export_timeout = max_export_timeout;
self
}

/// Set max_export_batch_size for [`BatchConfigBuilder`].
/// It's the maximum number of logs to process in a single batch. If there are
/// more than one batch worth of logs then it processes multiple batches
Expand All @@ -695,8 +677,6 @@ impl BatchConfigBuilder {
BatchConfig {
max_queue_size: self.max_queue_size,
scheduled_delay: self.scheduled_delay,
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
max_export_timeout: self.max_export_timeout,
max_export_batch_size,
}
}
Expand All @@ -723,14 +703,6 @@ impl BatchConfigBuilder {
self.scheduled_delay = Duration::from_millis(scheduled_delay);
}

#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
if let Some(max_export_timeout) = env::var(OTEL_BLRP_EXPORT_TIMEOUT)
.ok()
.and_then(|s| u64::from_str(&s).ok())
{
self.max_export_timeout = Duration::from_millis(max_export_timeout);
}

self
}
}
Expand All @@ -743,8 +715,6 @@ mod tests {
OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BLRP_SCHEDULE_DELAY,
OTEL_BLRP_SCHEDULE_DELAY_DEFAULT,
};
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
use super::{OTEL_BLRP_EXPORT_TIMEOUT, OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT};
use crate::error::OTelSdkResult;
use crate::logs::log_processor::tests::MockLogExporter;
use crate::logs::SdkLogRecord;
Expand All @@ -764,10 +734,6 @@ mod tests {
fn test_default_const_values() {
assert_eq!(OTEL_BLRP_SCHEDULE_DELAY, "OTEL_BLRP_SCHEDULE_DELAY");
assert_eq!(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT.as_millis(), 1_000);
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
assert_eq!(OTEL_BLRP_EXPORT_TIMEOUT, "OTEL_BLRP_EXPORT_TIMEOUT");
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
assert_eq!(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT.as_millis(), 30_000);
assert_eq!(OTEL_BLRP_MAX_QUEUE_SIZE, "OTEL_BLRP_MAX_QUEUE_SIZE");
assert_eq!(OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, 2_048);
assert_eq!(
Expand All @@ -782,17 +748,13 @@ mod tests {
// The following environment variables are expected to be unset so that their default values are used.
let env_vars = vec![
OTEL_BLRP_SCHEDULE_DELAY,
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
OTEL_BLRP_EXPORT_TIMEOUT,
OTEL_BLRP_MAX_QUEUE_SIZE,
OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
];

let config = temp_env::with_vars_unset(env_vars, BatchConfig::default);

assert_eq!(config.scheduled_delay, OTEL_BLRP_SCHEDULE_DELAY_DEFAULT);
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
assert_eq!(config.max_export_timeout, OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT);
assert_eq!(config.max_queue_size, OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT);
assert_eq!(
config.max_export_batch_size,
Expand Down Expand Up @@ -825,17 +787,13 @@ mod tests {
fn test_batch_config_configurable_by_env_vars() {
let env_vars = vec![
(OTEL_BLRP_SCHEDULE_DELAY, Some("2000")),
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
(OTEL_BLRP_EXPORT_TIMEOUT, Some("60000")),
(OTEL_BLRP_MAX_QUEUE_SIZE, Some("4096")),
(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
];

let config = temp_env::with_vars(env_vars, BatchConfig::default);

assert_eq!(config.scheduled_delay, Duration::from_millis(2000));
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
assert_eq!(config.max_export_timeout, Duration::from_millis(60000));
assert_eq!(config.max_queue_size, 4096);
assert_eq!(config.max_export_batch_size, 1024);
}
Expand All @@ -852,25 +810,18 @@ mod tests {
assert_eq!(config.max_queue_size, 256);
assert_eq!(config.max_export_batch_size, 256);
assert_eq!(config.scheduled_delay, OTEL_BLRP_SCHEDULE_DELAY_DEFAULT);
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
assert_eq!(config.max_export_timeout, OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT);
}

#[test]
fn test_batch_config_with_fields() {
let batch_builder = BatchConfigBuilder::default()
let batch = BatchConfigBuilder::default()
.with_max_export_batch_size(1)
.with_scheduled_delay(Duration::from_millis(2))
.with_max_queue_size(4);

#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
let batch_builder = batch_builder.with_max_export_timeout(Duration::from_millis(3));
let batch = batch_builder.build();
.with_max_queue_size(4)
.build();

assert_eq!(batch.max_export_batch_size, 1);
assert_eq!(batch.scheduled_delay, Duration::from_millis(2));
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
assert_eq!(batch.max_export_timeout, Duration::from_millis(3));
assert_eq!(batch.max_queue_size, 4);
}

Expand All @@ -879,8 +830,6 @@ mod tests {
let mut env_vars = vec![
(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("500")),
(OTEL_BLRP_SCHEDULE_DELAY, Some("I am not number")),
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
(OTEL_BLRP_EXPORT_TIMEOUT, Some("2046")),
];
temp_env::with_vars(env_vars.clone(), || {
let builder = BatchLogProcessor::builder(InMemoryLogExporter::default());
Expand All @@ -894,12 +843,6 @@ mod tests {
builder.config.max_queue_size,
OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT
);

#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
assert_eq!(
builder.config.max_export_timeout,
Duration::from_millis(2046)
);
});

env_vars.push((OTEL_BLRP_MAX_QUEUE_SIZE, Some("120")));
Expand Down
Loading