diff --git a/java-spanner/.gitignore b/java-spanner/.gitignore deleted file mode 100644 index 722d5e71d93c..000000000000 --- a/java-spanner/.gitignore +++ /dev/null @@ -1 +0,0 @@ -.vscode diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java index 2638bf4b94aa..310e366c0f8b 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java @@ -39,6 +39,7 @@ import com.google.cloud.spanner.spi.v1.SpannerRpc.Option; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; @@ -59,7 +60,11 @@ import java.util.EnumMap; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Logger; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; @@ -345,7 +350,8 @@ static Builder newBuilder() { } private TimestampBound bound; - private final Object txnLock = new Object(); + private final ReentrantLock txnLock = new ReentrantLock(); + private final Condition hasNoPendingStarts = txnLock.newCondition(); @GuardedBy("txnLock") private Timestamp timestamp; @@ -353,6 +359,8 @@ static Builder newBuilder() { @GuardedBy("txnLock") private ByteString transactionId; + private final AtomicInteger pendingStarts = new AtomicInteger(0); + private final Map channelHint; MultiUseReadOnlyTransaction(Builder builder) { @@ -407,6 +415,56 @@ TransactionSelector getTransactionSelector() { return selector; } + private void decrementPendingStartsAndSignal() { + if (pendingStarts.decrementAndGet() == 0) { + txnLock.lock(); + try { + hasNoPendingStarts.signalAll(); + } finally { + txnLock.unlock(); + } + } + } + + private ListenableAsyncResultSet createAsyncResultSet( + Supplier resultSetSupplier, int bufferRows) { + pendingStarts.incrementAndGet(); + // Make sure that we decrement the counter exactly once, either + // when the query is actually executed, or when the result set is closed, + // or if something goes wrong when creating the result set. + final AtomicBoolean decremented = new AtomicBoolean(false); + try { + return new AsyncResultSetImpl( + executorProvider, + () -> { + try { + return resultSetSupplier.get(); + } finally { + if (decremented.compareAndSet(false, true)) { + decrementPendingStartsAndSignal(); + } + } + }, + bufferRows) { + @Override + public void close() { + try { + super.close(); + } finally { + if (!isUsed() && decremented.compareAndSet(false, true)) { + decrementPendingStartsAndSignal(); + } + } + } + }; + } catch (Throwable t) { + if (decremented.compareAndSet(false, true)) { + decrementPendingStartsAndSignal(); + } + throw t; + } + } + @Override public ListenableAsyncResultSet readAsync( String table, KeySet keys, Iterable columns, ReadOption... options) { @@ -415,8 +473,8 @@ public ListenableAsyncResultSet readAsync( readOptions.hasBufferRows() ? readOptions.bufferRows() : AsyncResultSetImpl.DEFAULT_BUFFER_SIZE; - return new AsyncResultSetImpl( - executorProvider, () -> readInternal(table, null, keys, columns, options), bufferRows); + return createAsyncResultSet( + () -> readInternal(table, null, keys, columns, options), bufferRows); } @Override @@ -427,10 +485,8 @@ public ListenableAsyncResultSet readUsingIndexAsync( readOptions.hasBufferRows() ? readOptions.bufferRows() : AsyncResultSetImpl.DEFAULT_BUFFER_SIZE; - return new AsyncResultSetImpl( - executorProvider, - () -> readInternal(table, checkNotNull(index), keys, columns, options), - bufferRows); + return createAsyncResultSet( + () -> readInternal(table, checkNotNull(index), keys, columns, options), bufferRows); } @Override @@ -440,8 +496,7 @@ public ListenableAsyncResultSet executeQueryAsync(Statement statement, QueryOpti readOptions.hasBufferRows() ? readOptions.bufferRows() : AsyncResultSetImpl.DEFAULT_BUFFER_SIZE; - return new AsyncResultSetImpl( - executorProvider, + return createAsyncResultSet( () -> executeQueryInternal( statement, com.google.spanner.v1.ExecuteSqlRequest.QueryMode.NORMAL, options), @@ -450,20 +505,38 @@ public ListenableAsyncResultSet executeQueryAsync(Statement statement, QueryOpti @Override public Timestamp getReadTimestamp() { - synchronized (txnLock) { + txnLock.lock(); + try { assertTimestampAvailable(timestamp != null); return timestamp; + } finally { + txnLock.unlock(); } } ByteString getTransactionId() { - synchronized (txnLock) { + txnLock.lock(); + try { return transactionId; + } finally { + txnLock.unlock(); } } @Override public void close() { + txnLock.lock(); + try { + while (pendingStarts.get() > 0) { + try { + hasNoPendingStarts.await(); + } catch (InterruptedException e) { + throw SpannerExceptionFactory.propagateInterrupt(e); + } + } + } finally { + txnLock.unlock(); + } ByteString id = getTransactionId(); if (id != null && !id.isEmpty()) { rpc.clearTransactionAffinity(id); @@ -477,7 +550,8 @@ public void close() { * Multiplexed Session. */ void initFallbackTransaction() { - synchronized (txnLock) { + txnLock.lock(); + try { span.addAnnotation("Creating Transaction"); TransactionOptions.Builder options = TransactionOptions.newBuilder(); if (timestamp != null) { @@ -494,6 +568,8 @@ void initFallbackTransaction() { .setOptions(options) .build(); initTransactionInternal(request); + } finally { + txnLock.unlock(); } } @@ -507,7 +583,8 @@ void initTransaction() { // RTT, but optimal if the first read is slow. As the client library is now using streaming // reads, a possible optimization could be to use the first read in the transaction to begin // it implicitly. - synchronized (txnLock) { + txnLock.lock(); + try { if (transactionId != null) { return; } @@ -520,6 +597,8 @@ void initTransaction() { .setOptions(options) .build(); initTransactionInternal(request); + } finally { + txnLock.unlock(); } } diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java index d2e4c6a00fb8..3dd5724532b3 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java @@ -176,6 +176,12 @@ private AsyncResultSetImpl( this.buffer = new LinkedBlockingDeque<>(bufferSize); } + boolean isUsed() { + synchronized (monitor) { + return state != State.INITIALIZED; + } + } + /** * Closes the {@link AsyncResultSet}. {@link #close()} is non-blocking and may be called multiple * times without side effects. An {@link AsyncResultSet} may be closed before all rows have been diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractAsyncTransactionTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractAsyncTransactionTest.java index 0474a807d2b6..acf126cf0691 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractAsyncTransactionTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractAsyncTransactionTest.java @@ -47,8 +47,8 @@ /** Base class for {@link AsyncRunnerTest} and {@link AsyncTransactionManagerTest}. */ public abstract class AbstractAsyncTransactionTest { static MockSpannerServiceImpl mockSpanner; - private static Server server; - private static InetSocketAddress address; + static Server server; + static InetSocketAddress address; static ExecutorService executor; Spanner spanner; diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncReadOnlyTransactionTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncReadOnlyTransactionTest.java index 9d38316ce802..f92d63dad175 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncReadOnlyTransactionTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncReadOnlyTransactionTest.java @@ -17,11 +17,20 @@ package com.google.cloud.spanner; import static com.google.cloud.spanner.MockSpannerTestUtil.READ_ONE_KEY_VALUE_STATEMENT; +import static com.google.cloud.spanner.MockSpannerTestUtil.TEST_DATABASE; +import static com.google.cloud.spanner.MockSpannerTestUtil.TEST_INSTANCE; +import static com.google.cloud.spanner.MockSpannerTestUtil.TEST_PROJECT; import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import com.google.cloud.NoCredentials; import com.google.spanner.v1.BeginTransactionRequest; import com.google.spanner.v1.ExecuteSqlRequest; +import io.grpc.ManagedChannelBuilder; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.junit.Test; @@ -148,4 +157,39 @@ public void testMultipleQueriesOnlyCallsBeginTransactionOnce() throws Exception BeginTransactionRequest.class, ExecuteSqlRequest.class, ExecuteSqlRequest.class); } } + + @Test(timeout = 5000) + public void createAsyncResultSet_handlesExceptionCorrectly() throws Exception { + SpannerOptions.CloseableExecutorProvider mockExecutorProvider = + mock(SpannerOptions.CloseableExecutorProvider.class); + when(mockExecutorProvider.getExecutor()) + .thenThrow(new RuntimeException("Failed to get executor")); + + String endpoint = address.getHostString() + ":" + server.getPort(); + SpannerOptions options = + SpannerOptions.newBuilder() + .setProjectId(TEST_PROJECT) + .setChannelConfigurator(ManagedChannelBuilder::usePlaintext) + .setHost("http://" + endpoint) + .setCredentials(NoCredentials.getInstance()) + .setAsyncExecutorProvider(mockExecutorProvider) + .setSessionPoolOption( + SessionPoolOptions.newBuilder() + .setFailOnSessionLeak() + .setWaitForMinSessions(org.threeten.bp.Duration.ofSeconds(2)) + .build()) + .build(); + + try (Spanner testSpanner = options.getService()) { + DatabaseClient client = + testSpanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + try (ReadOnlyTransaction transaction = client.readOnlyTransaction()) { + RuntimeException e = + assertThrows( + RuntimeException.class, + () -> transaction.executeQueryAsync(READ_ONE_KEY_VALUE_STATEMENT)); + assertEquals("Failed to get executor", e.getMessage()); + } + } + } }