Skip to content

Commit 8974940

Browse files
committed
fix(spanner): ensure executeQueryAsync is non-blocking
When `executeQueryAsync(..)` was called for a read-only transaction, the method would execute a blocking `BeginTransaction` RPC for the first query. This change ensures that the `BeginTransaction` RPC instead is executed using the background executor thread pool.
1 parent 684511a commit 8974940

File tree

4 files changed

+171
-6
lines changed

4 files changed

+171
-6
lines changed

java-spanner/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
.vscode

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -588,7 +588,7 @@ public ListenableAsyncResultSet readAsync(
588588
? readOptions.bufferRows()
589589
: AsyncResultSetImpl.DEFAULT_BUFFER_SIZE;
590590
return new AsyncResultSetImpl(
591-
executorProvider, readInternal(table, null, keys, columns, options), bufferRows);
591+
executorProvider, () -> readInternal(table, null, keys, columns, options), bufferRows);
592592
}
593593

594594
@Override
@@ -607,7 +607,7 @@ public ListenableAsyncResultSet readUsingIndexAsync(
607607
: AsyncResultSetImpl.DEFAULT_BUFFER_SIZE;
608608
return new AsyncResultSetImpl(
609609
executorProvider,
610-
readInternal(table, checkNotNull(index), keys, columns, options),
610+
() -> readInternal(table, checkNotNull(index), keys, columns, options),
611611
bufferRows);
612612
}
613613

@@ -659,8 +659,9 @@ public ListenableAsyncResultSet executeQueryAsync(Statement statement, QueryOpti
659659
: AsyncResultSetImpl.DEFAULT_BUFFER_SIZE;
660660
return new AsyncResultSetImpl(
661661
executorProvider,
662-
executeQueryInternal(
663-
statement, com.google.spanner.v1.ExecuteSqlRequest.QueryMode.NORMAL, options),
662+
() ->
663+
executeQueryInternal(
664+
statement, com.google.spanner.v1.ExecuteSqlRequest.QueryMode.NORMAL, options),
664665
bufferRows);
665666
}
666667

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,9 +157,21 @@ private enum State {
157157

158158
AsyncResultSetImpl(
159159
ExecutorProvider executorProvider, Supplier<ResultSet> delegate, int bufferSize) {
160-
super(delegate);
160+
this(
161+
executorProvider,
162+
Suppliers.memoize(Preconditions.checkNotNull(delegate)),
163+
bufferSize,
164+
true);
165+
}
166+
167+
private AsyncResultSetImpl(
168+
ExecutorProvider executorProvider,
169+
Supplier<ResultSet> memoizedDelegate,
170+
int bufferSize,
171+
boolean dummy) {
172+
super(memoizedDelegate);
161173
this.executorProvider = Preconditions.checkNotNull(executorProvider);
162-
this.delegateResultSet = Preconditions.checkNotNull(delegate);
174+
this.delegateResultSet = memoizedDelegate;
163175
this.service = MoreExecutors.listeningDecorator(executorProvider.getExecutor());
164176
this.buffer = new LinkedBlockingDeque<>(bufferSize);
165177
}
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.spanner;
18+
19+
import static com.google.cloud.spanner.MockSpannerTestUtil.READ_ONE_KEY_VALUE_STATEMENT;
20+
import static com.google.common.truth.Truth.assertThat;
21+
import static org.junit.Assert.assertTrue;
22+
23+
import com.google.spanner.v1.BeginTransactionRequest;
24+
import com.google.spanner.v1.ExecuteSqlRequest;
25+
import java.util.concurrent.CountDownLatch;
26+
import java.util.concurrent.TimeUnit;
27+
import org.junit.Test;
28+
import org.junit.runner.RunWith;
29+
import org.junit.runners.JUnit4;
30+
31+
@RunWith(JUnit4.class)
32+
public class AsyncReadOnlyTransactionTest extends AbstractAsyncTransactionTest {
33+
34+
@Test
35+
public void asyncReadOnlyTransactionIsNonBlocking() throws Exception {
36+
// Warm up session pool to avoid CreateSession blocking when server is frozen.
37+
try (ResultSet resultSet = client().singleUse().executeQuery(READ_ONE_KEY_VALUE_STATEMENT)) {
38+
while (resultSet.next()) {}
39+
}
40+
mockSpanner.reset();
41+
42+
try (ReadOnlyTransaction transaction = client().readOnlyTransaction()) {
43+
mockSpanner.freeze();
44+
// Call executeQueryAsync. It should not block even though mock server is
45+
// frozen!
46+
AsyncResultSet rs = transaction.executeQueryAsync(READ_ONE_KEY_VALUE_STATEMENT);
47+
48+
// Verify that no requests have been sent yet.
49+
assertTrue(mockSpanner.getRequestTypes().isEmpty());
50+
51+
// Now register a callback to start the stream.
52+
final CountDownLatch latch = new CountDownLatch(1);
53+
rs.setCallback(
54+
executor,
55+
resultSet -> {
56+
try {
57+
AsyncResultSet.CursorState state;
58+
while ((state = resultSet.tryNext()) == AsyncResultSet.CursorState.OK) {
59+
// consume
60+
}
61+
if (state == AsyncResultSet.CursorState.DONE) {
62+
latch.countDown();
63+
}
64+
return AsyncResultSet.CallbackResponse.CONTINUE;
65+
} catch (Throwable t) {
66+
latch.countDown();
67+
return AsyncResultSet.CallbackResponse.DONE;
68+
}
69+
});
70+
71+
// Unfreeze the mock server so the background thread can proceed.
72+
mockSpanner.unfreeze();
73+
74+
// Wait for the callback to complete.
75+
assertTrue("Timeout waiting for callback", latch.await(10, TimeUnit.SECONDS));
76+
77+
// Verify that requests were sent on the background thread.
78+
// It should contain one BeginTransaction and one ExecuteSql.
79+
assertThat(mockSpanner.getRequestTypes())
80+
.containsExactly(BeginTransactionRequest.class, ExecuteSqlRequest.class);
81+
}
82+
}
83+
84+
@Test
85+
public void testMultipleQueriesOnlyCallsBeginTransactionOnce() throws Exception {
86+
// Warm up session pool to avoid CreateSession blocking when server is frozen.
87+
try (ResultSet resultSet = client().singleUse().executeQuery(READ_ONE_KEY_VALUE_STATEMENT)) {
88+
while (resultSet.next()) {}
89+
}
90+
mockSpanner.reset();
91+
92+
try (ReadOnlyTransaction transaction = client().readOnlyTransaction()) {
93+
mockSpanner.freeze();
94+
// Call executeQueryAsync twice.
95+
AsyncResultSet rs1 = transaction.executeQueryAsync(READ_ONE_KEY_VALUE_STATEMENT);
96+
AsyncResultSet rs2 = transaction.executeQueryAsync(READ_ONE_KEY_VALUE_STATEMENT);
97+
98+
// Verify that no requests have been sent yet.
99+
assertTrue(mockSpanner.getRequestTypes().isEmpty());
100+
101+
// Unfreeze the mock server.
102+
mockSpanner.unfreeze();
103+
104+
// Now register callbacks to start the streams.
105+
final CountDownLatch latch1 = new CountDownLatch(1);
106+
final CountDownLatch latch2 = new CountDownLatch(1);
107+
108+
rs1.setCallback(
109+
executor,
110+
resultSet -> {
111+
try {
112+
AsyncResultSet.CursorState state;
113+
while ((state = resultSet.tryNext()) == AsyncResultSet.CursorState.OK) {}
114+
if (state == AsyncResultSet.CursorState.DONE) {
115+
latch1.countDown();
116+
}
117+
return AsyncResultSet.CallbackResponse.CONTINUE;
118+
} catch (Throwable t) {
119+
latch1.countDown();
120+
return AsyncResultSet.CallbackResponse.DONE;
121+
}
122+
});
123+
124+
rs2.setCallback(
125+
executor,
126+
resultSet -> {
127+
try {
128+
AsyncResultSet.CursorState state;
129+
while ((state = resultSet.tryNext()) == AsyncResultSet.CursorState.OK) {}
130+
if (state == AsyncResultSet.CursorState.DONE) {
131+
latch2.countDown();
132+
}
133+
return AsyncResultSet.CallbackResponse.CONTINUE;
134+
} catch (Throwable t) {
135+
latch2.countDown();
136+
return AsyncResultSet.CallbackResponse.DONE;
137+
}
138+
});
139+
140+
// Wait for both callbacks to complete.
141+
assertTrue("Timeout waiting for callback 1", latch1.await(10, TimeUnit.SECONDS));
142+
assertTrue("Timeout waiting for callback 2", latch2.await(10, TimeUnit.SECONDS));
143+
144+
// Verify that requests were sent.
145+
// It should contain one BeginTransaction and two ExecuteSql.
146+
assertThat(mockSpanner.getRequestTypes())
147+
.containsExactly(
148+
BeginTransactionRequest.class, ExecuteSqlRequest.class, ExecuteSqlRequest.class);
149+
}
150+
}
151+
}

0 commit comments

Comments
 (0)