Skip to content

Commit 39a8ae5

Browse files
[sourcedb-to-spanner] Fix GCS Avro Export flow (#3845)
* initial changes * null exception * review changes
1 parent 1c98e03 commit 39a8ae5

3 files changed

Lines changed: 100 additions & 4 deletions

File tree

v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/AvroDestination.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,15 @@
1616
package com.google.cloud.teleport.v2.templates;
1717

1818
import java.util.Objects;
19+
import org.apache.avro.reflect.Nullable;
1920
import org.apache.beam.sdk.coders.DefaultCoder;
2021
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
2122

2223
@DefaultCoder(AvroCoder.class)
2324
public class AvroDestination {
2425
public String name;
2526
public String jsonSchema;
26-
public String shardId;
27+
@Nullable public String shardId;
2728

2829
// Needed for serialization
2930
public AvroDestination() {}

v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/MigrateTableTransform.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -162,9 +162,7 @@ public WriteFilesResult<AvroDestination> writeToGCS(
162162
.by(
163163
(record) ->
164164
AvroDestination.of(
165-
record.shardId(),
166-
record.tableName(),
167-
record.getPayload().getSchema().toString()))
165+
record.shardId(), record.tableName(), record.gcsSchema().toString()))
168166
.via(
169167
Contextful.fn(
170168
record -> {

v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/MigrateTableTransformTest.java

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,21 @@
2222
import com.google.cloud.teleport.v2.options.SourceDbToSpannerOptions;
2323
import com.google.cloud.teleport.v2.source.reader.ReaderImpl;
2424
import com.google.cloud.teleport.v2.source.reader.io.row.SourceRow;
25+
import com.google.cloud.teleport.v2.source.reader.io.schema.SchemaTestUtils;
26+
import com.google.cloud.teleport.v2.source.reader.io.schema.SourceTableSchema;
27+
import com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.UnifiedTypeMapper;
2528
import com.google.cloud.teleport.v2.source.reader.io.transform.ReaderTransform;
2629
import com.google.cloud.teleport.v2.spanner.ddl.Ddl;
2730
import com.google.cloud.teleport.v2.spanner.migrations.schema.ISchemaMapper;
31+
import com.google.cloud.teleport.v2.spanner.migrations.schema.SourceColumnType;
32+
import org.apache.avro.file.DataFileReader;
33+
import org.apache.avro.generic.GenericDatumReader;
34+
import org.apache.avro.generic.GenericRecord;
2835
import org.apache.beam.sdk.Pipeline;
2936
import org.apache.beam.sdk.io.Compression;
3037
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
3138
import org.apache.beam.sdk.options.PipelineOptionsFactory;
39+
import org.apache.beam.sdk.testing.TestPipeline;
3240
import org.apache.beam.sdk.transforms.Create;
3341
import org.apache.beam.sdk.transforms.PTransform;
3442
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -37,13 +45,16 @@
3745
import org.apache.beam.sdk.values.PCollection;
3846
import org.apache.beam.sdk.values.PCollectionTuple;
3947
import org.apache.beam.sdk.values.TupleTag;
48+
import org.junit.Rule;
4049
import org.junit.Test;
50+
import org.junit.rules.TemporaryFolder;
4151
import org.junit.runner.RunWith;
4252
import org.junit.runners.JUnit4;
4353

4454
/** Test class for {@link MigrateTableTransform}. */
4555
@RunWith(JUnit4.class)
4656
public class MigrateTableTransformTest {
57+
@Rule public TemporaryFolder tempFolder = new TemporaryFolder();
4758

4859
/** Tests that metric names are correctly generated, optionally including the shard ID. */
4960
@Test
@@ -132,4 +143,90 @@ public PCollectionTuple expand(PBegin input) {
132143
Pipeline p = Pipeline.create();
133144
migrateTableTransform.expand(PBegin.in(p));
134145
}
146+
147+
/**
148+
* Tests the writeToGCS method end-to-end to ensure that records are serialized correctly using
149+
* the wrapper schema in a non-sharded flow, where shardId is null.
150+
*/
151+
@Test
152+
public void testWriteToGCSForNonShardedFlow() throws Exception {
153+
runGcsWriteTest(null, "table1");
154+
}
155+
156+
/**
157+
* Tests the writeToGCS method end-to-end to ensure that records are serialized correctly using
158+
* the wrapper schema in a sharded flow, where shardId is present.
159+
*/
160+
@Test
161+
public void testWriteToGCSForShardedFlow() throws Exception {
162+
runGcsWriteTest("shard1", "table1/shard1");
163+
}
164+
165+
private void runGcsWriteTest(String shardId, String expectedRelativeSubDir) throws Exception {
166+
TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false);
167+
168+
final String testTable = "table1";
169+
final long testReadTime = 1712751118L;
170+
var schemaRef = SchemaTestUtils.generateSchemaReference("public", "mydb");
171+
var schema =
172+
SourceTableSchema.builder(UnifiedTypeMapper.MapperType.MYSQL)
173+
.setTableName(testTable)
174+
.addSourceColumnNameToSourceColumnType(
175+
"id", new SourceColumnType("INTEGER", new Long[] {}, null))
176+
.addSourceColumnNameToSourceColumnType(
177+
"firstName", new SourceColumnType("varchar", new Long[] {20L}, null))
178+
.build();
179+
SourceRow sourceRow =
180+
SourceRow.builder(schemaRef, schema, shardId, testReadTime)
181+
.setField("id", 123)
182+
.setField("firstName", "abc")
183+
.build();
184+
185+
PCollection<SourceRow> sourceRows = pipeline.apply(Create.of(sourceRow));
186+
String tempDirPath = tempFolder.getRoot().getAbsolutePath();
187+
188+
SourceDbToSpannerOptions options = PipelineOptionsFactory.as(SourceDbToSpannerOptions.class);
189+
options.setSourceDbDialect("MYSQL");
190+
options.setGcsOutputDirectory(tempDirPath);
191+
192+
SpannerConfig mockSpannerConfig = mock(SpannerConfig.class);
193+
Ddl mockDdl = mock(Ddl.class);
194+
ISchemaMapper mockSchemaMapper = mock(ISchemaMapper.class);
195+
ReaderImpl mockReader = mock(ReaderImpl.class);
196+
197+
MigrateTableTransform migrateTableTransform =
198+
new MigrateTableTransform(
199+
options, mockSpannerConfig, mockDdl, mockSchemaMapper, mockReader);
200+
201+
migrateTableTransform.writeToGCS(sourceRows, tempDirPath);
202+
203+
// Run the pipeline to execute the Avro write logic on actual data
204+
pipeline.run().waitUntilFinish();
205+
206+
// Verify Avro File was written and matches wrapper schema
207+
java.io.File avroSubDir = new java.io.File(tempDirPath, expectedRelativeSubDir);
208+
java.io.File[] avroFiles = avroSubDir.listFiles((dir, name) -> name.endsWith(".avro"));
209+
assertThat(avroFiles).isNotNull();
210+
assertThat(avroFiles).hasLength(1);
211+
java.io.File avroFile = avroFiles[0];
212+
213+
try (DataFileReader<GenericRecord> fileReader =
214+
new DataFileReader<>(avroFile, new GenericDatumReader<>())) {
215+
assertThat(fileReader.hasNext()).isTrue();
216+
GenericRecord record = fileReader.next();
217+
218+
assertThat(record.get("tableName").toString()).isEqualTo("table1");
219+
if (shardId == null) {
220+
assertThat(record.get("shardId")).isNull();
221+
} else {
222+
assertThat(record.get("shardId").toString()).isEqualTo(shardId);
223+
}
224+
225+
GenericRecord payloadRecord = (GenericRecord) record.get("payload");
226+
assertThat(payloadRecord.get("id")).isEqualTo(123);
227+
assertThat(payloadRecord.get("firstName").toString()).isEqualTo("abc");
228+
229+
assertThat(fileReader.hasNext()).isFalse();
230+
}
231+
}
135232
}

0 commit comments

Comments
 (0)