3838
3939import com .google .api .core .BetaApi ;
4040import com .google .api .gax .retrying .RetrySettings ;
41+ import com .google .api .gax .rpc .StatusCode ;
4142import com .google .cloud .BaseService ;
4243import com .google .cloud .ExceptionHandler ;
4344import com .google .cloud .RetryHelper ;
4445import com .google .cloud .RetryHelper .RetryHelperException ;
4546import com .google .cloud .ServiceOptions ;
47+ import com .google .cloud .TransportOptions ;
4648import com .google .cloud .datastore .execution .AggregationQueryExecutor ;
4749import com .google .cloud .datastore .spi .v1 .DatastoreRpc ;
50+ import com .google .cloud .datastore .telemetry .MetricsRecorder ;
51+ import com .google .cloud .datastore .telemetry .TelemetryConstants ;
4852import com .google .cloud .datastore .telemetry .TraceUtil ;
4953import com .google .cloud .datastore .telemetry .TraceUtil .Scope ;
5054import com .google .common .base .MoreObjects ;
5155import com .google .common .base .Preconditions ;
56+ import com .google .common .base .Stopwatch ;
5257import com .google .common .collect .AbstractIterator ;
5358import com .google .common .collect .ImmutableList ;
5459import com .google .common .collect .ImmutableMap ;
6166import com .google .datastore .v1 .RunQueryResponse ;
6267import com .google .datastore .v1 .TransactionOptions ;
6368import com .google .protobuf .ByteString ;
69+ import io .grpc .Status ;
6470import io .opentelemetry .context .Context ;
6571import java .util .ArrayList ;
6672import java .util .Arrays ;
6773import java .util .Collections ;
74+ import java .util .HashMap ;
6875import java .util .Iterator ;
6976import java .util .LinkedHashMap ;
7077import java .util .LinkedHashSet ;
7380import java .util .Optional ;
7481import java .util .Set ;
7582import java .util .concurrent .Callable ;
83+ import java .util .concurrent .TimeUnit ;
7684import java .util .logging .Level ;
7785import java .util .logging .Logger ;
7886import javax .annotation .Nullable ;
@@ -89,6 +97,7 @@ final class DatastoreImpl extends BaseService<DatastoreOptions> implements Datas
8997
9098 private final com .google .cloud .datastore .telemetry .TraceUtil otelTraceUtil =
9199 getOptions ().getTraceUtil ();
100+ private final MetricsRecorder metricsRecorder = getOptions ().getMetricsRecorder ();
92101
93102 private final ReadOptionProtoPreparer readOptionProtoPreparer ;
94103 private final AggregationQueryExecutor aggregationQueryExecutor ;
@@ -122,63 +131,28 @@ public Transaction newTransaction() {
122131 return new TransactionImpl (this );
123132 }
124133
134+ /**
135+ * A Tracing callable that adds OpenTelemetry tracing context. Intended to be used for
136+ * transactions and wraps {@link ReadWriteTransactionCallable} as the delegate. All transaction
137+ * logic (begin, run, commit, rollback, metrics recording) is handled in the delegate (this solely
138+ * handles tracing).
139+ */
125140 static class TracedReadWriteTransactionCallable <T > implements Callable <T > {
126- private final Datastore datastore ;
127- private final TransactionCallable <T > callable ;
128- private volatile TransactionOptions options ;
129- private volatile Transaction transaction ;
130-
141+ private final ReadWriteTransactionCallable <T > delegate ;
131142 private final TraceUtil .Span parentSpan ;
132143
133144 TracedReadWriteTransactionCallable (
134- Datastore datastore ,
135- TransactionCallable <T > callable ,
136- TransactionOptions options ,
145+ ReadWriteTransactionCallable <T > delegate ,
137146 @ Nullable com .google .cloud .datastore .telemetry .TraceUtil .Span parentSpan ) {
138- this .datastore = datastore ;
139- this .callable = callable ;
140- this .options = options ;
141- this .transaction = null ;
147+ this .delegate = delegate ;
142148 this .parentSpan = parentSpan ;
143149 }
144150
145- Datastore getDatastore () {
146- return datastore ;
147- }
148-
149- TransactionOptions getOptions () {
150- return options ;
151- }
152-
153- Transaction getTransaction () {
154- return transaction ;
155- }
156-
157- void setPrevTransactionId (ByteString transactionId ) {
158- TransactionOptions .ReadWrite readWrite =
159- TransactionOptions .ReadWrite .newBuilder ().setPreviousTransaction (transactionId ).build ();
160- options = options .toBuilder ().setReadWrite (readWrite ).build ();
161- }
162-
163151 @ Override
164152 public T call () throws DatastoreException {
165153 try (io .opentelemetry .context .Scope ignored =
166154 Context .current ().with (parentSpan .getSpan ()).makeCurrent ()) {
167- transaction = datastore .newTransaction (options );
168- T value = callable .run (transaction );
169- transaction .commit ();
170- return value ;
171- } catch (Exception ex ) {
172- transaction .rollback ();
173- throw DatastoreException .propagateUserException (ex );
174- } finally {
175- if (transaction .isActive ()) {
176- transaction .rollback ();
177- }
178- if (options != null
179- && options .getModeCase ().equals (TransactionOptions .ModeCase .READ_WRITE )) {
180- setPrevTransactionId (transaction .getTransactionId ());
181- }
155+ return delegate .call ();
182156 }
183157 }
184158 }
@@ -200,29 +174,22 @@ public boolean isClosed() {
200174 static class ReadWriteTransactionCallable <T > implements Callable <T > {
201175 private final Datastore datastore ;
202176 private final TransactionCallable <T > callable ;
177+ private final MetricsRecorder metricsRecorder ;
203178 private volatile TransactionOptions options ;
204179 private volatile Transaction transaction ;
205180
206181 ReadWriteTransactionCallable (
207- Datastore datastore , TransactionCallable <T > callable , TransactionOptions options ) {
182+ Datastore datastore ,
183+ TransactionCallable <T > callable ,
184+ TransactionOptions options ,
185+ MetricsRecorder metricsRecorder ) {
208186 this .datastore = datastore ;
209187 this .callable = callable ;
210188 this .options = options ;
189+ this .metricsRecorder = metricsRecorder ;
211190 this .transaction = null ;
212191 }
213192
214- Datastore getDatastore () {
215- return datastore ;
216- }
217-
218- TransactionOptions getOptions () {
219- return options ;
220- }
221-
222- Transaction getTransaction () {
223- return transaction ;
224- }
225-
226193 void setPrevTransactionId (ByteString transactionId ) {
227194 TransactionOptions .ReadWrite readWrite =
228195 TransactionOptions .ReadWrite .newBuilder ().setPreviousTransaction (transactionId ).build ();
@@ -231,15 +198,28 @@ void setPrevTransactionId(ByteString transactionId) {
231198
232199 @ Override
233200 public T call () throws DatastoreException {
201+ String attemptStatus = StatusCode .Code .UNKNOWN .toString ();
234202 try {
235203 transaction = datastore .newTransaction (options );
236204 T value = callable .run (transaction );
237205 transaction .commit ();
206+ attemptStatus = Status .Code .OK .toString ();
238207 return value ;
239208 } catch (Exception ex ) {
240- transaction .rollback ();
209+ attemptStatus = DatastoreException .extractStatusCode (ex );
210+ // An exception here can stem from either `callable.run()` (before commit was attempted)
211+ // or from `transaction.commit()`. If there is an exception thrown from either call site,
212+ // then the transaction is still active. Check if it is still active (e.g. not commited)
213+ // and roll back the transaction.
214+ if (transaction .isActive ()) {
215+ transaction .rollback ();
216+ }
241217 throw DatastoreException .propagateUserException (ex );
242218 } finally {
219+ recordAttempt (attemptStatus , datastore .getOptions ().getTransportOptions ());
220+ // If the transaction is active, then commit the rollback. If it was already successfully
221+ // rolled back, the transaction is inactive (prevents rolling back an already rolled back
222+ // transaction).
243223 if (transaction .isActive ()) {
244224 transaction .rollback ();
245225 }
@@ -249,50 +229,69 @@ public T call() throws DatastoreException {
249229 }
250230 }
251231 }
232+
233+ /**
234+ * Records a single transaction commit attempt with the given status code. This is called once
235+ * per invocation of {@link #call()}, capturing the outcome of each individual commit attempt.
236+ */
237+ private void recordAttempt (String status , TransportOptions transportOptions ) {
238+ Map <String , String > attributes = new HashMap <>();
239+ attributes .put (TelemetryConstants .ATTRIBUTES_KEY_STATUS , status );
240+ attributes .put (
241+ TelemetryConstants .ATTRIBUTES_KEY_METHOD , TelemetryConstants .METHOD_TRANSACTION_COMMIT );
242+ attributes .put (
243+ TelemetryConstants .ATTRIBUTES_KEY_PROJECT_ID , datastore .getOptions ().getProjectId ());
244+ attributes .put (
245+ TelemetryConstants .ATTRIBUTES_KEY_DATABASE_ID , datastore .getOptions ().getDatabaseId ());
246+ attributes .put (
247+ TelemetryConstants .ATTRIBUTES_KEY_TRANSPORT ,
248+ TelemetryConstants .getTransportName (transportOptions ));
249+ metricsRecorder .recordTransactionAttemptCount (1 , attributes );
250+ }
252251 }
253252
254253 @ Override
255254 public <T > T runInTransaction (final TransactionCallable <T > callable ) {
256- TraceUtil .Span span = otelTraceUtil .startSpan (SPAN_NAME_TRANSACTION_RUN );
257- Callable <T > transactionCallable =
258- (getOptions ().getOpenTelemetryOptions ().isTracingEnabled ()
259- ? new TracedReadWriteTransactionCallable <T >(
260- this , callable , /* transactionOptions= */ null , span )
261- : new ReadWriteTransactionCallable <T >(this , callable , /* transactionOptions= */ null ));
262- try (Scope ignored = span .makeCurrent ()) {
263- return RetryHelper .runWithRetries (
264- transactionCallable ,
265- retrySettings ,
266- TRANSACTION_EXCEPTION_HANDLER ,
267- getOptions ().getClock ());
268- } catch (RetryHelperException e ) {
269- span .end (e );
270- throw DatastoreException .translateAndThrow (e );
271- } finally {
272- span .end ();
273- }
255+ return runInTransaction (callable , /* transactionOptions= */ null );
274256 }
275257
276258 @ Override
277259 public <T > T runInTransaction (
278260 final TransactionCallable <T > callable , TransactionOptions transactionOptions ) {
279261 TraceUtil .Span span = otelTraceUtil .startSpan (SPAN_NAME_TRANSACTION_RUN );
280262
281- Callable <T > transactionCallable =
282- (getOptions ().getOpenTelemetryOptions ().isTracingEnabled ()
283- ? new TracedReadWriteTransactionCallable <T >(this , callable , transactionOptions , span )
284- : new ReadWriteTransactionCallable <T >(this , callable , transactionOptions ));
263+ ReadWriteTransactionCallable <T > baseCallable =
264+ new ReadWriteTransactionCallable <>(this , callable , transactionOptions , metricsRecorder );
265+
266+ Callable <T > transactionCallable = baseCallable ;
267+ if (getOptions ().getOpenTelemetryOptions ().isTracingEnabled ()) {
268+ transactionCallable = new TracedReadWriteTransactionCallable <>(baseCallable , span );
269+ }
285270
271+ String status = StatusCode .Code .OK .toString ();
272+ Stopwatch stopwatch = Stopwatch .createStarted ();
286273 try (Scope ignored = span .makeCurrent ()) {
287274 return RetryHelper .runWithRetries (
288275 transactionCallable ,
289276 retrySettings ,
290277 TRANSACTION_EXCEPTION_HANDLER ,
291278 getOptions ().getClock ());
292279 } catch (RetryHelperException e ) {
280+ status = DatastoreException .extractStatusCode (e );
293281 span .end (e );
294282 throw DatastoreException .translateAndThrow (e );
295283 } finally {
284+ long latencyMs = stopwatch .elapsed (TimeUnit .MILLISECONDS );
285+ Map <String , String > attributes = new HashMap <>();
286+ attributes .put (TelemetryConstants .ATTRIBUTES_KEY_STATUS , status );
287+ attributes .put (
288+ TelemetryConstants .ATTRIBUTES_KEY_METHOD , TelemetryConstants .METHOD_TRANSACTION_RUN );
289+ attributes .put (TelemetryConstants .ATTRIBUTES_KEY_PROJECT_ID , getOptions ().getProjectId ());
290+ attributes .put (TelemetryConstants .ATTRIBUTES_KEY_DATABASE_ID , getOptions ().getDatabaseId ());
291+ attributes .put (
292+ TelemetryConstants .ATTRIBUTES_KEY_TRANSPORT ,
293+ TelemetryConstants .getTransportName (getOptions ().getTransportOptions ()));
294+ metricsRecorder .recordTransactionLatency (latencyMs , attributes );
296295 span .end ();
297296 }
298297 }
0 commit comments