Skip to content

Commit 25af60d

Browse files
committed
chore(datastore): Rename internal metrics recorder hierarchy and fix gRPC transport coverage
Replace the old MetricsRecorder / OpenTelemetryMetricsRecorder / NoOpMetricsRecorder types with the new DatastoreMetricsRecorder family, which extends GAX's MetricsRecorder interface for a unified recording contract. Key changes: - Delete MetricsRecorder.java, OpenTelemetryMetricsRecorder.java, NoOpMetricsRecorder.java and their tests - Add DatastoreMetricsRecorder interface (with simple getInstance() that returns an OTel recorder when metrics are enabled, NoOp otherwise) - Add NoOpDatastoreMetricsRecorder, OpenTelemetryDatastoreMetricsRecorder, and DatastoreMetricsRecorderTest - Remove the !GRPC transport guard from TelemetryUtils.recordOperationMetrics() and attemptMetricsCallable() so all transports record metrics uniformly - Remove the isHttpTransport field from RetryAndTraceDatastoreRpcDecorator and DatastoreImpl; remove buildMetricsTracerFactory() from GrpcDatastoreRpc - Update TelemetryConstants with the new METRIC_PREFIX, DATASTORE_METER_NAME, and typed AttributeKey constants needed by the new recorder classes - Update DatastoreOptions to pass the full DatastoreOptions to getInstance() so the recorder factory can inspect credentials and project at creation time
1 parent 03aabb2 commit 25af60d

14 files changed

+362
-381
lines changed

java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java

Lines changed: 18 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,11 @@
4848
import com.google.cloud.TransportOptions;
4949
import com.google.cloud.datastore.execution.AggregationQueryExecutor;
5050
import com.google.cloud.datastore.spi.v1.DatastoreRpc;
51-
import com.google.cloud.datastore.telemetry.MetricsRecorder;
51+
import com.google.cloud.datastore.telemetry.DatastoreMetricsRecorder;
5252
import com.google.cloud.datastore.telemetry.TelemetryConstants;
5353
import com.google.cloud.datastore.telemetry.TelemetryUtils;
5454
import com.google.cloud.datastore.telemetry.TraceUtil;
5555
import com.google.cloud.datastore.telemetry.TraceUtil.Scope;
56-
import com.google.cloud.http.HttpTransportOptions;
5756
import com.google.common.base.MoreObjects;
5857
import com.google.common.base.Preconditions;
5958
import com.google.common.base.Stopwatch;
@@ -101,16 +100,14 @@ final class DatastoreImpl extends BaseService<DatastoreOptions> implements Datas
101100

102101
private final com.google.cloud.datastore.telemetry.TraceUtil otelTraceUtil =
103102
getOptions().getTraceUtil();
104-
private final MetricsRecorder metricsRecorder = getOptions().getMetricsRecorder();
105-
private final boolean isHttpTransport;
106-
103+
private final DatastoreMetricsRecorder datastoreMetricsRecorder =
104+
getOptions().getMetricsRecorder();
107105
private final ReadOptionProtoPreparer readOptionProtoPreparer;
108106
private final AggregationQueryExecutor aggregationQueryExecutor;
109107

110108
DatastoreImpl(DatastoreOptions options) {
111109
super(options);
112110
this.datastoreRpc = options.getDatastoreRpcV1();
113-
this.isHttpTransport = options.getTransportOptions() instanceof HttpTransportOptions;
114111
retrySettings =
115112
MoreObjects.firstNonNull(options.getRetrySettings(), ServiceOptions.getNoRetrySettings());
116113

@@ -122,7 +119,7 @@ final class DatastoreImpl extends BaseService<DatastoreOptions> implements Datas
122119
.setTraceUtil(otelTraceUtil)
123120
.setRetrySettings(retrySettings)
124121
.setDatastoreOptions(options)
125-
.setMetricsRecorder(metricsRecorder)
122+
.setMetricsRecorder(datastoreMetricsRecorder)
126123
.build(),
127124
options);
128125
}
@@ -185,19 +182,19 @@ public boolean isClosed() {
185182
static class ReadWriteTransactionCallable<T> implements Callable<T> {
186183
private final Datastore datastore;
187184
private final TransactionCallable<T> callable;
188-
private final MetricsRecorder metricsRecorder;
185+
private final DatastoreMetricsRecorder datastoreMetricsRecorder;
189186
private volatile TransactionOptions options;
190187
private volatile Transaction transaction;
191188

192189
ReadWriteTransactionCallable(
193190
Datastore datastore,
194191
TransactionCallable<T> callable,
195192
TransactionOptions options,
196-
MetricsRecorder metricsRecorder) {
193+
DatastoreMetricsRecorder datastoreMetricsRecorder) {
197194
this.datastore = datastore;
198195
this.callable = callable;
199196
this.options = options;
200-
this.metricsRecorder = metricsRecorder;
197+
this.datastoreMetricsRecorder = datastoreMetricsRecorder;
201198
this.transaction = null;
202199
}
203200

@@ -222,7 +219,7 @@ public T call() throws DatastoreException {
222219
// or from `transaction.commit()`. If there is an exception thrown from either call site,
223220
// then the transaction is still active. Check if it is still active (e.g. not commited)
224221
// and roll back the transaction.
225-
if (transaction.isActive()) {
222+
if (transaction != null && transaction.isActive()) {
226223
transaction.rollback();
227224
}
228225
throw DatastoreException.propagateUserException(ex);
@@ -231,10 +228,11 @@ public T call() throws DatastoreException {
231228
// If the transaction is active, then commit the rollback. If it was already successfully
232229
// rolled back, the transaction is inactive (prevents rolling back an already rolled back
233230
// transaction).
234-
if (transaction.isActive()) {
231+
if (transaction != null && transaction.isActive()) {
235232
transaction.rollback();
236233
}
237-
if (options != null
234+
if (transaction != null
235+
&& options != null
238236
&& options.getModeCase().equals(TransactionOptions.ModeCase.READ_WRITE)) {
239237
setPrevTransactionId(transaction.getTransactionId());
240238
}
@@ -257,7 +255,7 @@ private void recordAttempt(String status, TransportOptions transportOptions) {
257255
attributes.put(
258256
TelemetryConstants.ATTRIBUTES_KEY_TRANSPORT,
259257
TelemetryConstants.getTransportName(transportOptions));
260-
metricsRecorder.recordTransactionAttemptCount(1, attributes);
258+
datastoreMetricsRecorder.recordTransactionAttemptCount(1, attributes);
261259
}
262260
}
263261

@@ -272,7 +270,8 @@ public <T> T runInTransaction(
272270
TraceUtil.Span span = otelTraceUtil.startSpan(SPAN_NAME_TRANSACTION_RUN);
273271

274272
ReadWriteTransactionCallable<T> baseCallable =
275-
new ReadWriteTransactionCallable<>(this, callable, transactionOptions, metricsRecorder);
273+
new ReadWriteTransactionCallable<>(
274+
this, callable, transactionOptions, datastoreMetricsRecorder);
276275

277276
Callable<T> transactionCallable = baseCallable;
278277
if (getOptions().getOpenTelemetryOptions().isTracingEnabled()) {
@@ -302,7 +301,7 @@ public <T> T runInTransaction(
302301
attributes.put(
303302
TelemetryConstants.ATTRIBUTES_KEY_TRANSPORT,
304303
TelemetryConstants.getTransportName(getOptions().getTransportOptions()));
305-
metricsRecorder.recordTransactionLatency(latencyMs, attributes);
304+
datastoreMetricsRecorder.recordTransactionLatency(latencyMs, attributes);
306305
span.end();
307306
}
308307
}
@@ -805,15 +804,13 @@ private <T> T runWithObservability(
805804
ResultRetryAlgorithm<?> exceptionHandler) {
806805
com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.startSpan(spanName);
807806

808-
// Gax already records operation and attempt metrics. Since Datastore HttpJson does not
809-
// integrate with Gax, manually instrument these metrics when using HttpJson for parity
810-
Stopwatch operationStopwatch = isHttpTransport ? Stopwatch.createStarted() : null;
807+
Stopwatch operationStopwatch = Stopwatch.createStarted();
811808
String operationStatus = StatusCode.Code.OK.toString();
812809

813810
DatastoreOptions options = getOptions();
814811
Callable<T> attemptCallable =
815812
TelemetryUtils.attemptMetricsCallable(
816-
callable, metricsRecorder, options, isHttpTransport, methodName);
813+
callable, datastoreMetricsRecorder, options, methodName);
817814
try (TraceUtil.Scope ignored = span.makeCurrent()) {
818815
return RetryHelper.runWithRetries(
819816
attemptCallable, retrySettings, exceptionHandler, options.getClock());
@@ -823,12 +820,7 @@ private <T> T runWithObservability(
823820
throw DatastoreException.translateAndThrow(e);
824821
} finally {
825822
TelemetryUtils.recordOperationMetrics(
826-
metricsRecorder,
827-
options,
828-
isHttpTransport,
829-
operationStopwatch,
830-
methodName,
831-
operationStatus);
823+
datastoreMetricsRecorder, options, operationStopwatch, methodName, operationStatus);
832824
span.end();
833825
}
834826
}

java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreOptions.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import com.google.cloud.datastore.spi.v1.DatastoreRpc;
3232
import com.google.cloud.datastore.spi.v1.GrpcDatastoreRpc;
3333
import com.google.cloud.datastore.spi.v1.HttpDatastoreRpc;
34-
import com.google.cloud.datastore.telemetry.MetricsRecorder;
34+
import com.google.cloud.datastore.telemetry.DatastoreMetricsRecorder;
3535
import com.google.cloud.datastore.v1.DatastoreSettings;
3636
import com.google.cloud.grpc.GrpcTransportOptions;
3737
import com.google.cloud.http.HttpTransportOptions;
@@ -65,7 +65,7 @@ public class DatastoreOptions extends ServiceOptions<Datastore, DatastoreOptions
6565

6666
private final transient @Nonnull DatastoreOpenTelemetryOptions openTelemetryOptions;
6767
private final transient @Nonnull com.google.cloud.datastore.telemetry.TraceUtil traceUtil;
68-
private final transient @Nonnull MetricsRecorder metricsRecorder;
68+
private final transient @Nonnull DatastoreMetricsRecorder datastoreMetricsRecorder;
6969

7070
public static class DefaultDatastoreFactory implements DatastoreFactory {
7171

@@ -107,8 +107,8 @@ public DatastoreOpenTelemetryOptions getOpenTelemetryOptions() {
107107
}
108108

109109
@Nonnull
110-
MetricsRecorder getMetricsRecorder() {
111-
return metricsRecorder;
110+
DatastoreMetricsRecorder getMetricsRecorder() {
111+
return datastoreMetricsRecorder;
112112
}
113113

114114
public static class Builder extends ServiceOptions.Builder<Datastore, DatastoreOptions, Builder> {
@@ -193,7 +193,7 @@ public Builder setDatabaseId(String databaseId) {
193193
}
194194

195195
/**
196-
* Sets the {@link DatastoreOpenTelemetryOptions} to be used for this Firestore instance.
196+
* Sets the {@link DatastoreOpenTelemetryOptions} to be used for this Datastore instance.
197197
*
198198
* @param openTelemetryOptions The `DatastoreOpenTelemetryOptions` to use.
199199
*/
@@ -223,7 +223,7 @@ private DatastoreOptions(Builder builder) {
223223
? builder.openTelemetryOptions
224224
: DatastoreOpenTelemetryOptions.newBuilder().build();
225225
this.traceUtil = com.google.cloud.datastore.telemetry.TraceUtil.getInstance(this);
226-
this.metricsRecorder = MetricsRecorder.getInstance(openTelemetryOptions);
226+
this.datastoreMetricsRecorder = DatastoreMetricsRecorder.getInstance(this);
227227

228228
namespace = MoreObjects.firstNonNull(builder.namespace, defaultNamespace());
229229
databaseId = MoreObjects.firstNonNull(builder.databaseId, DEFAULT_DATABASE_ID);

java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecorator.java

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,11 @@
2424
import com.google.cloud.RetryHelper;
2525
import com.google.cloud.RetryHelper.RetryHelperException;
2626
import com.google.cloud.datastore.spi.v1.DatastoreRpc;
27-
import com.google.cloud.datastore.telemetry.MetricsRecorder;
28-
import com.google.cloud.datastore.telemetry.NoOpMetricsRecorder;
27+
import com.google.cloud.datastore.telemetry.DatastoreMetricsRecorder;
28+
import com.google.cloud.datastore.telemetry.NoOpDatastoreMetricsRecorder;
2929
import com.google.cloud.datastore.telemetry.TelemetryConstants;
3030
import com.google.cloud.datastore.telemetry.TelemetryUtils;
3131
import com.google.cloud.datastore.telemetry.TraceUtil;
32-
import com.google.cloud.http.HttpTransportOptions;
3332
import com.google.common.base.Preconditions;
3433
import com.google.common.base.Stopwatch;
3534
import com.google.datastore.v1.AllocateIdsRequest;
@@ -62,8 +61,7 @@ public class RetryAndTraceDatastoreRpcDecorator implements DatastoreRpc {
6261
private final com.google.cloud.datastore.telemetry.TraceUtil otelTraceUtil;
6362
private final RetrySettings retrySettings;
6463
private final DatastoreOptions datastoreOptions;
65-
private final MetricsRecorder metricsRecorder;
66-
private final boolean isHttpTransport;
64+
private final DatastoreMetricsRecorder datastoreMetricsRecorder;
6765

6866
@ObsoleteApi("Prefer to create RetryAndTraceDatastoreRpcDecorator via the Builder")
6967
public RetryAndTraceDatastoreRpcDecorator(
@@ -75,17 +73,15 @@ public RetryAndTraceDatastoreRpcDecorator(
7573
this.retrySettings = retrySettings;
7674
this.datastoreOptions = datastoreOptions;
7775
this.otelTraceUtil = otelTraceUtil;
78-
this.metricsRecorder = new NoOpMetricsRecorder();
79-
this.isHttpTransport = datastoreOptions.getTransportOptions() instanceof HttpTransportOptions;
76+
this.datastoreMetricsRecorder = new NoOpDatastoreMetricsRecorder();
8077
}
8178

8279
private RetryAndTraceDatastoreRpcDecorator(Builder builder) {
8380
this.datastoreRpc = builder.datastoreRpc;
8481
this.otelTraceUtil = builder.otelTraceUtil;
8582
this.retrySettings = builder.retrySettings;
8683
this.datastoreOptions = builder.datastoreOptions;
87-
this.metricsRecorder = builder.metricsRecorder;
88-
this.isHttpTransport = builder.isHttpTransport;
84+
this.datastoreMetricsRecorder = builder.datastoreMetricsRecorder;
8985
}
9086

9187
public static Builder newBuilder() {
@@ -99,8 +95,7 @@ public static class Builder {
9995
private DatastoreOptions datastoreOptions;
10096

10197
// Defaults configured for this class
102-
private MetricsRecorder metricsRecorder = new NoOpMetricsRecorder();
103-
private boolean isHttpTransport = false;
98+
private DatastoreMetricsRecorder datastoreMetricsRecorder = new NoOpDatastoreMetricsRecorder();
10499

105100
private Builder() {}
106101

@@ -124,9 +119,10 @@ public Builder setDatastoreOptions(DatastoreOptions datastoreOptions) {
124119
return this;
125120
}
126121

127-
public Builder setMetricsRecorder(MetricsRecorder metricsRecorder) {
128-
Preconditions.checkNotNull(metricsRecorder, "metricsRecorder can not be null");
129-
this.metricsRecorder = metricsRecorder;
122+
public Builder setMetricsRecorder(DatastoreMetricsRecorder datastoreMetricsRecorder) {
123+
Preconditions.checkNotNull(
124+
datastoreMetricsRecorder, "datastoreMetricsRecorder can not be null");
125+
this.datastoreMetricsRecorder = datastoreMetricsRecorder;
130126
return this;
131127
}
132128

@@ -135,7 +131,6 @@ public RetryAndTraceDatastoreRpcDecorator build() {
135131
Preconditions.checkNotNull(otelTraceUtil, "otelTraceUtil is required");
136132
Preconditions.checkNotNull(retrySettings, "retrySettings is required");
137133
Preconditions.checkNotNull(datastoreOptions, "datastoreOptions is required");
138-
this.isHttpTransport = datastoreOptions.getTransportOptions() instanceof HttpTransportOptions;
139134
return new RetryAndTraceDatastoreRpcDecorator(this);
140135
}
141136
}
@@ -207,12 +202,12 @@ public <O> O invokeRpc(Callable<O> block, String startSpan) {
207202

208203
<O> O invokeRpc(Callable<O> block, String startSpan, String methodName) {
209204
TraceUtil.Span span = otelTraceUtil.startSpan(startSpan);
210-
Stopwatch stopwatch = isHttpTransport ? Stopwatch.createStarted() : null;
205+
Stopwatch stopwatch = Stopwatch.createStarted();
211206
String operationStatus = StatusCode.Code.UNKNOWN.toString();
212207
try (TraceUtil.Scope ignored = span.makeCurrent()) {
213208
Callable<O> callable =
214209
TelemetryUtils.attemptMetricsCallable(
215-
block, metricsRecorder, datastoreOptions, isHttpTransport, methodName);
210+
block, datastoreMetricsRecorder, datastoreOptions, methodName);
216211
O result =
217212
RetryHelper.runWithRetries(
218213
callable, this.retrySettings, EXCEPTION_HANDLER, this.datastoreOptions.getClock());
@@ -224,12 +219,7 @@ <O> O invokeRpc(Callable<O> block, String startSpan, String methodName) {
224219
throw DatastoreException.translateAndThrow(e);
225220
} finally {
226221
TelemetryUtils.recordOperationMetrics(
227-
metricsRecorder,
228-
datastoreOptions,
229-
isHttpTransport,
230-
stopwatch,
231-
methodName,
232-
operationStatus);
222+
datastoreMetricsRecorder, datastoreOptions, stopwatch, methodName, operationStatus);
233223
span.end();
234224
}
235225
}

java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/spi/v1/GrpcDatastoreRpc.java

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,9 @@
3131
import com.google.api.gax.rpc.HeaderProvider;
3232
import com.google.api.gax.rpc.NoHeaderProvider;
3333
import com.google.api.gax.rpc.TransportChannel;
34-
import com.google.api.gax.tracing.MetricsTracerFactory;
35-
import com.google.api.gax.tracing.OpenTelemetryMetricsRecorder;
3634
import com.google.cloud.ServiceOptions;
3735
import com.google.cloud.datastore.DatastoreException;
3836
import com.google.cloud.datastore.DatastoreOptions;
39-
import com.google.cloud.datastore.telemetry.TelemetryConstants;
4037
import com.google.cloud.datastore.v1.DatastoreSettings;
4138
import com.google.cloud.datastore.v1.stub.DatastoreStubSettings;
4239
import com.google.cloud.datastore.v1.stub.GrpcDatastoreStub;
@@ -61,12 +58,8 @@
6158
import io.grpc.CallOptions;
6259
import io.grpc.ManagedChannel;
6360
import io.grpc.ManagedChannelBuilder;
64-
import io.opentelemetry.api.GlobalOpenTelemetry;
65-
import io.opentelemetry.api.OpenTelemetry;
6661
import java.io.IOException;
6762
import java.util.Collections;
68-
import java.util.HashMap;
69-
import java.util.Map;
7063

7164
@InternalApi
7265
public class GrpcDatastoreRpc implements DatastoreRpc {
@@ -95,44 +88,12 @@ public GrpcDatastoreRpc(DatastoreOptions datastoreOptions) throws IOException {
9588
.build())
9689
.build());
9790

98-
// Hook into Gax's Metrics collection framework
99-
MetricsTracerFactory metricsTracerFactory = buildMetricsTracerFactory(datastoreOptions);
100-
if (metricsTracerFactory != null) {
101-
builder.setTracerFactory(metricsTracerFactory);
102-
}
103-
10491
datastoreStub = GrpcDatastoreStub.create(builder.build());
10592
} catch (IOException e) {
10693
throw new IOException(e);
10794
}
10895
}
10996

110-
/**
111-
* Build the MetricsTracerFactory to hook into Gax's Otel Framework. Only hooks into Gax on two
112-
* conditions: 1. OpenTelemetry instance is passed in by the user 2. Metrics are enabled
113-
*
114-
* <p>Sets default attributes to be recorded as part of the metrics.
115-
*/
116-
static MetricsTracerFactory buildMetricsTracerFactory(DatastoreOptions datastoreOptions) {
117-
if (!datastoreOptions.getOpenTelemetryOptions().isMetricsEnabled()) {
118-
return null;
119-
}
120-
OpenTelemetry openTelemetry = datastoreOptions.getOpenTelemetryOptions().getOpenTelemetry();
121-
if (openTelemetry == null) {
122-
openTelemetry = GlobalOpenTelemetry.get();
123-
}
124-
OpenTelemetryMetricsRecorder gaxMetricsRecorder =
125-
new OpenTelemetryMetricsRecorder(openTelemetry, TelemetryConstants.SERVICE_NAME);
126-
Map<String, String> attributes = new HashMap<>();
127-
attributes.put(TelemetryConstants.ATTRIBUTES_KEY_PROJECT_ID, datastoreOptions.getProjectId());
128-
if (!Strings.isNullOrEmpty(datastoreOptions.getDatabaseId())) {
129-
attributes.put(
130-
TelemetryConstants.ATTRIBUTES_KEY_DATABASE_ID, datastoreOptions.getDatabaseId());
131-
}
132-
attributes.put(TelemetryConstants.ATTRIBUTES_KEY_TRANSPORT, "grpc");
133-
return new MetricsTracerFactory(gaxMetricsRecorder, attributes);
134-
}
135-
13697
@Override
13798
public void close() throws Exception {
13899
if (!closed) {

0 commit comments

Comments
 (0)