5959import java .util .Collections ;
6060import java .util .EnumMap ;
6161import java .util .Map ;
62+ import java .util .concurrent .ExecutionException ;
6263import java .util .concurrent .ThreadLocalRandom ;
64+ import java .util .concurrent .TimeUnit ;
65+ import java .util .concurrent .TimeoutException ;
6366import java .util .concurrent .atomic .AtomicBoolean ;
6467import java .util .concurrent .atomic .AtomicInteger ;
6568import java .util .concurrent .atomic .AtomicLong ;
@@ -321,6 +324,8 @@ static class Builder extends AbstractReadContext.Builder<Builder, MultiUseReadOn
321324 private TimestampBound bound ;
322325 private Timestamp timestamp ;
323326 private ByteString transactionId ;
327+ private Options .BeginTransactionOption beginTransactionOption =
328+ Options .BeginTransactionOption .EXPLICIT ;
324329
325330 private Builder () {}
326331
@@ -339,6 +344,11 @@ Builder setTransactionId(ByteString transactionId) {
339344 return this ;
340345 }
341346
347+ Builder setBeginTransactionOption (Options .BeginTransactionOption beginTransactionOption ) {
348+ this .beginTransactionOption = beginTransactionOption ;
349+ return this ;
350+ }
351+
342352 @ Override
343353 MultiUseReadOnlyTransaction build () {
344354 return new MultiUseReadOnlyTransaction (this );
@@ -359,9 +369,15 @@ static Builder newBuilder() {
359369 @ GuardedBy ("txnLock" )
360370 private ByteString transactionId ;
361371
372+ @ GuardedBy ("txnLock" )
373+ private SettableApiFuture <ByteString > transactionIdFuture ;
374+
362375 private final AtomicInteger pendingStarts = new AtomicInteger (0 );
363376
377+ private static final long WAIT_FOR_INLINE_BEGIN_TIMEOUT_MILLIS = 60_000L ;
378+
364379 private final Map <SpannerRpc .Option , ?> channelHint ;
380+ private final Options .BeginTransactionOption beginTransactionOption ;
365381
366382 MultiUseReadOnlyTransaction (Builder builder ) {
367383 super (builder );
@@ -386,6 +402,7 @@ static Builder newBuilder() {
386402 session .getOptions (),
387403 ThreadLocalRandom .current ().nextLong (Long .MAX_VALUE ),
388404 session .getSpanner ().getOptions ().isGrpcGcpExtensionEnabled ());
405+ this .beginTransactionOption = builder .beginTransactionOption ;
389406 }
390407
391408 @ Override
@@ -398,21 +415,68 @@ protected boolean isRouteToLeader() {
398415 return false ;
399416 }
400417
418+ private boolean shouldUseInlinedBegin () {
419+ return beginTransactionOption == Options .BeginTransactionOption .INLINE ;
420+ }
421+
401422 @ Override
402423 void beforeReadOrQuery () {
403424 super .beforeReadOrQuery ();
404- initTransaction ();
425+ if (shouldUseInlinedBegin ()) {
426+ // Keep the same nested transaction guard as the explicit BeginTransaction path. This checks
427+ // TransactionRunner's thread-local pending state, not the session's active transaction.
428+ SessionImpl .throwIfTransactionsPending ();
429+ } else {
430+ initTransaction ();
431+ }
405432 }
406433
407434 @ Override
408435 @ Nullable
409436 TransactionSelector getTransactionSelector () {
410- // No need for synchronization: super.readInternal() is always preceded by a check of
411- // "transactionId" that provides a happens-before from initialization, and the value is never
412- // changed afterwards.
413- @ SuppressWarnings ("GuardedByChecker" )
414- TransactionSelector selector = TransactionSelector .newBuilder ().setId (transactionId ).build ();
415- return selector ;
437+ if (!shouldUseInlinedBegin ()) {
438+ // No need for synchronization: super.readInternal() is always preceded by a check of
439+ // "transactionId" that provides a happens-before from initialization, and the value is
440+ // never changed afterwards.
441+ @ SuppressWarnings ("GuardedByChecker" )
442+ TransactionSelector selector =
443+ TransactionSelector .newBuilder ().setId (transactionId ).build ();
444+ return selector ;
445+ }
446+
447+ ApiFuture <ByteString > futureToWaitFor = null ;
448+ txnLock .lock ();
449+ try {
450+ if (transactionId != null ) {
451+ return TransactionSelector .newBuilder ().setId (transactionId ).build ();
452+ }
453+ if (transactionIdFuture == null ) {
454+ transactionIdFuture = SettableApiFuture .create ();
455+ return TransactionSelector .newBuilder ()
456+ .setBegin (createReadOnlyTransactionOptions ())
457+ .build ();
458+ }
459+ futureToWaitFor = transactionIdFuture ;
460+ } finally {
461+ txnLock .unlock ();
462+ }
463+
464+ try {
465+ return TransactionSelector .newBuilder ()
466+ .setId (futureToWaitFor .get (WAIT_FOR_INLINE_BEGIN_TIMEOUT_MILLIS , TimeUnit .MILLISECONDS ))
467+ .build ();
468+ } catch (ExecutionException e ) {
469+ throw SpannerExceptionFactory .asSpannerException (e .getCause ());
470+ } catch (TimeoutException e ) {
471+ throw SpannerExceptionFactory .newSpannerException (
472+ ErrorCode .DEADLINE_EXCEEDED ,
473+ "Timeout while waiting for an inlined read-only transaction to be returned by another"
474+ + " statement." ,
475+ e );
476+ } catch (InterruptedException e ) {
477+ Thread .currentThread ().interrupt ();
478+ throw SpannerExceptionFactory .newSpannerExceptionForCancellation (null , e );
479+ }
416480 }
417481
418482 private void decrementPendingStartsAndSignal () {
@@ -503,6 +567,80 @@ public ListenableAsyncResultSet executeQueryAsync(Statement statement, QueryOpti
503567 bufferRows );
504568 }
505569
570+ @ Override
571+ public void onTransactionMetadata (Transaction transaction , boolean shouldIncludeId ) {
572+ Timestamp readTimestamp = null ;
573+ if (transaction .hasReadTimestamp ()) {
574+ try {
575+ readTimestamp = Timestamp .fromProto (transaction .getReadTimestamp ());
576+ } catch (IllegalArgumentException e ) {
577+ throw SpannerExceptionFactory .newSpannerException (
578+ ErrorCode .INTERNAL , "Bad value in transaction.read_timestamp metadata field" , e );
579+ }
580+ }
581+ if (shouldIncludeId && transaction .getId ().isEmpty ()) {
582+ throw SpannerExceptionFactory .newSpannerException (
583+ ErrorCode .FAILED_PRECONDITION , NO_TRANSACTION_RETURNED_MSG );
584+ }
585+ txnLock .lock ();
586+ try {
587+ if (timestamp == null ) {
588+ if (readTimestamp == null ) {
589+ throw SpannerExceptionFactory .newSpannerException (
590+ ErrorCode .INTERNAL , "Missing expected transaction.read_timestamp metadata field" );
591+ }
592+ timestamp = readTimestamp ;
593+ }
594+ if (shouldIncludeId && transactionId == null ) {
595+ transactionId = transaction .getId ();
596+ if (transactionIdFuture != null && !transactionIdFuture .isDone ()) {
597+ transactionIdFuture .set (transactionId );
598+ }
599+ }
600+ } finally {
601+ txnLock .unlock ();
602+ }
603+ }
604+
605+ @ Override
606+ public SpannerException onError (
607+ SpannerException e , boolean withBeginTransaction , boolean lastStatement ) {
608+ e = super .onError (e , withBeginTransaction , lastStatement );
609+ if (withBeginTransaction ) {
610+ failTransactionIdFuture (e );
611+ }
612+ return e ;
613+ }
614+
615+ @ Override
616+ public void onDone (boolean withBeginTransaction ) {
617+ if (withBeginTransaction ) {
618+ failTransactionIdFuture (
619+ SpannerExceptionFactory .newSpannerException (
620+ ErrorCode .FAILED_PRECONDITION ,
621+ "ResultSet was closed before a read-only transaction id was returned" ));
622+ }
623+ super .onDone (withBeginTransaction );
624+ }
625+
626+ @ Override
627+ void onStartFailed (boolean withBeginTransaction , Throwable t ) {
628+ if (withBeginTransaction ) {
629+ failTransactionIdFuture (t );
630+ }
631+ }
632+
633+ private void failTransactionIdFuture (Throwable t ) {
634+ txnLock .lock ();
635+ try {
636+ if (transactionIdFuture != null && !transactionIdFuture .isDone ()) {
637+ transactionIdFuture .setException (t );
638+ }
639+ } finally {
640+ txnLock .unlock ();
641+ }
642+ }
643+
506644 @ Override
507645 public Timestamp getReadTimestamp () {
508646 txnLock .lock ();
@@ -544,6 +682,19 @@ public void close() {
544682 super .close ();
545683 }
546684
685+ private TransactionOptions createReadOnlyTransactionOptions () {
686+ TransactionOptions .Builder options = TransactionOptions .newBuilder ();
687+ if (timestamp != null ) {
688+ options
689+ .getReadOnlyBuilder ()
690+ .setReadTimestamp (timestamp .toProto ())
691+ .setReturnReadTimestamp (true );
692+ } else {
693+ bound .applyToBuilder (options .getReadOnlyBuilder ()).setReturnReadTimestamp (true );
694+ }
695+ return options .build ();
696+ }
697+
547698 /**
548699 * Initializes the transaction with the timestamp specified within MultiUseReadOnlyTransaction.
549700 * This is used only for fallback of PartitionQueryRequest and PartitionReadRequest with
@@ -553,19 +704,10 @@ void initFallbackTransaction() {
553704 txnLock .lock ();
554705 try {
555706 span .addAnnotation ("Creating Transaction" );
556- TransactionOptions .Builder options = TransactionOptions .newBuilder ();
557- if (timestamp != null ) {
558- options
559- .getReadOnlyBuilder ()
560- .setReadTimestamp (timestamp .toProto ())
561- .setReturnReadTimestamp (true );
562- } else {
563- bound .applyToBuilder (options .getReadOnlyBuilder ()).setReturnReadTimestamp (true );
564- }
565707 final BeginTransactionRequest request =
566708 BeginTransactionRequest .newBuilder ()
567709 .setSession (session .getName ())
568- .setOptions (options )
710+ .setOptions (createReadOnlyTransactionOptions () )
569711 .build ();
570712 initTransactionInternal (request );
571713 } finally {
@@ -589,12 +731,10 @@ void initTransaction() {
589731 return ;
590732 }
591733 span .addAnnotation ("Creating Transaction" );
592- TransactionOptions .Builder options = TransactionOptions .newBuilder ();
593- bound .applyToBuilder (options .getReadOnlyBuilder ()).setReturnReadTimestamp (true );
594734 final BeginTransactionRequest request =
595735 BeginTransactionRequest .newBuilder ()
596736 .setSession (session .getName ())
597- .setOptions (options )
737+ .setOptions (createReadOnlyTransactionOptions () )
598738 .build ();
599739 initTransactionInternal (request );
600740 } finally {
@@ -992,15 +1132,22 @@ CloseableIterator<PartialResultSet> startStream(
9921132 if (selector != null ) {
9931133 request .setTransaction (selector );
9941134 }
995- SpannerRpc .StreamingCall call =
996- rpc .executeQuery (
997- request .build (),
998- stream .consumer (),
999- getTransactionChannelHint (),
1000- requestId ,
1001- isRouteToLeader ());
1135+ boolean withBeginTransaction = request .getTransaction ().hasBegin ();
1136+ SpannerRpc .StreamingCall call ;
1137+ try {
1138+ call =
1139+ rpc .executeQuery (
1140+ request .build (),
1141+ stream .consumer (),
1142+ getTransactionChannelHint (),
1143+ requestId ,
1144+ isRouteToLeader ());
1145+ } catch (RuntimeException | Error t ) {
1146+ onStartFailed (withBeginTransaction , t );
1147+ throw t ;
1148+ }
10021149 session .markUsed (clock .instant ());
1003- stream .setCall (call , request . getTransaction (). hasBegin () );
1150+ stream .setCall (call , withBeginTransaction );
10041151 return stream ;
10051152 }
10061153
@@ -1116,6 +1263,8 @@ public void onDone(boolean withBeginTransaction) {
11161263 this .session .onReadDone ();
11171264 }
11181265
1266+ void onStartFailed (boolean withBeginTransaction , Throwable t ) {}
1267+
11191268 /**
11201269 * For transactions other than read-write, the MultiplexedSessionPrecommitToken will not be
11211270 * present in the RPC response. In such cases, this method will be a no-op.
@@ -1215,15 +1364,22 @@ CloseableIterator<PartialResultSet> startStream(
12151364 builder .setTransaction (selector );
12161365 }
12171366 builder .setRequestOptions (buildRequestOptions (readOptions ));
1218- SpannerRpc .StreamingCall call =
1219- rpc .read (
1220- builder .build (),
1221- stream .consumer (),
1222- getTransactionChannelHint (),
1223- requestId ,
1224- isRouteToLeader ());
1367+ boolean withBeginTransaction = builder .getTransaction ().hasBegin ();
1368+ SpannerRpc .StreamingCall call ;
1369+ try {
1370+ call =
1371+ rpc .read (
1372+ builder .build (),
1373+ stream .consumer (),
1374+ getTransactionChannelHint (),
1375+ requestId ,
1376+ isRouteToLeader ());
1377+ } catch (RuntimeException | Error t ) {
1378+ onStartFailed (withBeginTransaction , t );
1379+ throw t ;
1380+ }
12251381 session .markUsed (clock .instant ());
1226- stream .setCall (call , /* withBeginTransaction= */ builder . getTransaction (). hasBegin () );
1382+ stream .setCall (call , withBeginTransaction );
12271383 return stream ;
12281384 }
12291385
0 commit comments