diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 749faf6cc2..b6624b7e7b 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -5,6 +5,22 @@ - Removed `SimpleConcurrentLogProcessor` and the `experimental_logs_concurrent_log_processor` feature flag. The use cases it was designed for (ETW/user_events exporters) are better served by modeling those exporters as processors directly. +- **Added** `Counter::bind()` and `Histogram::bind()` SDK implementations that + return pre-bound measurement handles (`BoundCounter`, `BoundHistogram`). + Bound instruments resolve the attribute-to-aggregator mapping once at bind time + and cache the result, eliminating per-call HashMap lookups. View attribute + filtering is applied at bind time so the hot path stays free of per-call + attribute processing. Bound and unbound recordings with the same (post-view) + attribute set always aggregate into the same data point, including the empty + attribute set. Bound entries are never evicted during delta collection while + a handle exists — idle cycles produce no export but the tracker persists. If + `bind()` is called at the cardinality limit, the handle binds directly to + the overflow tracker — its writes stay on the same direct (no-lookup) hot + path and consistently land in the `otel.metric.overflow=true` bucket for + the lifetime of the handle. To recover a bound handle after delta collection + frees space, drop the existing handle and call `bind()` again. Gated behind + the `experimental_metrics_bound_instruments` feature flag. Benchmarks show + ~28x speedup for counter operations and ~9x for histograms. - Delta metrics collection now uses in-place eviction instead of draining the HashMap on every collect cycle. Stale attribute sets that received no measurements since the last collection are evicted. Note: recovery from cardinality overflow diff --git a/opentelemetry-sdk/Cargo.toml b/opentelemetry-sdk/Cargo.toml index 5b72161244..1a3fcfc316 100644 --- a/opentelemetry-sdk/Cargo.toml +++ b/opentelemetry-sdk/Cargo.toml @@ -59,6 +59,7 @@ experimental_metrics_custom_reader = ["metrics"] experimental_logs_batch_log_processor_with_async_runtime = ["logs", "experimental_async_runtime"] experimental_trace_batch_span_processor_with_async_runtime = ["tokio/sync", "trace", "experimental_async_runtime"] experimental_metrics_disable_name_validation = ["metrics"] +experimental_metrics_bound_instruments = ["metrics", "opentelemetry/experimental_metrics_bound_instruments"] bench_profiling = [] [[bench]] @@ -123,6 +124,11 @@ name = "log" harness = false required-features = ["logs"] +[[bench]] +name = "bound_instruments" +harness = false +required-features = ["metrics", "experimental_metrics_custom_reader", "experimental_metrics_bound_instruments", "spec_unstable_metrics_views"] + [lib] bench = false diff --git a/opentelemetry-sdk/benches/bound_instruments.rs b/opentelemetry-sdk/benches/bound_instruments.rs new file mode 100644 index 0000000000..d96dd7a631 --- /dev/null +++ b/opentelemetry-sdk/benches/bound_instruments.rs @@ -0,0 +1,207 @@ +use criterion::{criterion_group, criterion_main, Criterion}; +use opentelemetry::{metrics::MeterProvider as _, Key, KeyValue}; +use opentelemetry_sdk::metrics::{Instrument, ManualReader, SdkMeterProvider, Stream, Temporality}; + +// Run this benchmark with: +// cargo bench --bench bound_instruments --features metrics,experimental_metrics_custom_reader,experimental_metrics_bound_instruments,spec_unstable_metrics_views +// +// Apple M4 Max, 16 cores (12 performance + 4 efficiency), macOS 15.4 +// +// Results (3 attributes: method, status, path): +// Counter_Unbound_Delta time: [50.20 ns] +// Counter_Bound_Delta time: [ 1.80 ns] ~28x faster +// Counter_Bound_With_View_Delta time: [ 1.82 ns] view filter applied at bind, not on hot path +// Counter_Bound_AtOverflow_Delta time: [ 1.82 ns] bind() at cardinality limit binds directly to the overflow +// tracker — perf parity with a normal bind, no per-call resolution +// Histogram_Unbound_Delta time: [58.64 ns] +// Histogram_Bound_Delta time: [ 6.50 ns] ~9.0x faster +// Histogram_Bound_AtOverflow_Delta time: [ 6.58 ns] perf parity with a normal bind +// Counter_Bound_Multithread/2 time: [21.59 µs] (100 adds/thread) +// Counter_Bound_Multithread/4 time: [37.21 µs] (100 adds/thread) +// Counter_Bound_Multithread/8 time: [71.70 µs] (100 adds/thread) +// +// Note: criterion does not fail CI on regression by itself. These numbers are +// reference values for human review; use `cargo criterion --baseline` locally +// if you need automated comparison against a saved baseline. + +fn create_provider(temporality: Temporality) -> SdkMeterProvider { + let reader = ManualReader::builder() + .with_temporality(temporality) + .build(); + SdkMeterProvider::builder().with_reader(reader).build() +} + +fn bench_bound_instruments(c: &mut Criterion) { + let mut group = c.benchmark_group("BoundInstruments"); + group.sample_size(100); + + let attrs = [ + KeyValue::new("method", "GET"), + KeyValue::new("status", "200"), + KeyValue::new("path", "/api/v1/users"), + ]; + + // Counter: Unbound vs Bound (Delta) + { + let provider = create_provider(Temporality::Delta); + let meter = provider.meter("bench"); + let counter = meter.u64_counter("unbound").build(); + group.bench_function("Counter_Unbound_Delta", |b| { + b.iter(|| counter.add(1, &attrs)); + }); + } + + { + let provider = create_provider(Temporality::Delta); + let meter = provider.meter("bench"); + let counter = meter.u64_counter("bound").build(); + let bound = counter.bind(&attrs); + group.bench_function("Counter_Bound_Delta", |b| { + b.iter(|| bound.add(1)); + }); + } + + // Counter: Bound with a View filter — confirms the filter is applied at + // bind() time and the hot path stays free of attribute processing. + { + let view = |i: &opentelemetry_sdk::metrics::Instrument| { + if i.name() == "bound_with_view" { + Stream::builder() + .with_allowed_attribute_keys(vec![ + Key::new("method"), + Key::new("status"), + Key::new("path"), + ]) + .build() + .ok() + } else { + None + } + }; + let reader = ManualReader::builder() + .with_temporality(Temporality::Delta) + .build(); + let provider = SdkMeterProvider::builder() + .with_reader(reader) + .with_view(view) + .build(); + let meter = provider.meter("bench"); + let counter = meter.u64_counter("bound_with_view").build(); + let bound = counter.bind(&attrs); + group.bench_function("Counter_Bound_With_View_Delta", |b| { + b.iter(|| bound.add(1)); + }); + } + + // Counter: Bound at overflow — confirms that binding when the cardinality + // limit is exhausted yields the same hot-path performance as a normal bind + // (writes go directly to the overflow tracker, no per-call resolution). + { + let cardinality_limit = 4; + let view = move |i: &Instrument| { + if i.name() == "bound_at_overflow" { + Stream::builder() + .with_cardinality_limit(cardinality_limit) + .build() + .ok() + } else { + None + } + }; + let reader = ManualReader::builder() + .with_temporality(Temporality::Delta) + .build(); + let provider = SdkMeterProvider::builder() + .with_reader(reader) + .with_view(view) + .build(); + let meter = provider.meter("bench"); + let counter = meter.u64_counter("bound_at_overflow").build(); + // Saturate cardinality with unbound calls so bind() lands in overflow. + for i in 0..cardinality_limit { + counter.add(1, &[KeyValue::new("filler", i as i64)]); + } + let bound = counter.bind(&attrs); + group.bench_function("Counter_Bound_AtOverflow_Delta", |b| { + b.iter(|| bound.add(1)); + }); + } + + // Histogram: Unbound vs Bound (Delta) + { + let provider = create_provider(Temporality::Delta); + let meter = provider.meter("bench"); + let histogram = meter.f64_histogram("unbound_hist").build(); + group.bench_function("Histogram_Unbound_Delta", |b| { + b.iter(|| histogram.record(1.5, &attrs)); + }); + } + + { + let provider = create_provider(Temporality::Delta); + let meter = provider.meter("bench"); + let histogram = meter.f64_histogram("bound_hist").build(); + let bound = histogram.bind(&attrs); + group.bench_function("Histogram_Bound_Delta", |b| { + b.iter(|| bound.record(1.5)); + }); + } + + // Histogram: Bound at overflow — same property as the counter version. + { + let cardinality_limit = 4; + let view = move |i: &Instrument| { + if i.name() == "bound_hist_at_overflow" { + Stream::builder() + .with_cardinality_limit(cardinality_limit) + .build() + .ok() + } else { + None + } + }; + let reader = ManualReader::builder() + .with_temporality(Temporality::Delta) + .build(); + let provider = SdkMeterProvider::builder() + .with_reader(reader) + .with_view(view) + .build(); + let meter = provider.meter("bench"); + let histogram = meter.f64_histogram("bound_hist_at_overflow").build(); + for i in 0..cardinality_limit { + histogram.record(1.5, &[KeyValue::new("filler", i as i64)]); + } + let bound = histogram.bind(&attrs); + group.bench_function("Histogram_Bound_AtOverflow_Delta", |b| { + b.iter(|| bound.record(1.5)); + }); + } + + // Multi-threaded bound counter + for num_threads in [2, 4, 8] { + let provider = create_provider(Temporality::Delta); + let meter = provider.meter("bench"); + let counter = meter.u64_counter("mt_bound").build(); + let bound = counter.bind(&attrs); + + group.bench_function(format!("Counter_Bound_Multithread/{num_threads}"), |b| { + b.iter(|| { + std::thread::scope(|s| { + for _ in 0..num_threads { + s.spawn(|| { + for _ in 0..100 { + bound.add(1); + } + }); + } + }); + }); + }); + } + + group.finish(); +} + +criterion_group!(benches, bench_bound_instruments); +criterion_main!(benches); diff --git a/opentelemetry-sdk/src/metrics/instrument.rs b/opentelemetry-sdk/src/metrics/instrument.rs index 8e4a61b0d0..54a10722b3 100644 --- a/opentelemetry-sdk/src/metrics/instrument.rs +++ b/opentelemetry-sdk/src/metrics/instrument.rs @@ -1,10 +1,14 @@ use std::{borrow::Cow, collections::HashSet, error::Error, sync::Arc}; +#[cfg(feature = "experimental_metrics_bound_instruments")] +use opentelemetry::metrics::BoundSyncInstrument; use opentelemetry::{ metrics::{AsyncInstrument, SyncInstrument}, InstrumentationScope, Key, KeyValue, }; +#[cfg(feature = "experimental_metrics_bound_instruments")] +use crate::metrics::internal::BoundMeasure; use crate::metrics::{aggregation::Aggregation, internal::Measure}; use super::meter::{ @@ -388,6 +392,29 @@ impl SyncInstrument for ResolvedMeasures { measure.call(val, attrs) } } + + #[cfg(feature = "experimental_metrics_bound_instruments")] + fn bind(&self, attrs: &[KeyValue]) -> Box + Send + Sync> { + let bound_measures: Vec>> = + self.measures.iter().map(|m| m.bind(attrs)).collect(); + Box::new(ResolvedBoundMeasures { + measures: bound_measures, + }) + } +} + +#[cfg(feature = "experimental_metrics_bound_instruments")] +pub(crate) struct ResolvedBoundMeasures { + measures: Vec>>, +} + +#[cfg(feature = "experimental_metrics_bound_instruments")] +impl BoundSyncInstrument for ResolvedBoundMeasures { + fn measure(&self, val: T) { + for measure in &self.measures { + measure.call(val); + } + } } #[derive(Clone)] diff --git a/opentelemetry-sdk/src/metrics/internal/aggregate.rs b/opentelemetry-sdk/src/metrics/internal/aggregate.rs index e8e5cd433c..1896168505 100644 --- a/opentelemetry-sdk/src/metrics/internal/aggregate.rs +++ b/opentelemetry-sdk/src/metrics/internal/aggregate.rs @@ -18,6 +18,39 @@ use super::{ /// Receives measurements to be aggregated. pub(crate) trait Measure: Send + Sync + 'static { fn call(&self, measurement: T, attrs: &[KeyValue]); + + #[cfg(feature = "experimental_metrics_bound_instruments")] + fn bind(&self, attrs: &[KeyValue]) -> Box>; +} + +/// A pre-bound measurement handle that bypasses attribute lookup. +#[cfg(feature = "experimental_metrics_bound_instruments")] +pub(crate) trait BoundMeasure: Send + Sync + 'static { + fn call(&self, measurement: T); +} + +/// A bound handle that drops every measurement silently. Used when +/// `ValueMap::bind` returns `None` because the trackers `RwLock` is poisoned — +/// an extremely rare degenerate state in which the SDK can no longer aggregate +/// reliably. Returning a noop here mirrors `measure()`'s own poison handling +/// (silent drop) rather than panicking on the user's hot path. +#[cfg(feature = "experimental_metrics_bound_instruments")] +pub(crate) struct NoopBoundMeasure { + _marker: marker::PhantomData, +} + +#[cfg(feature = "experimental_metrics_bound_instruments")] +impl NoopBoundMeasure { + pub(crate) fn new() -> Self { + Self { + _marker: marker::PhantomData, + } + } +} + +#[cfg(feature = "experimental_metrics_bound_instruments")] +impl BoundMeasure for NoopBoundMeasure { + fn call(&self, _measurement: T) {} } /// Stores the aggregate of measurements into the aggregation and returns the number diff --git a/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs b/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs index 058a94d271..f1815e5842 100644 --- a/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs +++ b/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs @@ -1,3 +1,7 @@ +#[cfg(feature = "experimental_metrics_bound_instruments")] +use std::sync::atomic::Ordering; +#[cfg(feature = "experimental_metrics_bound_instruments")] +use std::sync::Arc; use std::{f64::consts::LOG2_E, mem::replace, ops::DerefMut, sync::Mutex}; use opentelemetry::{otel_debug, KeyValue}; @@ -12,10 +16,42 @@ use super::{ aggregate::{AggregateTimeInitiator, AttributeSetFilter}, Aggregator, ComputeAggregation, Measure, Number, ValueMap, }; +#[cfg(feature = "experimental_metrics_bound_instruments")] +use super::{BoundMeasure, NoopBoundMeasure, TrackerEntry}; pub(crate) const EXPO_MAX_SCALE: i8 = 20; pub(crate) const EXPO_MIN_SCALE: i8 = -10; +/// Pre-bound exponential-histogram handle: writes go directly to a fixed +/// `TrackerEntry` without per-call attribute lookup. Unlike `BoundHistogramHandle`, +/// no bucket precomputation happens at the call site — `update()` does scale +/// resolution and bin assignment inside the entry's Mutex. The NaN/inf filter +/// from the unbound `call()` path is preserved here. +#[cfg(feature = "experimental_metrics_bound_instruments")] +struct BoundExpoHistogramHandle { + tracker: Arc>>>, +} + +#[cfg(feature = "experimental_metrics_bound_instruments")] +impl BoundMeasure for BoundExpoHistogramHandle { + fn call(&self, measurement: T) { + // Mirror unbound ExpoHistogram::call: ignore NaN and infinity so that + // ExpoHistogramDataPoint::record's invariants are preserved. + if !measurement.into_float().is_finite() { + return; + } + self.tracker.aggregator.update(measurement); + self.tracker.has_been_updated.store(true, Ordering::Release); + } +} + +#[cfg(feature = "experimental_metrics_bound_instruments")] +impl Drop for BoundExpoHistogramHandle { + fn drop(&mut self) { + self.tracker.bound_count.fetch_sub(1, Ordering::Relaxed); + } +} + /// A single data point in an exponential histogram. #[derive(Debug, PartialEq)] struct ExpoHistogramDataPoint { @@ -526,6 +562,18 @@ where self.value_map.measure(measurement, filtered); }) } + + #[cfg(feature = "experimental_metrics_bound_instruments")] + fn bind(&self, attrs: &[KeyValue]) -> Box> { + let mut bound_attrs = Vec::new(); + self.filter.apply(attrs, |filtered| { + bound_attrs = filtered.to_vec(); + }); + match self.value_map.bind(&bound_attrs) { + Some(tracker) => Box::new(BoundExpoHistogramHandle { tracker }), + None => Box::new(NoopBoundMeasure::new()), + } + } } impl ComputeAggregation for ExpoHistogram diff --git a/opentelemetry-sdk/src/metrics/internal/histogram.rs b/opentelemetry-sdk/src/metrics/internal/histogram.rs index 93d23b90dd..0a83181463 100644 --- a/opentelemetry-sdk/src/metrics/internal/histogram.rs +++ b/opentelemetry-sdk/src/metrics/internal/histogram.rs @@ -1,5 +1,9 @@ use std::mem::replace; use std::ops::DerefMut; +#[cfg(feature = "experimental_metrics_bound_instruments")] +use std::sync::atomic::Ordering; +#[cfg(feature = "experimental_metrics_bound_instruments")] +use std::sync::Arc; use std::sync::Mutex; use crate::metrics::data::{self, MetricData}; @@ -9,6 +13,8 @@ use opentelemetry::KeyValue; use super::aggregate::{AggregateTimeInitiator, AttributeSetFilter}; use super::{Aggregator, ComputeAggregation, Measure, Number, ValueMap}; +#[cfg(feature = "experimental_metrics_bound_instruments")] +use super::{BoundMeasure, NoopBoundMeasure, TrackerEntry}; impl Aggregator for Mutex> where @@ -66,6 +72,33 @@ impl Buckets { } } +/// Pre-bound histogram handle: writes go directly to a fixed `TrackerEntry` +/// without per-call attribute lookup. The `tracker` is either a dedicated entry +/// for the bound attribute set, or — if bind() hit the cardinality limit — the +/// shared overflow tracker. +#[cfg(feature = "experimental_metrics_bound_instruments")] +struct BoundHistogramHandle { + tracker: Arc>>>, + bounds: Vec, +} + +#[cfg(feature = "experimental_metrics_bound_instruments")] +impl BoundMeasure for BoundHistogramHandle { + fn call(&self, measurement: T) { + let f = measurement.into_float(); + let index = self.bounds.partition_point(|&x| x < f); + self.tracker.aggregator.update((measurement, index)); + self.tracker.has_been_updated.store(true, Ordering::Release); + } +} + +#[cfg(feature = "experimental_metrics_bound_instruments")] +impl Drop for BoundHistogramHandle { + fn drop(&mut self) { + self.tracker.bound_count.fetch_sub(1, Ordering::Relaxed); + } +} + /// Summarizes a set of measurements as a histogram with explicitly defined /// buckets. pub(crate) struct Histogram { @@ -233,6 +266,23 @@ where self.value_map.measure((measurement, index), filtered); }) } + + #[cfg(feature = "experimental_metrics_bound_instruments")] + fn bind(&self, attrs: &[KeyValue]) -> Box> { + let mut bound_attrs = Vec::new(); + self.filter.apply(attrs, |filtered| { + bound_attrs = filtered.to_vec(); + }); + match self.value_map.bind(&bound_attrs) { + Some(tracker) => Box::new(BoundHistogramHandle { + tracker, + bounds: self.bounds.clone(), + }), + // Trackers RwLock is poisoned — return a noop handle so writes + // silently drop, mirroring `measure()`'s own poison handling. + None => Box::new(NoopBoundMeasure::new()), + } + } } impl ComputeAggregation for Histogram diff --git a/opentelemetry-sdk/src/metrics/internal/last_value.rs b/opentelemetry-sdk/src/metrics/internal/last_value.rs index 53423db451..4ab4a2d7e9 100644 --- a/opentelemetry-sdk/src/metrics/internal/last_value.rs +++ b/opentelemetry-sdk/src/metrics/internal/last_value.rs @@ -3,11 +3,41 @@ use crate::metrics::{ Temporality, }; use opentelemetry::KeyValue; +#[cfg(feature = "experimental_metrics_bound_instruments")] +use std::sync::atomic::Ordering; +#[cfg(feature = "experimental_metrics_bound_instruments")] +use std::sync::Arc; use super::{ aggregate::{AggregateTimeInitiator, AttributeSetFilter}, Aggregator, AtomicTracker, AtomicallyUpdate, ComputeAggregation, Measure, Number, ValueMap, }; +#[cfg(feature = "experimental_metrics_bound_instruments")] +use super::{BoundMeasure, NoopBoundMeasure, TrackerEntry}; + +/// Pre-bound gauge/last-value handle: writes go directly to a fixed +/// `TrackerEntry` without per-call attribute lookup. The `tracker` is either +/// a dedicated entry for the bound attribute set, or — if bind() hit the +/// cardinality limit — the shared overflow tracker. +#[cfg(feature = "experimental_metrics_bound_instruments")] +struct BoundLastValueHandle { + tracker: Arc>>, +} + +#[cfg(feature = "experimental_metrics_bound_instruments")] +impl BoundMeasure for BoundLastValueHandle { + fn call(&self, measurement: T) { + self.tracker.aggregator.update(measurement); + self.tracker.has_been_updated.store(true, Ordering::Release); + } +} + +#[cfg(feature = "experimental_metrics_bound_instruments")] +impl Drop for BoundLastValueHandle { + fn drop(&mut self) { + self.tracker.bound_count.fetch_sub(1, Ordering::Relaxed); + } +} /// this is reused by PrecomputedSum pub(crate) struct Assign @@ -142,6 +172,18 @@ where self.value_map.measure(measurement, filtered); }) } + + #[cfg(feature = "experimental_metrics_bound_instruments")] + fn bind(&self, attrs: &[KeyValue]) -> Box> { + let mut bound_attrs = Vec::new(); + self.filter.apply(attrs, |filtered| { + bound_attrs = filtered.to_vec(); + }); + match self.value_map.bind(&bound_attrs) { + Some(tracker) => Box::new(BoundLastValueHandle { tracker }), + None => Box::new(NoopBoundMeasure::new()), + } + } } impl ComputeAggregation for LastValue @@ -157,3 +199,75 @@ where (len, new.map(T::make_aggregated_metrics)) } } + +#[cfg(all(test, feature = "experimental_metrics_bound_instruments"))] +mod tests { + use super::*; + use crate::metrics::data::{AggregatedMetrics, Gauge, MetricData}; + + fn extract_gauge(agg: AggregatedMetrics) -> Gauge { + match agg { + AggregatedMetrics::U64(MetricData::Gauge(g)) => g, + _ => panic!("expected u64 Gauge"), + } + } + + /// Direct unit coverage for `LastValue::bind`. Sync `Gauge::bind()` is not yet + /// exposed in the public API, so the only callers of this code path today + /// are Views that remap an instrument to `Aggregation::LastValue`. This test + /// constructs the aggregator directly and exercises the bound handle through + /// the `Measure` / `BoundMeasure` traits to keep the impl honest. + #[test] + fn bind_writes_through_bound_handle() { + let last_value = + LastValue::::new(Temporality::Cumulative, AttributeSetFilter::new(None), 100); + let attrs = [KeyValue::new("k", "v")]; + let bound = Measure::bind(&last_value, &attrs); + + bound.call(7); + bound.call(42); // overwrites previous value (LastValue semantics) + + let (count, agg) = ComputeAggregation::call(&last_value, None); + assert_eq!(count, 1); + let gauge = extract_gauge(agg.expect("aggregation produced")); + assert_eq!(gauge.data_points.len(), 1); + assert_eq!(gauge.data_points[0].value, 42); + assert_eq!(gauge.data_points[0].attributes, attrs.to_vec()); + } + + #[test] + fn bound_handle_drop_decrements_bound_count() { + let last_value = + LastValue::::new(Temporality::Delta, AttributeSetFilter::new(None), 100); + let attrs = [KeyValue::new("k", "v")]; + + let bound = Measure::bind(&last_value, &attrs); + bound.call(5); + + // While the handle exists, the entry's bound_count is 1. + let trackers = last_value.value_map.trackers.read().unwrap(); + let entry = trackers + .values() + .next() + .expect("entry should exist after bind+call"); + assert_eq!( + entry.bound_count.load(Ordering::Relaxed), + 1, + "bound_count should reflect a live handle" + ); + drop(trackers); + + drop(bound); + + let trackers = last_value.value_map.trackers.read().unwrap(); + let entry = trackers + .values() + .next() + .expect("entry should still exist post-drop"); + assert_eq!( + entry.bound_count.load(Ordering::Relaxed), + 0, + "bound_count should drop to 0 after handle drops" + ); + } +} diff --git a/opentelemetry-sdk/src/metrics/internal/mod.rs b/opentelemetry-sdk/src/metrics/internal/mod.rs index 311169fd9c..5314bb36c1 100644 --- a/opentelemetry-sdk/src/metrics/internal/mod.rs +++ b/opentelemetry-sdk/src/metrics/internal/mod.rs @@ -17,7 +17,11 @@ use std::sync::atomic::{AtomicI64, AtomicU64}; use std::sync::{Arc, OnceLock, RwLock}; pub(crate) use aggregate::{AggregateBuilder, AggregateFns, ComputeAggregation, Measure}; +#[cfg(feature = "experimental_metrics_bound_instruments")] +pub(crate) use aggregate::{BoundMeasure, NoopBoundMeasure}; pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE}; +#[cfg(feature = "experimental_metrics_bound_instruments")] +use opentelemetry::otel_debug; use opentelemetry::{otel_warn, KeyValue}; use super::data::{AggregatedMetrics, MetricData}; @@ -51,14 +55,19 @@ pub(crate) trait Aggregator { fn clone_and_reset(&self, init: &Self::InitConfig) -> Self; } -/// Wraps an aggregator with status tracking for delta collection. +/// Wraps an aggregator with status tracking for delta collection and bound instruments. /// /// `has_been_updated` tracks whether the aggregator received measurements since the last /// collection cycle. This enables in-place delta collection: only updated entries are exported, -/// and stale entries are evicted to prevent unbounded memory growth. +/// and stale unbound entries are evicted to prevent unbounded memory growth. +/// +/// `bound_count` tracks how many bound instrument handles reference this entry. Entries with +/// bound_count > 0 are never evicted from the map, even if they had no updates in a cycle +/// (they simply produce no export). This ensures bound handles always point to a live tracker. pub(crate) struct TrackerEntry { pub(crate) aggregator: A, pub(crate) has_been_updated: AtomicBool, + pub(crate) bound_count: AtomicUsize, } impl TrackerEntry { @@ -66,6 +75,7 @@ impl TrackerEntry { TrackerEntry { aggregator: A::create(config), has_been_updated: AtomicBool::new(false), + bound_count: AtomicUsize::new(0), } } } @@ -87,7 +97,7 @@ where /// Number of different attribute set stored in the `trackers` map. count: AtomicUsize, /// Tracker for values with no attributes attached. - no_attribute_tracker: TrackerEntry, + no_attribute_tracker: Arc>, /// Configuration for an Aggregator config: A::InitConfig, cardinality_limit: usize, @@ -106,7 +116,7 @@ where trackers: RwLock::new(HashMap::with_capacity( 1 + min(DEFAULT_CARDINALITY_LIMIT, cardinality_limit), )), - no_attribute_tracker: TrackerEntry::new(&config), + no_attribute_tracker: Arc::new(TrackerEntry::new(&config)), count: AtomicUsize::new(0), config, cardinality_limit, @@ -184,6 +194,96 @@ where } } + /// Resolves attributes and returns a cached Arc for bound instruments. + /// The caller can then call `tracker.aggregator.update()` directly, bypassing the + /// full lookup path on subsequent measurements. + /// + /// When the cardinality limit has been reached, the returned tracker is the + /// overflow tracker (the same one unbound `measure()` calls write to at + /// overflow), preserving the bind() perf contract — every subsequent + /// `bound.add()` call is a direct write, regardless of cardinality state. + /// The handle remains permanently bound to overflow for its lifetime; + /// to recover after space frees up, drop and re-bind. + /// + /// Returns `None` only if the trackers RwLock is poisoned, in which case + /// the caller should produce a noop bound handle so measurements are + /// silently dropped rather than panicking on the user's hot path. + #[cfg(feature = "experimental_metrics_bound_instruments")] + fn bind(&self, attributes: &[KeyValue]) -> Option>> { + if attributes.is_empty() { + self.no_attribute_tracker + .bound_count + .fetch_add(1, Ordering::Relaxed); + return Some(Arc::clone(&self.no_attribute_tracker)); + } + + let sorted_attrs = sort_and_dedup(attributes); + self.bind_attrs(attributes, sorted_attrs) + } + + #[cfg(feature = "experimental_metrics_bound_instruments")] + fn bind_attrs( + &self, + original: &[KeyValue], + sorted_attrs: Vec, + ) -> Option>> { + // Fast path: read lock lookup using the canonical (sorted) key. + if let Ok(trackers) = self.trackers.read() { + if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) { + tracker.bound_count.fetch_add(1, Ordering::Relaxed); + return Some(Arc::clone(tracker)); + } + } + + // Slow path: write lock, insert if missing. + let Ok(mut trackers) = self.trackers.write() else { + // Lock poisoned — caller will produce a noop bound handle. + return None; + }; + + // Recheck after acquiring write lock. + if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) { + tracker.bound_count.fetch_add(1, Ordering::Relaxed); + return Some(Arc::clone(tracker)); + } + + if self.is_under_cardinality_limit() { + let new_tracker = Arc::new(TrackerEntry::::new(&self.config)); + new_tracker.bound_count.fetch_add(1, Ordering::Relaxed); + // Insert with both the original and sorted orderings so subsequent + // unbound measure() calls hit the fast path regardless of attr order. + // Mirrors `measure()`'s insert pattern. + if original != sorted_attrs.as_slice() { + trackers.insert(original.to_vec(), new_tracker.clone()); + } + trackers.insert(sorted_attrs, new_tracker.clone()); + self.count.fetch_add(1, Ordering::SeqCst); + Some(new_tracker) + } else { + // Over cardinality limit — bind directly to the overflow tracker so + // the bound handle keeps its perf contract (no per-call lookup) and + // its writes land predictably in the overflow bucket. This matches + // the spec SHOULD that the SDK pre-resolve aggregator state at bind + // time, and the spec MUST that bound recordings behave identically + // to unbound recordings (which themselves route to overflow once + // cardinality is exhausted). See open-telemetry/opentelemetry-specification#5050. + // + // The overflow tracker is created lazily here if it doesn't exist + // yet — mirrors the lazy creation in `measure()` (line above where + // overflow is inserted on first overflowing measurement). + let overflow_tracker = trackers + .entry(stream_overflow_attributes().clone()) + .or_insert_with(|| Arc::new(TrackerEntry::::new(&self.config))) + .clone(); + overflow_tracker.bound_count.fetch_add(1, Ordering::Relaxed); + otel_debug!( + name: "BoundInstrument.CardinalityOverflow", + message = "bind() called at cardinality limit, attributing to overflow bucket" + ); + Some(overflow_tracker) + } + } + /// Iterate through all attribute sets and populate `DataPoints` in readonly mode. /// This is used for synchronous instruments (Counter, Histogram, etc.) in Cumulative temporality mode, /// where attribute sets persist across collection cycles and [`ValueMap`] is not cleared. @@ -214,7 +314,9 @@ where /// Iterate through all attribute sets in-place, populate `DataPoints` and reset. /// Only entries updated since the last collection (tracked via `has_been_updated`) - /// are exported. Stale entries are evicted to prevent unbounded memory growth. + /// are exported. Stale unbound entries are evicted to prevent unbounded memory growth. + /// Bound entries (bound_count > 0) are never evicted — they persist until explicitly + /// unbound, but produce no export when they have no updates. /// /// Used for synchronous instruments (Counter, Histogram, etc.) in Delta temporality mode. pub(crate) fn collect_and_reset(&self, dest: &mut Vec, mut map_fn: MapFn) @@ -243,8 +345,10 @@ where if seen.insert(Arc::as_ptr(tracker)) { if tracker.has_been_updated.swap(false, Ordering::Acquire) { dest.push(map_fn(attrs.clone(), &tracker.aggregator)); - } else if attrs.as_slice() != overflow_attrs.as_slice() { - // Stale — candidate for eviction + } else if attrs.as_slice() != overflow_attrs.as_slice() + && tracker.bound_count.load(Ordering::Relaxed) == 0 + { + // Stale and not bound — candidate for eviction stale_entries.push(Arc::clone(tracker)); } } @@ -254,10 +358,13 @@ where if !stale_entries.is_empty() { if let Ok(mut trackers) = self.trackers.write() { - // Re-check under write lock to avoid TOCTOU race: a measure() call between - // dropping the read lock and acquiring the write lock could have updated - // an entry we marked as stale. - stale_entries.retain(|entry| !entry.has_been_updated.load(Ordering::Acquire)); + // Re-check under write lock to avoid TOCTOU race: a measure() or bind() call + // between dropping the read lock and acquiring the write lock could have + // updated an entry or bound a handle to one we marked as stale. + stale_entries.retain(|entry| { + !entry.has_been_updated.load(Ordering::Acquire) + && entry.bound_count.load(Ordering::Acquire) == 0 + }); if !stale_entries.is_empty() { let stale_pointers: HashSet<*const TrackerEntry> = @@ -785,4 +892,43 @@ mod tests { "count should reach 0 after eviction" ); } + + /// When the trackers `RwLock` is poisoned, `bind()` cannot safely insert or + /// look up entries, so it returns `None` and the caller (Sum/Histogram/etc.) + /// hands back a `NoopBoundMeasure`. This is a defensive branch that fires + /// on degenerate states (a thread panicked while holding the write lock) + /// and is unreachable through normal traffic. The test induces poisoning + /// explicitly so the branch keeps coverage. + #[cfg(feature = "experimental_metrics_bound_instruments")] + #[test] + fn bind_returns_none_when_trackers_lock_is_poisoned() { + let value_map = ValueMap::>::new((), 100); + + // Poison the trackers RwLock by panicking inside a write guard. + let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + let _guard = value_map.trackers.write().unwrap(); + panic!("intentional poison"); + })); + + assert!( + value_map.trackers.is_poisoned(), + "trackers lock must be poisoned for this test to be meaningful" + ); + + // Empty attrs use the no_attribute_tracker fast path and never touch + // the poisoned lock — they should still succeed. + assert!( + value_map.bind(&[]).is_some(), + "bind(&[]) must succeed even with poisoned lock; uses no_attribute_tracker" + ); + + // Non-empty attrs go through bind_attrs which needs the trackers lock. + // The read-lock try succeeds (only writes poison, but read on poisoned + // can also fail) — fall through to write lock which fails poisoned. + let result = value_map.bind(&[KeyValue::new("k", 1_i64)]); + assert!( + result.is_none(), + "bind() with non-empty attrs must return None on poisoned lock" + ); + } } diff --git a/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs b/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs index 4a549ff676..9f7935eda0 100644 --- a/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs @@ -2,12 +2,46 @@ use opentelemetry::KeyValue; use crate::metrics::data::{self, AggregatedMetrics, MetricData, SumDataPoint}; use crate::metrics::Temporality; +#[cfg(feature = "experimental_metrics_bound_instruments")] +use std::sync::atomic::Ordering; +#[cfg(feature = "experimental_metrics_bound_instruments")] +use std::sync::Arc; use super::aggregate::{AggregateTimeInitiator, AttributeSetFilter}; +#[cfg(feature = "experimental_metrics_bound_instruments")] +use super::Aggregator; use super::{last_value::Assign, AtomicTracker, Number, ValueMap}; +#[cfg(feature = "experimental_metrics_bound_instruments")] +use super::{BoundMeasure, NoopBoundMeasure, TrackerEntry}; use super::{ComputeAggregation, Measure}; use std::{collections::HashMap, sync::Mutex}; +/// Pre-bound precomputed-sum handle. Writes go directly to a fixed +/// `TrackerEntry`. PrecomputedSum is used by asynchronous instruments +/// (ObservableCounter / ObservableUpDownCounter), which do not expose `bind()` +/// to user code, so this impl exists to satisfy the `Measure` trait and is +/// not reachable via the public API today. The implementation mirrors +/// `BoundLastValueHandle` since both share the `Assign` aggregator. +#[cfg(feature = "experimental_metrics_bound_instruments")] +struct BoundPrecomputedSumHandle { + tracker: Arc>>, +} + +#[cfg(feature = "experimental_metrics_bound_instruments")] +impl BoundMeasure for BoundPrecomputedSumHandle { + fn call(&self, measurement: T) { + self.tracker.aggregator.update(measurement); + self.tracker.has_been_updated.store(true, Ordering::Release); + } +} + +#[cfg(feature = "experimental_metrics_bound_instruments")] +impl Drop for BoundPrecomputedSumHandle { + fn drop(&mut self) { + self.tracker.bound_count.fetch_sub(1, Ordering::Relaxed); + } +} + /// Summarizes a set of pre-computed sums as their arithmetic sum. pub(crate) struct PrecomputedSum { value_map: ValueMap>, @@ -138,6 +172,18 @@ where self.value_map.measure(measurement, filtered); }) } + + #[cfg(feature = "experimental_metrics_bound_instruments")] + fn bind(&self, attrs: &[KeyValue]) -> Box> { + let mut bound_attrs = Vec::new(); + self.filter.apply(attrs, |filtered| { + bound_attrs = filtered.to_vec(); + }); + match self.value_map.bind(&bound_attrs) { + Some(tracker) => Box::new(BoundPrecomputedSumHandle { tracker }), + None => Box::new(NoopBoundMeasure::new()), + } + } } impl ComputeAggregation for PrecomputedSum @@ -153,3 +199,73 @@ where (len, new.map(T::make_aggregated_metrics)) } } + +#[cfg(all(test, feature = "experimental_metrics_bound_instruments"))] +mod tests { + use super::*; + use crate::metrics::data::{AggregatedMetrics, MetricData, Sum}; + use std::sync::atomic::Ordering; + + fn extract_sum(agg: AggregatedMetrics) -> Sum { + match agg { + AggregatedMetrics::U64(MetricData::Sum(s)) => s, + _ => panic!("expected u64 Sum"), + } + } + + /// PrecomputedSum is used by ObservableCounter / ObservableUpDownCounter, which + /// do not expose `bind()` to user code. The `Measure::bind` impl exists so the + /// trait is uniform across all aggregators (and future Observable bind() + /// extensions are mechanical). This test exercises the impl directly so the + /// otherwise-unreachable code path stays honest. + #[test] + fn bind_writes_through_bound_handle() { + let pre_sum = PrecomputedSum::::new( + Temporality::Cumulative, + AttributeSetFilter::new(None), + true, + 100, + ); + let attrs = [KeyValue::new("k", "v")]; + let bound = Measure::bind(&pre_sum, &attrs); + + bound.call(99); // PrecomputedSum semantics: each call assigns the absolute value + + let (count, agg) = ComputeAggregation::call(&pre_sum, None); + assert_eq!(count, 1); + let sum = extract_sum(agg.expect("aggregation produced")); + assert_eq!(sum.data_points.len(), 1); + assert_eq!(sum.data_points[0].value, 99); + assert_eq!(sum.data_points[0].attributes, attrs.to_vec()); + } + + #[test] + fn bound_handle_drop_decrements_bound_count() { + let pre_sum = PrecomputedSum::::new( + Temporality::Delta, + AttributeSetFilter::new(None), + true, + 100, + ); + let attrs = [KeyValue::new("k", "v")]; + let bound = Measure::bind(&pre_sum, &attrs); + bound.call(5); + + let trackers = pre_sum.value_map.trackers.read().unwrap(); + let entry = trackers + .values() + .next() + .expect("entry should exist after bind+call"); + assert_eq!(entry.bound_count.load(Ordering::Relaxed), 1); + drop(trackers); + + drop(bound); + + let trackers = pre_sum.value_map.trackers.read().unwrap(); + let entry = trackers + .values() + .next() + .expect("entry should still exist post-drop"); + assert_eq!(entry.bound_count.load(Ordering::Relaxed), 0); + } +} diff --git a/opentelemetry-sdk/src/metrics/internal/sum.rs b/opentelemetry-sdk/src/metrics/internal/sum.rs index 349a5645c5..dc435f6e34 100644 --- a/opentelemetry-sdk/src/metrics/internal/sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/sum.rs @@ -1,10 +1,16 @@ use crate::metrics::data::{self, AggregatedMetrics, MetricData, SumDataPoint}; use crate::metrics::Temporality; use opentelemetry::KeyValue; +#[cfg(feature = "experimental_metrics_bound_instruments")] +use std::sync::atomic::Ordering; +#[cfg(feature = "experimental_metrics_bound_instruments")] +use std::sync::Arc; use super::aggregate::{AggregateTimeInitiator, AttributeSetFilter}; use super::{Aggregator, AtomicTracker, ComputeAggregation, Measure, Number}; use super::{AtomicallyUpdate, ValueMap}; +#[cfg(feature = "experimental_metrics_bound_instruments")] +use super::{BoundMeasure, NoopBoundMeasure, TrackerEntry}; struct Increment where @@ -37,6 +43,31 @@ where } } +/// Pre-bound counter handle: writes go directly to a fixed `TrackerEntry` without +/// per-call attribute lookup. The `tracker` is either a dedicated entry for the +/// bound attribute set, or — if bind() hit the cardinality limit — the shared +/// overflow tracker. Either way, `call()` is a single atomic increment and a +/// release store; no map lookup, no lock acquisition. +#[cfg(feature = "experimental_metrics_bound_instruments")] +struct BoundSumHandle { + tracker: Arc>>, +} + +#[cfg(feature = "experimental_metrics_bound_instruments")] +impl BoundMeasure for BoundSumHandle { + fn call(&self, measurement: T) { + self.tracker.aggregator.update(measurement); + self.tracker.has_been_updated.store(true, Ordering::Release); + } +} + +#[cfg(feature = "experimental_metrics_bound_instruments")] +impl Drop for BoundSumHandle { + fn drop(&mut self) { + self.tracker.bound_count.fetch_sub(1, Ordering::Relaxed); + } +} + /// Summarizes a set of measurements made as their arithmetic sum. pub(crate) struct Sum { value_map: ValueMap>, @@ -153,6 +184,20 @@ where self.value_map.measure(measurement, filtered); }) } + + #[cfg(feature = "experimental_metrics_bound_instruments")] + fn bind(&self, attrs: &[KeyValue]) -> Box> { + let mut bound_attrs = Vec::new(); + self.filter.apply(attrs, |filtered| { + bound_attrs = filtered.to_vec(); + }); + match self.value_map.bind(&bound_attrs) { + Some(tracker) => Box::new(BoundSumHandle { tracker }), + // Trackers RwLock is poisoned — return a noop handle so writes + // silently drop, mirroring `measure()`'s own poison handling. + None => Box::new(NoopBoundMeasure::new()), + } + } } impl ComputeAggregation for Sum diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 46390a60f6..558515cb87 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -122,6 +122,8 @@ impl FromStr for Temporality { #[cfg(all(test, feature = "testing"))] mod tests { + #[cfg(feature = "experimental_metrics_bound_instruments")] + use self::data::ExponentialHistogramDataPoint; use self::data::{HistogramDataPoint, MetricData, ScopeMetrics, SumDataPoint}; use super::internal::Number; use super::*; @@ -4467,6 +4469,28 @@ mod tests { .find(|&datapoint| datapoint.attributes.is_empty()) } + #[cfg(feature = "experimental_metrics_bound_instruments")] + fn find_overflow_histogram_datapoint( + data_points: &[HistogramDataPoint], + ) -> Option<&HistogramDataPoint> { + data_points.iter().find(|&datapoint| { + datapoint.attributes.iter().any(|kv| { + kv.key.as_str() == "otel.metric.overflow" && kv.value == Value::Bool(true) + }) + }) + } + + #[cfg(feature = "experimental_metrics_bound_instruments")] + fn find_overflow_exponential_histogram_datapoint( + data_points: &[ExponentialHistogramDataPoint], + ) -> Option<&ExponentialHistogramDataPoint> { + data_points.iter().find(|&datapoint| { + datapoint.attributes.iter().any(|kv| { + kv.key.as_str() == "otel.metric.overflow" && kv.value == Value::Bool(true) + }) + }) + } + fn find_scope_metric<'a>( metrics: &'a [ScopeMetrics], name: &'a str, @@ -4692,4 +4716,1350 @@ mod tests { assert!("".parse::().is_err()); assert!("cumulativ".parse::().is_err()); } + + #[cfg(feature = "experimental_metrics_bound_instruments")] + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn bound_counter_cumulative() { + let mut test_context = TestContext::new(Temporality::Cumulative); + let counter = test_context.u64_counter("test", "my_counter", None); + let attrs = vec![KeyValue::new("key1", "bound_value")]; + let bound = counter.bind(&attrs); + + bound.add(10); + bound.add(20); + bound.add(30); + test_context.flush_metrics(); + + let MetricData::Sum(sum) = test_context.get_aggregation::("my_counter", None) else { + unreachable!() + }; + + assert_eq!(sum.data_points.len(), 1, "Expected one data point"); + assert!(sum.is_monotonic); + assert_eq!(sum.temporality, Temporality::Cumulative); + + let data_point = &sum.data_points[0]; + assert_eq!(data_point.value, 60); + assert_eq!( + data_point.attributes, + vec![KeyValue::new("key1", "bound_value")] + ); + } + + #[cfg(feature = "experimental_metrics_bound_instruments")] + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn bound_counter_delta() { + let mut test_context = TestContext::new(Temporality::Delta); + let counter = test_context.u64_counter("test", "my_counter", None); + let attrs = vec![KeyValue::new("key1", "bound_value")]; + let bound = counter.bind(&attrs); + + bound.add(50); + test_context.flush_metrics(); + + let MetricData::Sum(sum) = test_context.get_aggregation::("my_counter", None) else { + unreachable!() + }; + assert_eq!(sum.temporality, Temporality::Delta); + assert_eq!(sum.data_points.len(), 1); + assert_eq!(sum.data_points[0].value, 50); + + // After delta collect, add more and collect again + test_context.reset_metrics(); + bound.add(25); + test_context.flush_metrics(); + + let MetricData::Sum(sum) = test_context.get_aggregation::("my_counter", None) else { + unreachable!() + }; + assert_eq!(sum.data_points.len(), 1); + assert_eq!( + sum.data_points[0].value, 25, + "Delta should reset between collections" + ); + } + + #[cfg(feature = "experimental_metrics_bound_instruments")] + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn bound_histogram_cumulative() { + let mut test_context = TestContext::new(Temporality::Cumulative); + let histogram = test_context + .meter() + .f64_histogram("my_histogram") + .with_boundaries(vec![5.0, 10.0, 25.0, 50.0]) + .build(); + let attrs = vec![KeyValue::new("key1", "bound_value")]; + let bound = histogram.bind(&attrs); + + bound.record(1.0); + bound.record(7.5); + bound.record(15.0); + bound.record(30.0); + test_context.flush_metrics(); + + let MetricData::Histogram(histogram_data) = + test_context.get_aggregation::("my_histogram", None) + else { + unreachable!() + }; + + assert_eq!(histogram_data.data_points.len(), 1); + assert_eq!(histogram_data.temporality, Temporality::Cumulative); + + let dp = &histogram_data.data_points[0]; + assert_eq!(dp.count, 4); + assert_eq!(dp.sum, 53.5); + assert_eq!(dp.min.unwrap(), 1.0); + assert_eq!(dp.max.unwrap(), 30.0); + assert_eq!(dp.attributes, vec![KeyValue::new("key1", "bound_value")]); + } + + #[cfg(feature = "experimental_metrics_bound_instruments")] + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn bound_counter_matches_unbound() { + let mut test_context = TestContext::new(Temporality::Cumulative); + let counter = test_context.u64_counter("test", "my_counter", None); + let attrs = vec![KeyValue::new("key1", "shared")]; + let bound = counter.bind(&attrs); + + // Mix bound and unbound additions to the same attribute set + counter.add(10, &attrs); + bound.add(20); + counter.add(30, &attrs); + bound.add(40); + test_context.flush_metrics(); + + let MetricData::Sum(sum) = test_context.get_aggregation::("my_counter", None) else { + unreachable!() + }; + + assert_eq!( + sum.data_points.len(), + 1, + "Bound and unbound should share the same data point" + ); + assert_eq!(sum.data_points[0].value, 100); + } + + #[cfg(feature = "experimental_metrics_bound_instruments")] + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn bound_counter_delta_no_update_no_export() { + let mut test_context = TestContext::new(Temporality::Delta); + let counter = test_context.u64_counter("test", "my_counter", None); + let attrs = vec![KeyValue::new("key1", "bound_value")]; + let bound = counter.bind(&attrs); + + // Cycle 1: add and collect + bound.add(10); + test_context.flush_metrics(); + let MetricData::Sum(sum) = test_context.get_aggregation::("my_counter", None) else { + unreachable!() + }; + assert_eq!(sum.data_points.len(), 1); + assert_eq!(sum.data_points[0].value, 10); + + // Cycle 2: no add, collect — should export nothing + test_context.reset_metrics(); + test_context.flush_metrics(); + let resource_metrics = test_context + .exporter + .get_finished_metrics() + .expect("metrics export should succeed"); + assert!( + resource_metrics.is_empty(), + "Bound handle with no updates should not export" + ); + + // Cycle 3: add again — handle is still alive, produces fresh delta + test_context.reset_metrics(); + bound.add(5); + test_context.flush_metrics(); + let MetricData::Sum(sum) = test_context.get_aggregation::("my_counter", None) else { + unreachable!() + }; + assert_eq!(sum.data_points.len(), 1); + assert_eq!( + sum.data_points[0].value, 5, + "Bound handle should produce fresh delta after quiet cycle" + ); + } + + #[cfg(feature = "experimental_metrics_bound_instruments")] + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn bound_counter_at_overflow_attributes_to_overflow_bucket() { + let cardinality_limit = 3; + let view = move |i: &Instrument| { + if i.name() == "my_counter" { + Stream::builder() + .with_name("my_counter") + .with_cardinality_limit(cardinality_limit) + .build() + .ok() + } else { + None + } + }; + let mut test_context = TestContext::new_with_view(Temporality::Delta, view); + let counter = test_context.u64_counter("test", "my_counter", None); + + // Fill to cardinality limit with unbound calls + for v in 0..cardinality_limit { + counter.add(1, &[KeyValue::new("A", v.to_string())]); + } + + // bind() at overflow — handle binds directly to the overflow tracker + let overflow_attrs = vec![KeyValue::new("A", "overflow_bind")]; + let bound = counter.bind(&overflow_attrs); + bound.add(42); + + test_context.flush_metrics(); + let MetricData::Sum(sum) = test_context.get_aggregation::("my_counter", None) else { + unreachable!() + }; + + // Expect: cardinality_limit unique + 1 overflow = cardinality_limit + 1 + assert_eq!( + sum.data_points.len(), + cardinality_limit + 1, + "Expected {} unique + 1 overflow data points", + cardinality_limit + ); + + // The bound handle's value should appear in the overflow bucket + let overflow_dp = + find_overflow_sum_datapoint(&sum.data_points).expect("overflow point expected"); + assert_eq!( + overflow_dp.value, 42, + "Bound-at-overflow data should go to overflow bucket" + ); + } + + #[cfg(feature = "experimental_metrics_bound_instruments")] + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn bound_counter_overflow_recovery_after_delta_eviction() { + let cardinality_limit = 3; + let view = move |i: &Instrument| { + if i.name() == "my_counter" { + Stream::builder() + .with_name("my_counter") + .with_cardinality_limit(cardinality_limit) + .build() + .ok() + } else { + None + } + }; + let mut test_context = TestContext::new_with_view(Temporality::Delta, view); + let counter = test_context.u64_counter("test", "my_counter", None); + + // Fill to cardinality limit with unbound calls (these are one-shot, not bound) + for v in 0..cardinality_limit { + counter.add(1, &[KeyValue::new("A", v.to_string())]); + } + + // Collect cycle 1: exports the 3 unique entries, then evicts them (no new updates) + test_context.flush_metrics(); + let MetricData::Sum(sum) = test_context.get_aggregation::("my_counter", None) else { + unreachable!() + }; + assert_eq!(sum.data_points.len(), cardinality_limit); + + // Cycle 2: no unbound adds, so the stale entries get evicted. + // Space is now open. A new bind() should get a dedicated tracker. + test_context.reset_metrics(); + test_context.flush_metrics(); // triggers eviction of stale entries + + let new_attrs = vec![KeyValue::new("A", "recovered")]; + let bound = counter.bind(&new_attrs); + bound.add(99); + + test_context.reset_metrics(); + test_context.flush_metrics(); + let MetricData::Sum(sum) = test_context.get_aggregation::("my_counter", None) else { + unreachable!() + }; + + // The bound handle should have a dedicated tracker, NOT overflow + assert_eq!( + sum.data_points.len(), + 1, + "Only the bound entry should be exported" + ); + let dp = find_sum_datapoint_with_key_value(&sum.data_points, "A", "recovered") + .expect("should find dedicated data point for recovered attrs"); + assert_eq!( + dp.value, 99, + "Bound handle after recovery should have dedicated tracker" + ); + assert!( + find_overflow_sum_datapoint(&sum.data_points).is_none(), + "Should not have overflow — bind() after eviction should get a dedicated tracker" + ); + } + + #[cfg(feature = "experimental_metrics_bound_instruments")] + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn bound_counter_multiple_overflow_handles_share_overflow_bucket() { + let cardinality_limit = 2; + let view = move |i: &Instrument| { + if i.name() == "my_counter" { + Stream::builder() + .with_name("my_counter") + .with_cardinality_limit(cardinality_limit) + .build() + .ok() + } else { + None + } + }; + let mut test_context = TestContext::new_with_view(Temporality::Delta, view); + let counter = test_context.u64_counter("test", "my_counter", None); + + // Fill to limit + counter.add(1, &[KeyValue::new("A", "0")]); + counter.add(1, &[KeyValue::new("A", "1")]); + + // Bind two distinct attribute sets at overflow + let bound_a = counter.bind(&[KeyValue::new("A", "overflow_a")]); + let bound_b = counter.bind(&[KeyValue::new("A", "overflow_b")]); + + bound_a.add(10); + bound_b.add(20); + bound_a.add(5); + + test_context.flush_metrics(); + let MetricData::Sum(sum) = test_context.get_aggregation::("my_counter", None) else { + unreachable!() + }; + + let overflow_dp = + find_overflow_sum_datapoint(&sum.data_points).expect("overflow point expected"); + assert_eq!( + overflow_dp.value, 35, + "All overflow-bound measurements should accumulate in overflow bucket" + ); + + // Cycle 2: bound handles still work after delta collect + test_context.reset_metrics(); + bound_a.add(7); + bound_b.add(3); + test_context.flush_metrics(); + + let MetricData::Sum(sum) = test_context.get_aggregation::("my_counter", None) else { + unreachable!() + }; + + let overflow_dp = + find_overflow_sum_datapoint(&sum.data_points).expect("overflow point expected"); + assert_eq!( + overflow_dp.value, 10, + "Overflow-bound handles should continue working across delta cycles" + ); + } + + #[cfg(feature = "experimental_metrics_bound_instruments")] + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn bound_counter_overflow_persists_across_eviction_cycles() { + // Once a bind() lands in overflow, the handle's writes must continue + // landing in overflow for the lifetime of the handle — even after + // delta eviction frees space. This is the predictability guarantee: + // a user inspecting their data should see the bound handle's + // attribution as a single, stable bucket. The recovery story is + // explicit (drop and re-bind), not implicit (silent self-healing). + let cardinality_limit = 3; + let view = move |i: &Instrument| { + if i.name() == "my_counter" { + Stream::builder() + .with_name("my_counter") + .with_cardinality_limit(cardinality_limit) + .build() + .ok() + } else { + None + } + }; + let mut test_context = TestContext::new_with_view(Temporality::Delta, view); + let counter = test_context.u64_counter("test", "my_counter", None); + + // Cycle 1: fill cardinality with unbound calls, then bind at overflow. + for v in 0..cardinality_limit { + counter.add(1, &[KeyValue::new("A", v.to_string())]); + } + let stuck_attrs = vec![KeyValue::new("A", "stuck_in_overflow")]; + let bound = counter.bind(&stuck_attrs); + bound.add(10); + + test_context.flush_metrics(); + let MetricData::Sum(sum) = test_context.get_aggregation::("my_counter", None) else { + unreachable!() + }; + let overflow_dp = find_overflow_sum_datapoint(&sum.data_points) + .expect("cycle 1: bound write at overflow should land in overflow bucket"); + assert_eq!(overflow_dp.value, 10); + + // Cycle 2: no calls. The 3 unbound entries become stale and are evicted, + // freeing all of the cardinality budget. + test_context.reset_metrics(); + test_context.flush_metrics(); + + // Cycle 3: the SAME bound handle is used again. Even though space is + // available, its writes must still land in overflow — the handle is + // permanently bound to overflow, not silently re-resolved. + test_context.reset_metrics(); + bound.add(99); + test_context.flush_metrics(); + let MetricData::Sum(sum) = test_context.get_aggregation::("my_counter", None) else { + unreachable!() + }; + let overflow_dp = find_overflow_sum_datapoint(&sum.data_points) + .expect("cycle 3: bound write must still land in overflow even after space frees up"); + assert_eq!( + overflow_dp.value, 99, + "Bound-at-overflow handle must keep writing to overflow even after delta eviction" + ); + assert!( + find_sum_datapoint_with_key_value(&sum.data_points, "A", "stuck_in_overflow").is_none(), + "Bound-at-overflow handle must not silently self-heal to a dedicated tracker" + ); + } + + #[cfg(feature = "experimental_metrics_bound_instruments")] + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn bound_histogram_delta() { + let mut test_context = TestContext::new(Temporality::Delta); + let histogram = test_context + .meter() + .f64_histogram("my_histogram") + .with_boundaries(vec![5.0, 10.0, 25.0, 50.0]) + .build(); + let attrs = vec![KeyValue::new("key1", "bound_value")]; + let bound = histogram.bind(&attrs); + + // Cycle 1: record and collect + bound.record(3.0); + bound.record(12.0); + test_context.flush_metrics(); + + let MetricData::Histogram(hist) = test_context.get_aggregation::("my_histogram", None) + else { + unreachable!() + }; + assert_eq!(hist.temporality, Temporality::Delta); + assert_eq!(hist.data_points.len(), 1); + assert_eq!(hist.data_points[0].count, 2); + assert_eq!(hist.data_points[0].sum, 15.0); + + // Cycle 2: delta resets, new values + test_context.reset_metrics(); + bound.record(40.0); + test_context.flush_metrics(); + + let MetricData::Histogram(hist) = test_context.get_aggregation::("my_histogram", None) + else { + unreachable!() + }; + assert_eq!(hist.data_points.len(), 1); + assert_eq!( + hist.data_points[0].count, 1, + "Delta should reset count between collections" + ); + assert_eq!( + hist.data_points[0].sum, 40.0, + "Delta should reset sum between collections" + ); + } + + #[cfg(feature = "experimental_metrics_bound_instruments")] + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn bound_histogram_matches_unbound() { + let mut test_context = TestContext::new(Temporality::Cumulative); + let histogram = test_context + .meter() + .f64_histogram("my_histogram") + .with_boundaries(vec![10.0, 50.0]) + .build(); + let attrs = vec![KeyValue::new("key1", "shared")]; + let bound = histogram.bind(&attrs); + + // Mix bound and unbound recordings to the same attribute set + histogram.record(5.0, &attrs); + bound.record(15.0); + histogram.record(25.0, &attrs); + bound.record(35.0); + test_context.flush_metrics(); + + let MetricData::Histogram(hist) = test_context.get_aggregation::("my_histogram", None) + else { + unreachable!() + }; + + assert_eq!( + hist.data_points.len(), + 1, + "Bound and unbound should share the same data point" + ); + assert_eq!(hist.data_points[0].count, 4); + assert_eq!(hist.data_points[0].sum, 80.0); + assert_eq!(hist.data_points[0].min.unwrap(), 5.0); + assert_eq!(hist.data_points[0].max.unwrap(), 35.0); + } + + #[cfg(feature = "experimental_metrics_bound_instruments")] + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn bound_histogram_delta_no_update_no_export() { + let mut test_context = TestContext::new(Temporality::Delta); + let histogram = test_context + .meter() + .f64_histogram("my_histogram") + .with_boundaries(vec![10.0]) + .build(); + let attrs = vec![KeyValue::new("key1", "bound_value")]; + let bound = histogram.bind(&attrs); + + // Cycle 1: record and collect + bound.record(5.0); + test_context.flush_metrics(); + let MetricData::Histogram(hist) = test_context.get_aggregation::("my_histogram", None) + else { + unreachable!() + }; + assert_eq!(hist.data_points.len(), 1); + assert_eq!(hist.data_points[0].count, 1); + + // Cycle 2: no recordings — should export nothing + test_context.reset_metrics(); + test_context.flush_metrics(); + let resource_metrics = test_context + .exporter + .get_finished_metrics() + .expect("metrics export should succeed"); + assert!( + resource_metrics.is_empty(), + "Bound histogram with no updates should not export" + ); + + // Cycle 3: record again — handle still alive + test_context.reset_metrics(); + bound.record(20.0); + test_context.flush_metrics(); + let MetricData::Histogram(hist) = test_context.get_aggregation::("my_histogram", None) + else { + unreachable!() + }; + assert_eq!(hist.data_points.len(), 1); + assert_eq!( + hist.data_points[0].count, 1, + "Fresh delta after quiet cycle" + ); + assert_eq!(hist.data_points[0].sum, 20.0); + } + + #[cfg(feature = "experimental_metrics_bound_instruments")] + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn bound_histogram_at_overflow_attributes_to_overflow_bucket() { + let cardinality_limit = 3; + let view = move |i: &Instrument| { + if i.name() == "my_histogram" { + Stream::builder() + .with_name("my_histogram") + .with_cardinality_limit(cardinality_limit) + .build() + .ok() + } else { + None + } + }; + let mut test_context = TestContext::new_with_view(Temporality::Delta, view); + let histogram = test_context + .meter() + .f64_histogram("my_histogram") + .with_boundaries(vec![10.0, 50.0]) + .build(); + + // Fill to cardinality limit with unbound calls + for v in 0..cardinality_limit { + histogram.record(1.0, &[KeyValue::new("A", v.to_string())]); + } + + // bind() at overflow — handle binds directly to the overflow tracker + let overflow_attrs = vec![KeyValue::new("A", "overflow_bind")]; + let bound = histogram.bind(&overflow_attrs); + bound.record(42.0); + + test_context.flush_metrics(); + let MetricData::Histogram(hist) = test_context.get_aggregation::("my_histogram", None) + else { + unreachable!() + }; + + assert_eq!( + hist.data_points.len(), + cardinality_limit + 1, + "Expected {} unique + 1 overflow data points", + cardinality_limit + ); + + let overflow_dp = + find_overflow_histogram_datapoint(&hist.data_points).expect("overflow point expected"); + assert_eq!( + overflow_dp.sum, 42.0, + "Bound-at-overflow data should go to overflow bucket" + ); + assert_eq!(overflow_dp.count, 1); + } + + #[cfg(feature = "experimental_metrics_bound_instruments")] + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn bound_histogram_overflow_persists_across_eviction_cycles() { + // Mirror of bound_counter_overflow_persists_across_eviction_cycles for + // histograms: a bound-at-overflow handle must keep landing in overflow + // even after delta eviction frees space, for the lifetime of the handle. + let cardinality_limit = 3; + let view = move |i: &Instrument| { + if i.name() == "my_histogram" { + Stream::builder() + .with_name("my_histogram") + .with_cardinality_limit(cardinality_limit) + .build() + .ok() + } else { + None + } + }; + let mut test_context = TestContext::new_with_view(Temporality::Delta, view); + let histogram = test_context + .meter() + .f64_histogram("my_histogram") + .with_boundaries(vec![10.0, 50.0]) + .build(); + + // Cycle 1: fill cardinality with unbound calls, then bind at overflow. + for v in 0..cardinality_limit { + histogram.record(1.0, &[KeyValue::new("A", v.to_string())]); + } + let stuck_attrs = vec![KeyValue::new("A", "stuck_in_overflow")]; + let bound = histogram.bind(&stuck_attrs); + bound.record(15.0); + + test_context.flush_metrics(); + let MetricData::Histogram(hist) = test_context.get_aggregation::("my_histogram", None) + else { + unreachable!() + }; + let overflow_dp = find_overflow_histogram_datapoint(&hist.data_points) + .expect("cycle 1: bound write at overflow should land in overflow bucket"); + assert_eq!(overflow_dp.sum, 15.0); + + // Cycle 2: no calls. Stale unbound entries get evicted, freeing space. + test_context.reset_metrics(); + test_context.flush_metrics(); + + // Cycle 3: same bound handle. Even though space is free, writes must + // still land in overflow. + test_context.reset_metrics(); + bound.record(99.0); + test_context.flush_metrics(); + let MetricData::Histogram(hist) = test_context.get_aggregation::("my_histogram", None) + else { + unreachable!() + }; + let overflow_dp = find_overflow_histogram_datapoint(&hist.data_points) + .expect("cycle 3: bound write must still land in overflow even after space frees up"); + assert_eq!( + overflow_dp.sum, 99.0, + "Bound-at-overflow histogram must keep writing to overflow even after delta eviction" + ); + assert_eq!(overflow_dp.count, 1); + } + + #[cfg(feature = "experimental_metrics_bound_instruments")] + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn bound_exponential_histogram_delta() { + // Histogram configured with Base2ExponentialHistogram aggregation goes + // through ExpoHistogram internally. Verify bind() returns a handle + // whose direct writes accumulate correctly. + let view = |i: &Instrument| { + if i.name() == "my_histogram" { + Stream::builder() + .with_aggregation(Aggregation::Base2ExponentialHistogram { + max_size: 160, + max_scale: 20, + record_min_max: true, + }) + .build() + .ok() + } else { + None + } + }; + let mut test_context = TestContext::new_with_view(Temporality::Delta, view); + let histogram = test_context.meter().f64_histogram("my_histogram").build(); + let attrs = vec![KeyValue::new("key1", "bound_value")]; + let bound = histogram.bind(&attrs); + + bound.record(2.0); + bound.record(4.0); + bound.record(8.0); + // NaN/inf must be filtered just like the unbound path. + bound.record(f64::NAN); + bound.record(f64::INFINITY); + + test_context.flush_metrics(); + let MetricData::ExponentialHistogram(hist) = + test_context.get_aggregation::("my_histogram", None) + else { + panic!("expected ExponentialHistogram aggregation"); + }; + assert_eq!(hist.data_points.len(), 1); + let dp = &hist.data_points[0]; + assert_eq!(dp.count, 3, "NaN and infinity should be dropped"); + assert_eq!(dp.sum, 14.0); + assert_eq!(dp.min, Some(2.0)); + assert_eq!(dp.max, Some(8.0)); + } + + #[cfg(feature = "experimental_metrics_bound_instruments")] + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn bound_exponential_histogram_at_overflow_attributes_to_overflow_bucket() { + let cardinality_limit = 3; + let view = move |i: &Instrument| { + if i.name() == "my_histogram" { + Stream::builder() + .with_aggregation(Aggregation::Base2ExponentialHistogram { + max_size: 160, + max_scale: 20, + record_min_max: true, + }) + .with_cardinality_limit(cardinality_limit) + .build() + .ok() + } else { + None + } + }; + let mut test_context = TestContext::new_with_view(Temporality::Delta, view); + let histogram = test_context.meter().f64_histogram("my_histogram").build(); + + for v in 0..cardinality_limit { + histogram.record(1.0, &[KeyValue::new("A", v.to_string())]); + } + + // bind() at overflow — handle binds directly to the overflow tracker + let overflow_attrs = vec![KeyValue::new("A", "overflow_bind")]; + let bound = histogram.bind(&overflow_attrs); + bound.record(42.0); + + test_context.flush_metrics(); + let MetricData::ExponentialHistogram(hist) = + test_context.get_aggregation::("my_histogram", None) + else { + panic!("expected ExponentialHistogram aggregation"); + }; + let overflow_dp = find_overflow_exponential_histogram_datapoint(&hist.data_points) + .expect("overflow point expected"); + assert_eq!( + overflow_dp.sum, 42.0, + "Bound-at-overflow data should go to overflow bucket" + ); + assert_eq!(overflow_dp.count, 1); + } + + #[cfg(feature = "experimental_metrics_bound_instruments")] + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn bound_exponential_histogram_overflow_persists_across_eviction_cycles() { + // Same predictability invariant the counter/histogram tests assert, + // verified for the exponential histogram aggregator path. + let cardinality_limit = 3; + let view = move |i: &Instrument| { + if i.name() == "my_histogram" { + Stream::builder() + .with_aggregation(Aggregation::Base2ExponentialHistogram { + max_size: 160, + max_scale: 20, + record_min_max: true, + }) + .with_cardinality_limit(cardinality_limit) + .build() + .ok() + } else { + None + } + }; + let mut test_context = TestContext::new_with_view(Temporality::Delta, view); + let histogram = test_context.meter().f64_histogram("my_histogram").build(); + + for v in 0..cardinality_limit { + histogram.record(1.0, &[KeyValue::new("A", v.to_string())]); + } + let stuck_attrs = vec![KeyValue::new("A", "stuck_in_overflow")]; + let bound = histogram.bind(&stuck_attrs); + bound.record(15.0); + + test_context.flush_metrics(); + let MetricData::ExponentialHistogram(hist) = + test_context.get_aggregation::("my_histogram", None) + else { + panic!("expected ExponentialHistogram aggregation"); + }; + let overflow_dp = find_overflow_exponential_histogram_datapoint(&hist.data_points) + .expect("cycle 1: bound write at overflow should land in overflow bucket"); + assert_eq!(overflow_dp.sum, 15.0); + + // Cycle 2: evict stale entries, freeing space. + test_context.reset_metrics(); + test_context.flush_metrics(); + + // Cycle 3: bound handle keeps writing to overflow. + test_context.reset_metrics(); + bound.record(99.0); + test_context.flush_metrics(); + let MetricData::ExponentialHistogram(hist) = + test_context.get_aggregation::("my_histogram", None) + else { + panic!("expected ExponentialHistogram aggregation"); + }; + let overflow_dp = find_overflow_exponential_histogram_datapoint(&hist.data_points) + .expect("cycle 3: bound write must still land in overflow even after space frees up"); + assert_eq!( + overflow_dp.sum, 99.0, + "Bound-at-overflow ExpoHistogram must keep writing to overflow even after delta eviction" + ); + assert_eq!(overflow_dp.count, 1); + } + + #[cfg(feature = "experimental_metrics_bound_instruments")] + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn bound_counter_drop_enables_eviction() { + let mut test_context = TestContext::new(Temporality::Delta); + let counter = test_context.u64_counter("test", "my_counter", None); + let attrs = vec![KeyValue::new("key1", "ephemeral")]; + + { + let bound = counter.bind(&attrs); + bound.add(100); + test_context.flush_metrics(); + + let MetricData::Sum(sum) = test_context.get_aggregation::("my_counter", None) + else { + unreachable!() + }; + assert_eq!(sum.data_points.len(), 1); + assert_eq!(sum.data_points[0].value, 100); + // bound drops here + } + + // Cycle 2: no updates, bound handle dropped — entry becomes stale and evictable + test_context.reset_metrics(); + test_context.flush_metrics(); + + // Cycle 3: the stale entry should have been evicted, so a new unbound add + // should be the only data point + test_context.reset_metrics(); + counter.add(1, &[KeyValue::new("key1", "new_entry")]); + test_context.flush_metrics(); + + let MetricData::Sum(sum) = test_context.get_aggregation::("my_counter", None) else { + unreachable!() + }; + + // Only the new entry should be present — the old "ephemeral" entry was evicted + assert_eq!(sum.data_points.len(), 1); + let dp = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "new_entry") + .expect("new_entry should be present"); + assert_eq!(dp.value, 1); + assert!( + find_sum_datapoint_with_key_value(&sum.data_points, "key1", "ephemeral").is_none(), + "Dropped bound entry should have been evicted" + ); + } + + #[cfg(feature = "experimental_metrics_bound_instruments")] + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn bound_counter_multiple_handles_same_attrs() { + let mut test_context = TestContext::new(Temporality::Delta); + let counter = test_context.u64_counter("test", "my_counter", None); + let attrs = vec![KeyValue::new("key1", "shared")]; + + let bound1 = counter.bind(&attrs); + let bound2 = counter.bind(&attrs); + + bound1.add(10); + bound2.add(20); + test_context.flush_metrics(); + + let MetricData::Sum(sum) = test_context.get_aggregation::("my_counter", None) else { + unreachable!() + }; + assert_eq!( + sum.data_points.len(), + 1, + "Multiple handles to same attrs should share data point" + ); + assert_eq!(sum.data_points[0].value, 30); + + // Drop one handle — entry should NOT be evicted + drop(bound1); + test_context.reset_metrics(); + test_context.flush_metrics(); // idle cycle, but bound2 still holds it + + // bound2 still works + test_context.reset_metrics(); + bound2.add(5); + test_context.flush_metrics(); + + let MetricData::Sum(sum) = test_context.get_aggregation::("my_counter", None) else { + unreachable!() + }; + assert_eq!(sum.data_points.len(), 1); + assert_eq!( + sum.data_points[0].value, 5, + "Entry should persist while any handle is alive" + ); + } + + #[cfg(feature = "experimental_metrics_bound_instruments")] + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn bound_counter_empty_attributes() { + let mut test_context = TestContext::new(Temporality::Cumulative); + let counter = test_context.u64_counter("test", "my_counter", None); + let bound = counter.bind(&[]); + + bound.add(10); + bound.add(30); + test_context.flush_metrics(); + + let MetricData::Sum(sum) = test_context.get_aggregation::("my_counter", None) else { + unreachable!() + }; + + assert_eq!(sum.data_points.len(), 1); + assert_eq!(sum.data_points[0].value, 40); + } + + #[cfg(feature = "experimental_metrics_bound_instruments")] + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn bound_counter_empty_attributes_shares_with_unbound() { + let mut test_context = TestContext::new(Temporality::Cumulative); + let counter = test_context.u64_counter("test", "my_counter", None); + let bound = counter.bind(&[]); + + // Mix bound and unbound calls with empty attributes — they must share + // the same data point (both route to no_attribute_tracker). + counter.add(10, &[]); + bound.add(20); + counter.add(30, &[]); + bound.add(40); + test_context.flush_metrics(); + + let MetricData::Sum(sum) = test_context.get_aggregation::("my_counter", None) else { + unreachable!() + }; + + assert_eq!( + sum.data_points.len(), + 1, + "Bound and unbound with empty attributes must share the same data point" + ); + assert_eq!(sum.data_points[0].value, 100); + } + + #[cfg(feature = "experimental_metrics_bound_instruments")] + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn bound_histogram_empty_attributes_shares_with_unbound() { + let mut test_context = TestContext::new(Temporality::Cumulative); + let histogram = test_context + .meter() + .u64_histogram("my_histogram") + .with_boundaries(vec![5.0, 10.0, 25.0]) + .build(); + let bound = histogram.bind(&[]); + + histogram.record(3, &[]); + bound.record(7); + histogram.record(20, &[]); + test_context.flush_metrics(); + + let MetricData::Histogram(hist) = test_context.get_aggregation::("my_histogram", None) + else { + unreachable!() + }; + + assert_eq!( + hist.data_points.len(), + 1, + "Bound and unbound with empty attributes must share the same data point" + ); + let dp = &hist.data_points[0]; + assert!(dp.attributes.is_empty()); + assert_eq!(dp.count, 3); + assert_eq!(dp.sum, 30); + } + + #[cfg(feature = "experimental_metrics_bound_instruments")] + #[cfg(feature = "spec_unstable_metrics_views")] + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn bound_counter_view_filters_attributes_at_bind_time() { + use opentelemetry::Key; + + let exporter = InMemoryMetricExporter::default(); + let view = |i: &Instrument| { + if i.name() == "my_counter" { + Stream::builder() + .with_allowed_attribute_keys(vec![Key::new("k1"), Key::new("k2")]) + .build() + .ok() + } else { + None + } + }; + let meter_provider = SdkMeterProvider::builder() + .with_periodic_exporter(exporter.clone()) + .with_view(view) + .build(); + let meter = meter_provider.meter("test"); + let counter = meter.u64_counter("my_counter").build(); + + // bind with k3 included — view should drop it at bind time + let bound = counter.bind(&[ + KeyValue::new("k1", "v1"), + KeyValue::new("k2", "v2"), + KeyValue::new("k3", "v3"), + ]); + bound.add(10); + bound.add(20); + + // unbound call with a *different* k3 value: after view filtering both + // bound and unbound must collapse into the same data point. + counter.add( + 7, + &[ + KeyValue::new("k1", "v1"), + KeyValue::new("k2", "v2"), + KeyValue::new("k3", "different"), + ], + ); + + meter_provider.force_flush().unwrap(); + let resource_metrics = exporter + .get_finished_metrics() + .expect("metrics are expected to be exported."); + let metric = &resource_metrics[0].scope_metrics[0].metrics[0]; + let data::AggregatedMetrics::U64(MetricData::Sum(sum)) = &metric.data else { + unreachable!() + }; + + assert_eq!( + sum.data_points.len(), + 1, + "view should filter k3, leaving bound+unbound to aggregate together" + ); + assert_eq!(sum.data_points[0].value, 37); + let attrs = &sum.data_points[0].attributes; + assert_eq!(attrs.len(), 2); + assert!(attrs.iter().any(|kv| kv.key.as_str() == "k1")); + assert!(attrs.iter().any(|kv| kv.key.as_str() == "k2")); + assert!(!attrs.iter().any(|kv| kv.key.as_str() == "k3")); + } + + #[cfg(feature = "experimental_metrics_bound_instruments")] + #[cfg(feature = "spec_unstable_metrics_views")] + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn bound_histogram_view_filters_attributes_at_bind_time() { + use opentelemetry::Key; + + let exporter = InMemoryMetricExporter::default(); + let view = |i: &Instrument| { + if i.name() == "my_hist" { + Stream::builder() + .with_allowed_attribute_keys(vec![Key::new("k1"), Key::new("k2")]) + .build() + .ok() + } else { + None + } + }; + let meter_provider = SdkMeterProvider::builder() + .with_periodic_exporter(exporter.clone()) + .with_view(view) + .build(); + let meter = meter_provider.meter("test"); + let histogram = meter + .u64_histogram("my_hist") + .with_boundaries(vec![5.0, 10.0, 25.0]) + .build(); + + let bound = histogram.bind(&[ + KeyValue::new("k1", "v1"), + KeyValue::new("k2", "v2"), + KeyValue::new("k3", "v3"), + ]); + bound.record(3); + bound.record(20); + histogram.record( + 7, + &[ + KeyValue::new("k1", "v1"), + KeyValue::new("k2", "v2"), + KeyValue::new("k3", "different"), + ], + ); + + meter_provider.force_flush().unwrap(); + let resource_metrics = exporter + .get_finished_metrics() + .expect("metrics are expected to be exported."); + let metric = &resource_metrics[0].scope_metrics[0].metrics[0]; + let data::AggregatedMetrics::U64(MetricData::Histogram(hist)) = &metric.data else { + unreachable!() + }; + + assert_eq!( + hist.data_points.len(), + 1, + "view should filter k3, leaving bound+unbound to aggregate together" + ); + let dp = &hist.data_points[0]; + assert_eq!(dp.count, 3); + assert_eq!(dp.sum, 30); + assert_eq!(dp.attributes.len(), 2); + assert!(dp.attributes.iter().any(|kv| kv.key.as_str() == "k1")); + assert!(dp.attributes.iter().any(|kv| kv.key.as_str() == "k2")); + assert!(!dp.attributes.iter().any(|kv| kv.key.as_str() == "k3")); + } + + #[cfg(feature = "experimental_metrics_bound_instruments")] + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn bound_counter_at_overflow_attributes_to_overflow_bucket_cumulative() { + // Cumulative: cardinality only grows, never evicts. A bind() at the + // limit lands in overflow and accumulates there forever. Verifies the + // bound handle's cumulative writes converge in the overflow bucket + // across multiple collection cycles. + let cardinality_limit = 3; + let view = move |i: &Instrument| { + if i.name() == "my_counter" { + Stream::builder() + .with_name("my_counter") + .with_cardinality_limit(cardinality_limit) + .build() + .ok() + } else { + None + } + }; + let mut test_context = TestContext::new_with_view(Temporality::Cumulative, view); + let counter = test_context.u64_counter("test", "my_counter", None); + + for v in 0..cardinality_limit { + counter.add(1, &[KeyValue::new("A", v.to_string())]); + } + let bound = counter.bind(&[KeyValue::new("A", "overflow_bind")]); + bound.add(10); + + test_context.flush_metrics(); + let MetricData::Sum(sum) = test_context.get_aggregation::("my_counter", None) else { + unreachable!() + }; + let overflow_dp = find_overflow_sum_datapoint(&sum.data_points) + .expect("cycle 1: overflow point expected"); + assert_eq!(overflow_dp.value, 10); + + // Cycle 2: cumulative state accumulates internally; reset the exporter + // so the assertion sees a single export rather than two appended ones. + test_context.reset_metrics(); + bound.add(7); + test_context.flush_metrics(); + let MetricData::Sum(sum) = test_context.get_aggregation::("my_counter", None) else { + unreachable!() + }; + let overflow_dp = find_overflow_sum_datapoint(&sum.data_points) + .expect("cycle 2: overflow point expected"); + assert_eq!( + overflow_dp.value, 17, + "cumulative overflow-bound writes must accumulate" + ); + } + + #[cfg(feature = "experimental_metrics_bound_instruments")] + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn bound_histogram_at_overflow_attributes_to_overflow_bucket_cumulative() { + let cardinality_limit = 3; + let view = move |i: &Instrument| { + if i.name() == "my_histogram" { + Stream::builder() + .with_name("my_histogram") + .with_cardinality_limit(cardinality_limit) + .build() + .ok() + } else { + None + } + }; + let mut test_context = TestContext::new_with_view(Temporality::Cumulative, view); + let histogram = test_context + .meter() + .f64_histogram("my_histogram") + .with_boundaries(vec![10.0, 50.0]) + .build(); + + for v in 0..cardinality_limit { + histogram.record(1.0, &[KeyValue::new("A", v.to_string())]); + } + let bound = histogram.bind(&[KeyValue::new("A", "overflow_bind")]); + bound.record(20.0); + + test_context.flush_metrics(); + let MetricData::Histogram(hist) = test_context.get_aggregation::("my_histogram", None) + else { + unreachable!() + }; + let overflow_dp = find_overflow_histogram_datapoint(&hist.data_points) + .expect("cycle 1: overflow point expected"); + assert_eq!(overflow_dp.sum, 20.0); + assert_eq!(overflow_dp.count, 1); + + // Cycle 2: cumulative accumulates internally; reset exporter to see a + // single fresh export rather than two appended ones. + test_context.reset_metrics(); + bound.record(30.0); + test_context.flush_metrics(); + let MetricData::Histogram(hist) = test_context.get_aggregation::("my_histogram", None) + else { + unreachable!() + }; + let overflow_dp = find_overflow_histogram_datapoint(&hist.data_points) + .expect("cycle 2: overflow point expected"); + assert_eq!( + overflow_dp.sum, 50.0, + "cumulative overflow-bound writes must accumulate" + ); + assert_eq!(overflow_dp.count, 2); + } + + #[cfg(feature = "experimental_metrics_bound_instruments")] + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn bound_exponential_histogram_at_overflow_attributes_to_overflow_bucket_cumulative() { + let cardinality_limit = 3; + let view = move |i: &Instrument| { + if i.name() == "my_histogram" { + Stream::builder() + .with_aggregation(Aggregation::Base2ExponentialHistogram { + max_size: 160, + max_scale: 20, + record_min_max: true, + }) + .with_cardinality_limit(cardinality_limit) + .build() + .ok() + } else { + None + } + }; + let mut test_context = TestContext::new_with_view(Temporality::Cumulative, view); + let histogram = test_context.meter().f64_histogram("my_histogram").build(); + + for v in 0..cardinality_limit { + histogram.record(1.0, &[KeyValue::new("A", v.to_string())]); + } + let bound = histogram.bind(&[KeyValue::new("A", "overflow_bind")]); + bound.record(20.0); + + test_context.flush_metrics(); + let MetricData::ExponentialHistogram(hist) = + test_context.get_aggregation::("my_histogram", None) + else { + panic!("expected ExponentialHistogram aggregation"); + }; + let overflow_dp = find_overflow_exponential_histogram_datapoint(&hist.data_points) + .expect("cycle 1: overflow point expected"); + assert_eq!(overflow_dp.sum, 20.0); + assert_eq!(overflow_dp.count, 1); + + // Cycle 2: cumulative accumulates internally; reset exporter to see a + // single fresh export rather than two appended ones. + test_context.reset_metrics(); + bound.record(30.0); + test_context.flush_metrics(); + let MetricData::ExponentialHistogram(hist) = + test_context.get_aggregation::("my_histogram", None) + else { + panic!("expected ExponentialHistogram aggregation"); + }; + let overflow_dp = find_overflow_exponential_histogram_datapoint(&hist.data_points) + .expect("cycle 2: overflow point expected"); + assert_eq!( + overflow_dp.sum, 50.0, + "cumulative overflow-bound writes must accumulate" + ); + assert_eq!(overflow_dp.count, 2); + } + + #[cfg(feature = "experimental_metrics_bound_instruments")] + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn bound_exponential_histogram_view_filters_attributes_at_bind_time() { + use opentelemetry::Key; + + let exporter = InMemoryMetricExporter::default(); + let view = |i: &Instrument| { + if i.name() == "my_hist" { + Stream::builder() + .with_aggregation(Aggregation::Base2ExponentialHistogram { + max_size: 160, + max_scale: 20, + record_min_max: true, + }) + .with_allowed_attribute_keys(vec![Key::new("k1"), Key::new("k2")]) + .build() + .ok() + } else { + None + } + }; + let meter_provider = SdkMeterProvider::builder() + .with_periodic_exporter(exporter.clone()) + .with_view(view) + .build(); + let meter = meter_provider.meter("test"); + let histogram = meter.f64_histogram("my_hist").build(); + + let bound = histogram.bind(&[ + KeyValue::new("k1", "v1"), + KeyValue::new("k2", "v2"), + KeyValue::new("k3", "v3"), + ]); + bound.record(3.0); + bound.record(20.0); + // Unbound call with a different k3: after view filtering, bound and unbound + // must collapse into the same exponential histogram data point. + histogram.record( + 7.0, + &[ + KeyValue::new("k1", "v1"), + KeyValue::new("k2", "v2"), + KeyValue::new("k3", "different"), + ], + ); + + meter_provider.force_flush().unwrap(); + let resource_metrics = exporter + .get_finished_metrics() + .expect("metrics are expected to be exported."); + let metric = &resource_metrics[0].scope_metrics[0].metrics[0]; + let data::AggregatedMetrics::F64(MetricData::ExponentialHistogram(hist)) = &metric.data + else { + panic!("expected ExponentialHistogram aggregation"); + }; + + assert_eq!( + hist.data_points.len(), + 1, + "view should filter k3, leaving bound+unbound to aggregate together" + ); + let dp = &hist.data_points[0]; + assert_eq!(dp.count, 3); + assert_eq!(dp.sum, 30.0); + assert_eq!(dp.attributes.len(), 2); + assert!(dp.attributes.iter().any(|kv| kv.key.as_str() == "k1")); + assert!(dp.attributes.iter().any(|kv| kv.key.as_str() == "k2")); + assert!(!dp.attributes.iter().any(|kv| kv.key.as_str() == "k3")); + } } diff --git a/opentelemetry-sdk/src/metrics/noop.rs b/opentelemetry-sdk/src/metrics/noop.rs index 1a490698ad..62b9dfe1fc 100644 --- a/opentelemetry-sdk/src/metrics/noop.rs +++ b/opentelemetry-sdk/src/metrics/noop.rs @@ -1,3 +1,5 @@ +#[cfg(feature = "experimental_metrics_bound_instruments")] +use opentelemetry::metrics::BoundSyncInstrument; use opentelemetry::{ metrics::{InstrumentProvider, SyncInstrument}, KeyValue, @@ -35,4 +37,21 @@ impl SyncInstrument for NoopSyncInstrument { fn measure(&self, _value: T, _attributes: &[KeyValue]) { // Ignored } + + #[cfg(feature = "experimental_metrics_bound_instruments")] + fn bind(&self, _attributes: &[KeyValue]) -> Box + Send + Sync> { + Box::new(NoopBoundSyncInstrument { _private: () }) + } +} + +#[cfg(feature = "experimental_metrics_bound_instruments")] +struct NoopBoundSyncInstrument { + _private: (), +} + +#[cfg(feature = "experimental_metrics_bound_instruments")] +impl BoundSyncInstrument for NoopBoundSyncInstrument { + fn measure(&self, _measurement: T) { + // Ignored + } } diff --git a/opentelemetry/CHANGELOG.md b/opentelemetry/CHANGELOG.md index ff23b060d5..96d6ab2adb 100644 --- a/opentelemetry/CHANGELOG.md +++ b/opentelemetry/CHANGELOG.md @@ -2,6 +2,16 @@ ## vNext +- **Added** `BoundCounter` and `BoundHistogram` types that cache resolved + aggregator references for a fixed attribute set. Created via `Counter::bind()` + and `Histogram::bind()`, bound instruments bypass per-call attribute lookup, + providing significant performance improvements for hot paths where the same + attributes are used repeatedly. Both types implement `Clone` so a single bound + state can be shared across threads or modules without re-binding. Also adds + the `SyncInstrument::bind()` trait method and `BoundSyncInstrument` trait + for SDK implementors; the trait method has a no-op default so custom + `SyncInstrument` impls degrade gracefully without panicking. Gated behind the + `experimental_metrics_bound_instruments` feature flag. - Add `reserve` method to `opentelemetry::propagation::Injector` to hint at the number of elements that will be added to avoid multiple resize operations of the underlying data structure. Has an empty default implementation. - **Breaking** Removed the following public fields and methods from the `SpanBuilder` [#3227][3227]: - `trace_id`, `span_id`, `end_time`, `status`, `sampling_result` diff --git a/opentelemetry/Cargo.toml b/opentelemetry/Cargo.toml index 6de93e7197..c930057e2d 100644 --- a/opentelemetry/Cargo.toml +++ b/opentelemetry/Cargo.toml @@ -39,6 +39,7 @@ metrics = [] testing = ["trace"] logs = [] internal-logs = ["tracing"] +experimental_metrics_bound_instruments = ["metrics"] [dev-dependencies] opentelemetry_sdk = { path = "../opentelemetry-sdk"} # for documentation tests diff --git a/opentelemetry/src/metrics/instruments/counter.rs b/opentelemetry/src/metrics/instruments/counter.rs index 8d72657686..0574e39719 100644 --- a/opentelemetry/src/metrics/instruments/counter.rs +++ b/opentelemetry/src/metrics/instruments/counter.rs @@ -2,6 +2,8 @@ use crate::KeyValue; use core::fmt; use std::sync::Arc; +#[cfg(feature = "experimental_metrics_bound_instruments")] +use super::BoundSyncInstrument; use super::SyncInstrument; /// An instrument that records increasing values. @@ -32,6 +34,12 @@ impl Counter { pub fn add(&self, value: T, attributes: &[KeyValue]) { self.0.measure(value, attributes) } + + /// Binds this counter to a fixed set of attributes. + #[cfg(feature = "experimental_metrics_bound_instruments")] + pub fn bind(&self, attributes: &[KeyValue]) -> BoundCounter { + BoundCounter(Arc::from(self.0.bind(attributes))) + } } /// An async instrument that records increasing values. @@ -59,3 +67,32 @@ impl fmt::Debug for ObservableCounter { )) } } + +/// A counter bound to a fixed set of attributes. +/// +/// Created by calling [`Counter::bind`] with an attribute set. All subsequent +/// [`add`](BoundCounter::add) calls use the pre-resolved attributes, bypassing +/// per-call attribute lookup for significantly better performance. +/// +/// `BoundCounter` can be cloned cheaply to share a single bound state across +/// threads or modules without re-binding. The underlying tracker is reclaimed +/// when the last clone is dropped. +#[cfg(feature = "experimental_metrics_bound_instruments")] +#[derive(Clone)] +#[must_use = "dropping a BoundCounter immediately is a no-op; store it to benefit from pre-bound attributes"] +pub struct BoundCounter(Arc + Send + Sync>); + +#[cfg(feature = "experimental_metrics_bound_instruments")] +impl fmt::Debug for BoundCounter { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_fmt(format_args!("BoundCounter<{}>", std::any::type_name::())) + } +} + +#[cfg(feature = "experimental_metrics_bound_instruments")] +impl BoundCounter { + /// Records an increment to the counter using the pre-bound attributes. + pub fn add(&self, value: T) { + self.0.measure(value) + } +} diff --git a/opentelemetry/src/metrics/instruments/histogram.rs b/opentelemetry/src/metrics/instruments/histogram.rs index 73c7d0bc96..357cdcc199 100644 --- a/opentelemetry/src/metrics/instruments/histogram.rs +++ b/opentelemetry/src/metrics/instruments/histogram.rs @@ -2,6 +2,8 @@ use crate::KeyValue; use core::fmt; use std::sync::Arc; +#[cfg(feature = "experimental_metrics_bound_instruments")] +use super::BoundSyncInstrument; use super::SyncInstrument; /// An instrument that records a distribution of values. @@ -32,4 +34,42 @@ impl Histogram { pub fn record(&self, value: T, attributes: &[KeyValue]) { self.0.measure(value, attributes) } + + /// Binds this histogram to a fixed set of attributes. + #[cfg(feature = "experimental_metrics_bound_instruments")] + pub fn bind(&self, attributes: &[KeyValue]) -> BoundHistogram { + BoundHistogram(Arc::from(self.0.bind(attributes))) + } +} + +/// A histogram bound to a fixed set of attributes. +/// +/// Created by calling [`Histogram::bind`] with an attribute set. All subsequent +/// [`record`](BoundHistogram::record) calls use the pre-resolved attributes, bypassing +/// per-call attribute lookup for significantly better performance. +/// +/// `BoundHistogram` can be cloned cheaply to share a single bound state across +/// threads or modules without re-binding. The underlying tracker is reclaimed +/// when the last clone is dropped. +#[cfg(feature = "experimental_metrics_bound_instruments")] +#[derive(Clone)] +#[must_use = "dropping a BoundHistogram immediately is a no-op; store it to benefit from pre-bound attributes"] +pub struct BoundHistogram(Arc + Send + Sync>); + +#[cfg(feature = "experimental_metrics_bound_instruments")] +impl fmt::Debug for BoundHistogram { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_fmt(format_args!( + "BoundHistogram<{}>", + std::any::type_name::() + )) + } +} + +#[cfg(feature = "experimental_metrics_bound_instruments")] +impl BoundHistogram { + /// Records a value in the histogram using the pre-bound attributes. + pub fn record(&self, value: T) { + self.0.measure(value) + } } diff --git a/opentelemetry/src/metrics/instruments/mod.rs b/opentelemetry/src/metrics/instruments/mod.rs index dc5a5ff0bb..968ed800d5 100644 --- a/opentelemetry/src/metrics/instruments/mod.rs +++ b/opentelemetry/src/metrics/instruments/mod.rs @@ -28,6 +28,29 @@ pub trait AsyncInstrument: Send + Sync { pub trait SyncInstrument: Send + Sync { /// Records a measurement synchronously. fn measure(&self, measurement: T, attributes: &[KeyValue]); + + /// Binds this instrument to a fixed set of attributes, returning a handle + /// that records measurements without per-call attribute lookup. + /// + /// The default implementation returns a no-op handle so that custom + /// `SyncInstrument` impls that have not opted into bound instruments + /// degrade gracefully rather than panicking on the user's hot path. + #[cfg(feature = "experimental_metrics_bound_instruments")] + fn bind(&self, _attributes: &[KeyValue]) -> Box + Send + Sync> { + crate::otel_debug!( + name: "SyncInstrument.BindNotImplemented", + message = "bind() called on a SyncInstrument implementation that does not override the default; measurements through the returned handle will be dropped" + ); + Box::new(crate::metrics::noop::NoopBoundSyncInstrument::new()) + } +} + +/// A pre-bound synchronous instrument that records measurements without attributes. +/// Created by calling `bind()` on a `Counter` or `Histogram` with a fixed attribute set. +#[cfg(feature = "experimental_metrics_bound_instruments")] +pub trait BoundSyncInstrument: Send + Sync { + /// Records a measurement. The attributes were fixed at bind time. + fn measure(&self, measurement: T); } /// Configuration for building a Histogram. diff --git a/opentelemetry/src/metrics/mod.rs b/opentelemetry/src/metrics/mod.rs index 71e2c13df8..2ebe3a41b2 100644 --- a/opentelemetry/src/metrics/mod.rs +++ b/opentelemetry/src/metrics/mod.rs @@ -5,6 +5,8 @@ use std::sync::Arc; mod instruments; mod meter; pub mod noop; +#[cfg(feature = "experimental_metrics_bound_instruments")] +pub use instruments::{counter::BoundCounter, histogram::BoundHistogram, BoundSyncInstrument}; pub use instruments::{ counter::{Counter, ObservableCounter}, gauge::{Gauge, ObservableGauge}, diff --git a/opentelemetry/src/metrics/noop.rs b/opentelemetry/src/metrics/noop.rs index c68671bf59..0a1dad5b33 100644 --- a/opentelemetry/src/metrics/noop.rs +++ b/opentelemetry/src/metrics/noop.rs @@ -9,6 +9,8 @@ use crate::{ }; use std::sync::Arc; +#[cfg(feature = "experimental_metrics_bound_instruments")] +use super::instruments::BoundSyncInstrument; use super::instruments::SyncInstrument; /// A no-op instance of a `MetricProvider` @@ -59,8 +61,32 @@ impl NoopSyncInstrument { } } -impl SyncInstrument for NoopSyncInstrument { +impl SyncInstrument for NoopSyncInstrument { fn measure(&self, _value: T, _attributes: &[KeyValue]) { // Ignored } + + #[cfg(feature = "experimental_metrics_bound_instruments")] + fn bind(&self, _attributes: &[KeyValue]) -> Box + Send + Sync> { + Box::new(NoopBoundSyncInstrument::new()) + } +} + +#[cfg(feature = "experimental_metrics_bound_instruments")] +pub(crate) struct NoopBoundSyncInstrument { + _private: (), +} + +#[cfg(feature = "experimental_metrics_bound_instruments")] +impl NoopBoundSyncInstrument { + pub(crate) fn new() -> Self { + NoopBoundSyncInstrument { _private: () } + } +} + +#[cfg(feature = "experimental_metrics_bound_instruments")] +impl BoundSyncInstrument for NoopBoundSyncInstrument { + fn measure(&self, _measurement: T) { + // Ignored + } }