Skip to content

Commit ec0c6c7

Browse files
feat(opentelemetry-sdk): add HistogramAggregation resolution
1 parent bb02477 commit ec0c6c7

7 files changed

Lines changed: 106 additions & 18 deletions

File tree

opentelemetry-sdk/src/metrics/exporter.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::time::Duration;
55

66
use crate::metrics::data::ResourceMetrics;
77

8-
use super::Temporality;
8+
use super::{HistogramAggregation, Temporality};
99

1010
/// Exporter handles the delivery of metric data to external receivers.
1111
///
@@ -37,4 +37,7 @@ pub trait PushMetricExporter: Send + Sync + 'static {
3737

3838
/// Access the [Temporality] of the MetricExporter.
3939
fn temporality(&self) -> Temporality;
40+
41+
/// The default aggregation to use for histogram instruments.
42+
fn default_histogram_aggregation(&self) -> HistogramAggregation;
4043
}

opentelemetry-sdk/src/metrics/in_memory_exporter.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::metrics::data::{
33
ExponentialHistogram, Gauge, Histogram, MetricData, ResourceMetrics, Sum,
44
};
55
use crate::metrics::exporter::PushMetricExporter;
6-
use crate::metrics::Temporality;
6+
use crate::metrics::{HistogramAggregation, Temporality};
77
use crate::InMemoryExporterError;
88
use std::collections::VecDeque;
99
use std::fmt;
@@ -261,4 +261,8 @@ impl PushMetricExporter for InMemoryMetricExporter {
261261
fn temporality(&self) -> Temporality {
262262
self.temporality
263263
}
264+
265+
fn default_histogram_aggregation(&self) -> HistogramAggregation {
266+
HistogramAggregation::ExplicitBucketHistogram
267+
}
264268
}

opentelemetry-sdk/src/metrics/mod.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,35 @@ impl FromStr for Temporality {
120120
}
121121
}
122122

123+
/// The default histogram aggregation selection for a [`exporter::PushMetricExporter`].
124+
///
125+
/// This controls which aggregation type is used for histogram instruments
126+
/// when no explicit aggregation is configured via a view.
127+
///
128+
/// Corresponds to the `OTEL_EXPORTER_OTLP_METRICS_DEFAULT_HISTOGRAM_AGGREGATION`
129+
/// environment variable.
130+
#[derive(Debug, Clone, Copy, PartialEq, Default)]
131+
pub enum HistogramAggregation {
132+
/// Use explicit bucket histogram aggregation with default boundaries.
133+
#[default]
134+
ExplicitBucketHistogram,
135+
136+
/// Use base2 exponential bucket histogram aggregation.
137+
Base2ExponentialBucketHistogram,
138+
}
139+
140+
impl FromStr for HistogramAggregation {
141+
type Err = ();
142+
143+
fn from_str(s: &str) -> Result<Self, Self::Err> {
144+
match s.to_lowercase().as_str() {
145+
"explicit_bucket_histogram" => Ok(Self::ExplicitBucketHistogram),
146+
"base2_exponential_bucket_histogram" => Ok(Self::Base2ExponentialBucketHistogram),
147+
_ => Err(()),
148+
}
149+
}
150+
}
151+
123152
#[cfg(all(test, feature = "testing"))]
124153
mod tests {
125154
use self::data::{HistogramDataPoint, MetricData, ScopeMetrics, SumDataPoint};

opentelemetry-sdk/src/metrics/periodic_reader.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use crate::{
1818

1919
use super::{
2020
data::ResourceMetrics, instrument::InstrumentKind, pipeline::Pipeline, reader::MetricReader,
21-
Temporality,
21+
HistogramAggregation, Temporality,
2222
};
2323

2424
const DEFAULT_INTERVAL: Duration = Duration::from_secs(60);
@@ -363,6 +363,10 @@ impl<E: PushMetricExporter> PeriodicReaderInner<E> {
363363
self.exporter.temporality()
364364
}
365365

366+
fn default_histogram_aggregation(&self) -> HistogramAggregation {
367+
self.exporter.default_histogram_aggregation()
368+
}
369+
366370
fn collect(&self, rm: &mut ResourceMetrics) -> OTelSdkResult {
367371
let producer = self.producer.lock().expect("lock poisoned");
368372
if let Some(p) = producer.as_ref() {
@@ -506,6 +510,10 @@ impl<E: PushMetricExporter> MetricReader for PeriodicReader<E> {
506510
fn temporality(&self, kind: InstrumentKind) -> Temporality {
507511
kind.temporality_preference(self.inner.temporality(kind))
508512
}
513+
514+
fn default_histogram_aggregation(&self) -> HistogramAggregation {
515+
self.inner.default_histogram_aggregation()
516+
}
509517
}
510518

511519
#[cfg(all(test, feature = "testing"))]
@@ -515,7 +523,7 @@ mod tests {
515523
error::{OTelSdkError, OTelSdkResult},
516524
metrics::{
517525
data::ResourceMetrics, exporter::PushMetricExporter, reader::MetricReader,
518-
InMemoryMetricExporter, SdkMeterProvider, Temporality,
526+
HistogramAggregation, InMemoryMetricExporter, SdkMeterProvider, Temporality,
519527
},
520528
Resource,
521529
};
@@ -574,6 +582,10 @@ mod tests {
574582
fn temporality(&self) -> Temporality {
575583
Temporality::Cumulative
576584
}
585+
586+
fn default_histogram_aggregation(&self) -> HistogramAggregation {
587+
HistogramAggregation::ExplicitBucketHistogram
588+
}
577589
}
578590

579591
#[derive(Debug, Clone, Default)]
@@ -602,6 +614,10 @@ mod tests {
602614
fn temporality(&self) -> Temporality {
603615
Temporality::Cumulative
604616
}
617+
618+
fn default_histogram_aggregation(&self) -> HistogramAggregation {
619+
HistogramAggregation::ExplicitBucketHistogram
620+
}
605621
}
606622

607623
#[test]

opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use crate::{
2222

2323
use super::{
2424
data::ResourceMetrics, instrument::InstrumentKind, pipeline::Pipeline, reader::MetricReader,
25+
HistogramAggregation,
2526
};
2627

2728
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
@@ -437,6 +438,10 @@ impl<E: PushMetricExporter> MetricReader for PeriodicReader<E> {
437438
fn temporality(&self, kind: InstrumentKind) -> super::Temporality {
438439
kind.temporality_preference(self.exporter.temporality())
439440
}
441+
442+
fn default_histogram_aggregation(&self) -> HistogramAggregation {
443+
self.exporter.default_histogram_aggregation()
444+
}
440445
}
441446

442447
#[cfg(all(test, feature = "testing"))]

opentelemetry-sdk/src/metrics/pipeline.rs

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use crate::{
2323

2424
use self::internal::AggregateFns;
2525

26-
use super::{aggregation::Aggregation, Temporality};
26+
use super::{aggregation::Aggregation, HistogramAggregation, Temporality};
2727

2828
/// Connects all of the instruments created by a meter provider to a [MetricReader].
2929
///
@@ -378,14 +378,15 @@ where
378378
// TODO: Create a separate pub (crate) Stream struct for the pipeline,
379379
// as Stream will not have any optional fields as None at this point and
380380
// new struct can better reflect this.
381+
let histogram_agg = self.pipeline.reader.default_histogram_aggregation();
381382
let mut agg = stream
382383
.aggregation
383384
.take()
384-
.unwrap_or_else(|| default_aggregation_selector(kind));
385+
.unwrap_or_else(|| default_aggregation_selector(kind, histogram_agg));
385386

386387
// Apply default if stream or reader aggregation returns default
387388
if matches!(agg, aggregation::Aggregation::Default) {
388-
agg = default_aggregation_selector(kind);
389+
agg = default_aggregation_selector(kind, histogram_agg);
389390
}
390391

391392
if let Err(err) = is_aggregator_compatible(&kind, &agg) {
@@ -421,7 +422,8 @@ where
421422
filter,
422423
cardinality_limit,
423424
);
424-
let AggregateFns { measure, collect } = match aggregate_fn(b, &agg, kind) {
425+
let AggregateFns { measure, collect } = match aggregate_fn(b, &agg, kind, histogram_agg)
426+
{
425427
Ok(Some(inst)) => inst,
426428
other => return other.map(|fs| fs.map(|inst| inst.measure)), // Drop aggregator or error
427429
};
@@ -497,23 +499,35 @@ where
497499
/// * Observable UpDownCounter ⇨ Sum
498500
/// * Gauge ⇨ LastValue
499501
/// * Observable Gauge ⇨ LastValue
500-
/// * Histogram ⇨ ExplicitBucketHistogram
502+
/// * Histogram ⇨ determined by `histogram_agg`
501503
///
502504
/// [the spec]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.19.0/specification/metrics/sdk.md#default-aggregation
503-
fn default_aggregation_selector(kind: InstrumentKind) -> Aggregation {
505+
fn default_aggregation_selector(
506+
kind: InstrumentKind,
507+
histogram_agg: HistogramAggregation,
508+
) -> Aggregation {
504509
match kind {
505510
InstrumentKind::Counter
506511
| InstrumentKind::UpDownCounter
507512
| InstrumentKind::ObservableCounter
508513
| InstrumentKind::ObservableUpDownCounter => Aggregation::Sum,
509514
InstrumentKind::Gauge => Aggregation::LastValue,
510515
InstrumentKind::ObservableGauge => Aggregation::LastValue,
511-
InstrumentKind::Histogram => Aggregation::ExplicitBucketHistogram {
512-
boundaries: vec![
513-
0.0, 5.0, 10.0, 25.0, 50.0, 75.0, 100.0, 250.0, 500.0, 750.0, 1000.0, 2500.0,
514-
5000.0, 7500.0, 10000.0,
515-
],
516-
record_min_max: true,
516+
InstrumentKind::Histogram => match histogram_agg {
517+
HistogramAggregation::ExplicitBucketHistogram => Aggregation::ExplicitBucketHistogram {
518+
boundaries: vec![
519+
0.0, 5.0, 10.0, 25.0, 50.0, 75.0, 100.0, 250.0, 500.0, 750.0, 1000.0, 2500.0,
520+
5000.0, 7500.0, 10000.0,
521+
],
522+
record_min_max: true,
523+
},
524+
HistogramAggregation::Base2ExponentialBucketHistogram => {
525+
Aggregation::Base2ExponentialHistogram {
526+
max_size: 160,
527+
max_scale: 20,
528+
record_min_max: true,
529+
}
530+
}
517531
},
518532
}
519533
}
@@ -525,9 +539,15 @@ fn aggregate_fn<T: Number>(
525539
b: AggregateBuilder<T>,
526540
agg: &aggregation::Aggregation,
527541
kind: InstrumentKind,
542+
histogram_agg: HistogramAggregation,
528543
) -> MetricResult<Option<AggregateFns<T>>> {
529544
match agg {
530-
Aggregation::Default => aggregate_fn(b, &default_aggregation_selector(kind), kind),
545+
Aggregation::Default => aggregate_fn(
546+
b,
547+
&default_aggregation_selector(kind, histogram_agg),
548+
kind,
549+
histogram_agg,
550+
),
531551
Aggregation::Drop => Ok(None),
532552
Aggregation::LastValue => {
533553
match kind {

opentelemetry-sdk/src/metrics/reader.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@ use crate::error::OTelSdkResult;
33
use std::time::Duration;
44
use std::{fmt, sync::Weak};
55

6-
use super::{data::ResourceMetrics, instrument::InstrumentKind, pipeline::Pipeline, Temporality};
6+
use super::{
7+
data::ResourceMetrics, instrument::InstrumentKind, pipeline::Pipeline, HistogramAggregation,
8+
Temporality,
9+
};
710

811
/// The interface used between the SDK and an exporter.
912
///
@@ -58,6 +61,14 @@ pub trait MetricReader: fmt::Debug + Send + Sync + 'static {
5861
///
5962
/// If not configured, the Cumulative temporality SHOULD be used.
6063
fn temporality(&self, kind: InstrumentKind) -> Temporality;
64+
65+
/// The default histogram aggregation.
66+
/// This SHOULD be obtained from the exporter.
67+
///
68+
/// If not configured, [`HistogramAggregation::ExplicitBucketHistogram`] is used.
69+
fn default_histogram_aggregation(&self) -> HistogramAggregation {
70+
HistogramAggregation::default()
71+
}
6172
}
6273

6374
/// Produces metrics for a [MetricReader].

0 commit comments

Comments
 (0)