Skip to content

Commit c006ae1

Browse files
committed
add BQ project detect and test
1 parent af00254 commit c006ae1

2 files changed

Lines changed: 79 additions & 5 deletions

File tree

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -156,13 +156,27 @@ public List<BigQueryStorageStreamSource<T>> split(
156156
streamCount = Math.max(streamCount, MIN_SPLIT_COUNT);
157157
}
158158

159+
String project =
160+
bqOptions.getBigQueryProject() == null
161+
? bqOptions.getProject()
162+
: bqOptions.getBigQueryProject();
163+
if (project == null) {
164+
if (targetTable != null
165+
&& targetTable.getTableReference() != null
166+
&& targetTable.getTableReference().getProjectId() != null) {
167+
project = targetTable.getTableReference().getProjectId();
168+
} else {
169+
@Nullable String tableReferenceId = getTargetTableId(bqOptions);
170+
if (tableReferenceId != null) {
171+
TableReference tableReference = BigQueryHelpers.parseTableUrn(tableReferenceId);
172+
project = tableReference.getProjectId();
173+
}
174+
}
175+
}
176+
159177
CreateReadSessionRequest createReadSessionRequest =
160178
CreateReadSessionRequest.newBuilder()
161-
.setParent(
162-
BigQueryHelpers.toProjectResourceName(
163-
bqOptions.getBigQueryProject() == null
164-
? bqOptions.getProject()
165-
: bqOptions.getBigQueryProject()))
179+
.setParent(BigQueryHelpers.toProjectResourceName(project))
166180
.setReadSession(readSessionBuilder)
167181
.setMaxStreamCount(streamCount)
168182
.build();

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1600,6 +1600,66 @@ public void testReadFromBigQueryIO() throws Exception {
16001600
p.run();
16011601
}
16021602

1603+
@Test
1604+
public void testReadFromBigQueryIOWithFallbackProject() throws Exception {
1605+
fakeDatasetService.createDataset("fallback-project", "dataset", "", "", null);
1606+
TableReference tableRef = BigQueryHelpers.parseTableSpec("fallback-project:dataset.table");
1607+
Table table = new Table().setTableReference(tableRef).setNumBytes(10L).setSchema(TABLE_SCHEMA);
1608+
fakeDatasetService.createTable(table);
1609+
1610+
CreateReadSessionRequest expectedCreateReadSessionRequest =
1611+
CreateReadSessionRequest.newBuilder()
1612+
.setParent("projects/fallback-project")
1613+
.setReadSession(
1614+
ReadSession.newBuilder()
1615+
.setTable("projects/fallback-project/datasets/dataset/tables/table")
1616+
.setDataFormat(DataFormat.AVRO)
1617+
.setReadOptions(ReadSession.TableReadOptions.newBuilder()))
1618+
.setMaxStreamCount(10)
1619+
.build();
1620+
1621+
ReadSession readSession =
1622+
ReadSession.newBuilder()
1623+
.setName("readSessionName")
1624+
.setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING))
1625+
.addStreams(ReadStream.newBuilder().setName("streamName"))
1626+
.setDataFormat(DataFormat.AVRO)
1627+
.build();
1628+
1629+
ReadRowsRequest expectedReadRowsRequest =
1630+
ReadRowsRequest.newBuilder().setReadStream("streamName").build();
1631+
1632+
List<GenericRecord> records = Lists.newArrayList(createRecord("A", 1, AVRO_SCHEMA));
1633+
1634+
List<ReadRowsResponse> readRowsResponses =
1635+
Lists.newArrayList(createResponse(AVRO_SCHEMA, records.subList(0, 1), 0.0, 1.0));
1636+
1637+
StorageClient fakeStorageClient = mock(StorageClient.class, withSettings().serializable());
1638+
when(fakeStorageClient.createReadSession(expectedCreateReadSessionRequest))
1639+
.thenReturn(readSession);
1640+
when(fakeStorageClient.readRows(expectedReadRowsRequest, ""))
1641+
.thenReturn(new FakeBigQueryServerStream<>(readRowsResponses));
1642+
1643+
// Explicitly set the pipeline's project option to null to simulate missing
1644+
// cross-language parameters, and verify it uses the project from the TableReference.
1645+
options.as(BigQueryOptions.class).setProject(null);
1646+
1647+
PCollection<KV<String, Long>> output =
1648+
p.apply(
1649+
BigQueryIO.read(new ParseKeyValue())
1650+
.from("fallback-project:dataset.table")
1651+
.withMethod(Method.DIRECT_READ)
1652+
.withFormat(DataFormat.AVRO)
1653+
.withTestServices(
1654+
new FakeBigQueryServices()
1655+
.withDatasetService(fakeDatasetService)
1656+
.withStorageClient(fakeStorageClient)));
1657+
1658+
PAssert.that(output).containsInAnyOrder(ImmutableList.of(KV.of("A", 1L)));
1659+
1660+
p.run();
1661+
}
1662+
16031663
@Test
16041664
public void testReadFromBigQueryIOWithTrimmedSchema() throws Exception {
16051665
fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null);

0 commit comments

Comments
 (0)