Skip to content

Commit 270feb0

Browse files
author
Aditya Bharadwaj
committed
Refactor: Decouple Datastream template from source engines (PR 2)
Introduce a pluggable ISourceConnector interface to encapsulate all source-specific logic for MySQL, PostgreSQL, and Oracle databases within the datastream-to-spanner template. This replaces inline source-specific logic with a clean, decoupled design: - Created ISourceConnector interface. - Created MySqlSourceConnector, PostgresqlSourceConnector, and OracleSourceConnector implementing the interface. - Refactored SourceConnectorRegistry to manually register and manage these connectors (no AutoService/SPI dependency). - Updated SpannerTransactionWriterDoFn and ShadowTableCreator to interact with sources purely via the ISourceConnector interface. - Deleted obsolete ChangeEventContextFactory and ChangeEventSequenceFactory classes, delegating context and sequence creation to the connectors. - Kept all source-specific constants in DatastreamConstants (like in main) to preserve compatibility. - Removed auto-service dependency from pom.xml. - All changes are strictly restricted to the datastream-to-spanner template. TAG=agy CONV=b7fec87d-3652-43db-a0d1-64773a054415
1 parent 1d67a0c commit 270feb0

30 files changed

Lines changed: 1384 additions & 1197 deletions

v2/datastream-to-spanner/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@
109109
<artifactId>google-cloud-compute</artifactId>
110110
<scope>test</scope>
111111
</dependency>
112+
112113
</dependencies>
113114

114115
<profiles>

v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamToSpanner.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
import com.google.cloud.teleport.v2.spanner.migrations.utils.TransformationContextReader;
4646
import com.google.cloud.teleport.v2.templates.DataStreamToSpanner.Options;
4747
import com.google.cloud.teleport.v2.templates.constants.DatastreamToSpannerConstants;
48-
import com.google.cloud.teleport.v2.templates.datastream.DatastreamConstants;
48+
import com.google.cloud.teleport.v2.templates.datastream.source.SourceConnectorRegistry;
4949
import com.google.cloud.teleport.v2.templates.spanner.ProcessInformationSchema;
5050
import com.google.cloud.teleport.v2.templates.transform.ChangeEventTransformerDoFn;
5151
import com.google.cloud.teleport.v2.transforms.DLQWriteTransform;
@@ -589,12 +589,12 @@ static void validateSourceType(Options options) {
589589
return;
590590
}
591591
String sourceType = getSourceType(options);
592-
if (!DatastreamConstants.SUPPORTED_DATASTREAM_SOURCES.contains(sourceType)) {
592+
if (!SourceConnectorRegistry.getSupportedSourceTypes().contains(sourceType)) {
593593
throw new IllegalArgumentException(
594594
"Unsupported source type found: "
595595
+ sourceType
596596
+ ". Specify one of the following source types: "
597-
+ DatastreamConstants.SUPPORTED_DATASTREAM_SOURCES);
597+
+ SourceConnectorRegistry.getSupportedSourceTypes());
598598
}
599599
options.setDatastreamSourceType(sourceType);
600600
}
@@ -620,15 +620,12 @@ static String getSourceType(Options options) {
620620
}
621621

622622
static String getSourceTypeFromConfig(SourceConfig sourceConfig) {
623-
if (sourceConfig.getMysqlSourceConfig() != null) {
624-
return DatastreamConstants.MYSQL_SOURCE_TYPE;
625-
} else if (sourceConfig.getOracleSourceConfig() != null) {
626-
return DatastreamConstants.ORACLE_SOURCE_TYPE;
627-
} else if (sourceConfig.getPostgresqlSourceConfig() != null) {
628-
return DatastreamConstants.POSTGRES_SOURCE_TYPE;
623+
try {
624+
return SourceConnectorRegistry.getSourceTypeFromConfig(sourceConfig);
625+
} catch (IllegalArgumentException e) {
626+
LOG.error("Source Connection Profile Type Not Supported", e);
627+
throw e;
629628
}
630-
LOG.error("Source Connection Profile Type Not Supported");
631-
throw new IllegalArgumentException("Unsupported source connection profile type in Datastream");
632629
}
633630

634631
/**

v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/SpannerTransactionWriterDoFn.java

Lines changed: 43 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,10 @@
3838
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.SpannerMigrationException;
3939
import com.google.cloud.teleport.v2.templates.constants.DatastreamToSpannerConstants;
4040
import com.google.cloud.teleport.v2.templates.datastream.ChangeEventContext;
41-
import com.google.cloud.teleport.v2.templates.datastream.ChangeEventContextFactory;
4241
import com.google.cloud.teleport.v2.templates.datastream.ChangeEventSequence;
43-
import com.google.cloud.teleport.v2.templates.datastream.ChangeEventSequenceFactory;
42+
import com.google.cloud.teleport.v2.templates.datastream.DatastreamConstants;
43+
import com.google.cloud.teleport.v2.templates.datastream.source.ISourceConnector;
44+
import com.google.cloud.teleport.v2.templates.datastream.source.SourceConnectorRegistry;
4445
import com.google.cloud.teleport.v2.templates.spanner.DatastreamToSpannerExceptionClassifier;
4546
import com.google.cloud.teleport.v2.templates.spanner.DatastreamToSpannerExceptionClassifier.ErrorTag;
4647
import com.google.cloud.teleport.v2.templates.utils.WatchdogRunnable;
@@ -103,6 +104,8 @@ class SpannerTransactionWriterDoFn
103104

104105
/* SpannerAccessor must be transient so that its value is not serialized at runtime. */
105106
private transient SpannerAccessor spannerAccessor;
107+
108+
private transient ISourceConnector sourceConnector;
106109
/* SpannerAccessor for shadow table database must be transient so that its value is not serialized at runtime. */
107110
private transient SpannerAccessor shadowTableSpannerAccessor;
108111

@@ -216,6 +219,7 @@ public void setup() {
216219
: spannerAccessor;
217220
mapper = new ObjectMapper();
218221
mapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
222+
sourceConnector = SourceConnectorRegistry.getSourceConnector(sourceType);
219223
// Setup and start the watchdog thread.
220224
transactionAttemptCount = new AtomicLong(0);
221225
isInTransaction = new AtomicBoolean(false);
@@ -256,6 +260,19 @@ public void processElement(ProcessContext c) {
256260
try {
257261

258262
JsonNode changeEvent = mapper.readTree(msg.getPayload());
263+
264+
// Validate source type
265+
JsonNode eventSourceTypeNode = changeEvent.get(DatastreamConstants.EVENT_SOURCE_TYPE_KEY);
266+
String eventSourceType = eventSourceTypeNode != null ? eventSourceTypeNode.asText() : "";
267+
if (!eventSourceType.equalsIgnoreCase(sourceType)) {
268+
throw new InvalidChangeEventException(
269+
"Change event with invalid source. Actual("
270+
+ eventSourceType
271+
+ "), Expected("
272+
+ sourceType
273+
+ ")");
274+
}
275+
259276
migrationShardId =
260277
Optional.ofNullable(changeEvent.get(SHARD_ID_COLUMN_NAME))
261278
.map(shardIdNode -> changeEvent.get(shardIdNode.asText()).asText())
@@ -267,13 +284,12 @@ public void processElement(ProcessContext c) {
267284
isRetryRecord = true;
268285
}
269286
ChangeEventContext changeEventContext =
270-
ChangeEventContextFactory.createChangeEventContext(
271-
changeEvent, ddl, shadowTableDdl, shadowTablePrefix, sourceType);
287+
getSourceConnector()
288+
.createChangeEventContext(changeEvent, ddl, shadowTableDdl, shadowTablePrefix);
272289

273290
// Sequence information for the current change event.
274291
ChangeEventSequence currentChangeEventSequence =
275-
ChangeEventSequenceFactory.createChangeEventSequenceFromChangeEventContext(
276-
changeEventContext);
292+
getSourceConnector().createChangeEventSequenceFromChangeEventContext(changeEventContext);
277293

278294
if (usesSeparateShadowTableDb) {
279295
processCrossDatabaseTransaction(
@@ -395,8 +411,9 @@ private void processSingleDatabaseTransaction(
395411
transactionAttemptCount.incrementAndGet();
396412
// Sequence information for the last change event.
397413
ChangeEventSequence previousChangeEventSequence =
398-
ChangeEventSequenceFactory.createChangeEventSequenceFromShadowTable(
399-
transaction, changeEventContext, shadowDdl, false);
414+
getSourceConnector()
415+
.createChangeEventSequenceFromShadowTable(
416+
transaction, changeEventContext, shadowDdl, false);
400417
/* There was a previous event recorded with a greater sequence information
401418
* than current. Hence, skip the current event.
402419
*/
@@ -463,8 +480,12 @@ void processCrossDatabaseTransaction(
463480

464481
// Build lock query based on source type
465482
ChangeEventSequence previousChangeEventSequence =
466-
ChangeEventSequenceFactory.createChangeEventSequenceFromShadowTable(
467-
shadowTxn, changeEventContext, shadowDdl, /* useSqlStatments= */ true);
483+
getSourceConnector()
484+
.createChangeEventSequenceFromShadowTable(
485+
shadowTxn,
486+
changeEventContext,
487+
shadowDdl,
488+
/* useSqlStatments= */ true);
468489

469490
if (previousChangeEventSequence != null
470491
&& previousChangeEventSequence.compareTo(currentChangeEventSequence) >= 0) {
@@ -493,7 +514,7 @@ void processCrossDatabaseTransaction(
493514
// this
494515
// thread gets killed.
495516
ChangeEventSequence validationSequence =
496-
ChangeEventSequenceFactory
517+
getSourceConnector()
497518
.createChangeEventSequenceFromShadowTable(
498519
shadowTxn,
499520
changeEventContext,
@@ -577,6 +598,17 @@ String getTxnTag(PipelineOptions options) {
577598
return txnTag;
578599
}
579600

601+
private ISourceConnector getSourceConnector() {
602+
if (sourceConnector == null) {
603+
sourceConnector = SourceConnectorRegistry.getSourceConnector(sourceType);
604+
}
605+
return sourceConnector;
606+
}
607+
608+
void setSourceConnector(ISourceConnector sourceConnector) {
609+
this.sourceConnector = sourceConnector;
610+
}
611+
580612
public void setMapper(ObjectMapper mapper) {
581613
this.mapper = mapper;
582614
}

v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventContextFactory.java

Lines changed: 0 additions & 76 deletions
This file was deleted.

v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventSequenceFactory.java

Lines changed: 0 additions & 89 deletions
This file was deleted.

0 commit comments

Comments
 (0)