Skip to content

Commit 81e8e8e

Browse files
lqiu96claude
andcommitted
fix: improve transaction metrics correctness and coverage
- Fix ReadWriteTransactionCallable.call() to use transaction.isActive() as the sole rollback guard in both the catch and finally blocks, removing the redundant `committed` boolean flag. isActive() returns false after both a successful commit and a completed rollback, making it sufficient to prevent double-rollback or rollback-after-commit. Move recordAttempt() to the finally block so it always runs exactly once regardless of how the attempt exits. - Remove transaction metrics from AggregationQueryExecutor. Per-query metrics were recorded outside the runInTransaction() scope and used the misleading transaction_latency name for a single RPC latency. Metrics are now scoped exclusively to runInTransaction() flows. - Differentiate method attributes: transaction_latency uses Transaction.Run (overall operation) and transaction_attempt_count uses Transaction.Commit (per-attempt), reflecting their different scopes. - Add project_id and database_id attributes to both metrics for multi-project and multi-database observability. - Move Stopwatch start to just before RetryHelper.runWithRetries() to exclude callable construction overhead from latency measurements. - Fix transaction_latency description from "successful transaction operations" to "transaction operations" since the metric is recorded for both successful and failed transactions. - Add Javadoc note to extractStatusCode() about the null-reason edge case where UNKNOWN is returned. - Add latency-on-failure assertion to DatastoreImplMetricsTest and extend attribute checks to cover project_id and database_id. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent c060233 commit 81e8e8e

File tree

8 files changed

+57
-47
lines changed

8 files changed

+57
-47
lines changed

java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreException.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,11 @@ static DatastoreException propagateUserException(Exception ex) {
168168
* com.google.api.gax.rpc.StatusCode.Code} which is transport-neutral, supporting both gRPC and
169169
* HttpJson. Falls back to "UNKNOWN" if the status cannot be determined.
170170
*
171+
* <p>Note: Some {@link DatastoreException} instances are constructed without a reason (e.g. via
172+
* {@link DatastoreException#DatastoreException(int, String, Throwable)}). If all {@link
173+
* DatastoreException} instances in the cause chain have a null or empty reason, this method
174+
* returns "UNKNOWN" even if the underlying error carries a meaningful status.
175+
*
171176
* @param throwable the throwable to extract the status code from
172177
* @return the status code name, or "UNKNOWN" if not determinable
173178
*/

java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,7 @@ final class DatastoreImpl extends BaseService<DatastoreOptions> implements Datas
112112
new AggregationQueryExecutor(
113113
new RetryAndTraceDatastoreRpcDecorator(
114114
datastoreRpc, otelTraceUtil, retrySettings, options),
115-
options,
116-
metricsRecorder);
115+
options);
117116
}
118117

119118
@Override
@@ -213,17 +212,28 @@ void setPrevTransactionId(ByteString transactionId) {
213212

214213
@Override
215214
public T call() throws DatastoreException {
215+
String attemptStatus = StatusCode.Code.UNKNOWN.toString();
216216
try {
217217
transaction = datastore.newTransaction(options);
218218
T value = callable.run(transaction);
219219
transaction.commit();
220-
recordAttempt(Status.Code.OK.toString());
220+
attemptStatus = Status.Code.OK.toString();
221221
return value;
222222
} catch (Exception ex) {
223-
transaction.rollback();
224-
recordAttempt(DatastoreException.extractStatusCode(ex));
223+
attemptStatus = DatastoreException.extractStatusCode(ex);
224+
// An exception here can originate from either callable.run() (before commit was attempted)
225+
// or from transaction.commit() itself. In both cases the transaction is still active.
226+
// isActive() returns false if the transaction was already committed or rolled back, so
227+
// it is safe to use as the sole guard here without tracking a separate committed flag.
228+
if (transaction.isActive()) {
229+
transaction.rollback();
230+
}
225231
throw DatastoreException.propagateUserException(ex);
226232
} finally {
233+
recordAttempt(attemptStatus);
234+
// transaction.isActive() returns false after both a successful commit or a completed
235+
// rollback, so it already guards against rolling back a committed transaction or
236+
// rolling back a transaction that has already been rolled back.
227237
if (transaction.isActive()) {
228238
transaction.rollback();
229239
}
@@ -243,6 +253,10 @@ private void recordAttempt(String status) {
243253
attributes.put(TelemetryConstants.ATTRIBUTES_KEY_STATUS, status);
244254
attributes.put(
245255
TelemetryConstants.ATTRIBUTES_KEY_METHOD, TelemetryConstants.METHOD_TRANSACTION_COMMIT);
256+
attributes.put(
257+
TelemetryConstants.ATTRIBUTES_KEY_PROJECT_ID, datastore.getOptions().getProjectId());
258+
attributes.put(
259+
TelemetryConstants.ATTRIBUTES_KEY_DATABASE_ID, datastore.getOptions().getDatabaseId());
246260
metricsRecorder.recordTransactionAttemptCount(1, attributes);
247261
}
248262
}
@@ -256,7 +270,6 @@ public <T> T runInTransaction(final TransactionCallable<T> callable) {
256270
public <T> T runInTransaction(
257271
final TransactionCallable<T> callable, TransactionOptions transactionOptions) {
258272
TraceUtil.Span span = otelTraceUtil.startSpan(SPAN_NAME_TRANSACTION_RUN);
259-
Stopwatch stopwatch = Stopwatch.createStarted();
260273

261274
ReadWriteTransactionCallable<T> baseCallable =
262275
new ReadWriteTransactionCallable<>(this, callable, transactionOptions, metricsRecorder);
@@ -267,6 +280,7 @@ public <T> T runInTransaction(
267280
}
268281

269282
String status = StatusCode.Code.OK.toString();
283+
Stopwatch stopwatch = Stopwatch.createStarted();
270284
try (Scope ignored = span.makeCurrent()) {
271285
return RetryHelper.runWithRetries(
272286
transactionCallable,
@@ -282,7 +296,9 @@ public <T> T runInTransaction(
282296
Map<String, String> attributes = new HashMap<>();
283297
attributes.put(TelemetryConstants.ATTRIBUTES_KEY_STATUS, status);
284298
attributes.put(
285-
TelemetryConstants.ATTRIBUTES_KEY_METHOD, TelemetryConstants.METHOD_TRANSACTION_COMMIT);
299+
TelemetryConstants.ATTRIBUTES_KEY_METHOD, TelemetryConstants.METHOD_TRANSACTION_RUN);
300+
attributes.put(TelemetryConstants.ATTRIBUTES_KEY_PROJECT_ID, getOptions().getProjectId());
301+
attributes.put(TelemetryConstants.ATTRIBUTES_KEY_DATABASE_ID, getOptions().getDatabaseId());
286302
metricsRecorder.recordTransactionLatency(latencyMs, attributes);
287303
span.end();
288304
}

java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/execution/AggregationQueryExecutor.java

Lines changed: 4 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -16,26 +16,18 @@
1616
package com.google.cloud.datastore.execution;
1717

1818
import com.google.api.core.InternalApi;
19-
import com.google.api.gax.rpc.StatusCode;
2019
import com.google.cloud.datastore.AggregationQuery;
2120
import com.google.cloud.datastore.AggregationResults;
22-
import com.google.cloud.datastore.DatastoreException;
2321
import com.google.cloud.datastore.DatastoreOptions;
2422
import com.google.cloud.datastore.ReadOption;
2523
import com.google.cloud.datastore.ReadOption.QueryConfig;
2624
import com.google.cloud.datastore.execution.request.AggregationQueryRequestProtoPreparer;
2725
import com.google.cloud.datastore.execution.response.AggregationQueryResponseTransformer;
2826
import com.google.cloud.datastore.models.ExplainOptions;
2927
import com.google.cloud.datastore.spi.v1.DatastoreRpc;
30-
import com.google.cloud.datastore.telemetry.MetricsRecorder;
31-
import com.google.cloud.datastore.telemetry.TelemetryConstants;
32-
import com.google.common.base.Stopwatch;
3328
import com.google.datastore.v1.RunAggregationQueryRequest;
3429
import com.google.datastore.v1.RunAggregationQueryResponse;
3530
import java.util.Arrays;
36-
import java.util.HashMap;
37-
import java.util.Map;
38-
import java.util.concurrent.TimeUnit;
3931

4032
/**
4133
* An implementation of {@link QueryExecutor} which executes {@link AggregationQuery} and returns
@@ -48,16 +40,13 @@ public class AggregationQueryExecutor
4840
private final DatastoreRpc datastoreRpc;
4941
private final AggregationQueryRequestProtoPreparer protoPreparer;
5042
private final AggregationQueryResponseTransformer responseTransformer;
51-
private final MetricsRecorder metricsRecorder;
5243

5344
public AggregationQueryExecutor(
5445
DatastoreRpc datastoreRpc,
55-
DatastoreOptions datastoreOptions,
56-
MetricsRecorder metricsRecorder) {
46+
DatastoreOptions datastoreOptions) {
5747
this.datastoreRpc = datastoreRpc;
5848
this.protoPreparer = new AggregationQueryRequestProtoPreparer(datastoreOptions);
5949
this.responseTransformer = new AggregationQueryResponseTransformer();
60-
this.metricsRecorder = metricsRecorder;
6150
}
6251

6352
@Override
@@ -66,28 +55,9 @@ public AggregationResults execute(
6655
RunAggregationQueryRequest runAggregationQueryRequest =
6756
getRunAggregationQueryRequest(
6857
query, explainOptions == null ? null : explainOptions.toPb(), readOptions);
69-
boolean isTransactional = runAggregationQueryRequest.getReadOptions().hasTransaction();
70-
Stopwatch stopwatch = isTransactional ? Stopwatch.createStarted() : null;
71-
String status = StatusCode.Code.OK.toString();
72-
try {
73-
RunAggregationQueryResponse runAggregationQueryResponse =
74-
this.datastoreRpc.runAggregationQuery(runAggregationQueryRequest);
75-
return this.responseTransformer.transform(runAggregationQueryResponse);
76-
} catch (DatastoreException e) {
77-
status = DatastoreException.extractStatusCode(e);
78-
throw e;
79-
} finally {
80-
if (isTransactional) {
81-
long latencyMs = stopwatch.elapsed(TimeUnit.MILLISECONDS);
82-
Map<String, String> attributes = new HashMap<>();
83-
attributes.put(TelemetryConstants.ATTRIBUTES_KEY_STATUS, status);
84-
attributes.put(
85-
TelemetryConstants.ATTRIBUTES_KEY_METHOD,
86-
TelemetryConstants.METHOD_TRANSACTION_RUN_AGGREGATION_QUERY);
87-
metricsRecorder.recordTransactionLatency(latencyMs, attributes);
88-
metricsRecorder.recordTransactionAttemptCount(1, attributes);
89-
}
90-
}
58+
RunAggregationQueryResponse runAggregationQueryResponse =
59+
this.datastoreRpc.runAggregationQuery(runAggregationQueryRequest);
60+
return this.responseTransformer.transform(runAggregationQueryResponse);
9161
}
9262

9363
private RunAggregationQueryRequest getRunAggregationQueryRequest(

java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ class OpenTelemetryMetricsRecorder implements MetricsRecorder {
4242
this.transactionLatency =
4343
meter
4444
.histogramBuilder(TelemetryConstants.SERVICE_NAME + "/transaction_latency")
45-
.setDescription("Total latency for successful transaction operations")
45+
.setDescription("Total latency of transaction operations")
4646
.setUnit("ms")
4747
.build();
4848

java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryConstants.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,12 @@ public class TelemetryConstants {
3939
/** Attribute key for the RPC method name (e.g. "Transaction.Run"). */
4040
public static final String ATTRIBUTES_KEY_METHOD = "method";
4141

42+
/** Attribute key for the GCP project ID. */
43+
public static final String ATTRIBUTES_KEY_PROJECT_ID = "project_id";
44+
45+
/** Attribute key for the Datastore database ID. */
46+
public static final String ATTRIBUTES_KEY_DATABASE_ID = "database_id";
47+
4248
/* TODO(lawrenceqiu): For now, these are a duplicate of method names in TraceUtil. Those will use these eventually */
4349
// Format is not SnakeCase to keep backward compatibility with the existing values TraceUtil spans
4450
public static final String METHOD_ALLOCATE_IDS = "AllocateIds";

java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreImplMetricsTest.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,10 @@ public void runInTransaction_recordsLatencyOnSuccess() {
113113
assertThat(point.getSum()).isAtLeast(0.0);
114114
assertThat(point.getAttributes().get(AttributeKey.stringKey("status"))).isEqualTo("OK");
115115
assertThat(point.getAttributes().get(AttributeKey.stringKey("method")))
116-
.isEqualTo(TelemetryConstants.METHOD_TRANSACTION_COMMIT);
116+
.isEqualTo(TelemetryConstants.METHOD_TRANSACTION_RUN);
117+
assertThat(point.getAttributes().get(AttributeKey.stringKey("project_id")))
118+
.isEqualTo(PROJECT_ID);
119+
assertThat(point.getAttributes().get(AttributeKey.stringKey("database_id"))).isNotNull();
117120

118121
EasyMock.verify(rpcFactoryMock, rpcMock);
119122
}
@@ -279,6 +282,18 @@ public void runInTransaction_recordsGrpcStatusCodeOnFailure() {
279282
.orElse(null);
280283
assertThat(abortedPoint).isNotNull();
281284
assertThat(abortedPoint.getValue()).isAtLeast(1);
285+
286+
// Verify that latency was recorded with the failure status code
287+
Optional<MetricData> latencyMetric =
288+
localMetricReader.collectAllMetrics().stream()
289+
.filter(m -> m.getName().equals(LATENCY_METRIC_NAME))
290+
.findFirst();
291+
assertThat(latencyMetric.isPresent()).isTrue();
292+
HistogramPointData latencyPoint =
293+
latencyMetric.get().getHistogramData().getPoints().stream().findFirst().orElse(null);
294+
assertThat(latencyPoint).isNotNull();
295+
assertThat(latencyPoint.getAttributes().get(AttributeKey.stringKey("status")))
296+
.isEqualTo("ABORTED");
282297
}
283298

284299
@Test

java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/execution/AggregationQueryExecutorTest.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import com.google.cloud.datastore.LongValue;
3838
import com.google.cloud.datastore.Query;
3939
import com.google.cloud.datastore.spi.v1.DatastoreRpc;
40-
import com.google.cloud.datastore.telemetry.MetricsRecorder;
4140
import com.google.common.collect.ImmutableMap;
4241
import com.google.datastore.v1.AggregationResultBatch;
4342
import com.google.datastore.v1.RunAggregationQueryRequest;
@@ -67,8 +66,7 @@ public void setUp() throws Exception {
6766
queryExecutor =
6867
new AggregationQueryExecutor(
6968
mockRpc,
70-
datastoreOptions,
71-
MetricsRecorder.getInstance(datastoreOptions.getOpenTelemetryOptions()));
69+
datastoreOptions);
7270
}
7371

7472
@Test

java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorderTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public void recordTransactionLatency_recordsHistogramWithAttributes() {
6969

7070
assertThat(latencyMetric).isNotNull();
7171
assertThat(latencyMetric.getDescription())
72-
.isEqualTo("Total latency for successful transaction operations");
72+
.isEqualTo("Total latency of transaction operations");
7373
assertThat(latencyMetric.getUnit()).isEqualTo("ms");
7474

7575
HistogramPointData point =

0 commit comments

Comments
 (0)