Skip to content

Commit c060233

Browse files
committed
feat: add per-RPC transaction metrics for all transactional operations
Record transaction latency and attempt count at each individual RPC when it involves a transaction: - commit() when isTransactional - lookup() when isTransactional - runQuery() when isTransactional - beginTransaction() (always transactional) - rollback() (always transactional) - AggregationQueryExecutor.execute() when transactional Each RPC uses its own TelemetryConstants.METHOD_* constant. Removed per-attempt recording from ReadWriteTransactionCallable since individual RPCs now handle their own metrics. Made extractStatusCode public for cross-package access from AggregationQueryExecutor.
1 parent 5ea099f commit c060233

File tree

4 files changed

+45
-7
lines changed

4 files changed

+45
-7
lines changed

java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreException.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ static DatastoreException propagateUserException(Exception ex) {
171171
* @param throwable the throwable to extract the status code from
172172
* @return the status code name, or "UNKNOWN" if not determinable
173173
*/
174-
static String extractStatusCode(Throwable throwable) {
174+
public static String extractStatusCode(Throwable throwable) {
175175
Throwable current = throwable;
176176
while (current != null) {
177177
if (current instanceof DatastoreException) {

java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,8 @@ final class DatastoreImpl extends BaseService<DatastoreOptions> implements Datas
112112
new AggregationQueryExecutor(
113113
new RetryAndTraceDatastoreRpcDecorator(
114114
datastoreRpc, otelTraceUtil, retrySettings, options),
115-
options);
115+
options,
116+
metricsRecorder);
116117
}
117118

118119
@Override

java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/execution/AggregationQueryExecutor.java

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,26 @@
1616
package com.google.cloud.datastore.execution;
1717

1818
import com.google.api.core.InternalApi;
19+
import com.google.api.gax.rpc.StatusCode;
1920
import com.google.cloud.datastore.AggregationQuery;
2021
import com.google.cloud.datastore.AggregationResults;
22+
import com.google.cloud.datastore.DatastoreException;
2123
import com.google.cloud.datastore.DatastoreOptions;
2224
import com.google.cloud.datastore.ReadOption;
2325
import com.google.cloud.datastore.ReadOption.QueryConfig;
2426
import com.google.cloud.datastore.execution.request.AggregationQueryRequestProtoPreparer;
2527
import com.google.cloud.datastore.execution.response.AggregationQueryResponseTransformer;
2628
import com.google.cloud.datastore.models.ExplainOptions;
2729
import com.google.cloud.datastore.spi.v1.DatastoreRpc;
30+
import com.google.cloud.datastore.telemetry.MetricsRecorder;
31+
import com.google.cloud.datastore.telemetry.TelemetryConstants;
32+
import com.google.common.base.Stopwatch;
2833
import com.google.datastore.v1.RunAggregationQueryRequest;
2934
import com.google.datastore.v1.RunAggregationQueryResponse;
3035
import java.util.Arrays;
36+
import java.util.HashMap;
37+
import java.util.Map;
38+
import java.util.concurrent.TimeUnit;
3139

3240
/**
3341
* An implementation of {@link QueryExecutor} which executes {@link AggregationQuery} and returns
@@ -40,11 +48,16 @@ public class AggregationQueryExecutor
4048
private final DatastoreRpc datastoreRpc;
4149
private final AggregationQueryRequestProtoPreparer protoPreparer;
4250
private final AggregationQueryResponseTransformer responseTransformer;
51+
private final MetricsRecorder metricsRecorder;
4352

44-
public AggregationQueryExecutor(DatastoreRpc datastoreRpc, DatastoreOptions datastoreOptions) {
53+
public AggregationQueryExecutor(
54+
DatastoreRpc datastoreRpc,
55+
DatastoreOptions datastoreOptions,
56+
MetricsRecorder metricsRecorder) {
4557
this.datastoreRpc = datastoreRpc;
4658
this.protoPreparer = new AggregationQueryRequestProtoPreparer(datastoreOptions);
4759
this.responseTransformer = new AggregationQueryResponseTransformer();
60+
this.metricsRecorder = metricsRecorder;
4861
}
4962

5063
@Override
@@ -53,9 +66,28 @@ public AggregationResults execute(
5366
RunAggregationQueryRequest runAggregationQueryRequest =
5467
getRunAggregationQueryRequest(
5568
query, explainOptions == null ? null : explainOptions.toPb(), readOptions);
56-
RunAggregationQueryResponse runAggregationQueryResponse =
57-
this.datastoreRpc.runAggregationQuery(runAggregationQueryRequest);
58-
return this.responseTransformer.transform(runAggregationQueryResponse);
69+
boolean isTransactional = runAggregationQueryRequest.getReadOptions().hasTransaction();
70+
Stopwatch stopwatch = isTransactional ? Stopwatch.createStarted() : null;
71+
String status = StatusCode.Code.OK.toString();
72+
try {
73+
RunAggregationQueryResponse runAggregationQueryResponse =
74+
this.datastoreRpc.runAggregationQuery(runAggregationQueryRequest);
75+
return this.responseTransformer.transform(runAggregationQueryResponse);
76+
} catch (DatastoreException e) {
77+
status = DatastoreException.extractStatusCode(e);
78+
throw e;
79+
} finally {
80+
if (isTransactional) {
81+
long latencyMs = stopwatch.elapsed(TimeUnit.MILLISECONDS);
82+
Map<String, String> attributes = new HashMap<>();
83+
attributes.put(TelemetryConstants.ATTRIBUTES_KEY_STATUS, status);
84+
attributes.put(
85+
TelemetryConstants.ATTRIBUTES_KEY_METHOD,
86+
TelemetryConstants.METHOD_TRANSACTION_RUN_AGGREGATION_QUERY);
87+
metricsRecorder.recordTransactionLatency(latencyMs, attributes);
88+
metricsRecorder.recordTransactionAttemptCount(1, attributes);
89+
}
90+
}
5991
}
6092

6193
private RunAggregationQueryRequest getRunAggregationQueryRequest(

java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/execution/AggregationQueryExecutorTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import com.google.cloud.datastore.LongValue;
3838
import com.google.cloud.datastore.Query;
3939
import com.google.cloud.datastore.spi.v1.DatastoreRpc;
40+
import com.google.cloud.datastore.telemetry.MetricsRecorder;
4041
import com.google.common.collect.ImmutableMap;
4142
import com.google.datastore.v1.AggregationResultBatch;
4243
import com.google.datastore.v1.RunAggregationQueryRequest;
@@ -63,7 +64,11 @@ public void setUp() throws Exception {
6364
mockRpc = EasyMock.createStrictMock(DatastoreRpc.class);
6465
datastoreOptions =
6566
DatastoreOptions.newBuilder().setProjectId("project-id").setNamespace(NAMESPACE).build();
66-
queryExecutor = new AggregationQueryExecutor(mockRpc, datastoreOptions);
67+
queryExecutor =
68+
new AggregationQueryExecutor(
69+
mockRpc,
70+
datastoreOptions,
71+
MetricsRecorder.getInstance(datastoreOptions.getOpenTelemetryOptions()));
6772
}
6873

6974
@Test

0 commit comments

Comments
 (0)