Skip to content

Commit bfdac1a

Browse files
committed
add member variable to keep track of source split result
1 parent 3fa8b0c commit bfdac1a

1 file changed

Lines changed: 8 additions & 9 deletions

File tree

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ abstract class BigQueryStorageSourceBase<T> extends BoundedSource<T> {
7474
protected final Coder<T> outputCoder;
7575
protected final BigQueryServices bqServices;
7676
private final @Nullable TimestampPrecision picosTimestampPrecision;
77+
private boolean emptyOrPruned = false;
7778

7879
BigQueryStorageSourceBase(
7980
@Nullable DataFormat format,
@@ -181,8 +182,10 @@ public List<BigQueryStorageStreamSource<T>> split(
181182
if (readSession.getStreamsList().isEmpty()) {
182183
LOG.info(
183184
"Returned stream list is empty. The underlying table is empty or all rows have been pruned.");
185+
emptyOrPruned = true;
184186
return ImmutableList.of();
185187
} else {
188+
emptyOrPruned = false;
186189
LOG.info("Read session returned {} streams", readSession.getStreamsList().size());
187190
}
188191

@@ -205,15 +208,11 @@ public List<BigQueryStorageStreamSource<T>> split(
205208

206209
@Override
207210
public BoundedReader<T> createReader(PipelineOptions options) throws IOException {
208-
try {
209-
if (split(0, options).isEmpty()) {
210-
return new EmptyReader<>(this);
211-
}
212-
} catch (Exception e) {
213-
// If split fails, we can't be sure if it's empty or not.
214-
// For backwards compatibility with tests that don't mock everything,
215-
// we still throw UnsupportedOperationException.
216-
LOG.debug("Split failed during createReader emptiness check", e);
211+
if (emptyOrPruned) {
212+
// When split() returns an empty list, UnboundedReadFromBoundedSource falls back to wrapping
213+
// the original unsplit source directly (ImmutableList.of(bigQuerySotrageSourceBase)) so we
214+
// need to return empty reader.
215+
return new EmptyReader<>(this);
217216
}
218217
throw new UnsupportedOperationException("BigQuery storage source must be split before reading");
219218
}

0 commit comments

Comments
 (0)