diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index 4ffe87104d9b..d6353bfcdb86 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -49,6 +49,7 @@ import com.google.spanner.v1.MultiplexedSessionPrecommitToken; import com.google.spanner.v1.RequestOptions; import com.google.spanner.v1.ResultSet; +import com.google.spanner.v1.ResultSetMetadata; import com.google.spanner.v1.ResultSetStats; import com.google.spanner.v1.RollbackRequest; import com.google.spanner.v1.Transaction; @@ -742,7 +743,7 @@ options, getPreviousTransactionId()))) @Override public void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId) { Preconditions.checkNotNull(transaction); - if (transaction.getId() != ByteString.EMPTY) { + if (!transaction.getId().isEmpty()) { // A transaction has been returned by a statement that was executed. Set the id of the // transaction on this instance and release the lock to allow other statements to proceed. if ((transactionIdFuture == null || !this.transactionIdFuture.isDone()) @@ -757,6 +758,18 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude } } + private boolean hasNonEmptyTransactionId(ResultSetMetadata metadata) { + return metadata.hasTransaction() && !metadata.getTransaction().getId().isEmpty(); + } + + private void throwIfBeginDidNotReturnTransaction( + TransactionSelector transactionSelector, boolean sawNonEmptyTransactionId) { + if (transactionSelector.hasBegin() && !sawNonEmptyTransactionId) { + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.FAILED_PRECONDITION, NO_TRANSACTION_RETURNED_MSG); + } + } + /** * In read-write transactions, the precommit token with the highest sequence number from this * transaction attempt will be tracked and included in the @@ -980,7 +993,8 @@ private ResultSet internalExecuteUpdate( com.google.spanner.v1.ResultSet resultSet = rpc.executeQuery(builder.build(), getTransactionChannelHint(), isRouteToLeader()); session.markUsed(clock.instant()); - if (resultSet.getMetadata().hasTransaction()) { + boolean sawNonEmptyTransactionId = hasNonEmptyTransactionId(resultSet.getMetadata()); + if (sawNonEmptyTransactionId) { onTransactionMetadata( resultSet.getMetadata().getTransaction(), builder.getTransaction().hasBegin()); } @@ -988,6 +1002,7 @@ private ResultSet internalExecuteUpdate( throw new IllegalArgumentException( "DML response missing stats possibly due to non-DML statement as input"); } + throwIfBeginDidNotReturnTransaction(builder.getTransaction(), sawNonEmptyTransactionId); if (resultSet.hasPrecommitToken()) { onPrecommitToken(resultSet.getPrecommitToken()); } @@ -1037,12 +1052,8 @@ public ApiFuture executeUpdateAsync(Statement statement, UpdateOption... u ErrorCode.INVALID_ARGUMENT, "DML response missing stats possibly due to non-DML statement as input"); } - if (builder.getTransaction().hasBegin() - && !(input.getMetadata().hasTransaction() - && input.getMetadata().getTransaction().getId() != ByteString.EMPTY)) { - throw SpannerExceptionFactory.newSpannerException( - ErrorCode.FAILED_PRECONDITION, NO_TRANSACTION_RETURNED_MSG); - } + throwIfBeginDidNotReturnTransaction( + builder.getTransaction(), hasNonEmptyTransactionId(input.getMetadata())); // For standard DML, using the exact row count. return input.getStats().getRowCountExact(); }, @@ -1116,9 +1127,11 @@ public long[] batchUpdate(Iterable statements, UpdateOption... update rpc.executeBatchDml(builder.build(), getTransactionChannelHint()); session.markUsed(clock.instant()); long[] results = new long[response.getResultSetsCount()]; + boolean sawNonEmptyTransactionId = false; for (int i = 0; i < response.getResultSetsCount(); ++i) { results[i] = response.getResultSets(i).getStats().getRowCountExact(); - if (response.getResultSets(i).getMetadata().hasTransaction()) { + if (hasNonEmptyTransactionId(response.getResultSets(i).getMetadata())) { + sawNonEmptyTransactionId = true; onTransactionMetadata( response.getResultSets(i).getMetadata().getTransaction(), builder.getTransaction().hasBegin()); @@ -1139,6 +1152,7 @@ public long[] batchUpdate(Iterable statements, UpdateOption... update response.getStatus().getMessage(), results); } + throwIfBeginDidNotReturnTransaction(builder.getTransaction(), sawNonEmptyTransactionId); return results; } catch (Throwable e) { throw onError( @@ -1187,9 +1201,11 @@ public ApiFuture batchUpdateAsync( response, batchDmlResponse -> { long[] results = new long[batchDmlResponse.getResultSetsCount()]; + boolean sawNonEmptyTransactionId = false; for (int i = 0; i < batchDmlResponse.getResultSetsCount(); ++i) { results[i] = batchDmlResponse.getResultSets(i).getStats().getRowCountExact(); - if (batchDmlResponse.getResultSets(i).getMetadata().hasTransaction()) { + if (hasNonEmptyTransactionId(batchDmlResponse.getResultSets(i).getMetadata())) { + sawNonEmptyTransactionId = true; onTransactionMetadata( batchDmlResponse.getResultSets(i).getMetadata().getTransaction(), builder.getTransaction().hasBegin()); @@ -1208,6 +1224,8 @@ public ApiFuture batchUpdateAsync( batchDmlResponse.getStatus().getMessage(), results); } + throwIfBeginDidNotReturnTransaction( + builder.getTransaction(), sawNonEmptyTransactionId); return results; }, MoreExecutors.directExecutor()); diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java index db1b39ac0a0b..5161821c96ac 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java @@ -1677,6 +1677,10 @@ static void runWithIgnoreInlineBegin(Runnable runnable) { } } + static void useShortTransactionWait(TransactionContext transaction) { + ((TransactionContextImpl) transaction).waitForTransactionTimeoutMillis = 1L; + } + @Test public void testQueryWithInlineBeginDidNotReturnTransaction() { runWithIgnoreInlineBegin( @@ -1738,7 +1742,11 @@ public void testUpdateWithInlineBeginDidNotReturnTransaction() { () -> client .readWriteTransaction() - .run(transaction -> transaction.executeUpdate(UPDATE_STATEMENT))); + .run( + transaction -> { + useShortTransactionWait(transaction); + return transaction.executeUpdate(UPDATE_STATEMENT); + })); assertEquals(ErrorCode.FAILED_PRECONDITION, e.getErrorCode()); assertThat(e.getMessage()).contains(AbstractReadContext.NO_TRANSACTION_RETURNED_MSG); assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); @@ -1760,6 +1768,7 @@ public void testBatchUpdateWithInlineBeginDidNotReturnTransaction() { .readWriteTransaction() .run( transaction -> { + useShortTransactionWait(transaction); transaction.batchUpdate( Collections.singletonList(UPDATE_STATEMENT)); return null; @@ -1831,9 +1840,11 @@ public void testUpdateAsyncWithInlineBeginDidNotReturnTransaction() { client .readWriteTransaction() .run( - transaction -> - SpannerApiFutures.get( - transaction.executeUpdateAsync(UPDATE_STATEMENT)))); + transaction -> { + useShortTransactionWait(transaction); + return SpannerApiFutures.get( + transaction.executeUpdateAsync(UPDATE_STATEMENT)); + })); assertEquals(ErrorCode.FAILED_PRECONDITION, e.getErrorCode()); assertThat(e.getMessage()).contains(AbstractReadContext.NO_TRANSACTION_RETURNED_MSG); assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); @@ -1854,10 +1865,12 @@ public void testBatchUpdateAsyncWithInlineBeginDidNotReturnTransaction() { client .readWriteTransaction() .run( - transaction -> - SpannerApiFutures.get( - transaction.batchUpdateAsync( - Collections.singletonList(UPDATE_STATEMENT))))); + transaction -> { + useShortTransactionWait(transaction); + return SpannerApiFutures.get( + transaction.batchUpdateAsync( + Collections.singletonList(UPDATE_STATEMENT))); + })); assertEquals(ErrorCode.FAILED_PRECONDITION, e.getErrorCode()); assertThat(e.getMessage()).contains(AbstractReadContext.NO_TRANSACTION_RETURNED_MSG); assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java index a34cabe1bbcb..cc44ba2f3f81 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java @@ -813,6 +813,10 @@ public void setIgnoreInlineBeginRequest(boolean ignore) { ignoreInlineBeginRequest.set(ignore); } + private boolean shouldOmitInlineBeginTransaction(TransactionSelector transactionSelector) { + return ignoreInlineBeginRequest.get() && transactionSelector.hasBegin(); + } + public void freeze() { synchronized (lock) { freezeLock = new CountDownLatch(1); @@ -1100,12 +1104,12 @@ public void executeSql(ExecuteSqlRequest request, StreamObserver resp .setRowCountExact(result.getUpdateCount()) .build()) .setMetadata( - ResultSetMetadata.newBuilder() - .setTransaction( - ignoreInlineBeginRequest.get() - ? Transaction.getDefaultInstance() - : Transaction.newBuilder().setId(transactionId).build()) - .build()); + shouldOmitInlineBeginTransaction(request.getTransaction()) + ? ResultSetMetadata.getDefaultInstance() + : ResultSetMetadata.newBuilder() + .setTransaction( + Transaction.newBuilder().setId(transactionId).build()) + .build()); if (session.getMultiplexed() && isReadWriteTransaction(transactionId)) { resultSetBuilder.setPrecommitToken(getResultSetPrecommitToken(transactionId)); } @@ -1131,13 +1135,12 @@ private void returnResultSet( Session session) { ResultSetMetadata metadata = resultSet.getMetadata(); if (transactionId != null) { - metadata = - metadata.toBuilder() - .setTransaction( - ignoreInlineBeginRequest.get() - ? Transaction.getDefaultInstance() - : Transaction.newBuilder().setId(transactionId).build()) - .build(); + if (!shouldOmitInlineBeginTransaction(transactionSelector)) { + metadata = + metadata.toBuilder() + .setTransaction(Transaction.newBuilder().setId(transactionId).build()) + .build(); + } } else if (transactionSelector.hasBegin() || transactionSelector.hasSingleUse()) { Transaction transaction = getTemporaryTransactionOrNull(transactionSelector); metadata = metadata.toBuilder().setTransaction(transaction).build(); @@ -1234,12 +1237,11 @@ public void executeBatchDml( ResultSet.newBuilder() .setStats(ResultSetStats.newBuilder().setRowCountExact(updateCount).build()) .setMetadata( - ResultSetMetadata.newBuilder() - .setTransaction( - ignoreInlineBeginRequest.get() - ? Transaction.getDefaultInstance() - : Transaction.newBuilder().setId(transactionId).build()) - .build()) + shouldOmitInlineBeginTransaction(request.getTransaction()) + ? ResultSetMetadata.getDefaultInstance() + : ResultSetMetadata.newBuilder() + .setTransaction(Transaction.newBuilder().setId(transactionId).build()) + .build()) .build()); } builder.setStatus(status);