From 1e84e2060c2525c031d171f8943485c61f2d7b5f Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Thu, 7 May 2026 11:09:08 -0400 Subject: [PATCH] Mitigate test broken after Avro Upgrade due to AVRO-4110 --- ...ReadSchemaTransformFormatProviderTest.java | 11 ++++ ...SchemaTransformFormatProviderTestData.java | 61 +++++++++++++++++++ ...ReadSchemaTransformFormatProviderTest.java | 11 ++++ 3 files changed, 83 insertions(+) diff --git a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/AvroReadSchemaTransformFormatProviderTest.java b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/AvroReadSchemaTransformFormatProviderTest.java index 5725ceff3a12..bfba60fb0c13 100644 --- a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/AvroReadSchemaTransformFormatProviderTest.java +++ b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/AvroReadSchemaTransformFormatProviderTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.fileschematransform; import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.ALL_PRIMITIVE_DATA_TYPES_SCHEMA; +import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.ARRAY_PRIMITIVE_DATA_TYPES_SCHEMA; import static org.apache.beam.sdk.io.fileschematransform.FileReadSchemaTransformProvider.FILEPATTERN_ROW_FIELD_NAME; import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviderTestData.DATA; import static org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions.RESOLVE_FILE; @@ -215,6 +216,16 @@ public void testReadWithPCollectionOfFilepatterns() { readPipeline.run(); } + // TODO(AVRO-4110): remove this override when Beam upgraded Avro past 1.12.0 + @Override + public void testArrayPrimitiveDataTypes() { + Schema schema = ARRAY_PRIMITIVE_DATA_TYPES_SCHEMA; + List rows = DATA.arrayPrimitiveDataTypesRowsAvro1120; + String filePath = getFilePath(); + + runWriteAndReadTest(schema, rows, filePath, null); + } + private static class TestDynamicDestinations extends DynamicAvroDestinations { final ResourceId baseDir; diff --git a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProviderTestData.java b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProviderTestData.java index 4f70dca71e38..d92acaa0ac6c 100644 --- a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProviderTestData.java +++ b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProviderTestData.java @@ -35,6 +35,7 @@ import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.timeContainingToRowFn; import java.math.BigDecimal; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -50,6 +51,7 @@ import org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.SinglyNestedDataTypes; import org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.TimeContaining; import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.joda.time.Instant; /** Shared {@link SchemaAwareJavaBeans} data to be used across various tests. */ @@ -60,6 +62,23 @@ class FileWriteSchemaTransformFormatProviderTestData { /* Prevent instantiation outside this class. */ private FileWriteSchemaTransformFormatProviderTestData() {} + private static class ListPatcher { + private ArrayList list; + + ListPatcher(List list) { + this.list = Lists.newArrayList(list); + } + + ArrayList get() { + return list; + } + + ListPatcher patch(int index, T value) { + list.set(index, value); + return this; + } + } + final List allPrimitiveDataTypesList = Arrays.asList( allPrimitiveDataTypes(false, BigDecimal.valueOf(1L), 1.2345, 1.2345f, 1, 1L, "a"), @@ -188,11 +207,53 @@ private FileWriteSchemaTransformFormatProviderTestData() {} Collections.emptyList(), Collections.emptyList())); + // TODO(AVRO-4110): remove this workaround when Beam upgraded Avro past 1.12.0 + final List arrayPrimitiveDataTypesListAvro1120 = + new ListPatcher<>(arrayPrimitiveDataTypesList) + .patch( + 1, + arrayPrimitiveDataTypes( + Collections.emptyList(), + Collections.singletonList((double) Float.MAX_VALUE), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList())) + .patch( + 6, + arrayPrimitiveDataTypes( + Arrays.asList(false, true, false), + Arrays.asList((double) Float.MIN_VALUE, 0.0, (double) Float.MAX_VALUE), + Arrays.asList(Float.MIN_VALUE, 0.0f, Float.MAX_VALUE), + Arrays.asList(Integer.MIN_VALUE, 0, Integer.MAX_VALUE), + Arrays.asList(Long.MIN_VALUE, 0L, Long.MAX_VALUE), + Arrays.asList( + Stream.generate(() -> "🐤").limit(10).collect(Collectors.joining("")), + Stream.generate(() -> "🐥").limit(10).collect(Collectors.joining("")), + Stream.generate(() -> "🐣").limit(10).collect(Collectors.joining(""))))) + .patch( + 7, + arrayPrimitiveDataTypes( + Stream.generate(() -> true).limit(10).collect(Collectors.toList()), + Stream.generate(() -> (double) Float.MIN_VALUE) + .limit(10) + .collect(Collectors.toList()), + Stream.generate(() -> Float.MIN_VALUE).limit(10).collect(Collectors.toList()), + Stream.generate(() -> Integer.MIN_VALUE).limit(10).collect(Collectors.toList()), + Stream.generate(() -> Long.MIN_VALUE).limit(10).collect(Collectors.toList()), + Stream.generate(() -> "🐿").limit(10).collect(Collectors.toList()))) + .get(); + final List arrayPrimitiveDataTypesRows = arrayPrimitiveDataTypesList.stream() .map(arrayPrimitiveDataTypesToRowFn()::apply) .collect(Collectors.toList()); + final List arrayPrimitiveDataTypesRowsAvro1120 = + arrayPrimitiveDataTypesListAvro1120.stream() + .map(arrayPrimitiveDataTypesToRowFn()::apply) + .collect(Collectors.toList()); + final List singlyNestedDataTypesNoRepeat = allPrimitiveDataTypesList.stream() .map(SchemaAwareJavaBeans::singlyNestedDataTypes) diff --git a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/ParquetReadSchemaTransformFormatProviderTest.java b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/ParquetReadSchemaTransformFormatProviderTest.java index b1d6bba06ea9..bbc33698c41f 100644 --- a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/ParquetReadSchemaTransformFormatProviderTest.java +++ b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/ParquetReadSchemaTransformFormatProviderTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.fileschematransform; import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.ALL_PRIMITIVE_DATA_TYPES_SCHEMA; +import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.ARRAY_PRIMITIVE_DATA_TYPES_SCHEMA; import static org.apache.beam.sdk.io.fileschematransform.FileReadSchemaTransformProvider.FILEPATTERN_ROW_FIELD_NAME; import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviderTestData.DATA; import static org.apache.beam.sdk.transforms.Contextful.fn; @@ -218,4 +219,14 @@ public void testReadWithPCollectionOfFilepatterns() { PAssert.that(output.get(FileReadSchemaTransformProvider.OUTPUT_TAG)).containsInAnyOrder(rows); readPipeline.run(); } + + // TODO(AVRO-4110): remove this override when Beam upgraded Avro past 1.12.0 + @Override + public void testArrayPrimitiveDataTypes() { + Schema schema = ARRAY_PRIMITIVE_DATA_TYPES_SCHEMA; + List rows = DATA.arrayPrimitiveDataTypesRowsAvro1120; + String filePath = getFilePath(); + + runWriteAndReadTest(schema, rows, filePath, null); + } }