-
Notifications
You must be signed in to change notification settings - Fork 1.1k
feat(spanner): add option for inline begin with multi-use read only txn #13233
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -59,7 +59,10 @@ | |
| import java.util.Collections; | ||
| import java.util.EnumMap; | ||
| import java.util.Map; | ||
| import java.util.concurrent.ExecutionException; | ||
| import java.util.concurrent.ThreadLocalRandom; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.TimeoutException; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import java.util.concurrent.atomic.AtomicLong; | ||
|
|
@@ -321,6 +324,8 @@ static class Builder extends AbstractReadContext.Builder<Builder, MultiUseReadOn | |
| private TimestampBound bound; | ||
| private Timestamp timestamp; | ||
| private ByteString transactionId; | ||
| private Options.BeginTransactionOption beginTransactionOption = | ||
| Options.BeginTransactionOption.EXPLICIT; | ||
|
|
||
| private Builder() {} | ||
|
|
||
|
|
@@ -339,6 +344,11 @@ Builder setTransactionId(ByteString transactionId) { | |
| return this; | ||
| } | ||
|
|
||
| Builder setBeginTransactionOption(Options.BeginTransactionOption beginTransactionOption) { | ||
| this.beginTransactionOption = beginTransactionOption; | ||
| return this; | ||
| } | ||
|
|
||
| @Override | ||
| MultiUseReadOnlyTransaction build() { | ||
| return new MultiUseReadOnlyTransaction(this); | ||
|
|
@@ -359,9 +369,15 @@ static Builder newBuilder() { | |
| @GuardedBy("txnLock") | ||
| private ByteString transactionId; | ||
|
|
||
| @GuardedBy("txnLock") | ||
| private SettableApiFuture<ByteString> transactionIdFuture; | ||
|
|
||
| private final AtomicInteger pendingStarts = new AtomicInteger(0); | ||
|
|
||
| private static final long WAIT_FOR_INLINE_BEGIN_TIMEOUT_MILLIS = 60_000L; | ||
|
|
||
| private final Map<SpannerRpc.Option, ?> channelHint; | ||
| private final Options.BeginTransactionOption beginTransactionOption; | ||
|
|
||
| MultiUseReadOnlyTransaction(Builder builder) { | ||
| super(builder); | ||
|
|
@@ -386,6 +402,7 @@ static Builder newBuilder() { | |
| session.getOptions(), | ||
| ThreadLocalRandom.current().nextLong(Long.MAX_VALUE), | ||
| session.getSpanner().getOptions().isGrpcGcpExtensionEnabled()); | ||
| this.beginTransactionOption = builder.beginTransactionOption; | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -398,21 +415,67 @@ protected boolean isRouteToLeader() { | |
| return false; | ||
| } | ||
|
|
||
| private boolean shouldUseInlinedBegin() { | ||
| return beginTransactionOption == Options.BeginTransactionOption.INLINE; | ||
| } | ||
|
|
||
| @Override | ||
| void beforeReadOrQuery() { | ||
| super.beforeReadOrQuery(); | ||
| initTransaction(); | ||
| if (shouldUseInlinedBegin()) { | ||
| SessionImpl.throwIfTransactionsPending(); | ||
| } else { | ||
| initTransaction(); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| @Nullable | ||
| TransactionSelector getTransactionSelector() { | ||
| // No need for synchronization: super.readInternal() is always preceded by a check of | ||
| // "transactionId" that provides a happens-before from initialization, and the value is never | ||
| // changed afterwards. | ||
| @SuppressWarnings("GuardedByChecker") | ||
| TransactionSelector selector = TransactionSelector.newBuilder().setId(transactionId).build(); | ||
| return selector; | ||
| if (!shouldUseInlinedBegin()) { | ||
| // No need for synchronization: super.readInternal() is always preceded by a check of | ||
| // "transactionId" that provides a happens-before from initialization, and the value is never | ||
| // changed afterwards. | ||
| @SuppressWarnings("GuardedByChecker") | ||
| TransactionSelector selector = | ||
| TransactionSelector.newBuilder().setId(transactionId).build(); | ||
| return selector; | ||
| } | ||
|
|
||
| ApiFuture<ByteString> futureToWaitFor = null; | ||
| txnLock.lock(); | ||
| try { | ||
| if (transactionId != null) { | ||
| return TransactionSelector.newBuilder().setId(transactionId).build(); | ||
| } | ||
| if (transactionIdFuture == null) { | ||
| transactionIdFuture = SettableApiFuture.create(); | ||
| return TransactionSelector.newBuilder() | ||
| .setBegin(createReadOnlyTransactionOptions()) | ||
| .build(); | ||
| } | ||
|
Comment on lines
+453
to
+458
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is a potential for
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After creating an inline-begin selector, both read and query start paths wrap |
||
| futureToWaitFor = transactionIdFuture; | ||
| } finally { | ||
| txnLock.unlock(); | ||
| } | ||
|
|
||
| try { | ||
| return TransactionSelector.newBuilder() | ||
| .setId( | ||
| futureToWaitFor.get( | ||
| WAIT_FOR_INLINE_BEGIN_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) | ||
|
rahul2393 marked this conversation as resolved.
Outdated
|
||
| .build(); | ||
| } catch (ExecutionException e) { | ||
| throw SpannerExceptionFactory.asSpannerException(e.getCause()); | ||
| } catch (TimeoutException e) { | ||
| throw SpannerExceptionFactory.newSpannerException( | ||
| ErrorCode.ABORTED, | ||
| "Timeout while waiting for an inlined read-only transaction to be returned by another" | ||
| + " statement.", | ||
| e); | ||
|
rahul2393 marked this conversation as resolved.
|
||
| } catch (InterruptedException e) { | ||
| throw SpannerExceptionFactory.newSpannerExceptionForCancellation(null, e); | ||
| } | ||
| } | ||
|
|
||
| private void decrementPendingStartsAndSignal() { | ||
|
|
@@ -503,6 +566,73 @@ public ListenableAsyncResultSet executeQueryAsync(Statement statement, QueryOpti | |
| bufferRows); | ||
| } | ||
|
|
||
| @Override | ||
| public void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId) { | ||
| Timestamp readTimestamp = null; | ||
| if (transaction.hasReadTimestamp()) { | ||
| try { | ||
| readTimestamp = Timestamp.fromProto(transaction.getReadTimestamp()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that this change does introduce a behavioral difference between inline-begin and explicit-begin, albeit one that is perfectly valid. With explicit-begin, you can call
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, updated the comment |
||
| } catch (IllegalArgumentException e) { | ||
| throw SpannerExceptionFactory.newSpannerException( | ||
| ErrorCode.INTERNAL, "Bad value in transaction.read_timestamp metadata field", e); | ||
| } | ||
| } | ||
| if (shouldIncludeId && transaction.getId().isEmpty()) { | ||
| throw SpannerExceptionFactory.newSpannerException( | ||
| ErrorCode.FAILED_PRECONDITION, NO_TRANSACTION_RETURNED_MSG); | ||
| } | ||
| txnLock.lock(); | ||
| try { | ||
| if (timestamp == null) { | ||
| if (readTimestamp == null) { | ||
| throw SpannerExceptionFactory.newSpannerException( | ||
| ErrorCode.INTERNAL, "Missing expected transaction.read_timestamp metadata field"); | ||
| } | ||
| timestamp = readTimestamp; | ||
| } | ||
| if (shouldIncludeId && transactionId == null) { | ||
| transactionId = transaction.getId(); | ||
| if (transactionIdFuture != null && !transactionIdFuture.isDone()) { | ||
| transactionIdFuture.set(transactionId); | ||
| } | ||
| } | ||
| } finally { | ||
| txnLock.unlock(); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public SpannerException onError( | ||
| SpannerException e, boolean withBeginTransaction, boolean lastStatement) { | ||
| e = super.onError(e, withBeginTransaction, lastStatement); | ||
| if (withBeginTransaction) { | ||
| failTransactionIdFuture(e); | ||
| } | ||
| return e; | ||
| } | ||
|
|
||
| @Override | ||
| public void onDone(boolean withBeginTransaction) { | ||
| if (withBeginTransaction) { | ||
| failTransactionIdFuture( | ||
| SpannerExceptionFactory.newSpannerException( | ||
| ErrorCode.FAILED_PRECONDITION, | ||
| "ResultSet was closed before a read-only transaction id was returned")); | ||
| } | ||
| super.onDone(withBeginTransaction); | ||
| } | ||
|
|
||
| private void failTransactionIdFuture(Throwable t) { | ||
| txnLock.lock(); | ||
| try { | ||
| if (transactionIdFuture != null && !transactionIdFuture.isDone()) { | ||
| transactionIdFuture.setException(t); | ||
| } | ||
| } finally { | ||
| txnLock.unlock(); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public Timestamp getReadTimestamp() { | ||
| txnLock.lock(); | ||
|
|
@@ -544,6 +674,19 @@ public void close() { | |
| super.close(); | ||
| } | ||
|
|
||
| private TransactionOptions createReadOnlyTransactionOptions() { | ||
| TransactionOptions.Builder options = TransactionOptions.newBuilder(); | ||
| if (timestamp != null) { | ||
| options | ||
| .getReadOnlyBuilder() | ||
| .setReadTimestamp(timestamp.toProto()) | ||
| .setReturnReadTimestamp(true); | ||
| } else { | ||
| bound.applyToBuilder(options.getReadOnlyBuilder()).setReturnReadTimestamp(true); | ||
| } | ||
| return options.build(); | ||
| } | ||
|
|
||
| /** | ||
| * Initializes the transaction with the timestamp specified within MultiUseReadOnlyTransaction. | ||
| * This is used only for fallback of PartitionQueryRequest and PartitionReadRequest with | ||
|
|
@@ -553,19 +696,10 @@ void initFallbackTransaction() { | |
| txnLock.lock(); | ||
| try { | ||
| span.addAnnotation("Creating Transaction"); | ||
| TransactionOptions.Builder options = TransactionOptions.newBuilder(); | ||
| if (timestamp != null) { | ||
| options | ||
| .getReadOnlyBuilder() | ||
| .setReadTimestamp(timestamp.toProto()) | ||
| .setReturnReadTimestamp(true); | ||
| } else { | ||
| bound.applyToBuilder(options.getReadOnlyBuilder()).setReturnReadTimestamp(true); | ||
| } | ||
| final BeginTransactionRequest request = | ||
| BeginTransactionRequest.newBuilder() | ||
| .setSession(session.getName()) | ||
| .setOptions(options) | ||
| .setOptions(createReadOnlyTransactionOptions()) | ||
| .build(); | ||
| initTransactionInternal(request); | ||
| } finally { | ||
|
|
@@ -589,12 +723,10 @@ void initTransaction() { | |
| return; | ||
| } | ||
| span.addAnnotation("Creating Transaction"); | ||
| TransactionOptions.Builder options = TransactionOptions.newBuilder(); | ||
| bound.applyToBuilder(options.getReadOnlyBuilder()).setReturnReadTimestamp(true); | ||
| final BeginTransactionRequest request = | ||
| BeginTransactionRequest.newBuilder() | ||
| .setSession(session.getName()) | ||
| .setOptions(options) | ||
| .setOptions(createReadOnlyTransactionOptions()) | ||
| .build(); | ||
| initTransactionInternal(request); | ||
| } finally { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,7 @@ | |
| import com.google.api.gax.rpc.ServerStream; | ||
| import com.google.cloud.Timestamp; | ||
| import com.google.cloud.spanner.Options.RpcPriority; | ||
| import com.google.cloud.spanner.Options.ReadOnlyTransactionOption; | ||
| import com.google.cloud.spanner.Options.TransactionOption; | ||
| import com.google.cloud.spanner.Options.UpdateOption; | ||
| import com.google.cloud.spanner.Statement.StatementFactory; | ||
|
|
@@ -351,6 +352,17 @@ ServerStream<BatchWriteResponse> batchWriteAtLeastOnce( | |
| */ | ||
| ReadOnlyTransaction readOnlyTransaction(); | ||
|
|
||
| /** | ||
| * Returns a read-only transaction context in which multiple reads and/or queries can be | ||
| * performed using {@link TimestampBound#strong()} concurrency and the given read-only | ||
| * transaction options. | ||
| * | ||
| * @param options options for starting the read-only transaction | ||
| */ | ||
| default ReadOnlyTransaction readOnlyTransaction(ReadOnlyTransactionOption... options) { | ||
| return readOnlyTransaction(TimestampBound.strong(), options); | ||
| } | ||
|
|
||
| /** | ||
| * Returns a read-only transaction context in which a multiple reads and/or queries can be | ||
| * performed at the given timestamp bound. All reads/queries will use the same timestamp, and the | ||
|
|
@@ -384,6 +396,31 @@ ServerStream<BatchWriteResponse> batchWriteAtLeastOnce( | |
| */ | ||
| ReadOnlyTransaction readOnlyTransaction(TimestampBound bound); | ||
|
|
||
| /** | ||
| * Returns a read-only transaction context in which multiple reads and/or queries can be | ||
| * performed at the given timestamp bound and with the given read-only transaction options. | ||
| * | ||
| * <p>Options can include: | ||
| * | ||
| * <ul> | ||
| * <li>{@link Options#beginTransactionOption(Options.BeginTransactionOption)}: Controls whether | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This part of the documentation should also be added to the method above that does not accept a TimestampBound. Also, I think it would be good to include some arguments for when it makes sense to use this option, and when it does not make sense. |
||
| * the transaction is started by an explicit BeginTransaction RPC or by inlining | ||
| * BeginTransaction on the first read/query. | ||
| * </ul> | ||
| * | ||
| * @param bound the timestamp bound at which to perform the read | ||
| * @param options options for starting the read-only transaction | ||
| */ | ||
| default ReadOnlyTransaction readOnlyTransaction( | ||
| TimestampBound bound, ReadOnlyTransactionOption... options) { | ||
| Options readOnlyTransactionOptions = Options.fromReadOnlyTransactionOptions(options); | ||
| if (readOnlyTransactionOptions.beginTransactionOption() == Options.BeginTransactionOption.EXPLICIT) { | ||
| return readOnlyTransaction(bound); | ||
| } | ||
| throw new UnsupportedOperationException( | ||
| "This DatabaseClient implementation does not support read-only transaction options"); | ||
| } | ||
|
|
||
| /** | ||
| * Returns a transaction runner for executing a single logical transaction with retries. The | ||
| * returned runner can only be used once. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The call to
SessionImpl.throwIfTransactionsPending()is problematic for several reasons:throwIfTransactionsPending()is an instance method inSessionImpl. Calling it as a static method will result in a compilation error.sessioninstance, this check will likely throw anIllegalStateExceptionbecause the currentMultiUseReadOnlyTransactionis already set as the active transaction on the session.SessionImpl.setActive()when the transaction is initialized.For inlined begin, we should simply skip
initTransaction()without adding this extra check.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stale review