3939import com .google .cloud .spanner .spi .v1 .SpannerRpc .Option ;
4040import com .google .common .annotations .VisibleForTesting ;
4141import com .google .common .base .Preconditions ;
42+ import com .google .common .base .Supplier ;
4243import com .google .common .collect .ImmutableMap ;
4344import com .google .common .util .concurrent .MoreExecutors ;
4445import com .google .protobuf .ByteString ;
5960import java .util .EnumMap ;
6061import java .util .Map ;
6162import java .util .concurrent .ThreadLocalRandom ;
63+ import java .util .concurrent .atomic .AtomicBoolean ;
64+ import java .util .concurrent .atomic .AtomicInteger ;
6265import java .util .concurrent .atomic .AtomicLong ;
66+ import java .util .concurrent .locks .Condition ;
67+ import java .util .concurrent .locks .ReentrantLock ;
6368import java .util .logging .Logger ;
6469import javax .annotation .Nullable ;
6570import javax .annotation .concurrent .GuardedBy ;
@@ -345,14 +350,17 @@ static Builder newBuilder() {
345350 }
346351
347352 private TimestampBound bound ;
348- private final Object txnLock = new Object ();
353+ private final ReentrantLock txnLock = new ReentrantLock ();
354+ private final Condition hasNoPendingStarts = txnLock .newCondition ();
349355
350356 @ GuardedBy ("txnLock" )
351357 private Timestamp timestamp ;
352358
353359 @ GuardedBy ("txnLock" )
354360 private ByteString transactionId ;
355361
362+ private final AtomicInteger pendingStarts = new AtomicInteger (0 );
363+
356364 private final Map <SpannerRpc .Option , ?> channelHint ;
357365
358366 MultiUseReadOnlyTransaction (Builder builder ) {
@@ -407,6 +415,56 @@ TransactionSelector getTransactionSelector() {
407415 return selector ;
408416 }
409417
418+ private void decrementPendingStartsAndSignal () {
419+ if (pendingStarts .decrementAndGet () == 0 ) {
420+ txnLock .lock ();
421+ try {
422+ hasNoPendingStarts .signalAll ();
423+ } finally {
424+ txnLock .unlock ();
425+ }
426+ }
427+ }
428+
429+ private ListenableAsyncResultSet createAsyncResultSet (
430+ Supplier <ResultSet > resultSetSupplier , int bufferRows ) {
431+ pendingStarts .incrementAndGet ();
432+ // Make sure that we decrement the counter exactly once, either
433+ // when the query is actually executed, or when the result set is closed,
434+ // or if something goes wrong when creating the result set.
435+ final AtomicBoolean decremented = new AtomicBoolean (false );
436+ try {
437+ return new AsyncResultSetImpl (
438+ executorProvider ,
439+ () -> {
440+ try {
441+ return resultSetSupplier .get ();
442+ } finally {
443+ if (decremented .compareAndSet (false , true )) {
444+ decrementPendingStartsAndSignal ();
445+ }
446+ }
447+ },
448+ bufferRows ) {
449+ @ Override
450+ public void close () {
451+ try {
452+ super .close ();
453+ } finally {
454+ if (!isUsed () && decremented .compareAndSet (false , true )) {
455+ decrementPendingStartsAndSignal ();
456+ }
457+ }
458+ }
459+ };
460+ } catch (Throwable t ) {
461+ if (decremented .compareAndSet (false , true )) {
462+ decrementPendingStartsAndSignal ();
463+ }
464+ throw t ;
465+ }
466+ }
467+
410468 @ Override
411469 public ListenableAsyncResultSet readAsync (
412470 String table , KeySet keys , Iterable <String > columns , ReadOption ... options ) {
@@ -415,8 +473,8 @@ public ListenableAsyncResultSet readAsync(
415473 readOptions .hasBufferRows ()
416474 ? readOptions .bufferRows ()
417475 : AsyncResultSetImpl .DEFAULT_BUFFER_SIZE ;
418- return new AsyncResultSetImpl (
419- executorProvider , () -> readInternal (table , null , keys , columns , options ), bufferRows );
476+ return createAsyncResultSet (
477+ () -> readInternal (table , null , keys , columns , options ), bufferRows );
420478 }
421479
422480 @ Override
@@ -427,10 +485,8 @@ public ListenableAsyncResultSet readUsingIndexAsync(
427485 readOptions .hasBufferRows ()
428486 ? readOptions .bufferRows ()
429487 : AsyncResultSetImpl .DEFAULT_BUFFER_SIZE ;
430- return new AsyncResultSetImpl (
431- executorProvider ,
432- () -> readInternal (table , checkNotNull (index ), keys , columns , options ),
433- bufferRows );
488+ return createAsyncResultSet (
489+ () -> readInternal (table , checkNotNull (index ), keys , columns , options ), bufferRows );
434490 }
435491
436492 @ Override
@@ -440,8 +496,7 @@ public ListenableAsyncResultSet executeQueryAsync(Statement statement, QueryOpti
440496 readOptions .hasBufferRows ()
441497 ? readOptions .bufferRows ()
442498 : AsyncResultSetImpl .DEFAULT_BUFFER_SIZE ;
443- return new AsyncResultSetImpl (
444- executorProvider ,
499+ return createAsyncResultSet (
445500 () ->
446501 executeQueryInternal (
447502 statement , com .google .spanner .v1 .ExecuteSqlRequest .QueryMode .NORMAL , options ),
@@ -450,20 +505,38 @@ public ListenableAsyncResultSet executeQueryAsync(Statement statement, QueryOpti
450505
451506 @ Override
452507 public Timestamp getReadTimestamp () {
453- synchronized (txnLock ) {
508+ txnLock .lock ();
509+ try {
454510 assertTimestampAvailable (timestamp != null );
455511 return timestamp ;
512+ } finally {
513+ txnLock .unlock ();
456514 }
457515 }
458516
459517 ByteString getTransactionId () {
460- synchronized (txnLock ) {
518+ txnLock .lock ();
519+ try {
461520 return transactionId ;
521+ } finally {
522+ txnLock .unlock ();
462523 }
463524 }
464525
465526 @ Override
466527 public void close () {
528+ txnLock .lock ();
529+ try {
530+ while (pendingStarts .get () > 0 ) {
531+ try {
532+ hasNoPendingStarts .await ();
533+ } catch (InterruptedException e ) {
534+ throw SpannerExceptionFactory .propagateInterrupt (e );
535+ }
536+ }
537+ } finally {
538+ txnLock .unlock ();
539+ }
467540 ByteString id = getTransactionId ();
468541 if (id != null && !id .isEmpty ()) {
469542 rpc .clearTransactionAndChannelAffinity (id , Option .CHANNEL_HINT .getLong (channelHint ));
@@ -477,7 +550,8 @@ public void close() {
477550 * Multiplexed Session.
478551 */
479552 void initFallbackTransaction () {
480- synchronized (txnLock ) {
553+ txnLock .lock ();
554+ try {
481555 span .addAnnotation ("Creating Transaction" );
482556 TransactionOptions .Builder options = TransactionOptions .newBuilder ();
483557 if (timestamp != null ) {
@@ -494,6 +568,8 @@ void initFallbackTransaction() {
494568 .setOptions (options )
495569 .build ();
496570 initTransactionInternal (request );
571+ } finally {
572+ txnLock .unlock ();
497573 }
498574 }
499575
@@ -507,7 +583,8 @@ void initTransaction() {
507583 // RTT, but optimal if the first read is slow. As the client library is now using streaming
508584 // reads, a possible optimization could be to use the first read in the transaction to begin
509585 // it implicitly.
510- synchronized (txnLock ) {
586+ txnLock .lock ();
587+ try {
511588 if (transactionId != null ) {
512589 return ;
513590 }
@@ -520,6 +597,8 @@ void initTransaction() {
520597 .setOptions (options )
521598 .build ();
522599 initTransactionInternal (request );
600+ } finally {
601+ txnLock .unlock ();
523602 }
524603 }
525604
0 commit comments