Skip to content

Commit 1f3d665

Browse files
authored
Expose StorageWriteApiMaxRequestCallbackWaitTimeSec in BQ storage write. (#38470)
1 parent cf5e517 commit 1f3d665

2 files changed

Lines changed: 17 additions & 0 deletions

File tree

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,13 @@ public interface BigQueryOptions
153153

154154
void setStorageWriteMaxInflightBytes(Long value);
155155

156+
@Description(
157+
"Maximum time in seconds a Storage Write API append request is allowed to wait in the "
158+
+ "request callback queue before timing out. Overrides Storage Write API default (5 min)")
159+
Integer getStorageWriteApiMaxRequestCallbackWaitTimeSec();
160+
161+
void setStorageWriteApiMaxRequestCallbackWaitTimeSec(Integer value);
162+
156163
@Description(
157164
"Enables multiplexing mode, where multiple tables can share the same connection. Only available when writing with STORAGE_API_AT_LEAST_ONCE"
158165
+ " mode. This is recommended if your write operation is creating 20+ connections. When using multiplexing, consider tuning "

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1496,6 +1496,7 @@ public static class WriteStreamServiceImpl implements WriteStreamService {
14961496
private final BigQueryWriteClient newWriteClient;
14971497
private final long storageWriteMaxInflightRequests;
14981498
private final long storageWriteMaxInflightBytes;
1499+
private final @Nullable Integer storageWriteApiMaxRequestCallbackWaitTimeSec;
14991500
private final BigQueryIOMetadata bqIOMetadata;
15001501
private final PipelineOptions options;
15011502

@@ -1506,6 +1507,8 @@ public static class WriteStreamServiceImpl implements WriteStreamService {
15061507
this.options = options;
15071508
this.storageWriteMaxInflightRequests = bqOptions.getStorageWriteMaxInflightRequests();
15081509
this.storageWriteMaxInflightBytes = bqOptions.getStorageWriteMaxInflightBytes();
1510+
this.storageWriteApiMaxRequestCallbackWaitTimeSec =
1511+
bqOptions.getStorageWriteApiMaxRequestCallbackWaitTimeSec();
15091512
this.bqIOMetadata = BigQueryIOMetadata.create();
15101513
}
15111514

@@ -1514,6 +1517,8 @@ public WriteStreamServiceImpl(BigQueryOptions bqOptions) {
15141517
this.options = bqOptions;
15151518
this.storageWriteMaxInflightRequests = bqOptions.getStorageWriteMaxInflightRequests();
15161519
this.storageWriteMaxInflightBytes = bqOptions.getStorageWriteMaxInflightBytes();
1520+
this.storageWriteApiMaxRequestCallbackWaitTimeSec =
1521+
bqOptions.getStorageWriteApiMaxRequestCallbackWaitTimeSec();
15171522
this.bqIOMetadata = BigQueryIOMetadata.create();
15181523
}
15191524

@@ -1578,6 +1583,11 @@ public StreamAppendClient getStreamAppendClient(
15781583
options.as(BigQueryOptions.class).getMaxConnectionPoolConnections())
15791584
.build());
15801585

1586+
if (storageWriteApiMaxRequestCallbackWaitTimeSec != null) {
1587+
StreamWriter.setMaxRequestCallbackWaitTime(
1588+
java.time.Duration.ofSeconds(storageWriteApiMaxRequestCallbackWaitTimeSec));
1589+
}
1590+
15811591
StreamWriter streamWriter =
15821592
StreamWriter.newBuilder(streamName, newWriteClient)
15831593
.setExecutorProvider(

0 commit comments

Comments
 (0)