Skip to content

Commit f3c15af

Browse files
committed
ref counting
1 parent 233962c commit f3c15af

9 files changed

Lines changed: 84 additions & 14 deletions

File tree

instrumentation/opentelemetry-api/opentelemetry-api-1.10/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/opentelemetryapi/v1_10/metrics/ApplicationDoubleCounterBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public application.io.opentelemetry.api.metrics.ObservableDoubleCounter buildWit
4949
new ApplicationObservableDoubleMeasurement(agentMeasurement));
5050
return new ApplicationObservableDoubleCounter(
5151
CallbackAnchor.anchor(agentBuilder::buildWithCallback, callback),
52-
() -> CallbackAnchor.remove(callback));
52+
CallbackAnchor.releaseOnClose(callback));
5353
}
5454

5555
// added in 1.15.0

instrumentation/opentelemetry-api/opentelemetry-api-1.10/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/opentelemetryapi/v1_10/metrics/ApplicationDoubleGaugeBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public application.io.opentelemetry.api.metrics.ObservableDoubleGauge buildWithC
4949
new ApplicationObservableDoubleMeasurement(agentMeasurement));
5050
return new ApplicationObservableDoubleGauge(
5151
CallbackAnchor.anchor(agentBuilder::buildWithCallback, callback),
52-
() -> CallbackAnchor.remove(callback));
52+
CallbackAnchor.releaseOnClose(callback));
5353
}
5454

5555
// added in 1.15.0

instrumentation/opentelemetry-api/opentelemetry-api-1.10/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/opentelemetryapi/v1_10/metrics/ApplicationDoubleUpDownCounterBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public application.io.opentelemetry.api.metrics.ObservableDoubleUpDownCounter bu
4949
new ApplicationObservableDoubleMeasurement(agentMeasurement));
5050
return new ApplicationObservableDoubleUpDownCounter(
5151
CallbackAnchor.anchor(agentBuilder::buildWithCallback, callback),
52-
() -> CallbackAnchor.remove(callback));
52+
CallbackAnchor.releaseOnClose(callback));
5353
}
5454

5555
// added in 1.15.0

instrumentation/opentelemetry-api/opentelemetry-api-1.10/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/opentelemetryapi/v1_10/metrics/ApplicationLongCounterBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public application.io.opentelemetry.api.metrics.ObservableLongCounter buildWithC
5353
applicationCallback.accept(new ApplicationObservableLongMeasurement(agentMeasurement));
5454
return new ApplicationObservableLongCounter(
5555
CallbackAnchor.anchor(agentBuilder::buildWithCallback, callback),
56-
() -> CallbackAnchor.remove(callback));
56+
CallbackAnchor.releaseOnClose(callback));
5757
}
5858

5959
// added in 1.15.0

instrumentation/opentelemetry-api/opentelemetry-api-1.10/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/opentelemetryapi/v1_10/metrics/ApplicationLongGaugeBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public application.io.opentelemetry.api.metrics.ObservableLongGauge buildWithCal
4343
applicationCallback.accept(new ApplicationObservableLongMeasurement(agentMeasurement));
4444
return new ApplicationObservableLongGauge(
4545
CallbackAnchor.anchor(agentBuilder::buildWithCallback, callback),
46-
() -> CallbackAnchor.remove(callback));
46+
CallbackAnchor.releaseOnClose(callback));
4747
}
4848

4949
// added in 1.15.0

instrumentation/opentelemetry-api/opentelemetry-api-1.10/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/opentelemetryapi/v1_10/metrics/ApplicationLongUpDownCounterBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public application.io.opentelemetry.api.metrics.ObservableLongUpDownCounter buil
5353
applicationCallback.accept(new ApplicationObservableLongMeasurement(agentMeasurement));
5454
return new ApplicationObservableLongUpDownCounter(
5555
CallbackAnchor.anchor(agentBuilder::buildWithCallback, callback),
56-
() -> CallbackAnchor.remove(callback));
56+
CallbackAnchor.releaseOnClose(callback));
5757
}
5858

5959
// added in 1.15.0

instrumentation/opentelemetry-api/opentelemetry-api-1.10/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/opentelemetryapi/v1_10/metrics/CallbackAnchor.java

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,9 @@
88
import io.opentelemetry.javaagent.bootstrap.WeakRefConsumer;
99
import io.opentelemetry.javaagent.bootstrap.WeakRefRunnable;
1010
import java.lang.ref.WeakReference;
11-
import java.util.Set;
11+
import java.util.Map;
1212
import java.util.concurrent.ConcurrentHashMap;
13+
import java.util.concurrent.atomic.AtomicBoolean;
1314
import java.util.function.Consumer;
1415
import java.util.function.Function;
1516

@@ -40,12 +41,13 @@ public final class CallbackAnchor {
4041
// Anchors callbacks to this class's lifecycle. Since this class is injected as a helper into each
4142
// application class loader, callbacks are naturally tied to their class loader's lifecycle.
4243
// Use identity semantics so unusual equals/hashCode implementations don't collapse distinct
43-
// callbacks into the same entry.
44-
private static final Set<IdentityKey> callbacks = ConcurrentHashMap.newKeySet();
44+
// callbacks into the same entry, and ref counts so the same callback can back multiple
45+
// instruments without closing one instrument dropping the shared anchor too early.
46+
private static final Map<IdentityKey, Integer> callbacks = new ConcurrentHashMap<>();
4547

4648
public static <T, R extends AutoCloseable> R anchor(
4749
Function<Consumer<T>, R> buildFn, Consumer<T> callback) {
48-
callbacks.add(new IdentityKey(callback));
50+
callbacks.merge(new IdentityKey(callback), 1, Integer::sum);
4951
WeakRefConsumer<T> weak = new WeakRefConsumer<>(new WeakReference<>(callback));
5052
R instrument = buildFn.apply(weak);
5153
weak.closeWhenCollected(instrument);
@@ -54,15 +56,29 @@ public static <T, R extends AutoCloseable> R anchor(
5456

5557
public static <R extends AutoCloseable> R anchorBatch(
5658
Function<Runnable, R> buildFn, Runnable callback) {
57-
callbacks.add(new IdentityKey(callback));
59+
callbacks.merge(new IdentityKey(callback), 1, Integer::sum);
5860
WeakRefRunnable weak = new WeakRefRunnable(new WeakReference<>(callback));
5961
R instrument = buildFn.apply(weak);
6062
weak.closeWhenCollected(instrument);
6163
return instrument;
6264
}
6365

64-
public static void remove(Object callback) {
65-
callbacks.remove(new IdentityKey(callback));
66+
// Some instrument wrappers may end up calling close() more than once. Guard the deferred
67+
// release so each wrapper decrements the callback ref count at most once.
68+
public static Runnable releaseOnClose(Object callback) {
69+
AtomicBoolean released = new AtomicBoolean();
70+
return () -> {
71+
if (released.compareAndSet(false, true)) {
72+
release(callback);
73+
}
74+
};
75+
}
76+
77+
private static void release(Object callback) {
78+
// Returning null from computeIfPresent removes the mapping
79+
// once the ref count reaches zero.
80+
callbacks.computeIfPresent(
81+
new IdentityKey(callback), (key, count) -> count == 1 ? null : count - 1);
6682
}
6783

6884
private static final class IdentityKey {

instrumentation/opentelemetry-api/opentelemetry-api-1.15/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/opentelemetryapi/v1_15/metrics/ApplicationMeter115.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public application.io.opentelemetry.api.metrics.BatchCallback batchCallback(
3131
agentMeter.batchCallback(
3232
weak, unwrap(observableMeasurement), unwrap(additionalMeasurements)),
3333
callback),
34-
() -> CallbackAnchor.remove(callback));
34+
CallbackAnchor.releaseOnClose(callback));
3535
}
3636

3737
private static ObservableMeasurement unwrap(

instrumentation/opentelemetry-api/opentelemetry-api-1.15/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/opentelemetryapi/v1_15/metrics/MeterTest.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,11 @@
1414
import io.opentelemetry.api.metrics.Meter;
1515
import io.opentelemetry.api.metrics.ObservableDoubleMeasurement;
1616
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
17+
import io.opentelemetry.instrumentation.test.utils.GcUtils;
1718
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
1819
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
20+
import java.lang.ref.WeakReference;
21+
import java.time.Duration;
1922
import org.assertj.core.api.AbstractIterableAssert;
2023
import org.junit.jupiter.api.BeforeEach;
2124
import org.junit.jupiter.api.Test;
@@ -321,4 +324,55 @@ void batchDoubleGauge() throws InterruptedException {
321324

322325
testing.waitAndAssertMetrics(instrumentationName, "test", AbstractIterableAssert::isEmpty);
323326
}
327+
328+
@Test
329+
void sharedBatchCallbackSurvivesClosingOneHandleTwice() throws Exception {
330+
ObservableLongMeasurement firstMeasurement = meter.counterBuilder("test.first").buildObserver();
331+
ObservableLongMeasurement secondMeasurement =
332+
meter.counterBuilder("test.second").buildObserver();
333+
Runnable callback =
334+
new Runnable() {
335+
@Override
336+
public void run() {}
337+
};
338+
WeakReference<Runnable> callbackRef = new WeakReference<>(callback);
339+
340+
BatchCallback firstCallback = meter.batchCallback(callback, firstMeasurement);
341+
// Intentionally do not retain the second handle so callback reachability depends on the
342+
// remaining anchor, not on another wrapper still holding the callback via its onClose action.
343+
meter.batchCallback(callback, secondMeasurement);
344+
345+
firstCallback.close();
346+
firstCallback.close();
347+
firstCallback = null;
348+
callback = null;
349+
350+
GcUtils.awaitGc(Duration.ofSeconds(10));
351+
assertThat(callbackRef.get()).isNotNull();
352+
}
353+
354+
@Test
355+
void sharedBatchCallbackCollectedAfterLastHandleClosed() throws Exception {
356+
ObservableLongMeasurement firstMeasurement = meter.counterBuilder("test.first").buildObserver();
357+
ObservableLongMeasurement secondMeasurement =
358+
meter.counterBuilder("test.second").buildObserver();
359+
Runnable callback =
360+
new Runnable() {
361+
@Override
362+
public void run() {}
363+
};
364+
WeakReference<Runnable> callbackRef = new WeakReference<>(callback);
365+
366+
BatchCallback firstCallback = meter.batchCallback(callback, firstMeasurement);
367+
BatchCallback secondCallback = meter.batchCallback(callback, secondMeasurement);
368+
369+
firstCallback.close();
370+
secondCallback.close();
371+
firstCallback = null;
372+
secondCallback = null;
373+
callback = null;
374+
375+
GcUtils.awaitGc(callbackRef, Duration.ofSeconds(10));
376+
assertThat(callbackRef.get()).isNull();
377+
}
324378
}

0 commit comments

Comments
 (0)