Skip to content

Commit 1a6f4d5

Browse files
authored
fix(bigquery): Update fast query path to allow jobTimeoutMs in the request (#13502)
1 parent 64cde7f commit 1a6f4d5

6 files changed

Lines changed: 280 additions & 150 deletions

File tree

java-bigquery/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2169,11 +2169,15 @@ && getOptions().getOpenTelemetryTracer() != null) {
21692169
.startSpan();
21702170
}
21712171
try (Scope queryScope = querySpan != null ? querySpan.makeCurrent() : null) {
2172-
// If all parameters passed in configuration are supported by the query() method on the
2173-
// backend, put on fast path
2172+
// The fast query path (jobs.query API) is preferred to reduce latency by avoiding
2173+
// the slow fallback path (jobs.insert API). We will opt to use it if the configuration
2174+
// and JobId allow (i.e. if all parameters passed in configuration are supported).
21742175
QueryRequestInfo requestInfo =
21752176
new QueryRequestInfo(configuration, getOptions().getDataFormatOptions());
2176-
if (requestInfo.isFastQuerySupported(jobId)) {
2177+
// Fast query path is not possible if job is specified in the JobID object.
2178+
// Respect Job field value in JobId specified by user.
2179+
// Specifying it will force the query to take the slower path.
2180+
if (requestInfo.isFastQuerySupported() && (jobId == null || jobId.getJob() == null)) {
21772181
// Be careful when setting the projectID in JobId, if a projectID is specified in the JobId,
21782182
// the job created by the query method will use that project. This may cause the query to
21792183
// fail with "Access denied" if the project do not have enough permissions to run the job.

java-bigquery/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,8 @@ private BigQueryResult getExecuteSelectResponse(
245245
labelMap = labels[0];
246246
}
247247
try {
248-
// use jobs.query if possible
248+
// The fast query path (jobs.query API) is preferred to reduce latency by avoiding
249+
// the slow fallback path (jobs.insert API). We will opt to use it if possible.
249250
if (isFastQuerySupported()) {
250251
logger.log(Level.INFO, "\n Using Fast Query Path");
251252
final String projectId = bigQueryOptions.getProjectId();
@@ -810,7 +811,8 @@ void flagEndOfStream() { // package-private
810811
Level.WARNING,
811812
"\n"
812813
+ Thread.currentThread().getName()
813-
+ " Could not flag End of Stream, both the buffer types are null. This might happen when the connection is close without executing a query");
814+
+ " Could not flag End of Stream, both the buffer types are null. This might happen"
815+
+ " when the connection is close without executing a query");
814816
}
815817
} catch (InterruptedException e) {
816818
logger.log(
@@ -1260,7 +1262,6 @@ boolean isFastQuerySupported() {
12601262
&& connectionSettings.getCreateDisposition() == null
12611263
&& connectionSettings.getDestinationEncryptionConfiguration() == null
12621264
&& connectionSettings.getDestinationTable() == null
1263-
&& connectionSettings.getJobTimeoutMs() == null
12641265
&& connectionSettings.getMaximumBillingTier() == null
12651266
&& connectionSettings.getPriority() == null
12661267
&& connectionSettings.getRangePartitioning() == null
@@ -1361,6 +1362,9 @@ QueryRequest createQueryRequest(
13611362
content.setRequestId(requestId);
13621363
// The new Connection interface only supports StandardSQL dialect
13631364
content.setUseLegacySql(false);
1365+
if (connectionSettings.getJobTimeoutMs() != null) {
1366+
content.setJobTimeoutMs(connectionSettings.getJobTimeoutMs());
1367+
}
13641368
return content;
13651369
}
13661370

java-bigquery/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/QueryRequestInfo.java

Lines changed: 46 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@
2121
import com.google.api.services.bigquery.model.QueryRequest;
2222
import com.google.cloud.bigquery.QueryJobConfiguration.JobCreationMode;
2323
import com.google.common.base.MoreObjects;
24-
import com.google.common.base.Objects;
2524
import com.google.common.collect.Lists;
2625
import java.util.List;
2726
import java.util.Map;
27+
import java.util.Objects;
2828
import java.util.UUID;
2929

3030
final class QueryRequestInfo {
@@ -45,6 +45,7 @@ final class QueryRequestInfo {
4545
private final JobCreationMode jobCreationMode;
4646
private final DataFormatOptions formatOptions;
4747
private final String reservation;
48+
private final Long jobTimeoutMs;
4849

4950
QueryRequestInfo(
5051
QueryJobConfiguration config, com.google.cloud.bigquery.DataFormatOptions dataFormatOptions) {
@@ -64,22 +65,26 @@ final class QueryRequestInfo {
6465
this.jobCreationMode = config.getJobCreationMode();
6566
this.formatOptions = dataFormatOptions.toPb();
6667
this.reservation = config.getReservation();
68+
this.jobTimeoutMs = config.getJobTimeoutMs();
6769
}
6870

69-
boolean isFastQuerySupported(JobId jobId) {
70-
// Fast query path is not possible if job is specified in the JobID object
71-
// Respect Job field value in JobId specified by user.
72-
// Specifying it will force the query to take the slower path.
73-
if (jobId != null) {
74-
if (jobId.getJob() != null) {
75-
return false;
76-
}
77-
}
71+
/**
72+
* Determines if the query can be executed via the "fast query" path (jobs.query API) instead of
73+
* the "slow path" (jobs.insert API followed by jobs.getQueryResults).
74+
*
75+
* <p>The fast query path is preferred because it completes in a single RPC, significantly
76+
* reducing end-to-end latency for small queries.
77+
*
78+
* <p>However, the jobs.query API does not support all configuration options available in
79+
* jobs.insert (e.g., destination table, clustering, time partitioning). This method checks the
80+
* QueryJobConfiguration for any unsupported options. If any are present, we must fall back to the
81+
* jobs.insert path.
82+
*/
83+
boolean isFastQuerySupported() {
7884
return config.getClustering() == null
7985
&& config.getCreateDisposition() == null
8086
&& config.getDestinationEncryptionConfiguration() == null
8187
&& config.getDestinationTable() == null
82-
&& config.getJobTimeoutMs() == null
8388
&& config.getMaximumBillingTier() == null
8489
&& config.getPriority() == null
8590
&& config.getRangePartitioning() == null
@@ -134,6 +139,9 @@ QueryRequest toPb() {
134139
if (reservation != null) {
135140
request.setReservation(reservation);
136141
}
142+
if (jobTimeoutMs != null) {
143+
request.setJobTimeoutMs(jobTimeoutMs);
144+
}
137145
return request;
138146
}
139147

@@ -155,12 +163,13 @@ public String toString() {
155163
.add("jobCreationMode", jobCreationMode)
156164
.add("formatOptions", formatOptions.getUseInt64Timestamp())
157165
.add("reservation", reservation)
166+
.add("jobTimeoutMs", jobTimeoutMs)
158167
.toString();
159168
}
160169

161170
@Override
162171
public int hashCode() {
163-
return Objects.hashCode(
172+
return Objects.hash(
164173
connectionProperties,
165174
defaultDataset,
166175
dryRun,
@@ -175,14 +184,34 @@ public int hashCode() {
175184
useLegacySql,
176185
jobCreationMode,
177186
formatOptions,
178-
reservation);
187+
reservation,
188+
jobTimeoutMs);
179189
}
180190

181191
@Override
182192
public boolean equals(Object obj) {
183-
return obj == this
184-
|| obj != null
185-
&& obj.getClass().equals(QueryRequestInfo.class)
186-
&& java.util.Objects.equals(toPb(), ((QueryRequestInfo) obj).toPb());
193+
if (obj == this) {
194+
return true;
195+
}
196+
if (obj == null || !obj.getClass().equals(QueryRequestInfo.class)) {
197+
return false;
198+
}
199+
QueryRequestInfo other = (QueryRequestInfo) obj;
200+
return Objects.equals(connectionProperties, other.connectionProperties)
201+
&& Objects.equals(defaultDataset, other.defaultDataset)
202+
&& Objects.equals(dryRun, other.dryRun)
203+
&& Objects.equals(labels, other.labels)
204+
&& Objects.equals(maximumBytesBilled, other.maximumBytesBilled)
205+
&& Objects.equals(maxResults, other.maxResults)
206+
&& Objects.equals(query, other.query)
207+
&& Objects.equals(queryParameters, other.queryParameters)
208+
&& Objects.equals(requestId, other.requestId)
209+
&& Objects.equals(createSession, other.createSession)
210+
&& Objects.equals(useQueryCache, other.useQueryCache)
211+
&& Objects.equals(useLegacySql, other.useLegacySql)
212+
&& Objects.equals(jobCreationMode, other.jobCreationMode)
213+
&& Objects.equals(formatOptions, other.formatOptions)
214+
&& Objects.equals(reservation, other.reservation)
215+
&& Objects.equals(jobTimeoutMs, other.jobTimeoutMs);
187216
}
188217
}

java-bigquery/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,12 @@ public class BigQueryImplTest {
240240
.setDefaultDataset(DatasetId.of(PROJECT, DATASET))
241241
.setUseQueryCache(false)
242242
.build();
243+
private static final QueryJobConfiguration QUERY_JOB_CONFIGURATION_WITH_TIMEOUT =
244+
QueryJobConfiguration.newBuilder("SQL")
245+
.setDefaultDataset(DatasetId.of(PROJECT, DATASET))
246+
.setUseQueryCache(false)
247+
.setJobTimeoutMs(1000L)
248+
.build();
243249
private static final QueryJobConfiguration QUERY_JOB_CONFIGURATION_FOR_DMLQUERY =
244250
QueryJobConfiguration.newBuilder("DML")
245251
.setDefaultDataset(DatasetId.of(PROJECT, DATASET))
@@ -537,7 +543,8 @@ public class BigQueryImplTest {
537543
private HttpBigQueryRpc bigqueryRpcMock;
538544
private BigQuery bigquery;
539545
private static final String RATE_LIMIT_ERROR_MSG =
540-
"Job exceeded rate limits: Your table exceeded quota for table update operations. For more information, see https://cloud.google.com/bigquery/docs/troubleshoot-quotas";
546+
"Job exceeded rate limits: Your table exceeded quota for table update operations. For more"
547+
+ " information, see https://cloud.google.com/bigquery/docs/troubleshoot-quotas";
541548

542549
@Captor private ArgumentCaptor<Map<BigQueryRpc.Option, Object>> capturedOptions;
543550
@Captor private ArgumentCaptor<com.google.api.services.bigquery.model.Job> jobCapture;
@@ -2394,6 +2401,49 @@ void testFastQueryRequestCompleted() throws InterruptedException, IOException {
23942401
.queryRpcSkipExceptionTranslation(eq(PROJECT), requestPbCapture.capture());
23952402
}
23962403

2404+
@Test
2405+
void testFastQueryRequestCompletedWithTimeout() throws InterruptedException, IOException {
2406+
com.google.api.services.bigquery.model.QueryResponse queryResponsePb =
2407+
new com.google.api.services.bigquery.model.QueryResponse()
2408+
.setCacheHit(false)
2409+
.setJobComplete(true)
2410+
.setKind("bigquery#queryResponse")
2411+
.setPageToken(null)
2412+
.setRows(ImmutableList.of(TABLE_ROW))
2413+
.setSchema(TABLE_SCHEMA.toPb())
2414+
.setTotalBytesProcessed(42L)
2415+
.setTotalRows(BigInteger.valueOf(1L));
2416+
2417+
when(bigqueryRpcMock.queryRpcSkipExceptionTranslation(eq(PROJECT), requestPbCapture.capture()))
2418+
.thenReturn(queryResponsePb);
2419+
2420+
bigquery = options.getService();
2421+
TableResult result = bigquery.query(QUERY_JOB_CONFIGURATION_WITH_TIMEOUT);
2422+
assertNull(result.getNextPage());
2423+
assertNull(result.getNextPageToken());
2424+
assertFalse(result.hasNextPage());
2425+
assertThat(result.getSchema()).isEqualTo(TABLE_SCHEMA);
2426+
assertThat(result.getTotalRows()).isEqualTo(1);
2427+
for (FieldValueList row : result.getValues()) {
2428+
assertThat(row.get(0).getBooleanValue()).isFalse();
2429+
assertThat(row.get(1).getLongValue()).isEqualTo(1);
2430+
}
2431+
2432+
QueryRequest requestPb = requestPbCapture.getValue();
2433+
assertEquals(QUERY_JOB_CONFIGURATION_WITH_TIMEOUT.getQuery(), requestPb.getQuery());
2434+
assertEquals(
2435+
QUERY_JOB_CONFIGURATION_WITH_TIMEOUT.getDefaultDataset().getDataset(),
2436+
requestPb.getDefaultDataset().getDatasetId());
2437+
assertEquals(
2438+
QUERY_JOB_CONFIGURATION_WITH_TIMEOUT.useQueryCache(), requestPb.getUseQueryCache());
2439+
assertEquals(
2440+
QUERY_JOB_CONFIGURATION_WITH_TIMEOUT.getJobTimeoutMs(), requestPb.getJobTimeoutMs());
2441+
assertNull(requestPb.getLocation());
2442+
2443+
verify(bigqueryRpcMock)
2444+
.queryRpcSkipExceptionTranslation(eq(PROJECT), requestPbCapture.capture());
2445+
}
2446+
23972447
@Test
23982448
void testQueryRequestRequiredJobCreationCompleted() throws InterruptedException, IOException {
23992449
JobId queryJob = JobId.of(PROJECT, JOB);
@@ -3072,7 +3122,8 @@ void testFastQueryRateLimitIdempotency() throws Exception {
30723122
@Test
30733123
void testRateLimitRegEx() throws Exception {
30743124
String msg2 =
3075-
"Job eceeded rate limits: Your table exceeded quota for table update operations. For more information, see https://cloud.google.com/bigquery/docs/troubleshoot-quotas";
3125+
"Job eceeded rate limits: Your table exceeded quota for table update operations. For more"
3126+
+ " information, see https://cloud.google.com/bigquery/docs/troubleshoot-quotas";
30763127
String msg3 = "exceeded rate exceeded quota for table update";
30773128
String msg4 = "exceeded rate limits";
30783129
assertTrue(

java-bigquery/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/QueryRequestInfoTest.java

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,12 @@ public class QueryRequestInfoTest {
159159
QueryRequestInfo REQUEST_INFO_SUPPORTED =
160160
new QueryRequestInfo(
161161
QUERY_JOB_CONFIGURATION_SUPPORTED, DataFormatOptions.newBuilder().build());
162+
private static final QueryJobConfiguration QUERY_JOB_CONFIGURATION_WITH_TIMEOUT =
163+
QUERY_JOB_CONFIGURATION_SUPPORTED.toBuilder().setJobTimeoutMs(TIMEOUT).build();
164+
QueryRequestInfo REQUEST_INFO_WITH_TIMEOUT =
165+
new QueryRequestInfo(
166+
QUERY_JOB_CONFIGURATION_WITH_TIMEOUT, DataFormatOptions.newBuilder().build());
167+
162168
private static final QueryJobConfiguration QUERY_JOB_CONFIGURATION_REQUIRED_SUPPORTED =
163169
QUERY_JOB_CONFIGURATION_SUPPORTED.toBuilder()
164170
.setJobCreationMode(JobCreationMode.JOB_CREATION_REQUIRED)
@@ -169,20 +175,18 @@ public class QueryRequestInfoTest {
169175

170176
@Test
171177
public void testIsFastQuerySupported() {
172-
JobId jobIdSupported = JobId.newBuilder().build();
173-
JobId jobIdNotSupported = JobId.newBuilder().setJob("random-job-id").build();
174-
assertEquals(false, REQUEST_INFO.isFastQuerySupported(jobIdSupported));
175-
assertEquals(true, REQUEST_INFO_SUPPORTED.isFastQuerySupported(jobIdSupported));
176-
assertEquals(true, REQUEST_INFO_REQUIRED_SUPPORTED.isFastQuerySupported(jobIdSupported));
177-
assertEquals(false, REQUEST_INFO.isFastQuerySupported(jobIdNotSupported));
178-
assertEquals(false, REQUEST_INFO_SUPPORTED.isFastQuerySupported(jobIdNotSupported));
179-
assertEquals(false, REQUEST_INFO_REQUIRED_SUPPORTED.isFastQuerySupported(jobIdNotSupported));
178+
assertFalse(REQUEST_INFO.isFastQuerySupported());
179+
assertTrue(REQUEST_INFO_SUPPORTED.isFastQuerySupported());
180+
assertTrue(REQUEST_INFO_WITH_TIMEOUT.isFastQuerySupported());
181+
assertTrue(REQUEST_INFO_REQUIRED_SUPPORTED.isFastQuerySupported());
180182
}
181183

182184
@Test
183185
public void testToPb() {
184186
QueryRequest requestPb = REQUEST_INFO.toPb();
185187
assertEquals(requestPb, REQUEST_INFO.toPb());
188+
QueryRequest requestWithTimeoutPb = REQUEST_INFO_WITH_TIMEOUT.toPb();
189+
assertEquals(TIMEOUT, requestWithTimeoutPb.getJobTimeoutMs());
186190
}
187191

188192
@Test
@@ -194,6 +198,14 @@ public void equalTo() {
194198
compareQueryRequestInfo(
195199
new QueryRequestInfo(QUERY_JOB_CONFIGURATION, DataFormatOptions.newBuilder().build()),
196200
REQUEST_INFO);
201+
compareQueryRequestInfo(
202+
new QueryRequestInfo(
203+
QUERY_JOB_CONFIGURATION_WITH_TIMEOUT, DataFormatOptions.newBuilder().build()),
204+
REQUEST_INFO_WITH_TIMEOUT);
205+
compareQueryRequestInfo(
206+
new QueryRequestInfo(
207+
QUERY_JOB_CONFIGURATION_REQUIRED_SUPPORTED, DataFormatOptions.newBuilder().build()),
208+
REQUEST_INFO_REQUIRED_SUPPORTED);
197209
}
198210

199211
@Test
@@ -237,5 +249,6 @@ private void compareQueryRequestInfo(QueryRequestInfo expected, QueryRequestInfo
237249
assertEquals(expectedQueryReq.get("jobCreationMode"), actualQueryReq.get("jobCreationMode"));
238250
assertEquals(expectedQueryReq.getFormatOptions(), actualQueryReq.getFormatOptions());
239251
assertEquals(expectedQueryReq.getReservation(), actualQueryReq.getReservation());
252+
assertEquals(expectedQueryReq.getJobTimeoutMs(), actualQueryReq.getJobTimeoutMs());
240253
}
241254
}

0 commit comments

Comments
 (0)