3838
3939import com .google .api .core .BetaApi ;
4040import com .google .api .gax .retrying .RetrySettings ;
41+ import com .google .api .gax .rpc .StatusCode ;
4142import com .google .cloud .BaseService ;
4243import com .google .cloud .ExceptionHandler ;
4344import com .google .cloud .RetryHelper ;
4445import com .google .cloud .RetryHelper .RetryHelperException ;
4546import com .google .cloud .ServiceOptions ;
4647import com .google .cloud .datastore .execution .AggregationQueryExecutor ;
4748import com .google .cloud .datastore .spi .v1 .DatastoreRpc ;
49+ import com .google .cloud .datastore .telemetry .MetricsRecorder ;
50+ import com .google .cloud .datastore .telemetry .TelemetryConstants ;
4851import com .google .cloud .datastore .telemetry .TraceUtil ;
4952import com .google .cloud .datastore .telemetry .TraceUtil .Scope ;
5053import com .google .common .base .MoreObjects ;
5154import com .google .common .base .Preconditions ;
55+ import com .google .common .base .Stopwatch ;
5256import com .google .common .collect .AbstractIterator ;
5357import com .google .common .collect .ImmutableList ;
5458import com .google .common .collect .ImmutableMap ;
6165import com .google .datastore .v1 .RunQueryResponse ;
6266import com .google .datastore .v1 .TransactionOptions ;
6367import com .google .protobuf .ByteString ;
68+ import io .grpc .Status ;
6469import io .opentelemetry .context .Context ;
6570import java .util .ArrayList ;
6671import java .util .Arrays ;
6772import java .util .Collections ;
73+ import java .util .HashMap ;
6874import java .util .Iterator ;
6975import java .util .LinkedHashMap ;
7076import java .util .LinkedHashSet ;
7379import java .util .Optional ;
7480import java .util .Set ;
7581import java .util .concurrent .Callable ;
82+ import java .util .concurrent .TimeUnit ;
7683import java .util .logging .Level ;
7784import java .util .logging .Logger ;
7885import javax .annotation .Nullable ;
@@ -89,6 +96,7 @@ final class DatastoreImpl extends BaseService<DatastoreOptions> implements Datas
8996
9097 private final com .google .cloud .datastore .telemetry .TraceUtil otelTraceUtil =
9198 getOptions ().getTraceUtil ();
99+ private final MetricsRecorder metricsRecorder = getOptions ().getMetricsRecorder ();
92100
93101 private final ReadOptionProtoPreparer readOptionProtoPreparer ;
94102 private final AggregationQueryExecutor aggregationQueryExecutor ;
@@ -122,63 +130,31 @@ public Transaction newTransaction() {
122130 return new TransactionImpl (this );
123131 }
124132
133+ /**
134+ * A wrapper around {@link ReadWriteTransactionCallable} that adds OpenTelemetry tracing context
135+ * propagation. All transaction logic (begin, run, commit, rollback, metrics recording) is
136+ * delegated to the underlying {@link ReadWriteTransactionCallable}.
137+ */
125138 static class TracedReadWriteTransactionCallable <T > implements Callable <T > {
126- private final Datastore datastore ;
127- private final TransactionCallable <T > callable ;
128- private volatile TransactionOptions options ;
129- private volatile Transaction transaction ;
130-
139+ private final ReadWriteTransactionCallable <T > delegate ;
131140 private final TraceUtil .Span parentSpan ;
132141
133142 TracedReadWriteTransactionCallable (
134- Datastore datastore ,
135- TransactionCallable <T > callable ,
136- TransactionOptions options ,
143+ ReadWriteTransactionCallable <T > delegate ,
137144 @ Nullable com .google .cloud .datastore .telemetry .TraceUtil .Span parentSpan ) {
138- this .datastore = datastore ;
139- this .callable = callable ;
140- this .options = options ;
141- this .transaction = null ;
145+ this .delegate = delegate ;
142146 this .parentSpan = parentSpan ;
143147 }
144148
145- Datastore getDatastore () {
146- return datastore ;
147- }
148-
149- TransactionOptions getOptions () {
150- return options ;
151- }
152-
153- Transaction getTransaction () {
154- return transaction ;
155- }
156-
157- void setPrevTransactionId (ByteString transactionId ) {
158- TransactionOptions .ReadWrite readWrite =
159- TransactionOptions .ReadWrite .newBuilder ().setPreviousTransaction (transactionId ).build ();
160- options = options .toBuilder ().setReadWrite (readWrite ).build ();
149+ ReadWriteTransactionCallable <T > getDelegate () {
150+ return delegate ;
161151 }
162152
163153 @ Override
164154 public T call () throws DatastoreException {
165155 try (io .opentelemetry .context .Scope ignored =
166156 Context .current ().with (parentSpan .getSpan ()).makeCurrent ()) {
167- transaction = datastore .newTransaction (options );
168- T value = callable .run (transaction );
169- transaction .commit ();
170- return value ;
171- } catch (Exception ex ) {
172- transaction .rollback ();
173- throw DatastoreException .propagateUserException (ex );
174- } finally {
175- if (transaction .isActive ()) {
176- transaction .rollback ();
177- }
178- if (options != null
179- && options .getModeCase ().equals (TransactionOptions .ModeCase .READ_WRITE )) {
180- setPrevTransactionId (transaction .getTransactionId ());
181- }
157+ return delegate .call ();
182158 }
183159 }
184160 }
@@ -200,14 +176,19 @@ public boolean isClosed() {
200176 static class ReadWriteTransactionCallable <T > implements Callable <T > {
201177 private final Datastore datastore ;
202178 private final TransactionCallable <T > callable ;
179+ private final MetricsRecorder metricsRecorder ;
203180 private volatile TransactionOptions options ;
204181 private volatile Transaction transaction ;
205182
206183 ReadWriteTransactionCallable (
207- Datastore datastore , TransactionCallable <T > callable , TransactionOptions options ) {
184+ Datastore datastore ,
185+ TransactionCallable <T > callable ,
186+ TransactionOptions options ,
187+ MetricsRecorder metricsRecorder ) {
208188 this .datastore = datastore ;
209189 this .callable = callable ;
210190 this .options = options ;
191+ this .metricsRecorder = metricsRecorder ;
211192 this .transaction = null ;
212193 }
213194
@@ -231,15 +212,28 @@ void setPrevTransactionId(ByteString transactionId) {
231212
232213 @ Override
233214 public T call () throws DatastoreException {
215+ String attemptStatus = StatusCode .Code .UNKNOWN .toString ();
234216 try {
235217 transaction = datastore .newTransaction (options );
236218 T value = callable .run (transaction );
237219 transaction .commit ();
220+ attemptStatus = Status .Code .OK .toString ();
238221 return value ;
239222 } catch (Exception ex ) {
240- transaction .rollback ();
223+ attemptStatus = DatastoreException .extractStatusCode (ex );
224+ // An exception here can originate from either callable.run() (before commit was attempted)
225+ // or from transaction.commit() itself. In both cases the transaction is still active.
226+ // isActive() returns false if the transaction was already committed or rolled back, so
227+ // it is safe to use as the sole guard here without tracking a separate committed flag.
228+ if (transaction .isActive ()) {
229+ transaction .rollback ();
230+ }
241231 throw DatastoreException .propagateUserException (ex );
242232 } finally {
233+ recordAttempt (attemptStatus );
234+ // transaction.isActive() returns false after both a successful commit or a completed
235+ // rollback, so it already guards against rolling back a committed transaction or
236+ // rolling back a transaction that has already been rolled back.
243237 if (transaction .isActive ()) {
244238 transaction .rollback ();
245239 }
@@ -249,50 +243,63 @@ public T call() throws DatastoreException {
249243 }
250244 }
251245 }
246+
247+ /**
248+ * Records a single transaction commit attempt with the given status code. This is called once
249+ * per invocation of {@link #call()}, capturing the outcome of each individual commit attempt.
250+ */
251+ private void recordAttempt (String status ) {
252+ Map <String , String > attributes = new HashMap <>();
253+ attributes .put (TelemetryConstants .ATTRIBUTES_KEY_STATUS , status );
254+ attributes .put (
255+ TelemetryConstants .ATTRIBUTES_KEY_METHOD , TelemetryConstants .METHOD_TRANSACTION_COMMIT );
256+ attributes .put (
257+ TelemetryConstants .ATTRIBUTES_KEY_PROJECT_ID , datastore .getOptions ().getProjectId ());
258+ attributes .put (
259+ TelemetryConstants .ATTRIBUTES_KEY_DATABASE_ID , datastore .getOptions ().getDatabaseId ());
260+ metricsRecorder .recordTransactionAttemptCount (1 , attributes );
261+ }
252262 }
253263
254264 @ Override
255265 public <T > T runInTransaction (final TransactionCallable <T > callable ) {
256- TraceUtil .Span span = otelTraceUtil .startSpan (SPAN_NAME_TRANSACTION_RUN );
257- Callable <T > transactionCallable =
258- (getOptions ().getOpenTelemetryOptions ().isTracingEnabled ()
259- ? new TracedReadWriteTransactionCallable <T >(
260- this , callable , /* transactionOptions= */ null , span )
261- : new ReadWriteTransactionCallable <T >(this , callable , /* transactionOptions= */ null ));
262- try (Scope ignored = span .makeCurrent ()) {
263- return RetryHelper .runWithRetries (
264- transactionCallable ,
265- retrySettings ,
266- TRANSACTION_EXCEPTION_HANDLER ,
267- getOptions ().getClock ());
268- } catch (RetryHelperException e ) {
269- span .end (e );
270- throw DatastoreException .translateAndThrow (e );
271- } finally {
272- span .end ();
273- }
266+ return runInTransaction (callable , /* transactionOptions= */ null );
274267 }
275268
276269 @ Override
277270 public <T > T runInTransaction (
278271 final TransactionCallable <T > callable , TransactionOptions transactionOptions ) {
279272 TraceUtil .Span span = otelTraceUtil .startSpan (SPAN_NAME_TRANSACTION_RUN );
280273
281- Callable <T > transactionCallable =
282- (getOptions ().getOpenTelemetryOptions ().isTracingEnabled ()
283- ? new TracedReadWriteTransactionCallable <T >(this , callable , transactionOptions , span )
284- : new ReadWriteTransactionCallable <T >(this , callable , transactionOptions ));
274+ ReadWriteTransactionCallable <T > baseCallable =
275+ new ReadWriteTransactionCallable <>(this , callable , transactionOptions , metricsRecorder );
276+
277+ Callable <T > transactionCallable = baseCallable ;
278+ if (getOptions ().getOpenTelemetryOptions ().isTracingEnabled ()) {
279+ transactionCallable = new TracedReadWriteTransactionCallable <>(baseCallable , span );
280+ }
285281
282+ String status = StatusCode .Code .OK .toString ();
283+ Stopwatch stopwatch = Stopwatch .createStarted ();
286284 try (Scope ignored = span .makeCurrent ()) {
287285 return RetryHelper .runWithRetries (
288286 transactionCallable ,
289287 retrySettings ,
290288 TRANSACTION_EXCEPTION_HANDLER ,
291289 getOptions ().getClock ());
292290 } catch (RetryHelperException e ) {
291+ status = DatastoreException .extractStatusCode (e );
293292 span .end (e );
294293 throw DatastoreException .translateAndThrow (e );
295294 } finally {
295+ long latencyMs = stopwatch .elapsed (TimeUnit .MILLISECONDS );
296+ Map <String , String > attributes = new HashMap <>();
297+ attributes .put (TelemetryConstants .ATTRIBUTES_KEY_STATUS , status );
298+ attributes .put (
299+ TelemetryConstants .ATTRIBUTES_KEY_METHOD , TelemetryConstants .METHOD_TRANSACTION_RUN );
300+ attributes .put (TelemetryConstants .ATTRIBUTES_KEY_PROJECT_ID , getOptions ().getProjectId ());
301+ attributes .put (TelemetryConstants .ATTRIBUTES_KEY_DATABASE_ID , getOptions ().getDatabaseId ());
302+ metricsRecorder .recordTransactionLatency (latencyMs , attributes );
296303 span .end ();
297304 }
298305 }
0 commit comments