Skip to content

Commit b7c87f2

Browse files
authored
Merge pull request #38009 from stankiewicz/fix_bq_empty_table
[BigQueryIO] Support reading from empty tables using Storage Read API in streaming pipelines
2 parents 9ea8612 + bfdac1a commit b7c87f2

2 files changed

Lines changed: 92 additions & 0 deletions

File tree

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.google.cloud.bigquery.storage.v1.ReadStream;
3131
import java.io.IOException;
3232
import java.util.List;
33+
import java.util.NoSuchElementException;
3334
import org.apache.beam.sdk.coders.Coder;
3435
import org.apache.beam.sdk.io.BoundedSource;
3536
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StorageClient;
@@ -41,6 +42,7 @@
4142
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
4243
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
4344
import org.checkerframework.checker.nullness.qual.Nullable;
45+
import org.joda.time.Instant;
4446
import org.slf4j.Logger;
4547
import org.slf4j.LoggerFactory;
4648

@@ -72,6 +74,7 @@ abstract class BigQueryStorageSourceBase<T> extends BoundedSource<T> {
7274
protected final Coder<T> outputCoder;
7375
protected final BigQueryServices bqServices;
7476
private final @Nullable TimestampPrecision picosTimestampPrecision;
77+
private boolean emptyOrPruned = false;
7578

7679
BigQueryStorageSourceBase(
7780
@Nullable DataFormat format,
@@ -193,8 +196,10 @@ public List<BigQueryStorageStreamSource<T>> split(
193196
if (readSession.getStreamsList().isEmpty()) {
194197
LOG.info(
195198
"Returned stream list is empty. The underlying table is empty or all rows have been pruned.");
199+
emptyOrPruned = true;
196200
return ImmutableList.of();
197201
} else {
202+
emptyOrPruned = false;
198203
LOG.info("Read session returned {} streams", readSession.getStreamsList().size());
199204
}
200205

@@ -217,9 +222,51 @@ public List<BigQueryStorageStreamSource<T>> split(
217222

218223
@Override
219224
public BoundedReader<T> createReader(PipelineOptions options) throws IOException {
225+
if (emptyOrPruned) {
226+
// When split() returns an empty list, UnboundedReadFromBoundedSource falls back to wrapping
227+
// the original unsplit source directly (ImmutableList.of(bigQuerySotrageSourceBase)) so we
228+
// need to return empty reader.
229+
return new EmptyReader<>(this);
230+
}
220231
throw new UnsupportedOperationException("BigQuery storage source must be split before reading");
221232
}
222233

234+
private static class EmptyReader<T> extends BoundedReader<T> {
235+
private final BigQueryStorageSourceBase<T> source;
236+
237+
EmptyReader(BigQueryStorageSourceBase<T> source) {
238+
this.source = source;
239+
}
240+
241+
@Override
242+
public boolean start() throws IOException {
243+
return false;
244+
}
245+
246+
@Override
247+
public boolean advance() throws IOException {
248+
return false;
249+
}
250+
251+
@Override
252+
public T getCurrent() throws NoSuchElementException {
253+
throw new NoSuchElementException();
254+
}
255+
256+
@Override
257+
public Instant getCurrentTimestamp() throws NoSuchElementException {
258+
throw new NoSuchElementException();
259+
}
260+
261+
@Override
262+
public void close() throws IOException {}
263+
264+
@Override
265+
public BoundedSource<T> getCurrentSource() {
266+
return source;
267+
}
268+
}
269+
223270
private void setPicosTimestampPrecision(
224271
ReadSession.TableReadOptions.Builder tableReadOptionsBuilder, DataFormat dataFormat) {
225272
if (picosTimestampPrecision == null) {

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@
9999
import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
100100
import org.apache.beam.sdk.io.BoundedSource;
101101
import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
102+
import org.apache.beam.sdk.io.UnboundedSource;
102103
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TableRowParser;
103104
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
104105
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
@@ -120,6 +121,7 @@
120121
import org.apache.beam.sdk.transforms.PTransform;
121122
import org.apache.beam.sdk.transforms.SerializableFunction;
122123
import org.apache.beam.sdk.transforms.display.DisplayData;
124+
import org.apache.beam.sdk.util.construction.UnboundedReadFromBoundedSource;
123125
import org.apache.beam.sdk.values.KV;
124126
import org.apache.beam.sdk.values.PBegin;
125127
import org.apache.beam.sdk.values.PCollection;
@@ -705,6 +707,49 @@ public void testTableSourceCreateReader() throws Exception {
705707
tableSource.createReader(options);
706708
}
707709

710+
@Test
711+
public void testUnboundedReadFromBoundedSourceWithEmptyTable() throws Exception {
712+
fakeDatasetService.createDataset("project-id", "dataset", "", "", null);
713+
TableReference tableRef = BigQueryHelpers.parseTableSpec("project-id:dataset.table");
714+
715+
Table table =
716+
new Table().setTableReference(tableRef).setNumBytes(0L).setSchema(new TableSchema());
717+
718+
fakeDatasetService.createTable(table);
719+
720+
ReadSession emptyReadSession = ReadSession.newBuilder().build();
721+
StorageClient fakeStorageClient = mock(StorageClient.class);
722+
when(fakeStorageClient.createReadSession(any())).thenReturn(emptyReadSession);
723+
724+
BigQueryStorageTableSource<TableRow> tableSource =
725+
BigQueryStorageTableSource.create(
726+
ValueProvider.StaticValueProvider.of(tableRef),
727+
null,
728+
null,
729+
new TableRowParser(),
730+
TableRowJsonCoder.of(),
731+
new FakeBigQueryServices()
732+
.withDatasetService(fakeDatasetService)
733+
.withStorageClient(fakeStorageClient));
734+
735+
// This simulates what happens in a streaming pipeline when BoundedSource is used
736+
UnboundedSource<TableRow, ?> unboundedSource =
737+
new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter<>(tableSource);
738+
739+
// Initial split
740+
List<? extends UnboundedSource<TableRow, ?>> splits = unboundedSource.split(1, options);
741+
// Because tableSource.split returns empty list, BoundedToUnboundedSourceAdapter falls back to
742+
// returning itself
743+
assertEquals(1, splits.size());
744+
UnboundedSource<TableRow, ?> splitSource = splits.get(0);
745+
746+
// Create reader
747+
UnboundedSource.UnboundedReader<TableRow> reader = splitSource.createReader(options, null);
748+
749+
// This should NOT throw UnsupportedOperationException
750+
assertFalse(reader.start());
751+
}
752+
708753
private static GenericRecord createRecord(String name, Schema schema) {
709754
GenericRecord genericRecord = new Record(schema);
710755
genericRecord.put("name", name);

0 commit comments

Comments
 (0)