Skip to content

Commit 52f007e

Browse files
authored
chore(Datastore): Add Operation and Attempt metrics for HttpJson transport (#12086)
This adds operation and attempt metrics that are being recorded via Gax. Since gRPC can integrate natively with Gax, this metrics are captured. This instruments the metrics to be collected via HttpJson. Sample view of the metrics collected: <img width="2230" height="1564" alt="image" src="https://github.com/user-attachments/assets/3c7de293-bb97-4d05-a086-fed39d27e093" />
1 parent db50ccd commit 52f007e

File tree

11 files changed

+845
-186
lines changed

11 files changed

+845
-186
lines changed

java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreException.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.google.cloud.datastore;
1818

19+
import com.google.api.core.InternalApi;
1920
import com.google.api.gax.grpc.GrpcStatusCode;
2021
import com.google.api.gax.rpc.ApiException;
2122
import com.google.api.gax.rpc.StatusCode;
@@ -176,7 +177,8 @@ static DatastoreException propagateUserException(Exception ex) {
176177
* @param throwable the throwable to extract the status code from
177178
* @return the status code name, or "UNKNOWN" if not determinable
178179
*/
179-
static String extractStatusCode(Throwable throwable) {
180+
@InternalApi
181+
public static String extractStatusCode(Throwable throwable) {
180182
Throwable current = throwable;
181183
while (current != null) {
182184
if (current instanceof DatastoreException) {

java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java

Lines changed: 140 additions & 148 deletions
Large diffs are not rendered by default.

java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecorator.java

Lines changed: 103 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,20 @@
1818
import static com.google.cloud.BaseService.EXCEPTION_HANDLER;
1919

2020
import com.google.api.core.InternalApi;
21+
import com.google.api.core.ObsoleteApi;
2122
import com.google.api.gax.retrying.RetrySettings;
23+
import com.google.api.gax.rpc.StatusCode;
2224
import com.google.cloud.RetryHelper;
2325
import com.google.cloud.RetryHelper.RetryHelperException;
2426
import com.google.cloud.datastore.spi.v1.DatastoreRpc;
27+
import com.google.cloud.datastore.telemetry.MetricsRecorder;
28+
import com.google.cloud.datastore.telemetry.NoOpMetricsRecorder;
29+
import com.google.cloud.datastore.telemetry.TelemetryConstants;
30+
import com.google.cloud.datastore.telemetry.TelemetryUtils;
2531
import com.google.cloud.datastore.telemetry.TraceUtil;
32+
import com.google.cloud.http.HttpTransportOptions;
33+
import com.google.common.base.Preconditions;
34+
import com.google.common.base.Stopwatch;
2635
import com.google.datastore.v1.AllocateIdsRequest;
2736
import com.google.datastore.v1.AllocateIdsResponse;
2837
import com.google.datastore.v1.BeginTransactionRequest;
@@ -53,7 +62,10 @@ public class RetryAndTraceDatastoreRpcDecorator implements DatastoreRpc {
5362
private final com.google.cloud.datastore.telemetry.TraceUtil otelTraceUtil;
5463
private final RetrySettings retrySettings;
5564
private final DatastoreOptions datastoreOptions;
65+
private final MetricsRecorder metricsRecorder;
66+
private final boolean isHttpTransport;
5667

68+
@ObsoleteApi("Prefer to create RetryAndTraceDatastoreRpcDecorator via the Builder")
5769
public RetryAndTraceDatastoreRpcDecorator(
5870
DatastoreRpc datastoreRpc,
5971
TraceUtil otelTraceUtil,
@@ -63,6 +75,69 @@ public RetryAndTraceDatastoreRpcDecorator(
6375
this.retrySettings = retrySettings;
6476
this.datastoreOptions = datastoreOptions;
6577
this.otelTraceUtil = otelTraceUtil;
78+
this.metricsRecorder = new NoOpMetricsRecorder();
79+
this.isHttpTransport = datastoreOptions.getTransportOptions() instanceof HttpTransportOptions;
80+
}
81+
82+
private RetryAndTraceDatastoreRpcDecorator(Builder builder) {
83+
this.datastoreRpc = builder.datastoreRpc;
84+
this.otelTraceUtil = builder.otelTraceUtil;
85+
this.retrySettings = builder.retrySettings;
86+
this.datastoreOptions = builder.datastoreOptions;
87+
this.metricsRecorder = builder.metricsRecorder;
88+
this.isHttpTransport = builder.isHttpTransport;
89+
}
90+
91+
public static Builder newBuilder() {
92+
return new Builder();
93+
}
94+
95+
public static class Builder {
96+
private DatastoreRpc datastoreRpc;
97+
private TraceUtil otelTraceUtil;
98+
private RetrySettings retrySettings;
99+
private DatastoreOptions datastoreOptions;
100+
101+
// Defaults configured for this class
102+
private MetricsRecorder metricsRecorder = new NoOpMetricsRecorder();
103+
private boolean isHttpTransport = false;
104+
105+
private Builder() {}
106+
107+
public Builder setDatastoreRpc(DatastoreRpc datastoreRpc) {
108+
this.datastoreRpc = datastoreRpc;
109+
return this;
110+
}
111+
112+
public Builder setTraceUtil(TraceUtil otelTraceUtil) {
113+
this.otelTraceUtil = otelTraceUtil;
114+
return this;
115+
}
116+
117+
public Builder setRetrySettings(RetrySettings retrySettings) {
118+
this.retrySettings = retrySettings;
119+
return this;
120+
}
121+
122+
public Builder setDatastoreOptions(DatastoreOptions datastoreOptions) {
123+
this.datastoreOptions = datastoreOptions;
124+
return this;
125+
}
126+
127+
public Builder setMetricsRecorder(MetricsRecorder metricsRecorder) {
128+
Preconditions.checkNotNull(metricsRecorder, "metricsRecorder can not be null");
129+
this.metricsRecorder = metricsRecorder;
130+
return this;
131+
}
132+
133+
public RetryAndTraceDatastoreRpcDecorator build() {
134+
Preconditions.checkNotNull(datastoreRpc, "datastoreRpc is required");
135+
Preconditions.checkNotNull(otelTraceUtil, "otelTraceUtil is required");
136+
Preconditions.checkNotNull(retrySettings, "retrySettings is required");
137+
Preconditions.checkNotNull(datastoreOptions, "datastoreOptions is required");
138+
this.isHttpTransport = datastoreOptions.getTransportOptions() instanceof HttpTransportOptions;
139+
return new RetryAndTraceDatastoreRpcDecorator(this);
140+
}
66141
}
67142

68143
@Override
@@ -110,7 +185,10 @@ public RunAggregationQueryResponse runAggregationQuery(RunAggregationQueryReques
110185
? com.google.cloud.datastore.telemetry.TraceUtil
111186
.SPAN_NAME_TRANSACTION_RUN_AGGREGATION_QUERY
112187
: com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_RUN_AGGREGATION_QUERY);
113-
return invokeRpc(() -> datastoreRpc.runAggregationQuery(request), spanName);
188+
return invokeRpc(
189+
() -> datastoreRpc.runAggregationQuery(request),
190+
spanName,
191+
TelemetryConstants.METHOD_RUN_AGGREGATION_QUERY);
114192
}
115193

116194
@Override
@@ -124,14 +202,34 @@ public boolean isClosed() {
124202
}
125203

126204
public <O> O invokeRpc(Callable<O> block, String startSpan) {
127-
com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.startSpan(startSpan);
128-
try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) {
129-
return RetryHelper.runWithRetries(
130-
block, this.retrySettings, EXCEPTION_HANDLER, this.datastoreOptions.getClock());
205+
return invokeRpc(block, startSpan, null);
206+
}
207+
208+
<O> O invokeRpc(Callable<O> block, String startSpan, String methodName) {
209+
TraceUtil.Span span = otelTraceUtil.startSpan(startSpan);
210+
Stopwatch stopwatch = isHttpTransport ? Stopwatch.createStarted() : null;
211+
String operationStatus = StatusCode.Code.UNKNOWN.toString();
212+
try (TraceUtil.Scope ignored = span.makeCurrent()) {
213+
Callable<O> callable =
214+
TelemetryUtils.attemptMetricsCallable(
215+
block, metricsRecorder, datastoreOptions, isHttpTransport, methodName);
216+
O result =
217+
RetryHelper.runWithRetries(
218+
callable, this.retrySettings, EXCEPTION_HANDLER, this.datastoreOptions.getClock());
219+
operationStatus = StatusCode.Code.OK.toString();
220+
return result;
131221
} catch (RetryHelperException e) {
222+
operationStatus = DatastoreException.extractStatusCode(e);
132223
span.end(e);
133224
throw DatastoreException.translateAndThrow(e);
134225
} finally {
226+
TelemetryUtils.recordOperationMetrics(
227+
metricsRecorder,
228+
datastoreOptions,
229+
isHttpTransport,
230+
stopwatch,
231+
methodName,
232+
operationStatus);
135233
span.end();
136234
}
137235
}

java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/EnabledTraceUtil.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -208,10 +208,13 @@ public TraceUtil.Span setAttribute(String key, boolean value) {
208208
}
209209

210210
@Override
211+
@SuppressWarnings("MustBeClosedChecker")
211212
public Scope makeCurrent() {
212-
try (io.opentelemetry.context.Scope scope = span.makeCurrent()) {
213-
return new Scope(scope);
214-
}
213+
// span.makeCurrent() opens a ThreadLocal scope that binds this span to the current thread.
214+
// We explicitly leave this unclosed and suppress MustBeClosedChecker so that the returned
215+
// TraceUtil.Scope can manage the lifecycle instead, allowing the caller's try-with-resources
216+
// to control when the ThreadLocal context is restored.
217+
return new Scope(span.makeCurrent());
215218
}
216219
}
217220

@@ -238,10 +241,14 @@ static class Context implements TraceUtil.Context {
238241
}
239242

240243
@Override
244+
@SuppressWarnings("MustBeClosedChecker")
241245
public Scope makeCurrent() {
242-
try (io.opentelemetry.context.Scope scope = context.makeCurrent()) {
243-
return new Scope(scope);
244-
}
246+
// context.makeCurrent() opens a ThreadLocal scope that binds this context to the current
247+
// thread.
248+
// We explicitly leave this unclosed and suppress MustBeClosedChecker so that the returned
249+
// TraceUtil.Scope can manage the lifecycle instead, allowing the caller's try-with-resources
250+
// to control when the ThreadLocal context is restored.
251+
return new Scope(context.makeCurrent());
245252
}
246253
}
247254

java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/MetricsRecorder.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,18 @@ public interface MetricsRecorder {
3838
/** Records the number of attempts a transaction took. */
3939
void recordTransactionAttemptCount(long count, Map<String, String> attributes);
4040

41+
/** Records the latency of a single RPC attempt in milliseconds. */
42+
void recordAttemptLatency(double latencyMs, Map<String, String> attributes);
43+
44+
/** Records the count of a single RPC attempt. */
45+
void recordAttemptCount(long count, Map<String, String> attributes);
46+
47+
/** Records the total latency of an operation (including retries) in milliseconds. */
48+
void recordOperationLatency(double latencyMs, Map<String, String> attributes);
49+
50+
/** Records the count of an operation. */
51+
void recordOperationCount(long count, Map<String, String> attributes);
52+
4153
/**
4254
* Returns a {@link MetricsRecorder} instance based on the provided OpenTelemetry options.
4355
*

java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/NoOpMetricsRecorder.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,19 @@
1616

1717
package com.google.cloud.datastore.telemetry;
1818

19+
import com.google.api.core.InternalApi;
1920
import java.util.Map;
2021

2122
/**
2223
* Metrics recorder implementation, used to stub out metrics instrumentation when metrics are
2324
* disabled.
25+
*
26+
* <p>WARNING: This class is intended for internal use only. It was made public to be used across
27+
* packages as a default. It should not be used by external customers and its API may change without
28+
* notice.
2429
*/
25-
class NoOpMetricsRecorder implements MetricsRecorder {
30+
@InternalApi
31+
public class NoOpMetricsRecorder implements MetricsRecorder {
2632

2733
@Override
2834
public void recordTransactionLatency(double latencyMs, Map<String, String> attributes) {
@@ -33,4 +39,24 @@ public void recordTransactionLatency(double latencyMs, Map<String, String> attri
3339
public void recordTransactionAttemptCount(long count, Map<String, String> attributes) {
3440
/* No-Op OTel Operation */
3541
}
42+
43+
@Override
44+
public void recordAttemptLatency(double latencyMs, Map<String, String> attributes) {
45+
/* No-Op OTel Operation */
46+
}
47+
48+
@Override
49+
public void recordAttemptCount(long count, Map<String, String> attributes) {
50+
/* No-Op OTel Operation */
51+
}
52+
53+
@Override
54+
public void recordOperationLatency(double latencyMs, Map<String, String> attributes) {
55+
/* No-Op OTel Operation */
56+
}
57+
58+
@Override
59+
public void recordOperationCount(long count, Map<String, String> attributes) {
60+
/* No-Op OTel Operation */
61+
}
3662
}

java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorder.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ class OpenTelemetryMetricsRecorder implements MetricsRecorder {
3333

3434
private final DoubleHistogram transactionLatency;
3535
private final LongCounter transactionAttemptCount;
36+
private final DoubleHistogram attemptLatency;
37+
private final LongCounter attemptCount;
38+
private final DoubleHistogram operationLatency;
39+
private final LongCounter operationCount;
3640

3741
OpenTelemetryMetricsRecorder(@Nonnull OpenTelemetry openTelemetry) {
3842
this.openTelemetry = openTelemetry;
@@ -51,6 +55,34 @@ class OpenTelemetryMetricsRecorder implements MetricsRecorder {
5155
.counterBuilder(TelemetryConstants.METRIC_NAME_TRANSACTION_ATTEMPT_COUNT)
5256
.setDescription("Number of attempts to commit a transaction")
5357
.build();
58+
59+
this.attemptLatency =
60+
meter
61+
.histogramBuilder(TelemetryConstants.METRIC_NAME_ATTEMPT_LATENCY)
62+
.setDescription("Latency of a single RPC attempt")
63+
.setUnit("ms")
64+
.build();
65+
66+
this.attemptCount =
67+
meter
68+
.counterBuilder(TelemetryConstants.METRIC_NAME_ATTEMPT_COUNT)
69+
.setDescription("Number of RPC attempts")
70+
.setUnit("1")
71+
.build();
72+
73+
this.operationLatency =
74+
meter
75+
.histogramBuilder(TelemetryConstants.METRIC_NAME_OPERATION_LATENCY)
76+
.setDescription("Total latency of an operation including retries")
77+
.setUnit("ms")
78+
.build();
79+
80+
this.operationCount =
81+
meter
82+
.counterBuilder(TelemetryConstants.METRIC_NAME_OPERATION_COUNT)
83+
.setDescription("Number of operations")
84+
.setUnit("1")
85+
.build();
5486
}
5587

5688
OpenTelemetry getOpenTelemetry() {
@@ -67,6 +99,26 @@ public void recordTransactionAttemptCount(long count, Map<String, String> attrib
6799
transactionAttemptCount.add(count, toOtelAttributes(attributes));
68100
}
69101

102+
@Override
103+
public void recordAttemptLatency(double latencyMs, Map<String, String> attributes) {
104+
attemptLatency.record(latencyMs, toOtelAttributes(attributes));
105+
}
106+
107+
@Override
108+
public void recordAttemptCount(long count, Map<String, String> attributes) {
109+
attemptCount.add(count, toOtelAttributes(attributes));
110+
}
111+
112+
@Override
113+
public void recordOperationLatency(double latencyMs, Map<String, String> attributes) {
114+
operationLatency.record(latencyMs, toOtelAttributes(attributes));
115+
}
116+
117+
@Override
118+
public void recordOperationCount(long count, Map<String, String> attributes) {
119+
operationCount.add(count, toOtelAttributes(attributes));
120+
}
121+
70122
private static Attributes toOtelAttributes(Map<String, String> attributes) {
71123
AttributesBuilder builder = Attributes.builder();
72124
if (attributes != null) {

0 commit comments

Comments
 (0)