4545import com .google .cloud .ServiceOptions ;
4646import com .google .cloud .datastore .execution .AggregationQueryExecutor ;
4747import com .google .cloud .datastore .spi .v1 .DatastoreRpc ;
48+ import com .google .cloud .datastore .telemetry .MetricsRecorder ;
49+ import com .google .cloud .datastore .telemetry .TelemetryConstants ;
4850import com .google .cloud .datastore .telemetry .TraceUtil ;
4951import com .google .cloud .datastore .telemetry .TraceUtil .Scope ;
5052import com .google .common .base .MoreObjects ;
5153import com .google .common .base .Preconditions ;
54+ import com .google .common .base .Stopwatch ;
5255import com .google .common .collect .AbstractIterator ;
5356import com .google .common .collect .ImmutableList ;
5457import com .google .common .collect .ImmutableMap ;
6568import java .util .ArrayList ;
6669import java .util .Arrays ;
6770import java .util .Collections ;
71+ import java .util .HashMap ;
6872import java .util .Iterator ;
6973import java .util .LinkedHashMap ;
7074import java .util .LinkedHashSet ;
7377import java .util .Optional ;
7478import java .util .Set ;
7579import java .util .concurrent .Callable ;
80+ import java .util .concurrent .TimeUnit ;
7681import java .util .logging .Level ;
7782import java .util .logging .Logger ;
7883import javax .annotation .Nullable ;
@@ -89,6 +94,7 @@ final class DatastoreImpl extends BaseService<DatastoreOptions> implements Datas
8994
9095 private final com .google .cloud .datastore .telemetry .TraceUtil otelTraceUtil =
9196 getOptions ().getTraceUtil ();
97+ private final MetricsRecorder metricsRecorder = getOptions ().getMetricsRecorder ();
9298
9399 private final ReadOptionProtoPreparer readOptionProtoPreparer ;
94100 private final AggregationQueryExecutor aggregationQueryExecutor ;
@@ -122,63 +128,31 @@ public Transaction newTransaction() {
122128 return new TransactionImpl (this );
123129 }
124130
131+ /**
132+ * A wrapper around {@link ReadWriteTransactionCallable} that adds OpenTelemetry tracing context
133+ * propagation. All transaction logic (begin, run, commit, rollback, metrics recording) is
134+ * delegated to the underlying {@link ReadWriteTransactionCallable}.
135+ */
125136 static class TracedReadWriteTransactionCallable <T > implements Callable <T > {
126- private final Datastore datastore ;
127- private final TransactionCallable <T > callable ;
128- private volatile TransactionOptions options ;
129- private volatile Transaction transaction ;
130-
137+ private final ReadWriteTransactionCallable <T > delegate ;
131138 private final TraceUtil .Span parentSpan ;
132139
133140 TracedReadWriteTransactionCallable (
134- Datastore datastore ,
135- TransactionCallable <T > callable ,
136- TransactionOptions options ,
141+ ReadWriteTransactionCallable <T > delegate ,
137142 @ Nullable com .google .cloud .datastore .telemetry .TraceUtil .Span parentSpan ) {
138- this .datastore = datastore ;
139- this .callable = callable ;
140- this .options = options ;
141- this .transaction = null ;
143+ this .delegate = delegate ;
142144 this .parentSpan = parentSpan ;
143145 }
144146
145- Datastore getDatastore () {
146- return datastore ;
147- }
148-
149- TransactionOptions getOptions () {
150- return options ;
151- }
152-
153- Transaction getTransaction () {
154- return transaction ;
155- }
156-
157- void setPrevTransactionId (ByteString transactionId ) {
158- TransactionOptions .ReadWrite readWrite =
159- TransactionOptions .ReadWrite .newBuilder ().setPreviousTransaction (transactionId ).build ();
160- options = options .toBuilder ().setReadWrite (readWrite ).build ();
147+ ReadWriteTransactionCallable <T > getDelegate () {
148+ return delegate ;
161149 }
162150
163151 @ Override
164152 public T call () throws DatastoreException {
165153 try (io .opentelemetry .context .Scope ignored =
166154 Context .current ().with (parentSpan .getSpan ()).makeCurrent ()) {
167- transaction = datastore .newTransaction (options );
168- T value = callable .run (transaction );
169- transaction .commit ();
170- return value ;
171- } catch (Exception ex ) {
172- transaction .rollback ();
173- throw DatastoreException .propagateUserException (ex );
174- } finally {
175- if (transaction .isActive ()) {
176- transaction .rollback ();
177- }
178- if (options != null
179- && options .getModeCase ().equals (TransactionOptions .ModeCase .READ_WRITE )) {
180- setPrevTransactionId (transaction .getTransactionId ());
181- }
155+ return delegate .call ();
182156 }
183157 }
184158 }
@@ -200,14 +174,19 @@ public boolean isClosed() {
200174 static class ReadWriteTransactionCallable <T > implements Callable <T > {
201175 private final Datastore datastore ;
202176 private final TransactionCallable <T > callable ;
177+ private final MetricsRecorder metricsRecorder ;
203178 private volatile TransactionOptions options ;
204179 private volatile Transaction transaction ;
205180
206181 ReadWriteTransactionCallable (
207- Datastore datastore , TransactionCallable <T > callable , TransactionOptions options ) {
182+ Datastore datastore ,
183+ TransactionCallable <T > callable ,
184+ TransactionOptions options ,
185+ MetricsRecorder metricsRecorder ) {
208186 this .datastore = datastore ;
209187 this .callable = callable ;
210188 this .options = options ;
189+ this .metricsRecorder = metricsRecorder ;
211190 this .transaction = null ;
212191 }
213192
@@ -235,9 +214,11 @@ public T call() throws DatastoreException {
235214 transaction = datastore .newTransaction (options );
236215 T value = callable .run (transaction );
237216 transaction .commit ();
217+ recordAttempt ("OK" );
238218 return value ;
239219 } catch (Exception ex ) {
240220 transaction .rollback ();
221+ recordAttempt (extractStatus (ex ));
241222 throw DatastoreException .propagateUserException (ex );
242223 } finally {
243224 if (transaction .isActive ()) {
@@ -249,54 +230,92 @@ public T call() throws DatastoreException {
249230 }
250231 }
251232 }
233+
234+ /**
235+ * Records a single transaction commit attempt with the given status code. This is called once
236+ * per invocation of {@link #call()}, capturing the outcome of each individual commit attempt.
237+ */
238+ private void recordAttempt (String status ) {
239+ Map <String , String > attributes = new HashMap <>();
240+ attributes .put (TelemetryConstants .ATTRIBUTES_KEY_STATUS , status );
241+ attributes .put (
242+ TelemetryConstants .ATTRIBUTES_KEY_METHOD_NAME ,
243+ TelemetryConstants .METHOD_TRANSACTION_RUN );
244+ metricsRecorder .recordTransactionAttemptCount (1 , attributes );
245+ }
246+
247+ /**
248+ * Extracts the gRPC status code from the given exception. Falls back to "UNKNOWN" if the
249+ * status cannot be determined.
250+ */
251+ private static String extractStatus (Exception ex ) {
252+ if (ex instanceof DatastoreException ) {
253+ String reason = ((DatastoreException ) ex ).getReason ();
254+ if (reason != null && !reason .isEmpty ()) {
255+ return reason ;
256+ }
257+ }
258+ return "UNKNOWN" ;
259+ }
252260 }
253261
254262 @ Override
255263 public <T > T runInTransaction (final TransactionCallable <T > callable ) {
256- TraceUtil .Span span = otelTraceUtil .startSpan (SPAN_NAME_TRANSACTION_RUN );
257- Callable <T > transactionCallable =
258- (getOptions ().getOpenTelemetryOptions ().isTracingEnabled ()
259- ? new TracedReadWriteTransactionCallable <T >(
260- this , callable , /* transactionOptions= */ null , span )
261- : new ReadWriteTransactionCallable <T >(this , callable , /* transactionOptions= */ null ));
262- try (Scope ignored = span .makeCurrent ()) {
263- return RetryHelper .runWithRetries (
264- transactionCallable ,
265- retrySettings ,
266- TRANSACTION_EXCEPTION_HANDLER ,
267- getOptions ().getClock ());
268- } catch (RetryHelperException e ) {
269- span .end (e );
270- throw DatastoreException .translateAndThrow (e );
271- } finally {
272- span .end ();
273- }
264+ return runInTransaction (callable , /* transactionOptions= */ null );
274265 }
275266
276267 @ Override
277268 public <T > T runInTransaction (
278269 final TransactionCallable <T > callable , TransactionOptions transactionOptions ) {
279270 TraceUtil .Span span = otelTraceUtil .startSpan (SPAN_NAME_TRANSACTION_RUN );
271+ Stopwatch stopwatch = Stopwatch .createStarted ();
280272
273+ ReadWriteTransactionCallable <T > baseCallable =
274+ new ReadWriteTransactionCallable <>(this , callable , transactionOptions , metricsRecorder );
281275 Callable <T > transactionCallable =
282- ( getOptions ().getOpenTelemetryOptions ().isTracingEnabled ()
283- ? new TracedReadWriteTransactionCallable <T >( this , callable , transactionOptions , span )
284- : new ReadWriteTransactionCallable < T >( this , callable , transactionOptions )) ;
276+ getOptions ().getOpenTelemetryOptions ().isTracingEnabled ()
277+ ? new TracedReadWriteTransactionCallable <>( baseCallable , span )
278+ : baseCallable ;
285279
280+ String status = "OK" ;
286281 try (Scope ignored = span .makeCurrent ()) {
287282 return RetryHelper .runWithRetries (
288283 transactionCallable ,
289284 retrySettings ,
290285 TRANSACTION_EXCEPTION_HANDLER ,
291286 getOptions ().getClock ());
292287 } catch (RetryHelperException e ) {
288+ status = extractGrpcStatusCode (e );
293289 span .end (e );
294290 throw DatastoreException .translateAndThrow (e );
295291 } finally {
292+ long latencyMs = stopwatch .elapsed (TimeUnit .MILLISECONDS );
293+ Map <String , String > attributes = new HashMap <>();
294+ attributes .put (TelemetryConstants .ATTRIBUTES_KEY_STATUS , status );
295+ attributes .put (
296+ TelemetryConstants .ATTRIBUTES_KEY_METHOD_NAME ,
297+ TelemetryConstants .METHOD_TRANSACTION_RUN );
298+ metricsRecorder .recordTransactionLatency (latencyMs , attributes );
296299 span .end ();
297300 }
298301 }
299302
303+ /**
304+ * Extracts the gRPC status code from a {@link RetryHelperException}. The underlying cause is
305+ * expected to be a {@link DatastoreException} which carries the gRPC reason. Falls back to
306+ * "UNKNOWN" if the status cannot be determined.
307+ */
308+ private static String extractGrpcStatusCode (RetryHelperException e ) {
309+ Throwable cause = e .getCause ();
310+ if (cause instanceof DatastoreException ) {
311+ String reason = ((DatastoreException ) cause ).getReason ();
312+ if (reason != null && !reason .isEmpty ()) {
313+ return reason ;
314+ }
315+ }
316+ return "UNKNOWN" ;
317+ }
318+
300319 @ Override
301320 public <T > QueryResults <T > run (Query <T > query ) {
302321 return run (Optional .empty (), query , null );
0 commit comments