Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion java-spanner/.gitignore

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.google.cloud.spanner.spi.v1.SpannerRpc.Option;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
Expand All @@ -59,7 +60,10 @@
import java.util.EnumMap;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
Expand Down Expand Up @@ -345,14 +349,17 @@ static Builder newBuilder() {
}

private TimestampBound bound;
private final Object txnLock = new Object();
private final ReentrantLock txnLock = new ReentrantLock();
private final Condition hasNoPendingStarts = txnLock.newCondition();

@GuardedBy("txnLock")
private Timestamp timestamp;

@GuardedBy("txnLock")
private ByteString transactionId;

private final AtomicInteger pendingStarts = new AtomicInteger(0);

private final Map<SpannerRpc.Option, ?> channelHint;

MultiUseReadOnlyTransaction(Builder builder) {
Expand Down Expand Up @@ -407,6 +414,28 @@ TransactionSelector getTransactionSelector() {
return selector;
}

private ListenableAsyncResultSet createAsyncResultSet(
Supplier<ResultSet> resultSetSupplier, int bufferRows) {
pendingStarts.incrementAndGet();
return new AsyncResultSetImpl(
executorProvider,
() -> {
try {
return resultSetSupplier.get();
} finally {
if (pendingStarts.decrementAndGet() == 0) {
txnLock.lock();
try {
hasNoPendingStarts.signalAll();
} finally {
txnLock.unlock();
}
}
}
},
bufferRows);
Comment thread
olavloite marked this conversation as resolved.
Outdated
}

@Override
public ListenableAsyncResultSet readAsync(
String table, KeySet keys, Iterable<String> columns, ReadOption... options) {
Expand All @@ -415,8 +444,8 @@ public ListenableAsyncResultSet readAsync(
readOptions.hasBufferRows()
? readOptions.bufferRows()
: AsyncResultSetImpl.DEFAULT_BUFFER_SIZE;
return new AsyncResultSetImpl(
executorProvider, () -> readInternal(table, null, keys, columns, options), bufferRows);
return createAsyncResultSet(
() -> readInternal(table, null, keys, columns, options), bufferRows);
}

@Override
Expand All @@ -427,10 +456,8 @@ public ListenableAsyncResultSet readUsingIndexAsync(
readOptions.hasBufferRows()
? readOptions.bufferRows()
: AsyncResultSetImpl.DEFAULT_BUFFER_SIZE;
return new AsyncResultSetImpl(
executorProvider,
() -> readInternal(table, checkNotNull(index), keys, columns, options),
bufferRows);
return createAsyncResultSet(
() -> readInternal(table, checkNotNull(index), keys, columns, options), bufferRows);
}

@Override
Expand All @@ -440,8 +467,7 @@ public ListenableAsyncResultSet executeQueryAsync(Statement statement, QueryOpti
readOptions.hasBufferRows()
? readOptions.bufferRows()
: AsyncResultSetImpl.DEFAULT_BUFFER_SIZE;
return new AsyncResultSetImpl(
executorProvider,
return createAsyncResultSet(
() ->
executeQueryInternal(
statement, com.google.spanner.v1.ExecuteSqlRequest.QueryMode.NORMAL, options),
Expand All @@ -450,20 +476,38 @@ public ListenableAsyncResultSet executeQueryAsync(Statement statement, QueryOpti

@Override
public Timestamp getReadTimestamp() {
synchronized (txnLock) {
txnLock.lock();
try {
assertTimestampAvailable(timestamp != null);
return timestamp;
} finally {
txnLock.unlock();
}
}

ByteString getTransactionId() {
synchronized (txnLock) {
txnLock.lock();
try {
return transactionId;
} finally {
txnLock.unlock();
}
}

@Override
public void close() {
txnLock.lock();
try {
while (pendingStarts.get() > 0) {
try {
hasNoPendingStarts.await();
} catch (InterruptedException e) {
throw SpannerExceptionFactory.propagateInterrupt(e);
}
}
} finally {
txnLock.unlock();
}
ByteString id = getTransactionId();
if (id != null && !id.isEmpty()) {
rpc.clearTransactionAffinity(id);
Expand All @@ -477,7 +521,8 @@ public void close() {
* Multiplexed Session.
*/
void initFallbackTransaction() {
synchronized (txnLock) {
txnLock.lock();
try {
span.addAnnotation("Creating Transaction");
TransactionOptions.Builder options = TransactionOptions.newBuilder();
if (timestamp != null) {
Expand All @@ -494,6 +539,8 @@ void initFallbackTransaction() {
.setOptions(options)
.build();
initTransactionInternal(request);
} finally {
txnLock.unlock();
}
}

Expand All @@ -507,7 +554,8 @@ void initTransaction() {
// RTT, but optimal if the first read is slow. As the client library is now using streaming
// reads, a possible optimization could be to use the first read in the transaction to begin
// it implicitly.
synchronized (txnLock) {
txnLock.lock();
try {
if (transactionId != null) {
return;
}
Expand All @@ -520,6 +568,8 @@ void initTransaction() {
.setOptions(options)
.build();
initTransactionInternal(request);
} finally {
txnLock.unlock();
}
}

Expand Down
Loading