Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
import com.google.spanner.v1.RequestOptions;
import com.google.spanner.v1.ResultSet;
import com.google.spanner.v1.ResultSetMetadata;
import com.google.spanner.v1.ResultSetStats;
import com.google.spanner.v1.RollbackRequest;
import com.google.spanner.v1.Transaction;
Expand Down Expand Up @@ -742,7 +743,7 @@ options, getPreviousTransactionId())))
@Override
public void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId) {
Preconditions.checkNotNull(transaction);
if (transaction.getId() != ByteString.EMPTY) {
if (!transaction.getId().isEmpty()) {
// A transaction has been returned by a statement that was executed. Set the id of the
// transaction on this instance and release the lock to allow other statements to proceed.
if ((transactionIdFuture == null || !this.transactionIdFuture.isDone())
Expand All @@ -757,6 +758,18 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude
}
}

private boolean hasNonEmptyTransactionId(ResultSetMetadata metadata) {
return metadata.hasTransaction() && !metadata.getTransaction().getId().isEmpty();
}

private void throwIfBeginDidNotReturnTransaction(
TransactionSelector transactionSelector, boolean sawNonEmptyTransactionId) {
if (transactionSelector.hasBegin() && !sawNonEmptyTransactionId) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION, NO_TRANSACTION_RETURNED_MSG);
}
}

/**
* In read-write transactions, the precommit token with the highest sequence number from this
* transaction attempt will be tracked and included in the
Expand Down Expand Up @@ -980,14 +993,16 @@ private ResultSet internalExecuteUpdate(
com.google.spanner.v1.ResultSet resultSet =
rpc.executeQuery(builder.build(), getTransactionChannelHint(), isRouteToLeader());
session.markUsed(clock.instant());
if (resultSet.getMetadata().hasTransaction()) {
boolean sawNonEmptyTransactionId = hasNonEmptyTransactionId(resultSet.getMetadata());
if (sawNonEmptyTransactionId) {
onTransactionMetadata(
resultSet.getMetadata().getTransaction(), builder.getTransaction().hasBegin());
}
if (!resultSet.hasStats()) {
throw new IllegalArgumentException(
"DML response missing stats possibly due to non-DML statement as input");
}
throwIfBeginDidNotReturnTransaction(builder.getTransaction(), sawNonEmptyTransactionId);
if (resultSet.hasPrecommitToken()) {
onPrecommitToken(resultSet.getPrecommitToken());
}
Expand Down Expand Up @@ -1037,12 +1052,8 @@ public ApiFuture<Long> executeUpdateAsync(Statement statement, UpdateOption... u
ErrorCode.INVALID_ARGUMENT,
"DML response missing stats possibly due to non-DML statement as input");
}
if (builder.getTransaction().hasBegin()
&& !(input.getMetadata().hasTransaction()
&& input.getMetadata().getTransaction().getId() != ByteString.EMPTY)) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION, NO_TRANSACTION_RETURNED_MSG);
}
throwIfBeginDidNotReturnTransaction(
builder.getTransaction(), hasNonEmptyTransactionId(input.getMetadata()));
// For standard DML, using the exact row count.
return input.getStats().getRowCountExact();
},
Expand Down Expand Up @@ -1116,9 +1127,11 @@ public long[] batchUpdate(Iterable<Statement> statements, UpdateOption... update
rpc.executeBatchDml(builder.build(), getTransactionChannelHint());
session.markUsed(clock.instant());
long[] results = new long[response.getResultSetsCount()];
boolean sawNonEmptyTransactionId = false;
for (int i = 0; i < response.getResultSetsCount(); ++i) {
results[i] = response.getResultSets(i).getStats().getRowCountExact();
if (response.getResultSets(i).getMetadata().hasTransaction()) {
if (hasNonEmptyTransactionId(response.getResultSets(i).getMetadata())) {
sawNonEmptyTransactionId = true;
onTransactionMetadata(
response.getResultSets(i).getMetadata().getTransaction(),
builder.getTransaction().hasBegin());
Expand All @@ -1139,6 +1152,7 @@ public long[] batchUpdate(Iterable<Statement> statements, UpdateOption... update
response.getStatus().getMessage(),
results);
}
throwIfBeginDidNotReturnTransaction(builder.getTransaction(), sawNonEmptyTransactionId);
return results;
} catch (Throwable e) {
throw onError(
Expand Down Expand Up @@ -1187,9 +1201,11 @@ public ApiFuture<long[]> batchUpdateAsync(
response,
batchDmlResponse -> {
long[] results = new long[batchDmlResponse.getResultSetsCount()];
boolean sawNonEmptyTransactionId = false;
for (int i = 0; i < batchDmlResponse.getResultSetsCount(); ++i) {
results[i] = batchDmlResponse.getResultSets(i).getStats().getRowCountExact();
if (batchDmlResponse.getResultSets(i).getMetadata().hasTransaction()) {
if (hasNonEmptyTransactionId(batchDmlResponse.getResultSets(i).getMetadata())) {
sawNonEmptyTransactionId = true;
onTransactionMetadata(
batchDmlResponse.getResultSets(i).getMetadata().getTransaction(),
builder.getTransaction().hasBegin());
Expand All @@ -1208,6 +1224,8 @@ public ApiFuture<long[]> batchUpdateAsync(
batchDmlResponse.getStatus().getMessage(),
results);
}
throwIfBeginDidNotReturnTransaction(
builder.getTransaction(), sawNonEmptyTransactionId);
return results;
},
MoreExecutors.directExecutor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1677,6 +1677,10 @@ static void runWithIgnoreInlineBegin(Runnable runnable) {
}
}

static void useShortTransactionWait(TransactionContext transaction) {
((TransactionContextImpl) transaction).waitForTransactionTimeoutMillis = 1L;
}

@Test
public void testQueryWithInlineBeginDidNotReturnTransaction() {
runWithIgnoreInlineBegin(
Expand Down Expand Up @@ -1738,7 +1742,11 @@ public void testUpdateWithInlineBeginDidNotReturnTransaction() {
() ->
client
.readWriteTransaction()
.run(transaction -> transaction.executeUpdate(UPDATE_STATEMENT)));
.run(
transaction -> {
useShortTransactionWait(transaction);
return transaction.executeUpdate(UPDATE_STATEMENT);
}));
assertEquals(ErrorCode.FAILED_PRECONDITION, e.getErrorCode());
assertThat(e.getMessage()).contains(AbstractReadContext.NO_TRANSACTION_RETURNED_MSG);
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0);
Expand All @@ -1760,6 +1768,7 @@ public void testBatchUpdateWithInlineBeginDidNotReturnTransaction() {
.readWriteTransaction()
.run(
transaction -> {
useShortTransactionWait(transaction);
transaction.batchUpdate(
Collections.singletonList(UPDATE_STATEMENT));
return null;
Expand Down Expand Up @@ -1831,9 +1840,11 @@ public void testUpdateAsyncWithInlineBeginDidNotReturnTransaction() {
client
.readWriteTransaction()
.run(
transaction ->
SpannerApiFutures.get(
transaction.executeUpdateAsync(UPDATE_STATEMENT))));
transaction -> {
useShortTransactionWait(transaction);
return SpannerApiFutures.get(
transaction.executeUpdateAsync(UPDATE_STATEMENT));
}));
assertEquals(ErrorCode.FAILED_PRECONDITION, e.getErrorCode());
assertThat(e.getMessage()).contains(AbstractReadContext.NO_TRANSACTION_RETURNED_MSG);
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0);
Expand All @@ -1854,10 +1865,12 @@ public void testBatchUpdateAsyncWithInlineBeginDidNotReturnTransaction() {
client
.readWriteTransaction()
.run(
transaction ->
SpannerApiFutures.get(
transaction.batchUpdateAsync(
Collections.singletonList(UPDATE_STATEMENT)))));
transaction -> {
useShortTransactionWait(transaction);
return SpannerApiFutures.get(
transaction.batchUpdateAsync(
Collections.singletonList(UPDATE_STATEMENT)));
}));
assertEquals(ErrorCode.FAILED_PRECONDITION, e.getErrorCode());
assertThat(e.getMessage()).contains(AbstractReadContext.NO_TRANSACTION_RETURNED_MSG);
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,10 @@ public void setIgnoreInlineBeginRequest(boolean ignore) {
ignoreInlineBeginRequest.set(ignore);
}

private boolean shouldOmitInlineBeginTransaction(TransactionSelector transactionSelector) {
return ignoreInlineBeginRequest.get() && transactionSelector.hasBegin();
}

public void freeze() {
synchronized (lock) {
freezeLock = new CountDownLatch(1);
Expand Down Expand Up @@ -1100,12 +1104,12 @@ public void executeSql(ExecuteSqlRequest request, StreamObserver<ResultSet> resp
.setRowCountExact(result.getUpdateCount())
.build())
.setMetadata(
ResultSetMetadata.newBuilder()
.setTransaction(
ignoreInlineBeginRequest.get()
? Transaction.getDefaultInstance()
: Transaction.newBuilder().setId(transactionId).build())
.build());
shouldOmitInlineBeginTransaction(request.getTransaction())
? ResultSetMetadata.getDefaultInstance()
: ResultSetMetadata.newBuilder()
.setTransaction(
Transaction.newBuilder().setId(transactionId).build())
.build());
if (session.getMultiplexed() && isReadWriteTransaction(transactionId)) {
resultSetBuilder.setPrecommitToken(getResultSetPrecommitToken(transactionId));
}
Expand All @@ -1131,13 +1135,12 @@ private void returnResultSet(
Session session) {
ResultSetMetadata metadata = resultSet.getMetadata();
if (transactionId != null) {
metadata =
metadata.toBuilder()
.setTransaction(
ignoreInlineBeginRequest.get()
? Transaction.getDefaultInstance()
: Transaction.newBuilder().setId(transactionId).build())
.build();
if (!shouldOmitInlineBeginTransaction(transactionSelector)) {
metadata =
metadata.toBuilder()
.setTransaction(Transaction.newBuilder().setId(transactionId).build())
.build();
}
} else if (transactionSelector.hasBegin() || transactionSelector.hasSingleUse()) {
Transaction transaction = getTemporaryTransactionOrNull(transactionSelector);
metadata = metadata.toBuilder().setTransaction(transaction).build();
Expand Down Expand Up @@ -1234,12 +1237,11 @@ public void executeBatchDml(
ResultSet.newBuilder()
.setStats(ResultSetStats.newBuilder().setRowCountExact(updateCount).build())
.setMetadata(
ResultSetMetadata.newBuilder()
.setTransaction(
ignoreInlineBeginRequest.get()
? Transaction.getDefaultInstance()
: Transaction.newBuilder().setId(transactionId).build())
.build())
shouldOmitInlineBeginTransaction(request.getTransaction())
? ResultSetMetadata.getDefaultInstance()
: ResultSetMetadata.newBuilder()
.setTransaction(Transaction.newBuilder().setId(transactionId).build())
.build())
.build());
}
builder.setStatus(status);
Expand Down
Loading