Skip to content

Commit 2c7e5f5

Browse files
authored
feat(bigtable): route point read rows to shim (#13542)
1 parent 7af3224 commit 2c7e5f5

5 files changed

Lines changed: 526 additions & 13 deletions

File tree

java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Query.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,10 +318,39 @@ public TargetId getTargetId() {
318318
return targetId;
319319
}
320320

321+
/**
322+
* Returns true if this query identifies a single row that can be served by a point read. Supports
323+
* two shapes: exactly one row key and no row ranges, or exactly one closed-closed row range whose
324+
* start key equals its end key.
325+
*/
326+
@InternalApi
327+
public boolean isSinglePointQuery() {
328+
RowSet rows = this.builder.getRows();
329+
int keyCount = rows.getRowKeysCount();
330+
int rangeCount = rows.getRowRangesCount();
331+
if (keyCount == 1 && rangeCount == 0) {
332+
return true;
333+
}
334+
if (keyCount == 0 && rangeCount == 1) {
335+
RowRange range = rows.getRowRanges(0);
336+
return range.hasStartKeyClosed()
337+
&& range.hasEndKeyClosed()
338+
&& range.getStartKeyClosed().equals(range.getEndKeyClosed());
339+
}
340+
return false;
341+
}
342+
321343
@InternalApi
322344
public SessionReadRowRequest toSessionPointProto() {
345+
Preconditions.checkState(
346+
isSinglePointQuery(),
347+
"Query must be a single-point read (one row key, or one closed-closed row range whose"
348+
+ " start equals its end)");
349+
RowSet rows = this.builder.getRows();
350+
ByteString key =
351+
rows.getRowKeysCount() > 0 ? rows.getRowKeys(0) : rows.getRowRanges(0).getStartKeyClosed();
323352
return SessionReadRowRequest.newBuilder()
324-
.setKey(this.builder.getRows().getRowKeysList().get(0))
353+
.setKey(key)
325354
.setFilter(this.builder.getFilter())
326355
.build();
327356
}

java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.google.api.gax.retrying.BasicResultRetryAlgorithm;
2828
import com.google.api.gax.retrying.ExponentialRetryAlgorithm;
2929
import com.google.api.gax.retrying.RetryAlgorithm;
30+
import com.google.api.gax.retrying.RetrySettings;
3031
import com.google.api.gax.retrying.RetryingExecutorWithContext;
3132
import com.google.api.gax.retrying.ScheduledRetryingExecutor;
3233
import com.google.api.gax.retrying.SimpleStreamResumptionStrategy;
@@ -37,6 +38,7 @@
3738
import com.google.api.gax.rpc.RequestParamsExtractor;
3839
import com.google.api.gax.rpc.ServerStreamingCallSettings;
3940
import com.google.api.gax.rpc.ServerStreamingCallable;
41+
import com.google.api.gax.rpc.StatusCode;
4042
import com.google.api.gax.rpc.UnaryCallSettings;
4143
import com.google.api.gax.rpc.UnaryCallable;
4244
import com.google.api.gax.tracing.SpanName;
@@ -97,6 +99,7 @@
9799
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable;
98100
import com.google.cloud.bigtable.data.v2.stub.readrows.FilterMarkerRowsCallable;
99101
import com.google.cloud.bigtable.data.v2.stub.readrows.LargeReadRowsResumptionStrategy;
102+
import com.google.cloud.bigtable.data.v2.stub.readrows.MaybePointReadCallable;
100103
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsBatchingDescriptor;
101104
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsResumptionStrategy;
102105
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsRetryCompletedCallable;
@@ -119,6 +122,7 @@
119122
import java.time.Duration;
120123
import java.util.List;
121124
import java.util.Map;
125+
import java.util.Set;
122126
import java.util.concurrent.TimeUnit;
123127
import java.util.function.Function;
124128
import javax.annotation.Nonnull;
@@ -259,11 +263,20 @@ public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
259263
bigtableClientContext.getClientContext().getTracerFactory(),
260264
span);
261265

262-
return traced.withDefaultCallContext(
263-
bigtableClientContext
264-
.getClientContext()
265-
.getDefaultCallContext()
266-
.withRetrySettings(perOpSettings.readRowsSettings.getRetrySettings()));
266+
ServerStreamingCallable<Query, RowT> classic =
267+
traced.withDefaultCallContext(
268+
bigtableClientContext
269+
.getClientContext()
270+
.getDefaultCallContext()
271+
.withRetrySettings(perOpSettings.readRowsSettings.getRetrySettings()));
272+
273+
return new MaybePointReadCallable<>(
274+
classic,
275+
createPointReadCallable(
276+
rowAdapter,
277+
"ReadRows",
278+
perOpSettings.readRowsSettings.getRetrySettings(),
279+
perOpSettings.readRowsSettings.getRetryableCodes()));
267280
}
268281

269282
/**
@@ -281,13 +294,25 @@ public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
281294
* </ul>
282295
*/
283296
public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT> rowAdapter) {
297+
return createPointReadCallable(
298+
rowAdapter,
299+
"ReadRow",
300+
perOpSettings.readRowSettings.getRetrySettings(),
301+
perOpSettings.readRowSettings.getRetryableCodes());
302+
}
303+
304+
private <RowT> UnaryCallable<Query, RowT> createPointReadCallable(
305+
RowAdapter<RowT> rowAdapter,
306+
String spanName,
307+
RetrySettings retrySettings,
308+
Set<StatusCode.Code> retryableCodes) {
284309
ClientContext clientContext = bigtableClientContext.getClientContext();
285310

286311
ServerStreamingCallable<ReadRowsRequest, RowT> readRowsCallable =
287312
createReadRowsBaseCallable(
288313
ServerStreamingCallSettings.<ReadRowsRequest, Row>newBuilder()
289-
.setRetryableCodes(perOpSettings.readRowSettings.getRetryableCodes())
290-
.setRetrySettings(perOpSettings.readRowSettings.getRetrySettings())
314+
.setRetryableCodes(retryableCodes)
315+
.setRetrySettings(retrySettings)
291316
.setIdleTimeoutDuration(Duration.ZERO)
292317
.setWaitTimeoutDuration(Duration.ZERO)
293318
.build(),
@@ -302,16 +327,20 @@ public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT>
302327
BigtableUnaryOperationCallable<Query, RowT> classic =
303328
new BigtableUnaryOperationCallable<>(
304329
readRowCallable,
305-
clientContext
306-
.getDefaultCallContext()
307-
.withRetrySettings(perOpSettings.readRowSettings.getRetrySettings()),
330+
clientContext.getDefaultCallContext().withRetrySettings(retrySettings),
308331
clientContext.getTracerFactory(),
309-
getSpanName("ReadRow"),
332+
getSpanName(spanName),
310333
/* allowNoResponse= */ true);
311334

335+
UnaryCallSettings<?, ?> shimSettings =
336+
perOpSettings.readRowSettings.toBuilder()
337+
.setRetrySettings(retrySettings)
338+
.setRetryableCodes(retryableCodes)
339+
.build();
340+
312341
return bigtableClientContext
313342
.getSessionShim()
314-
.decorateReadRow(classic, rowAdapter, perOpSettings.readRowSettings);
343+
.decorateReadRow(classic, rowAdapter, shimSettings);
315344
}
316345

317346
private <ReqT, RowT> ServerStreamingCallable<ReadRowsRequest, RowT> createReadRowsBaseCallable(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigtable.data.v2.stub.readrows;
17+
18+
import com.google.api.core.ApiFuture;
19+
import com.google.api.core.ApiFutureCallback;
20+
import com.google.api.core.ApiFutures;
21+
import com.google.api.core.InternalApi;
22+
import com.google.api.gax.rpc.ApiCallContext;
23+
import com.google.api.gax.rpc.ResponseObserver;
24+
import com.google.api.gax.rpc.ServerStreamingCallable;
25+
import com.google.api.gax.rpc.StreamController;
26+
import com.google.api.gax.rpc.UnaryCallable;
27+
import com.google.cloud.bigtable.data.v2.models.Query;
28+
import com.google.common.util.concurrent.MoreExecutors;
29+
import java.util.concurrent.atomic.AtomicBoolean;
30+
import java.util.concurrent.atomic.AtomicReference;
31+
32+
/**
33+
* Routes ReadRows calls whose query identifies a single row through a unary point-read callable,
34+
* letting them benefit from the same session-shim diversion as {@code ReadRow}. Queries that cannot
35+
* be reduced to a point read fall through to the classic {@code ReadRows} callable.
36+
*/
37+
@InternalApi
38+
public class MaybePointReadCallable<RowT> extends ServerStreamingCallable<Query, RowT> {
39+
private final ServerStreamingCallable<Query, RowT> classic;
40+
private final UnaryCallable<Query, RowT> pointReader;
41+
42+
public MaybePointReadCallable(
43+
ServerStreamingCallable<Query, RowT> classic, UnaryCallable<Query, RowT> pointReader) {
44+
this.classic = classic;
45+
this.pointReader = pointReader;
46+
}
47+
48+
@Override
49+
public void call(Query request, ResponseObserver<RowT> responseObserver, ApiCallContext context) {
50+
if (!request.isSinglePointQuery()) {
51+
classic.call(request, responseObserver, context);
52+
return;
53+
}
54+
55+
AtomicBoolean cancelled = new AtomicBoolean();
56+
AtomicReference<ApiFuture<RowT>> futureRef = new AtomicReference<>();
57+
58+
responseObserver.onStart(
59+
new StreamController() {
60+
@Override
61+
public void cancel() {
62+
cancelled.set(true);
63+
ApiFuture<RowT> f = futureRef.get();
64+
if (f != null) {
65+
f.cancel(false);
66+
}
67+
}
68+
69+
@Override
70+
public void disableAutoInboundFlowControl() {}
71+
72+
@Override
73+
public void request(int count) {}
74+
});
75+
76+
ApiFuture<RowT> future;
77+
try {
78+
future = pointReader.futureCall(request, context);
79+
} catch (Throwable t) {
80+
if (!cancelled.get()) {
81+
responseObserver.onError(t);
82+
}
83+
return;
84+
}
85+
futureRef.set(future);
86+
if (cancelled.get()) {
87+
future.cancel(false);
88+
}
89+
90+
ApiFutures.addCallback(
91+
future,
92+
new ApiFutureCallback<RowT>() {
93+
@Override
94+
public void onSuccess(RowT row) {
95+
if (cancelled.get()) {
96+
return;
97+
}
98+
if (row != null) {
99+
try {
100+
responseObserver.onResponse(row);
101+
} catch (Throwable t) {
102+
responseObserver.onError(t);
103+
return;
104+
}
105+
}
106+
responseObserver.onComplete();
107+
}
108+
109+
@Override
110+
public void onFailure(Throwable t) {
111+
if (cancelled.get()) {
112+
return;
113+
}
114+
responseObserver.onError(t);
115+
}
116+
},
117+
MoreExecutors.directExecutor());
118+
}
119+
}

java-bigtable/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/QueryTest.java

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.google.bigtable.v2.RowFilter;
2424
import com.google.bigtable.v2.RowRange;
2525
import com.google.bigtable.v2.RowSet;
26+
import com.google.bigtable.v2.SessionReadRowRequest;
2627
import com.google.cloud.bigtable.data.v2.internal.ByteStringComparator;
2728
import com.google.cloud.bigtable.data.v2.internal.NameUtil;
2829
import com.google.cloud.bigtable.data.v2.internal.RequestContext;
@@ -966,4 +967,113 @@ public void testQueryReversed() {
966967
assertThat(query.toProto(requestContext))
967968
.isEqualTo(expectedReadFromTableProtoBuilder().setReversed(true).build());
968969
}
970+
971+
@Test
972+
public void isSinglePointQuery_singleRowKey() {
973+
assertThat(Query.create(TABLE_ID).rowKey("k").isSinglePointQuery()).isTrue();
974+
}
975+
976+
@Test
977+
public void isSinglePointQuery_singleClosedRange() {
978+
assertThat(
979+
Query.create(TABLE_ID)
980+
.range(ByteStringRange.unbounded().startClosed("k").endClosed("k"))
981+
.isSinglePointQuery())
982+
.isTrue();
983+
}
984+
985+
@Test
986+
public void isSinglePointQuery_emptyQuery() {
987+
assertThat(Query.create(TABLE_ID).isSinglePointQuery()).isFalse();
988+
}
989+
990+
@Test
991+
public void isSinglePointQuery_multipleRowKeys() {
992+
assertThat(Query.create(TABLE_ID).rowKey("a").rowKey("b").isSinglePointQuery()).isFalse();
993+
}
994+
995+
@Test
996+
public void isSinglePointQuery_rowKeyAndRange() {
997+
assertThat(
998+
Query.create(TABLE_ID)
999+
.rowKey("a")
1000+
.range(ByteStringRange.unbounded().startClosed("a").endClosed("a"))
1001+
.isSinglePointQuery())
1002+
.isFalse();
1003+
}
1004+
1005+
@Test
1006+
public void isSinglePointQuery_multipleRanges() {
1007+
assertThat(
1008+
Query.create(TABLE_ID)
1009+
.range(ByteStringRange.unbounded().startClosed("a").endClosed("a"))
1010+
.range(ByteStringRange.unbounded().startClosed("b").endClosed("b"))
1011+
.isSinglePointQuery())
1012+
.isFalse();
1013+
}
1014+
1015+
@Test
1016+
public void isSinglePointQuery_closedOpenRange() {
1017+
assertThat(
1018+
Query.create(TABLE_ID)
1019+
.range(ByteStringRange.unbounded().startClosed("k").endOpen("k"))
1020+
.isSinglePointQuery())
1021+
.isFalse();
1022+
}
1023+
1024+
@Test
1025+
public void isSinglePointQuery_unequalClosedRange() {
1026+
assertThat(
1027+
Query.create(TABLE_ID)
1028+
.range(ByteStringRange.unbounded().startClosed("a").endClosed("b"))
1029+
.isSinglePointQuery())
1030+
.isFalse();
1031+
}
1032+
1033+
@Test
1034+
public void isSinglePointQuery_prefixRange() {
1035+
assertThat(Query.create(TABLE_ID).prefix("k").isSinglePointQuery()).isFalse();
1036+
}
1037+
1038+
@Test
1039+
public void toSessionPointProto_fromRowKey() {
1040+
Query query = Query.create(TABLE_ID).rowKey("the-key");
1041+
assertThat(query.toSessionPointProto())
1042+
.isEqualTo(
1043+
SessionReadRowRequest.newBuilder()
1044+
.setKey(ByteString.copyFromUtf8("the-key"))
1045+
.setFilter(RowFilter.getDefaultInstance())
1046+
.build());
1047+
}
1048+
1049+
@Test
1050+
public void toSessionPointProto_fromClosedRange() {
1051+
Query query =
1052+
Query.create(TABLE_ID)
1053+
.range(ByteStringRange.unbounded().startClosed("the-key").endClosed("the-key"));
1054+
assertThat(query.toSessionPointProto())
1055+
.isEqualTo(
1056+
SessionReadRowRequest.newBuilder()
1057+
.setKey(ByteString.copyFromUtf8("the-key"))
1058+
.setFilter(RowFilter.getDefaultInstance())
1059+
.build());
1060+
}
1061+
1062+
@Test
1063+
public void toSessionPointProto_preservesFilter() {
1064+
RowFilter filter = FILTERS.key().regex("regex").toProto();
1065+
Query query = Query.create(TABLE_ID).rowKey("the-key").filter(FILTERS.key().regex("regex"));
1066+
assertThat(query.toSessionPointProto())
1067+
.isEqualTo(
1068+
SessionReadRowRequest.newBuilder()
1069+
.setKey(ByteString.copyFromUtf8("the-key"))
1070+
.setFilter(filter)
1071+
.build());
1072+
}
1073+
1074+
@Test
1075+
public void toSessionPointProto_rejectsNonSinglePointQuery() {
1076+
Query query = Query.create(TABLE_ID).rowKey("a").rowKey("b");
1077+
assertThrows(IllegalStateException.class, query::toSessionPointProto);
1078+
}
9691079
}

0 commit comments

Comments
 (0)