Skip to content

Commit 84c2338

Browse files
committed
fix(spanner): ensure executeQueryAsync is non-blocking
When `executeQueryAsync(..)` was called for a read-only transaction, the method would execute a blocking `BeginTransaction` RPC for the first query. This change ensures that the `BeginTransaction` RPC instead is executed using the background executor thread pool.
1 parent 684511a commit 84c2338

File tree

4 files changed

+172
-6
lines changed

4 files changed

+172
-6
lines changed

java-spanner/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
.vscode

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -588,7 +588,7 @@ public ListenableAsyncResultSet readAsync(
588588
? readOptions.bufferRows()
589589
: AsyncResultSetImpl.DEFAULT_BUFFER_SIZE;
590590
return new AsyncResultSetImpl(
591-
executorProvider, readInternal(table, null, keys, columns, options), bufferRows);
591+
executorProvider, () -> readInternal(table, null, keys, columns, options), bufferRows);
592592
}
593593

594594
@Override
@@ -607,7 +607,7 @@ public ListenableAsyncResultSet readUsingIndexAsync(
607607
: AsyncResultSetImpl.DEFAULT_BUFFER_SIZE;
608608
return new AsyncResultSetImpl(
609609
executorProvider,
610-
readInternal(table, checkNotNull(index), keys, columns, options),
610+
() -> readInternal(table, checkNotNull(index), keys, columns, options),
611611
bufferRows);
612612
}
613613

@@ -659,8 +659,9 @@ public ListenableAsyncResultSet executeQueryAsync(Statement statement, QueryOpti
659659
: AsyncResultSetImpl.DEFAULT_BUFFER_SIZE;
660660
return new AsyncResultSetImpl(
661661
executorProvider,
662-
executeQueryInternal(
663-
statement, com.google.spanner.v1.ExecuteSqlRequest.QueryMode.NORMAL, options),
662+
() ->
663+
executeQueryInternal(
664+
statement, com.google.spanner.v1.ExecuteSqlRequest.QueryMode.NORMAL, options),
664665
bufferRows);
665666
}
666667

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,9 +157,21 @@ private enum State {
157157

158158
AsyncResultSetImpl(
159159
ExecutorProvider executorProvider, Supplier<ResultSet> delegate, int bufferSize) {
160-
super(delegate);
160+
this(
161+
executorProvider,
162+
Suppliers.memoize(Preconditions.checkNotNull(delegate)),
163+
bufferSize,
164+
true);
165+
}
166+
167+
private AsyncResultSetImpl(
168+
ExecutorProvider executorProvider,
169+
Supplier<ResultSet> memoizedDelegate,
170+
int bufferSize,
171+
boolean dummy) {
172+
super(memoizedDelegate);
161173
this.executorProvider = Preconditions.checkNotNull(executorProvider);
162-
this.delegateResultSet = Preconditions.checkNotNull(delegate);
174+
this.delegateResultSet = memoizedDelegate;
163175
this.service = MoreExecutors.listeningDecorator(executorProvider.getExecutor());
164176
this.buffer = new LinkedBlockingDeque<>(bufferSize);
165177
}
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.spanner;
18+
19+
import static com.google.cloud.spanner.MockSpannerTestUtil.READ_ONE_KEY_VALUE_STATEMENT;
20+
import static com.google.common.truth.Truth.assertThat;
21+
import static org.junit.Assert.assertTrue;
22+
23+
import com.google.spanner.v1.BeginTransactionRequest;
24+
import com.google.spanner.v1.ExecuteSqlRequest;
25+
import java.util.concurrent.CountDownLatch;
26+
import java.util.concurrent.TimeUnit;
27+
import org.junit.Test;
28+
import org.junit.runner.RunWith;
29+
import org.junit.runners.JUnit4;
30+
31+
@RunWith(JUnit4.class)
32+
public class AsyncReadOnlyTransactionTest extends AbstractAsyncTransactionTest {
33+
34+
@Test
35+
public void asyncReadOnlyTransactionIsNonBlocking() throws Exception {
36+
// Warm up session pool to avoid CreateSession blocking when server is frozen.
37+
try (ResultSet resultSet = client().singleUse().executeQuery(READ_ONE_KEY_VALUE_STATEMENT)) {
38+
while (resultSet.next()) {
39+
}
40+
}
41+
mockSpanner.reset();
42+
43+
try (ReadOnlyTransaction transaction = client().readOnlyTransaction()) {
44+
mockSpanner.freeze();
45+
// Call executeQueryAsync. It should not block even though mock server is
46+
// frozen!
47+
AsyncResultSet rs = transaction.executeQueryAsync(READ_ONE_KEY_VALUE_STATEMENT);
48+
49+
// Verify that no requests have been sent yet.
50+
assertTrue(mockSpanner.getRequestTypes().isEmpty());
51+
52+
// Now register a callback to start the stream.
53+
final CountDownLatch latch = new CountDownLatch(1);
54+
rs.setCallback(
55+
executor,
56+
resultSet -> {
57+
try {
58+
while (resultSet.tryNext() == AsyncResultSet.CursorState.OK) {
59+
// consume
60+
}
61+
if (resultSet.tryNext() == AsyncResultSet.CursorState.DONE) {
62+
latch.countDown();
63+
}
64+
return AsyncResultSet.CallbackResponse.CONTINUE;
65+
} catch (Throwable t) {
66+
latch.countDown();
67+
return AsyncResultSet.CallbackResponse.DONE;
68+
}
69+
});
70+
71+
// Unfreeze the mock server so the background thread can proceed.
72+
mockSpanner.unfreeze();
73+
74+
// Wait for the callback to complete.
75+
assertTrue("Timeout waiting for callback", latch.await(10, TimeUnit.SECONDS));
76+
77+
// Verify that requests were sent on the background thread.
78+
// It should contain one BeginTransaction and one ExecuteSql.
79+
assertThat(mockSpanner.getRequestTypes())
80+
.containsExactly(BeginTransactionRequest.class, ExecuteSqlRequest.class);
81+
}
82+
}
83+
84+
@Test
85+
public void testMultipleQueriesOnlyCallsBeginTransactionOnce() throws Exception {
86+
// Warm up session pool to avoid CreateSession blocking when server is frozen.
87+
try (ResultSet resultSet = client().singleUse().executeQuery(READ_ONE_KEY_VALUE_STATEMENT)) {
88+
while (resultSet.next()) {
89+
}
90+
}
91+
mockSpanner.reset();
92+
93+
try (ReadOnlyTransaction transaction = client().readOnlyTransaction()) {
94+
mockSpanner.freeze();
95+
// Call executeQueryAsync twice.
96+
AsyncResultSet rs1 = transaction.executeQueryAsync(READ_ONE_KEY_VALUE_STATEMENT);
97+
AsyncResultSet rs2 = transaction.executeQueryAsync(READ_ONE_KEY_VALUE_STATEMENT);
98+
99+
// Verify that no requests have been sent yet.
100+
assertTrue(mockSpanner.getRequestTypes().isEmpty());
101+
102+
// Unfreeze the mock server.
103+
mockSpanner.unfreeze();
104+
105+
// Now register callbacks to start the streams.
106+
final CountDownLatch latch1 = new CountDownLatch(1);
107+
final CountDownLatch latch2 = new CountDownLatch(1);
108+
109+
rs1.setCallback(
110+
executor,
111+
resultSet -> {
112+
try {
113+
while (resultSet.tryNext() == AsyncResultSet.CursorState.OK) {
114+
}
115+
if (resultSet.tryNext() == AsyncResultSet.CursorState.DONE) {
116+
latch1.countDown();
117+
}
118+
return AsyncResultSet.CallbackResponse.CONTINUE;
119+
} catch (Throwable t) {
120+
latch1.countDown();
121+
return AsyncResultSet.CallbackResponse.DONE;
122+
}
123+
});
124+
125+
rs2.setCallback(
126+
executor,
127+
resultSet -> {
128+
try {
129+
while (resultSet.tryNext() == AsyncResultSet.CursorState.OK) {
130+
}
131+
if (resultSet.tryNext() == AsyncResultSet.CursorState.DONE) {
132+
latch2.countDown();
133+
}
134+
return AsyncResultSet.CallbackResponse.CONTINUE;
135+
} catch (Throwable t) {
136+
latch2.countDown();
137+
return AsyncResultSet.CallbackResponse.DONE;
138+
}
139+
});
140+
141+
// Wait for both callbacks to complete.
142+
assertTrue("Timeout waiting for callback 1", latch1.await(10, TimeUnit.SECONDS));
143+
assertTrue("Timeout waiting for callback 2", latch2.await(10, TimeUnit.SECONDS));
144+
145+
// Verify that requests were sent.
146+
// It should contain one BeginTransaction and two ExecuteSql.
147+
assertThat(mockSpanner.getRequestTypes())
148+
.containsExactly(
149+
BeginTransactionRequest.class, ExecuteSqlRequest.class, ExecuteSqlRequest.class);
150+
}
151+
}
152+
}

0 commit comments

Comments
 (0)