Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -972,29 +972,23 @@ void createdTimestamp() throws IOException {

LongCounter counter = meter.counterBuilder("requests").build();
testClock.advance(Duration.ofMillis(1));
long bearStartNanos = testClock.now();
counter.add(3, Attributes.builder().put("animal", "bear").build());
testClock.advance(Duration.ofMillis(1));
long mouseStartNanos = testClock.now();
counter.add(2, Attributes.builder().put("animal", "mouse").build());
testClock.advance(Duration.ofMillis(1));

// There is a curious difference between Prometheus and OpenTelemetry:
// In Prometheus metrics the _created timestamp is per data point,
// i.e. the _created timestamp says when this specific set of label values
// was first observed.
// In the OTel Java SDK the _created timestamp is the initialization time
// of the SdkMeterProvider, i.e. all data points will have the same _created timestamp.
// So we expect the _created timestamp to be the start time of the application,
// not the timestamp when the counter or an individual data point was created.
String expected =
""
+ "# TYPE requests counter\n"
+ "requests_total{animal=\"bear\",otel_scope_name=\"test\"} 3.0\n"
+ "requests_created{animal=\"bear\",otel_scope_name=\"test\"} "
+ createdTimestamp
+ convertTimestamp(bearStartNanos)
+ "\n"
+ "requests_total{animal=\"mouse\",otel_scope_name=\"test\"} 2.0\n"
+ "requests_created{animal=\"mouse\",otel_scope_name=\"test\"} "
+ createdTimestamp
+ convertTimestamp(mouseStartNanos)
+ "\n"
+ "# TYPE target info\n"
+ "target_info{service_name=\"unknown_service:java\",telemetry_sdk_language=\"java\",telemetry_sdk_name=\"opentelemetry\",telemetry_sdk_version=\"1.x.x\"} 1\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.Clock;
import java.util.concurrent.TimeUnit;
import java.util.function.DoubleSupplier;
import org.openjdk.jmh.annotations.Benchmark;
Expand Down Expand Up @@ -40,7 +41,7 @@ public static class ThreadState {

@Setup(Level.Trial)
public final void setup() {
aggregatorHandle = aggregation.getAggregator().createHandle();
aggregatorHandle = aggregation.getAggregator().createHandle(Clock.getDefault().now());
valueSupplier = valueGen.supplier();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.Clock;
import java.util.concurrent.TimeUnit;
import java.util.function.DoubleSupplier;
import org.openjdk.jmh.annotations.Benchmark;
Expand Down Expand Up @@ -46,7 +47,7 @@ public static class ThreadState {

@Setup(Level.Invocation)
public final void setup() {
aggregatorHandle = aggregation.getAggregator().createHandle();
aggregatorHandle = aggregation.getAggregator().createHandle(Clock.getDefault().now());
valueSupplier = valueGen.supplier();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ public class InstrumentGarbageCollectionBenchmarkTest {

/**
* This test validates that in {@link MemoryMode#REUSABLE_DATA}, any {@link
* MetricStorage#collect(Resource, InstrumentationScopeInfo, long, long)} barely allocates memory
* which is then subsequently garbage collected. It is done so comparatively to {@link
* MetricStorage#collect(Resource, InstrumentationScopeInfo, long)} barely allocates memory which
* is then subsequently garbage collected. It is done so comparatively to {@link
* MemoryMode#IMMUTABLE_DATA},
*
* <p>It runs the JMH test {@link InstrumentGarbageCollectionBenchmark} with GC profiler, and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,7 @@ Collection<MetricData> collectAll(RegisteredReader registeredReader, long epochN
// Only invoke callbacks if meter is enabled
if (meterEnabled) {
for (CallbackRegistration callbackRegistration : currentRegisteredCallbacks) {
callbackRegistration.invokeCallback(
registeredReader, meterProviderSharedState.getStartEpochNanos(), epochNanos);
callbackRegistration.invokeCallback(registeredReader);
}
}

Expand All @@ -145,10 +144,7 @@ Collection<MetricData> collectAll(RegisteredReader registeredReader, long epochN
for (MetricStorage storage : storages) {
MetricData current =
storage.collect(
meterProviderSharedState.getResource(),
getInstrumentationScopeInfo(),
meterProviderSharedState.getStartEpochNanos(),
epochNanos);
meterProviderSharedState.getResource(), getInstrumentationScopeInfo(), epochNanos);
// Ignore if the metric data doesn't have any data points, for example when aggregation is
// Aggregation#drop()
if (!current.isEmpty()) {
Expand Down Expand Up @@ -288,6 +284,7 @@ WriteableMetricStorage registerSynchronousMetricStorage(InstrumentDescriptor ins
SynchronousMetricStorage.create(
reader,
registeredView,
meterProviderSharedState.getClock(),
instrument,
meterProviderSharedState.getExemplarFilter(),
meterEnabled)));
Expand Down Expand Up @@ -317,7 +314,11 @@ SdkObservableMeasurement registerObservableMeasurement(
registeredStorages.add(
registry.register(
AsynchronousMetricStorage.create(
reader, registeredView, instrumentDescriptor, meterEnabled)));
reader,
registeredView,
meterProviderSharedState.getClock(),
instrumentDescriptor,
meterEnabled)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ public static SdkMeterProviderBuilder builder() {
Resource resource,
ExemplarFilterInternal exemplarFilter,
ScopeConfigurator<MeterConfig> meterConfigurator) {
long startEpochNanos = clock.now();
this.registeredViews = registeredViews;
this.registeredReaders =
metricReaders.entrySet().stream()
Expand All @@ -83,8 +82,7 @@ public static SdkMeterProviderBuilder builder() {
ViewRegistry.create(entry.getKey(), entry.getValue(), registeredViews)))
.collect(toList());
this.metricProducers = metricProducers;
this.sharedState =
MeterProviderSharedState.create(clock, resource, exemplarFilter, startEpochNanos);
this.sharedState = MeterProviderSharedState.create(clock, resource, exemplarFilter);
this.registry =
new ComponentRegistry<>(
instrumentationLibraryInfo ->
Expand All @@ -99,7 +97,6 @@ public static SdkMeterProviderBuilder builder() {
readerMetricProducers.add(new LeasedMetricProducer(registry, sharedState, registeredReader));
MetricReader reader = registeredReader.getReader();
reader.register(new SdkCollectionRegistration(readerMetricProducers, sharedState));
registeredReader.setLastCollectEpochNanos(startEpochNanos);
if (reader instanceof PeriodicMetricReader) {
setReaderMeterProvider((PeriodicMetricReader) reader, this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,16 @@ static Aggregator<?> drop() {
}

/**
* Returns a new {@link AggregatorHandle}. This MUST by used by the synchronous to aggregate
* recorded measurements during the collection cycle.
* Returns a new {@link AggregatorHandle}. Used by both synchronous and asynchronous metric
* storage to aggregate recorded measurements during the collection cycle.
*
* @param creationEpochNanos the epoch timestamp (nanos) at which the handle is being created,
* stored via {@link AggregatorHandle#getCreationEpochNanos()}. Whether this value is used as
* the start timestamp of reported data points depends on the instrument and temporality — see
* {@link AggregatorHandle#getCreationEpochNanos()} for details.
* @return a new {@link AggregatorHandle}.
*/
AggregatorHandle<T> createHandle();
AggregatorHandle<T> createHandle(long creationEpochNanos);

/**
* Returns a new DELTA point by computing the difference between two cumulative points.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,16 @@ public abstract class AggregatorHandle<T extends PointData> {
private static final String UNSUPPORTED_DOUBLE_MESSAGE =
"This aggregator does not support double values.";

private final long creationEpochNanos;
// A reservoir of sampled exemplars for this time period.
@Nullable private final DoubleExemplarReservoir doubleReservoirFactory;
@Nullable private final LongExemplarReservoir longReservoirFactory;
private final boolean isDoubleType;
private volatile boolean valuesRecorded = false;

protected AggregatorHandle(ExemplarReservoirFactory reservoirFactory, boolean isDoubleType) {
protected AggregatorHandle(
long creationEpochNanos, ExemplarReservoirFactory reservoirFactory, boolean isDoubleType) {
this.creationEpochNanos = creationEpochNanos;
this.isDoubleType = isDoubleType;
if (isDoubleType) {
this.doubleReservoirFactory = reservoirFactory.createDoubleExemplarReservoir();
Expand Down Expand Up @@ -145,4 +148,22 @@ private static <S> S throwUnsupportedIfNull(@Nullable S value, String message) {
}
return value;
}

/**
* Returns the epoch timestamp (nanos) at which this handle was created.
*
* <p>For cumulative synchronous instruments, this is the time of the first measurement for the
* series and is used as {@link PointData#getStartEpochNanos()}.
*
* <p>For cumulative asynchronous instruments, this is either the instrument creation time (if the
* series first appeared during the first collection cycle) or the preceding collection interval's
* timestamp (if the series appeared in a later cycle), and is used as {@link
* PointData#getStartEpochNanos()}.
*
* <p>Not used for delta instruments; their start epoch is computed directly from the reader's
* last collection time or instrument creation time.
*/
public long getCreationEpochNanos() {
return creationEpochNanos;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ public DoubleBase2ExponentialHistogramAggregator(
}

@Override
public AggregatorHandle<ExponentialHistogramPointData> createHandle() {
return new Handle(reservoirFactory, maxBuckets, maxScale, recordMinMax, memoryMode);
public AggregatorHandle<ExponentialHistogramPointData> createHandle(long creationEpochNanos) {
return new Handle(
creationEpochNanos, reservoirFactory, maxBuckets, maxScale, recordMinMax, memoryMode);
}

@Override
Expand Down Expand Up @@ -104,12 +105,13 @@ static final class Handle extends AggregatorHandle<ExponentialHistogramPointData
@Nullable private final MutableExponentialHistogramPointData reusablePoint;

Handle(
long creationEpochNanos,
ExemplarReservoirFactory reservoirFactory,
int maxBuckets,
int maxScale,
boolean recordMinMax,
MemoryMode memoryMode) {
super(reservoirFactory, /* isDoubleType= */ true);
super(creationEpochNanos, reservoirFactory, /* isDoubleType= */ true);
this.maxBuckets = maxBuckets;
this.maxScale = maxScale;
this.recordMinMax = recordMinMax;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ public DoubleExplicitBucketHistogramAggregator(
}

@Override
public AggregatorHandle<HistogramPointData> createHandle() {
return new Handle(boundaryList, boundaries, recordMinMax, reservoirFactory, memoryMode);
public AggregatorHandle<HistogramPointData> createHandle(long creationEpochNanos) {
return new Handle(
creationEpochNanos, boundaryList, boundaries, recordMinMax, reservoirFactory, memoryMode);
}

@Override
Expand Down Expand Up @@ -120,12 +121,13 @@ static final class Handle extends AggregatorHandle<HistogramPointData> {
@Nullable private final MutableHistogramPointData reusablePoint;

Handle(
long creationEpochNanos,
List<Double> boundaryList,
double[] boundaries,
boolean recordMinMax,
ExemplarReservoirFactory reservoirFactory,
MemoryMode memoryMode) {
super(reservoirFactory, /* isDoubleType= */ true);
super(creationEpochNanos, reservoirFactory, /* isDoubleType= */ true);
this.boundaryList = boundaryList;
this.boundaries = boundaries;
this.recordMinMax = recordMinMax;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ public DoubleLastValueAggregator(
}

@Override
public AggregatorHandle<DoublePointData> createHandle() {
return new Handle(reservoirFactory, memoryMode);
public AggregatorHandle<DoublePointData> createHandle(long creationEpochNanos) {
return new Handle(creationEpochNanos, reservoirFactory, memoryMode);
}

@Override
Expand Down Expand Up @@ -99,8 +99,9 @@ static final class Handle extends AggregatorHandle<DoublePointData> {
// Only used when memoryMode is REUSABLE_DATA
@Nullable private final MutableDoublePointData reusablePoint;

private Handle(ExemplarReservoirFactory reservoirFactory, MemoryMode memoryMode) {
super(reservoirFactory, /* isDoubleType= */ true);
private Handle(
long creationEpochNanos, ExemplarReservoirFactory reservoirFactory, MemoryMode memoryMode) {
super(creationEpochNanos, reservoirFactory, /* isDoubleType= */ true);
if (memoryMode == MemoryMode.REUSABLE_DATA) {
reusablePoint = new MutableDoublePointData();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ public DoubleSumAggregator(
}

@Override
public AggregatorHandle<DoublePointData> createHandle() {
return new Handle(reservoirFactory, memoryMode);
public AggregatorHandle<DoublePointData> createHandle(long creationEpochNanos) {
return new Handle(creationEpochNanos, reservoirFactory, memoryMode);
}

@Override
Expand Down Expand Up @@ -112,8 +112,9 @@ static final class Handle extends AggregatorHandle<DoublePointData> {
// Only used if memoryMode == MemoryMode.REUSABLE_DATA
@Nullable private final MutableDoublePointData reusablePoint;

Handle(ExemplarReservoirFactory reservoirFactory, MemoryMode memoryMode) {
super(reservoirFactory, /* isDoubleType= */ true);
Handle(
long creationEpochNanos, ExemplarReservoirFactory reservoirFactory, MemoryMode memoryMode) {
super(creationEpochNanos, reservoirFactory, /* isDoubleType= */ true);
reusablePoint = memoryMode == MemoryMode.REUSABLE_DATA ? new MutableDoublePointData() : null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,13 @@ public List<? extends ExemplarData> getExemplars() {

public static final Aggregator<PointData> INSTANCE = new DropAggregator();

// creationEpochNanos is 0 because DropAggregator never produces data points, so
// the start timestamp is irrelevant. A single shared HANDLE is safe to use.
private static final AggregatorHandle<PointData> HANDLE =
new AggregatorHandle<PointData>(
ExemplarReservoirFactory.noSamples(), /* isDoubleType= */ true) {
/* creationEpochNanos= */ 0,
ExemplarReservoirFactory.noSamples(),
/* isDoubleType= */ true) {
@Override
protected PointData doAggregateThenMaybeResetDoubles(
long startEpochNanos,
Expand All @@ -75,7 +79,7 @@ protected void doRecordDouble(double value) {}
private DropAggregator() {}

@Override
public AggregatorHandle<PointData> createHandle() {
public AggregatorHandle<PointData> createHandle(long creationEpochNanos) {
return HANDLE;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ public LongLastValueAggregator(ExemplarReservoirFactory reservoirFactory, Memory
}

@Override
public AggregatorHandle<LongPointData> createHandle() {
return new Handle(reservoirFactory, memoryMode);
public AggregatorHandle<LongPointData> createHandle(long creationEpochNanos) {
return new Handle(creationEpochNanos, reservoirFactory, memoryMode);
}

@Override
Expand Down Expand Up @@ -95,8 +95,9 @@ static final class Handle extends AggregatorHandle<LongPointData> {
// Only used when memoryMode is REUSABLE_DATA
@Nullable private final MutableLongPointData reusablePoint;

Handle(ExemplarReservoirFactory reservoirFactory, MemoryMode memoryMode) {
super(reservoirFactory, /* isDoubleType= */ false);
Handle(
long creationEpochNanos, ExemplarReservoirFactory reservoirFactory, MemoryMode memoryMode) {
super(creationEpochNanos, reservoirFactory, /* isDoubleType= */ false);
if (memoryMode == MemoryMode.REUSABLE_DATA) {
reusablePoint = new MutableLongPointData();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ public LongSumAggregator(
}

@Override
public AggregatorHandle<LongPointData> createHandle() {
return new Handle(reservoirFactory, memoryMode);
public AggregatorHandle<LongPointData> createHandle(long creationEpochNanos) {
return new Handle(creationEpochNanos, reservoirFactory, memoryMode);
}

@Override
Expand Down Expand Up @@ -105,8 +105,9 @@ static final class Handle extends AggregatorHandle<LongPointData> {
// Only used if memoryMode == MemoryMode.REUSABLE_DATA
@Nullable private final MutableLongPointData reusablePointData;

Handle(ExemplarReservoirFactory reservoirFactory, MemoryMode memoryMode) {
super(reservoirFactory, /* isDoubleType= */ false);
Handle(
long creationEpochNanos, ExemplarReservoirFactory reservoirFactory, MemoryMode memoryMode) {
super(creationEpochNanos, reservoirFactory, /* isDoubleType= */ false);
reusablePointData =
memoryMode == MemoryMode.REUSABLE_DATA ? new MutableLongPointData() : null;
}
Expand Down
Loading
Loading