Skip to content

Commit 7c2b57e

Browse files
authored
Adding GCP Spanner Change Stream support for Python (#35453)
* Adding Spanner Change Stream support to python. * Updating CHANGES.md * Fixing python format * Modify CHANGES.md * Restarting Tests * Retesting
1 parent adbb9e4 commit 7c2b57e

5 files changed

Lines changed: 391 additions & 3 deletions

File tree

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@
180180
* [Python] Prism runner now auto-enabled for some Python pipelines using the direct runner ([#34921](https://github.com/apache/beam/pull/34921)).
181181
* [YAML] WriteToTFRecord and ReadFromTFRecord Beam YAML support
182182
* Python: Added JupyterLab 4.x extension compatibility for enhanced notebook integration ([#34495](https://github.com/apache/beam/pull/34495)).
183+
* [Python] Adding GCP Spanner Change Stream support for Python (apache_beam.io.gcp.spanner). ([#24103] https://github.com/apache/beam/issues/24103).
183184

184185
## Breaking Changes
185186

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@
5252
import com.google.cloud.spanner.Statement;
5353
import com.google.cloud.spanner.Struct;
5454
import com.google.cloud.spanner.TimestampBound;
55+
import com.google.gson.Gson;
56+
import com.google.gson.GsonBuilder;
5557
import java.io.ByteArrayInputStream;
5658
import java.io.ByteArrayOutputStream;
5759
import java.io.IOException;
@@ -1015,6 +1017,32 @@ public PCollection<Row> expand(PBegin input) {
10151017
}
10161018
}
10171019

1020+
static class ChangeStreamRead extends PTransform<PBegin, PCollection<String>> {
1021+
1022+
ReadChangeStream readChangeStream;
1023+
1024+
public ChangeStreamRead(ReadChangeStream readChangeStream) {
1025+
this.readChangeStream = readChangeStream;
1026+
}
1027+
1028+
@Override
1029+
public PCollection<String> expand(PBegin input) {
1030+
return input
1031+
.apply(readChangeStream)
1032+
.apply("DataChangeRecordToStringJSON", ParDo.of(new DataChangeRecordToJsonFn()));
1033+
}
1034+
}
1035+
1036+
private static class DataChangeRecordToJsonFn extends DoFn<DataChangeRecord, String> {
1037+
private static Gson gson = new GsonBuilder().disableHtmlEscaping().create();
1038+
1039+
@ProcessElement
1040+
public void process(@Element DataChangeRecord input, OutputReceiver<String> receiver) {
1041+
String modJsonString = gson.toJson(input, DataChangeRecord.class);
1042+
receiver.output(modJsonString);
1043+
}
1044+
}
1045+
10181046
/**
10191047
* A {@link PTransform} that create a transaction. If applied to a {@link PCollection}, it will
10201048
* create a transaction after the {@link PCollection} is closed.

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTransformRegistrar.java

Lines changed: 115 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.google.auto.service.AutoService;
2424
import com.google.cloud.Timestamp;
2525
import com.google.cloud.spanner.Mutation;
26+
import com.google.cloud.spanner.Options.RpcPriority;
2627
import com.google.cloud.spanner.TimestampBound;
2728
import java.util.Map;
2829
import java.util.concurrent.TimeUnit;
@@ -43,8 +44,8 @@
4344
import org.joda.time.Duration;
4445

4546
/**
46-
* Exposes {@link SpannerIO.WriteRows} and {@link SpannerIO.ReadRows} as an external transform for
47-
* cross-language usage.
47+
* Exposes {@link SpannerIO.WriteRows}, {@link SpannerIO.ReadRows} and {@link
48+
* SpannerIO.ChangeStreamRead} as an external transform for cross-language usage.
4849
*/
4950
@AutoService(ExternalTransformRegistrar.class)
5051
public class SpannerTransformRegistrar implements ExternalTransformRegistrar {
@@ -55,6 +56,8 @@ public class SpannerTransformRegistrar implements ExternalTransformRegistrar {
5556
"beam:transform:org.apache.beam:spanner_insert_or_update:v1";
5657
public static final String DELETE_URN = "beam:transform:org.apache.beam:spanner_delete:v1";
5758
public static final String READ_URN = "beam:transform:org.apache.beam:spanner_read:v1";
59+
public static final String READ_CHANGE_STREAM_URN =
60+
"beam:transform:org.apache.beam:spanner_change_stream_reader:v1";
5861

5962
@Override
6063
@NonNull
@@ -66,6 +69,7 @@ public class SpannerTransformRegistrar implements ExternalTransformRegistrar {
6669
.put(INSERT_OR_UPDATE_URN, new InsertOrUpdateBuilder())
6770
.put(DELETE_URN, new DeleteBuilder())
6871
.put(READ_URN, new ReadBuilder())
72+
.put(READ_CHANGE_STREAM_URN, new ChangeStreamReaderBuilder())
6973
.build();
7074
}
7175

@@ -382,4 +386,113 @@ public PTransform<PCollection<Row>, PDone> buildExternal(
382386
return SpannerIO.WriteRows.of(writeTransform, operation, configuration.table);
383387
}
384388
}
389+
390+
public static class ChangeStreamReaderBuilder
391+
implements ExternalTransformBuilder<
392+
ChangeStreamReaderBuilder.Configuration, PBegin, PCollection<String>> {
393+
394+
public static class Configuration extends CrossLanguageConfiguration {
395+
private String changeStreamName = "";
396+
private String metadataDatabase = "";
397+
private String metadataInstance = "";
398+
private @Nullable Timestamp inclusiveStartAt;
399+
private @Nullable Timestamp inclusiveEndAt;
400+
private @Nullable String metadataTable;
401+
private @Nullable RpcPriority rpcPriority;
402+
private @Nullable Duration watermarkRefreshRate;
403+
404+
public void setChangeStreamName(String changeStreamName) {
405+
this.changeStreamName = changeStreamName;
406+
}
407+
408+
public void setInclusiveStartAt(@Nullable String inclusiveStartAtString) {
409+
if (inclusiveStartAtString != null) {
410+
this.inclusiveStartAt = Timestamp.parseTimestamp(inclusiveStartAtString);
411+
}
412+
}
413+
414+
public void setInclusiveEndAt(@Nullable String inclusiveEndAtString) {
415+
if (inclusiveEndAtString != null) {
416+
this.inclusiveEndAt = Timestamp.parseTimestamp(inclusiveEndAtString);
417+
}
418+
}
419+
420+
public void setMetadataDatabase(String metadataDatabase) {
421+
this.metadataDatabase = metadataDatabase;
422+
}
423+
424+
public void setMetadataInstance(String metadataInstance) {
425+
this.metadataInstance = metadataInstance;
426+
}
427+
428+
public void setMetadataTable(@Nullable String metadataTable) {
429+
this.metadataTable = metadataTable;
430+
}
431+
432+
public void setRpcPriority(@Nullable String rpcPriorityString) {
433+
if (rpcPriorityString != null) {
434+
this.rpcPriority = RpcPriority.valueOf(rpcPriorityString);
435+
}
436+
}
437+
438+
public void setWatermarkRefreshRate(@Nullable String watermarkRefreshRateString) {
439+
if (watermarkRefreshRateString != null) {
440+
this.watermarkRefreshRate = Duration.parse(watermarkRefreshRateString);
441+
}
442+
}
443+
}
444+
445+
@Override
446+
@NonNull
447+
public PTransform<PBegin, PCollection<String>> buildExternal(
448+
ChangeStreamReaderBuilder.Configuration configuration) {
449+
450+
configuration.checkMandatoryFields();
451+
452+
if (configuration.changeStreamName.isEmpty()) {
453+
throw new IllegalArgumentException("ChangeStreamName can't be empty");
454+
}
455+
456+
if (configuration.metadataInstance.isEmpty()) {
457+
throw new IllegalArgumentException("MetadataInstance can't be empty");
458+
}
459+
460+
if (configuration.metadataDatabase.isEmpty()) {
461+
throw new IllegalArgumentException("MetadataDatabase can't be empty");
462+
}
463+
464+
SpannerIO.ReadChangeStream readChangeStream =
465+
SpannerIO.readChangeStream()
466+
.withProjectId(configuration.projectId)
467+
.withInstanceId(configuration.instanceId)
468+
.withDatabaseId(configuration.databaseId)
469+
.withChangeStreamName(configuration.changeStreamName)
470+
.withMetadataInstance(configuration.metadataInstance)
471+
.withMetadataDatabase(configuration.metadataDatabase);
472+
473+
if (configuration.inclusiveStartAt != null) {
474+
readChangeStream = readChangeStream.withInclusiveStartAt(configuration.inclusiveStartAt);
475+
}
476+
477+
if (configuration.inclusiveEndAt != null) {
478+
readChangeStream = readChangeStream.withInclusiveEndAt(configuration.inclusiveEndAt);
479+
}
480+
481+
if (configuration.metadataTable != null) {
482+
readChangeStream = readChangeStream.withMetadataTable(configuration.metadataTable);
483+
}
484+
485+
if (configuration.rpcPriority != null) {
486+
487+
readChangeStream = readChangeStream.withRpcPriority(configuration.rpcPriority);
488+
}
489+
490+
if (configuration.watermarkRefreshRate != null) {
491+
readChangeStream =
492+
readChangeStream.withWatermarkRefreshRate(configuration.watermarkRefreshRate);
493+
}
494+
495+
return new SpannerIO.ChangeStreamRead(readChangeStream);
496+
}
497+
}
385498
}

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTransformRegistrarTest.java

Lines changed: 144 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.Map;
2727
import java.util.concurrent.TimeUnit;
2828
import java.util.stream.Stream;
29+
import org.apache.beam.sdk.io.gcp.spanner.SpannerTransformRegistrar.ChangeStreamReaderBuilder;
2930
import org.apache.beam.sdk.io.gcp.spanner.SpannerTransformRegistrar.InsertBuilder;
3031
import org.apache.beam.sdk.io.gcp.spanner.SpannerTransformRegistrar.ReadBuilder;
3132
import org.apache.beam.sdk.schemas.Schema;
@@ -48,22 +49,29 @@ public class SpannerTransformRegistrarTest {
4849
public static final String SPANNER_PROJECT = "spanner-project";
4950
public static final String SPANNER_TABLE = "spanner-table";
5051
public static final String SPANNER_SQL_QUERY = "SELECT * from spanner_table;";
52+
public static final String SPANNER_CHANGE_STREAM_NAME = "spanner-change-stream-name";
53+
public static final String SPANNER_CHANGE_STREAM_METADATA_INSTANCE =
54+
"spanner-change-stream-instance";
55+
public static final String SPANNER_CHANGE_STREAM_METADATA_DATABASE =
56+
"spanner-change-stream-database";
5157
private SpannerTransformRegistrar spannerTransformRegistrar;
5258
private ReadBuilder readBuilder;
5359
private InsertBuilder writeBuilder;
60+
private ChangeStreamReaderBuilder changeStreamReaderBuilder;
5461

5562
@Before
5663
public void setup() {
5764
spannerTransformRegistrar = new SpannerTransformRegistrar();
5865
readBuilder = new ReadBuilder();
5966
writeBuilder = new InsertBuilder();
67+
changeStreamReaderBuilder = new ChangeStreamReaderBuilder();
6068
}
6169

6270
@Test
6371
public void testKnownBuilderInstances() {
6472
Map<String, ExternalTransformBuilder<?, ?, ?>> builderInstancesMap =
6573
spannerTransformRegistrar.knownBuilderInstances();
66-
assertEquals(6, builderInstancesMap.size());
74+
assertEquals(7, builderInstancesMap.size());
6775
assertThat(builderInstancesMap, IsMapContaining.hasKey(SpannerTransformRegistrar.INSERT_URN));
6876
assertThat(builderInstancesMap, IsMapContaining.hasKey(SpannerTransformRegistrar.UPDATE_URN));
6977
assertThat(builderInstancesMap, IsMapContaining.hasKey(SpannerTransformRegistrar.REPLACE_URN));
@@ -72,6 +80,9 @@ public void testKnownBuilderInstances() {
7280
IsMapContaining.hasKey(SpannerTransformRegistrar.INSERT_OR_UPDATE_URN));
7381
assertThat(builderInstancesMap, IsMapContaining.hasKey(SpannerTransformRegistrar.DELETE_URN));
7482
assertThat(builderInstancesMap, IsMapContaining.hasKey(SpannerTransformRegistrar.READ_URN));
83+
assertThat(
84+
builderInstancesMap,
85+
IsMapContaining.hasKey(SpannerTransformRegistrar.READ_CHANGE_STREAM_URN));
7586
}
7687

7788
@Test(expected = IllegalArgumentException.class)
@@ -207,4 +218,136 @@ private InsertBuilder.Configuration getBasicWriteConfiguration() {
207218
configuration.setMaxCumulativeBackoff(100L);
208219
return configuration;
209220
}
221+
222+
@Test(expected = IllegalArgumentException.class)
223+
public void testChangeStreamReaderBuilderBuildExternalWithMissingMandatoryFields() {
224+
changeStreamReaderBuilder.buildExternal(new ChangeStreamReaderBuilder.Configuration());
225+
}
226+
227+
@Test(expected = IllegalArgumentException.class)
228+
public void testChangeStreamReaderBuilderBuildExternalWithMissingDatabaseId() {
229+
ChangeStreamReaderBuilder.Configuration configuration =
230+
new ChangeStreamReaderBuilder.Configuration();
231+
configuration.setProjectId(SPANNER_PROJECT);
232+
configuration.setInstanceId(SPANNER_INSTANCE);
233+
configuration.setChangeStreamName(SPANNER_CHANGE_STREAM_NAME);
234+
configuration.setMetadataInstance(SPANNER_CHANGE_STREAM_METADATA_INSTANCE);
235+
configuration.setMetadataDatabase(SPANNER_CHANGE_STREAM_METADATA_DATABASE);
236+
changeStreamReaderBuilder.buildExternal(configuration);
237+
}
238+
239+
@Test(expected = IllegalArgumentException.class)
240+
public void testChangeStreamReaderBuilderBuildExternalWithMissingInstanceId() {
241+
ChangeStreamReaderBuilder.Configuration configuration =
242+
new ChangeStreamReaderBuilder.Configuration();
243+
configuration.setProjectId(SPANNER_PROJECT);
244+
configuration.setDatabaseId(SPANNER_DATABASE);
245+
configuration.setChangeStreamName(SPANNER_CHANGE_STREAM_NAME);
246+
configuration.setMetadataInstance(SPANNER_CHANGE_STREAM_METADATA_INSTANCE);
247+
configuration.setMetadataDatabase(SPANNER_CHANGE_STREAM_METADATA_DATABASE);
248+
changeStreamReaderBuilder.buildExternal(configuration);
249+
}
250+
251+
@Test(expected = IllegalArgumentException.class)
252+
public void testChangeStreamReaderBuilderBuildExternalWithMissingChangeStreamName() {
253+
ChangeStreamReaderBuilder.Configuration configuration =
254+
new ChangeStreamReaderBuilder.Configuration();
255+
configuration.setProjectId(SPANNER_PROJECT);
256+
configuration.setDatabaseId(SPANNER_DATABASE);
257+
configuration.setInstanceId(SPANNER_INSTANCE);
258+
configuration.setMetadataInstance(SPANNER_CHANGE_STREAM_METADATA_INSTANCE);
259+
configuration.setMetadataDatabase(SPANNER_CHANGE_STREAM_METADATA_DATABASE);
260+
changeStreamReaderBuilder.buildExternal(configuration);
261+
}
262+
263+
@Test(expected = IllegalArgumentException.class)
264+
public void testChangeStreamReaderBuilderBuildExternalWithMissingMetadataInstance() {
265+
ChangeStreamReaderBuilder.Configuration configuration =
266+
new ChangeStreamReaderBuilder.Configuration();
267+
configuration.setProjectId(SPANNER_PROJECT);
268+
configuration.setDatabaseId(SPANNER_DATABASE);
269+
configuration.setInstanceId(SPANNER_INSTANCE);
270+
configuration.setChangeStreamName(SPANNER_CHANGE_STREAM_NAME);
271+
configuration.setMetadataDatabase(SPANNER_CHANGE_STREAM_METADATA_DATABASE);
272+
changeStreamReaderBuilder.buildExternal(configuration);
273+
}
274+
275+
@Test(expected = IllegalArgumentException.class)
276+
public void testChangeStreamReaderBuilderBuildExternalWithMissingMetadataDatabase() {
277+
ChangeStreamReaderBuilder.Configuration configuration =
278+
new ChangeStreamReaderBuilder.Configuration();
279+
configuration.setProjectId(SPANNER_PROJECT);
280+
configuration.setDatabaseId(SPANNER_DATABASE);
281+
configuration.setInstanceId(SPANNER_INSTANCE);
282+
configuration.setChangeStreamName(SPANNER_CHANGE_STREAM_NAME);
283+
configuration.setMetadataInstance(SPANNER_CHANGE_STREAM_METADATA_INSTANCE);
284+
changeStreamReaderBuilder.buildExternal(configuration);
285+
}
286+
287+
@Test
288+
public void testChangeStreamReaderBuilderBuildExternalWithRequiredFields() {
289+
ChangeStreamReaderBuilder.Configuration configuration =
290+
new ChangeStreamReaderBuilder.Configuration();
291+
292+
configuration.setProjectId(SPANNER_PROJECT);
293+
configuration.setDatabaseId(SPANNER_DATABASE);
294+
configuration.setInstanceId(SPANNER_INSTANCE);
295+
configuration.setChangeStreamName(SPANNER_CHANGE_STREAM_NAME);
296+
configuration.setMetadataInstance(SPANNER_CHANGE_STREAM_METADATA_INSTANCE);
297+
configuration.setMetadataDatabase(SPANNER_CHANGE_STREAM_METADATA_DATABASE);
298+
299+
PTransform<PBegin, PCollection<String>> changeStreamReaderTransform =
300+
changeStreamReaderBuilder.buildExternal(configuration);
301+
assertNotNull(changeStreamReaderTransform);
302+
}
303+
304+
@Test
305+
public void testChangeStreamReaderBuilderBuildExternalWithAllFields() {
306+
String startAt = "2023-01-01T00:00:00Z";
307+
String endAt = "2023-01-02T00:00:00Z";
308+
String metadataTable = "meta-table";
309+
String rpcPriority = "HIGH";
310+
String refreshRate = "PT30S";
311+
312+
ChangeStreamReaderBuilder.Configuration configuration =
313+
new ChangeStreamReaderBuilder.Configuration();
314+
315+
configuration.setProjectId(SPANNER_PROJECT);
316+
configuration.setDatabaseId(SPANNER_DATABASE);
317+
configuration.setInstanceId(SPANNER_INSTANCE);
318+
configuration.setChangeStreamName(SPANNER_CHANGE_STREAM_NAME);
319+
configuration.setMetadataInstance(SPANNER_CHANGE_STREAM_METADATA_INSTANCE);
320+
configuration.setMetadataDatabase(SPANNER_CHANGE_STREAM_METADATA_DATABASE);
321+
configuration.setInclusiveStartAt(startAt);
322+
configuration.setInclusiveEndAt(endAt);
323+
configuration.setMetadataTable(metadataTable);
324+
configuration.setRpcPriority(rpcPriority);
325+
configuration.setWatermarkRefreshRate(refreshRate);
326+
327+
PTransform<PBegin, PCollection<String>> changeStreamReaderTransform =
328+
changeStreamReaderBuilder.buildExternal(configuration);
329+
assertNotNull(changeStreamReaderTransform);
330+
}
331+
332+
@Test
333+
public void testChangeStreamReaderBuilderBuildExternalWithNullOptionalValues() {
334+
ChangeStreamReaderBuilder.Configuration configuration =
335+
new ChangeStreamReaderBuilder.Configuration();
336+
337+
configuration.setProjectId(SPANNER_PROJECT);
338+
configuration.setDatabaseId(SPANNER_DATABASE);
339+
configuration.setInstanceId(SPANNER_INSTANCE);
340+
configuration.setChangeStreamName(SPANNER_CHANGE_STREAM_NAME);
341+
configuration.setMetadataInstance(SPANNER_CHANGE_STREAM_METADATA_INSTANCE);
342+
configuration.setMetadataDatabase(SPANNER_CHANGE_STREAM_METADATA_DATABASE);
343+
configuration.setInclusiveStartAt(null);
344+
configuration.setInclusiveEndAt(null);
345+
configuration.setMetadataTable(null);
346+
configuration.setRpcPriority(null);
347+
configuration.setWatermarkRefreshRate(null);
348+
349+
PTransform<PBegin, PCollection<String>> changeStreamReaderTransform =
350+
changeStreamReaderBuilder.buildExternal(configuration);
351+
assertNotNull(changeStreamReaderTransform);
352+
}
210353
}

0 commit comments

Comments
 (0)