Skip to content

Commit de9da26

Browse files
committed
refactor(metrics): extend direct-bind to all aggregators
Removes BoundFallbackHandle and the fallback parameter on Measure::bind, leaving every aggregator on the same direct-bind shape: bind() returns a BoundXxxHandle holding an Arc<TrackerEntry> that writes directly to the aggregator without per-call attribute lookup. LastValue, PrecomputedSum, and ExpoHistogram now match the Sum / Histogram pattern. PrecomputedSum's bind() is unreachable from user code today (async-only) but the impl exists so the trait is uniform and future Gauge / Observable bind() extensions are mechanical. The rare RwLock-poisoned case now yields a NoopBoundMeasure that drops measurements silently — mirroring measure()'s own poison handling. Test coverage added: - Bound ExponentialHistogram via View: delta with NaN/inf filter, bind-at-overflow attribution, persistence across delta eviction, view filter applied at bind time, and cumulative accumulation. - Cumulative bind-at-overflow tests for Counter and Histogram. - LastValue::bind and PrecomputedSum::bind unit tests exercising Measure / BoundMeasure traits and bound_count Drop semantics. - ValueMap::bind under a poisoned trackers RwLock returns None so the caller produces a NoopBoundMeasure rather than panicking.
1 parent e84c57b commit de9da26

9 files changed

Lines changed: 729 additions & 36 deletions

File tree

opentelemetry-sdk/src/metrics/instrument.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,7 @@ impl<T: Copy + 'static> SyncInstrument<T> for ResolvedMeasures<T> {
398398
let bound_measures: Vec<Box<dyn BoundMeasure<T>>> = self
399399
.measures
400400
.iter()
401-
.map(|m| m.bind(attrs, Arc::clone(m)))
401+
.map(|m| m.bind(attrs))
402402
.collect();
403403
Box::new(ResolvedBoundMeasures {
404404
measures: bound_measures,

opentelemetry-sdk/src/metrics/internal/aggregate.rs

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ pub(crate) trait Measure<T>: Send + Sync + 'static {
2020
fn call(&self, measurement: T, attrs: &[KeyValue]);
2121

2222
#[cfg(feature = "experimental_metrics_bound_instruments")]
23-
fn bind(&self, attrs: &[KeyValue], fallback: Arc<dyn Measure<T>>) -> Box<dyn BoundMeasure<T>>;
23+
fn bind(&self, attrs: &[KeyValue]) -> Box<dyn BoundMeasure<T>>;
2424
}
2525

2626
/// A pre-bound measurement handle that bypasses attribute lookup.
@@ -29,26 +29,28 @@ pub(crate) trait BoundMeasure<T>: Send + Sync + 'static {
2929
fn call(&self, measurement: T);
3030
}
3131

32-
/// Fallback bound handle for aggregator types that don't support direct binding.
33-
/// Delegates every call to the unbound `Measure::call()` path.
32+
/// A bound handle that drops every measurement silently. Used when
33+
/// `ValueMap::bind` returns `None` because the trackers `RwLock` is poisoned —
34+
/// an extremely rare degenerate state in which the SDK can no longer aggregate
35+
/// reliably. Returning a noop here mirrors `measure()`'s own poison handling
36+
/// (silent drop) rather than panicking on the user's hot path.
3437
#[cfg(feature = "experimental_metrics_bound_instruments")]
35-
pub(crate) struct BoundFallbackHandle<T> {
36-
measure: Arc<dyn Measure<T>>,
37-
attrs: Vec<KeyValue>,
38+
pub(crate) struct NoopBoundMeasure<T> {
39+
_marker: marker::PhantomData<T>,
3840
}
3941

4042
#[cfg(feature = "experimental_metrics_bound_instruments")]
41-
impl<T: Send + Sync + 'static> BoundMeasure<T> for BoundFallbackHandle<T> {
42-
fn call(&self, measurement: T) {
43-
self.measure.call(measurement, &self.attrs);
43+
impl<T> NoopBoundMeasure<T> {
44+
pub(crate) fn new() -> Self {
45+
Self {
46+
_marker: marker::PhantomData,
47+
}
4448
}
4549
}
4650

4751
#[cfg(feature = "experimental_metrics_bound_instruments")]
48-
impl<T> BoundFallbackHandle<T> {
49-
pub(crate) fn new(measure: Arc<dyn Measure<T>>, attrs: Vec<KeyValue>) -> Self {
50-
Self { measure, attrs }
51-
}
52+
impl<T: Send + Sync + 'static> BoundMeasure<T> for NoopBoundMeasure<T> {
53+
fn call(&self, _measurement: T) {}
5254
}
5355

5456
/// Stores the aggregate of measurements into the aggregation and returns the number

opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
#[cfg(feature = "experimental_metrics_bound_instruments")]
2+
use std::sync::atomic::Ordering;
3+
#[cfg(feature = "experimental_metrics_bound_instruments")]
24
use std::sync::Arc;
35
use std::{f64::consts::LOG2_E, mem::replace, ops::DerefMut, sync::Mutex};
46

@@ -15,11 +17,41 @@ use super::{
1517
Aggregator, ComputeAggregation, Measure, Number, ValueMap,
1618
};
1719
#[cfg(feature = "experimental_metrics_bound_instruments")]
18-
use super::{BoundFallbackHandle, BoundMeasure};
20+
use super::{BoundMeasure, NoopBoundMeasure, TrackerEntry};
1921

2022
pub(crate) const EXPO_MAX_SCALE: i8 = 20;
2123
pub(crate) const EXPO_MIN_SCALE: i8 = -10;
2224

25+
/// Pre-bound exponential-histogram handle: writes go directly to a fixed
26+
/// `TrackerEntry` without per-call attribute lookup. Unlike `BoundHistogramHandle`,
27+
/// no bucket precomputation happens at the call site — `update()` does scale
28+
/// resolution and bin assignment inside the entry's Mutex. The NaN/inf filter
29+
/// from the unbound `call()` path is preserved here.
30+
#[cfg(feature = "experimental_metrics_bound_instruments")]
31+
struct BoundExpoHistogramHandle<T: Number> {
32+
tracker: Arc<TrackerEntry<Mutex<ExpoHistogramDataPoint<T>>>>,
33+
}
34+
35+
#[cfg(feature = "experimental_metrics_bound_instruments")]
36+
impl<T: Number> BoundMeasure<T> for BoundExpoHistogramHandle<T> {
37+
fn call(&self, measurement: T) {
38+
// Mirror unbound ExpoHistogram::call: ignore NaN and infinity so that
39+
// ExpoHistogramDataPoint::record's invariants are preserved.
40+
if !measurement.into_float().is_finite() {
41+
return;
42+
}
43+
self.tracker.aggregator.update(measurement);
44+
self.tracker.has_been_updated.store(true, Ordering::Release);
45+
}
46+
}
47+
48+
#[cfg(feature = "experimental_metrics_bound_instruments")]
49+
impl<T: Number> Drop for BoundExpoHistogramHandle<T> {
50+
fn drop(&mut self) {
51+
self.tracker.bound_count.fetch_sub(1, Ordering::Relaxed);
52+
}
53+
}
54+
2355
/// A single data point in an exponential histogram.
2456
#[derive(Debug, PartialEq)]
2557
struct ExpoHistogramDataPoint<T> {
@@ -532,8 +564,15 @@ where
532564
}
533565

534566
#[cfg(feature = "experimental_metrics_bound_instruments")]
535-
fn bind(&self, attrs: &[KeyValue], fallback: Arc<dyn Measure<T>>) -> Box<dyn BoundMeasure<T>> {
536-
Box::new(BoundFallbackHandle::new(fallback, attrs.to_vec()))
567+
fn bind(&self, attrs: &[KeyValue]) -> Box<dyn BoundMeasure<T>> {
568+
let mut bound_attrs = Vec::new();
569+
self.filter.apply(attrs, |filtered| {
570+
bound_attrs = filtered.to_vec();
571+
});
572+
match self.value_map.bind(&bound_attrs) {
573+
Some(tracker) => Box::new(BoundExpoHistogramHandle { tracker }),
574+
None => Box::new(NoopBoundMeasure::new()),
575+
}
537576
}
538577
}
539578

opentelemetry-sdk/src/metrics/internal/histogram.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use opentelemetry::KeyValue;
1414
use super::aggregate::{AggregateTimeInitiator, AttributeSetFilter};
1515
use super::{Aggregator, ComputeAggregation, Measure, Number, ValueMap};
1616
#[cfg(feature = "experimental_metrics_bound_instruments")]
17-
use super::{BoundFallbackHandle, BoundMeasure, TrackerEntry};
17+
use super::{BoundMeasure, NoopBoundMeasure, TrackerEntry};
1818

1919
impl<T> Aggregator for Mutex<Buckets<T>>
2020
where
@@ -268,7 +268,7 @@ where
268268
}
269269

270270
#[cfg(feature = "experimental_metrics_bound_instruments")]
271-
fn bind(&self, attrs: &[KeyValue], fallback: Arc<dyn Measure<T>>) -> Box<dyn BoundMeasure<T>> {
271+
fn bind(&self, attrs: &[KeyValue]) -> Box<dyn BoundMeasure<T>> {
272272
let mut bound_attrs = Vec::new();
273273
self.filter.apply(attrs, |filtered| {
274274
bound_attrs = filtered.to_vec();
@@ -278,10 +278,9 @@ where
278278
tracker,
279279
bounds: self.bounds.clone(),
280280
}),
281-
// Trackers RwLock is poisoned — extremely rare. Hand back a fallback
282-
// handle whose writes will silently drop (mirroring `measure()`'s
283-
// own poison handling) rather than panic on the user's hot path.
284-
None => Box::new(BoundFallbackHandle::new(fallback, bound_attrs)),
281+
// Trackers RwLock is poisoned — return a noop handle so writes
282+
// silently drop, mirroring `measure()`'s own poison handling.
283+
None => Box::new(NoopBoundMeasure::new()),
285284
}
286285
}
287286
}

opentelemetry-sdk/src/metrics/internal/last_value.rs

Lines changed: 106 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,40 @@ use crate::metrics::{
44
};
55
use opentelemetry::KeyValue;
66
#[cfg(feature = "experimental_metrics_bound_instruments")]
7+
use std::sync::atomic::Ordering;
8+
#[cfg(feature = "experimental_metrics_bound_instruments")]
79
use std::sync::Arc;
810

911
use super::{
1012
aggregate::{AggregateTimeInitiator, AttributeSetFilter},
1113
Aggregator, AtomicTracker, AtomicallyUpdate, ComputeAggregation, Measure, Number, ValueMap,
1214
};
1315
#[cfg(feature = "experimental_metrics_bound_instruments")]
14-
use super::{BoundFallbackHandle, BoundMeasure};
16+
use super::{BoundMeasure, NoopBoundMeasure, TrackerEntry};
17+
18+
/// Pre-bound gauge/last-value handle: writes go directly to a fixed
19+
/// `TrackerEntry` without per-call attribute lookup. The `tracker` is either
20+
/// a dedicated entry for the bound attribute set, or — if bind() hit the
21+
/// cardinality limit — the shared overflow tracker.
22+
#[cfg(feature = "experimental_metrics_bound_instruments")]
23+
struct BoundLastValueHandle<T: Number> {
24+
tracker: Arc<TrackerEntry<Assign<T>>>,
25+
}
26+
27+
#[cfg(feature = "experimental_metrics_bound_instruments")]
28+
impl<T: Number> BoundMeasure<T> for BoundLastValueHandle<T> {
29+
fn call(&self, measurement: T) {
30+
self.tracker.aggregator.update(measurement);
31+
self.tracker.has_been_updated.store(true, Ordering::Release);
32+
}
33+
}
34+
35+
#[cfg(feature = "experimental_metrics_bound_instruments")]
36+
impl<T: Number> Drop for BoundLastValueHandle<T> {
37+
fn drop(&mut self) {
38+
self.tracker.bound_count.fetch_sub(1, Ordering::Relaxed);
39+
}
40+
}
1541

1642
/// this is reused by PrecomputedSum
1743
pub(crate) struct Assign<T>
@@ -148,8 +174,15 @@ where
148174
}
149175

150176
#[cfg(feature = "experimental_metrics_bound_instruments")]
151-
fn bind(&self, attrs: &[KeyValue], fallback: Arc<dyn Measure<T>>) -> Box<dyn BoundMeasure<T>> {
152-
Box::new(BoundFallbackHandle::new(fallback, attrs.to_vec()))
177+
fn bind(&self, attrs: &[KeyValue]) -> Box<dyn BoundMeasure<T>> {
178+
let mut bound_attrs = Vec::new();
179+
self.filter.apply(attrs, |filtered| {
180+
bound_attrs = filtered.to_vec();
181+
});
182+
match self.value_map.bind(&bound_attrs) {
183+
Some(tracker) => Box::new(BoundLastValueHandle { tracker }),
184+
None => Box::new(NoopBoundMeasure::new()),
185+
}
153186
}
154187
}
155188

@@ -166,3 +199,73 @@ where
166199
(len, new.map(T::make_aggregated_metrics))
167200
}
168201
}
202+
203+
#[cfg(all(test, feature = "experimental_metrics_bound_instruments"))]
204+
mod tests {
205+
use super::*;
206+
use crate::metrics::data::{AggregatedMetrics, Gauge, MetricData};
207+
208+
fn extract_gauge(agg: AggregatedMetrics) -> Gauge<u64> {
209+
match agg {
210+
AggregatedMetrics::U64(MetricData::Gauge(g)) => g,
211+
_ => panic!("expected u64 Gauge"),
212+
}
213+
}
214+
215+
/// Direct unit coverage for `LastValue::bind`. Sync `Gauge::bind()` is not yet
216+
/// exposed in the public API, so the only callers of this code path today
217+
/// are Views that remap an instrument to `Aggregation::LastValue`. This test
218+
/// constructs the aggregator directly and exercises the bound handle through
219+
/// the `Measure` / `BoundMeasure` traits to keep the impl honest.
220+
#[test]
221+
fn bind_writes_through_bound_handle() {
222+
let last_value = LastValue::<u64>::new(Temporality::Cumulative, AttributeSetFilter::new(None), 100);
223+
let attrs = [KeyValue::new("k", "v")];
224+
let bound = Measure::bind(&last_value, &attrs);
225+
226+
bound.call(7);
227+
bound.call(42); // overwrites previous value (LastValue semantics)
228+
229+
let (count, agg) = ComputeAggregation::call(&last_value, None);
230+
assert_eq!(count, 1);
231+
let gauge = extract_gauge(agg.expect("aggregation produced"));
232+
assert_eq!(gauge.data_points.len(), 1);
233+
assert_eq!(gauge.data_points[0].value, 42);
234+
assert_eq!(gauge.data_points[0].attributes, attrs.to_vec());
235+
}
236+
237+
#[test]
238+
fn bound_handle_drop_decrements_bound_count() {
239+
let last_value = LastValue::<u64>::new(Temporality::Delta, AttributeSetFilter::new(None), 100);
240+
let attrs = [KeyValue::new("k", "v")];
241+
242+
let bound = Measure::bind(&last_value, &attrs);
243+
bound.call(5);
244+
245+
// While the handle exists, the entry's bound_count is 1.
246+
let trackers = last_value.value_map.trackers.read().unwrap();
247+
let entry = trackers
248+
.values()
249+
.next()
250+
.expect("entry should exist after bind+call");
251+
assert_eq!(
252+
entry.bound_count.load(Ordering::Relaxed),
253+
1,
254+
"bound_count should reflect a live handle"
255+
);
256+
drop(trackers);
257+
258+
drop(bound);
259+
260+
let trackers = last_value.value_map.trackers.read().unwrap();
261+
let entry = trackers
262+
.values()
263+
.next()
264+
.expect("entry should still exist post-drop");
265+
assert_eq!(
266+
entry.bound_count.load(Ordering::Relaxed),
267+
0,
268+
"bound_count should drop to 0 after handle drops"
269+
);
270+
}
271+
}

opentelemetry-sdk/src/metrics/internal/mod.rs

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use std::sync::{Arc, OnceLock, RwLock};
1818

1919
pub(crate) use aggregate::{AggregateBuilder, AggregateFns, ComputeAggregation, Measure};
2020
#[cfg(feature = "experimental_metrics_bound_instruments")]
21-
pub(crate) use aggregate::{BoundFallbackHandle, BoundMeasure};
21+
pub(crate) use aggregate::{BoundMeasure, NoopBoundMeasure};
2222
pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE};
2323
#[cfg(feature = "experimental_metrics_bound_instruments")]
2424
use opentelemetry::otel_debug;
@@ -892,4 +892,43 @@ mod tests {
892892
"count should reach 0 after eviction"
893893
);
894894
}
895+
896+
/// When the trackers `RwLock` is poisoned, `bind()` cannot safely insert or
897+
/// look up entries, so it returns `None` and the caller (Sum/Histogram/etc.)
898+
/// hands back a `NoopBoundMeasure`. This is a defensive branch that fires
899+
/// on degenerate states (a thread panicked while holding the write lock)
900+
/// and is unreachable through normal traffic. The test induces poisoning
901+
/// explicitly so the branch keeps coverage.
902+
#[cfg(feature = "experimental_metrics_bound_instruments")]
903+
#[test]
904+
fn bind_returns_none_when_trackers_lock_is_poisoned() {
905+
let value_map = ValueMap::<Assign<i64>>::new((), 100);
906+
907+
// Poison the trackers RwLock by panicking inside a write guard.
908+
let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
909+
let _guard = value_map.trackers.write().unwrap();
910+
panic!("intentional poison");
911+
}));
912+
913+
assert!(
914+
value_map.trackers.is_poisoned(),
915+
"trackers lock must be poisoned for this test to be meaningful"
916+
);
917+
918+
// Empty attrs use the no_attribute_tracker fast path and never touch
919+
// the poisoned lock — they should still succeed.
920+
assert!(
921+
value_map.bind(&[]).is_some(),
922+
"bind(&[]) must succeed even with poisoned lock; uses no_attribute_tracker"
923+
);
924+
925+
// Non-empty attrs go through bind_attrs which needs the trackers lock.
926+
// The read-lock try succeeds (only writes poison, but read on poisoned
927+
// can also fail) — fall through to write lock which fails poisoned.
928+
let result = value_map.bind(&[KeyValue::new("k", 1_i64)]);
929+
assert!(
930+
result.is_none(),
931+
"bind() with non-empty attrs must return None on poisoned lock"
932+
);
933+
}
895934
}

0 commit comments

Comments
 (0)