Skip to content

Commit 9f70011

Browse files
committed
AVRO-4238: [java] Fix adding union fields with array default value
When FastReader was enabled and a field of type Union<Array, ...> with an Array as default value was added, an AvroRuntimeException occurred during Schema Evolution. This change resolves this bug in FastReaderBuilder.java, allowing the Schema Migration to succeed as specified.
1 parent 7473a6e commit 9f70011

2 files changed

Lines changed: 42 additions & 20 deletions

File tree

lang/java/avro/src/main/java/org/apache/avro/io/FastReaderBuilder.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,16 @@ private ExecutionStep getDefaultingStep(Schema.Field field) throws IOException {
197197
} else if (defaultValue instanceof Utf8) {
198198
return createFieldSetter(field, reusingReader((old, d) -> readUtf8(old, (Utf8) defaultValue)));
199199
} else if (defaultValue instanceof List && ((List<?>) defaultValue).isEmpty()) {
200-
return createFieldSetter(field, reusingReader((old, d) -> data.newArray(old, 0, field.schema())));
200+
Schema arraySchema = field.schema();
201+
if (arraySchema.getType() == Schema.Type.UNION) {
202+
arraySchema = arraySchema.getTypes().stream()
203+
.filter(nestedSchema -> nestedSchema.getType() == Schema.Type.ARRAY).findFirst()
204+
.orElseThrow(() -> new AvroTypeException(String.format(
205+
"Union schema %s has a default value of type Array, but none of the union types is of type Array",
206+
field.schema().toString())));
207+
}
208+
final Schema schema = arraySchema;
209+
return createFieldSetter(field, reusingReader((old, d) -> data.newArray(old, 0, schema)));
201210
} else if (defaultValue instanceof Map && ((Map<?, ?>) defaultValue).isEmpty()) {
202211
return createFieldSetter(field, reusingReader((old, d) -> data.newMap(old, 0)));
203212
} else {

lang/java/avro/src/test/java/org/apache/avro/TestReadingWritingDataInEvolvedSchemas.java

Lines changed: 32 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,14 @@
1717
*/
1818
package org.apache.avro;
1919

20-
import static org.junit.Assert.assertArrayEquals;
21-
import static org.junit.Assert.assertEquals;
22-
2320
import java.io.ByteArrayInputStream;
2421
import java.io.ByteArrayOutputStream;
2522
import java.io.IOException;
2623
import java.nio.ByteBuffer;
2724
import java.nio.charset.StandardCharsets;
28-
import java.util.Arrays;
29-
import java.util.Collection;
30-
import java.util.stream.Stream;
25+
26+
import static java.util.Collections.emptyList;
27+
import static org.junit.jupiter.api.Assertions.*;
3128

3229
import org.apache.avro.generic.GenericData;
3330
import org.apache.avro.generic.GenericData.EnumSymbol;
@@ -41,7 +38,6 @@
4138
import org.apache.avro.io.Encoder;
4239
import org.apache.avro.io.EncoderFactory;
4340

44-
import org.junit.jupiter.api.Assertions;
4541
import org.junit.jupiter.params.ParameterizedTest;
4642
import org.junit.jupiter.params.provider.EnumSource;
4743

@@ -122,6 +118,11 @@ public class TestReadingWritingDataInEvolvedSchemas {
122118
.name(FIELD_A).type().unionOf().floatType().and().doubleType().endUnion().noDefault() //
123119
.endRecord();
124120

121+
private static final Schema UNION_WITH_EMPTY_ARRAY_DEFAULT_RECORD = SchemaBuilder.record(RECORD_A) //
122+
.fields() //
123+
.name(FIELD_A).type().unionOf().array().items(INT_RECORD).and().nullType().endUnion().arrayDefault(emptyList()) //
124+
.endRecord();
125+
125126
enum EncoderType {
126127
BINARY, JSON
127128
}
@@ -232,9 +233,9 @@ void doubleWrittenWithUnionSchemaIsNotConvertedToFloatSchema(EncoderType encoder
232233
Schema writer = UNION_INT_LONG_FLOAT_DOUBLE_RECORD;
233234
Record record = defaultRecordWithSchema(writer, FIELD_A, 42.0);
234235
byte[] encoded = encodeGenericBlob(record, encoderType);
235-
AvroTypeException exception = Assertions.assertThrows(AvroTypeException.class,
236+
AvroTypeException exception = assertThrows(AvroTypeException.class,
236237
() -> decodeGenericBlob(FLOAT_RECORD, writer, encoded, encoderType));
237-
Assertions.assertEquals("Found double, expecting float", exception.getMessage());
238+
assertEquals("Found double, expecting float", exception.getMessage());
238239
}
239240

240241
@ParameterizedTest
@@ -243,9 +244,9 @@ void floatWrittenWithUnionSchemaIsNotConvertedToLongSchema(EncoderType encoderTy
243244
Schema writer = UNION_INT_LONG_FLOAT_DOUBLE_RECORD;
244245
Record record = defaultRecordWithSchema(writer, FIELD_A, 42.0f);
245246
byte[] encoded = encodeGenericBlob(record, encoderType);
246-
AvroTypeException exception = Assertions.assertThrows(AvroTypeException.class,
247+
AvroTypeException exception = assertThrows(AvroTypeException.class,
247248
() -> decodeGenericBlob(LONG_RECORD, writer, encoded, encoderType));
248-
Assertions.assertEquals("Found float, expecting long", exception.getMessage());
249+
assertEquals("Found float, expecting long", exception.getMessage());
249250
}
250251

251252
@ParameterizedTest
@@ -254,9 +255,9 @@ void longWrittenWithUnionSchemaIsNotConvertedToIntSchema(EncoderType encoderType
254255
Schema writer = UNION_INT_LONG_FLOAT_DOUBLE_RECORD;
255256
Record record = defaultRecordWithSchema(writer, FIELD_A, 42L);
256257
byte[] encoded = encodeGenericBlob(record, encoderType);
257-
AvroTypeException exception = Assertions.assertThrows(AvroTypeException.class,
258+
AvroTypeException exception = assertThrows(AvroTypeException.class,
258259
() -> decodeGenericBlob(INT_RECORD, writer, encoded, encoderType));
259-
Assertions.assertEquals("Found long, expecting int", exception.getMessage());
260+
assertEquals("Found long, expecting int", exception.getMessage());
260261
}
261262

262263
@ParameterizedTest
@@ -342,9 +343,9 @@ void enumRecordWithExtendedSchemaCanNotBeReadIfNewValuesAreUsed(EncoderType enco
342343
Record record = defaultRecordWithSchema(writer, FIELD_A, new EnumSymbol(ENUM_ABC, "C"));
343344
byte[] encoded = encodeGenericBlob(record, encoderType);
344345

345-
AvroTypeException exception = Assertions.assertThrows(AvroTypeException.class,
346+
AvroTypeException exception = assertThrows(AvroTypeException.class,
346347
() -> decodeGenericBlob(ENUM_AB_RECORD, writer, encoded, encoderType));
347-
Assertions.assertEquals("No match for C", exception.getMessage());
348+
assertEquals("No match for C", exception.getMessage());
348349
}
349350

350351
@ParameterizedTest
@@ -363,9 +364,9 @@ void recordWrittenWithExtendedSchemaCanBeReadWithOriginalSchemaButLossOfData(Enc
363364
assertEquals(42, decoded.get(FIELD_A));
364365
try {
365366
decoded.get("newTopField");
366-
Assertions.fail("get should throw a exception");
367+
fail("get should throw a exception");
367368
} catch (AvroRuntimeException ex) {
368-
Assertions.assertEquals("Not a valid schema field: newTopField", ex.getMessage());
369+
assertEquals("Not a valid schema field: newTopField", ex.getMessage());
369370
}
370371
}
371372

@@ -379,9 +380,9 @@ void readerWithoutDefaultValueThrowsException(EncoderType encoderType) throws Ex
379380
.endRecord();
380381
Record record = defaultRecordWithSchema(INT_RECORD, FIELD_A, 42);
381382
byte[] encoded = encodeGenericBlob(record, encoderType);
382-
AvroTypeException exception = Assertions.assertThrows(AvroTypeException.class,
383+
AvroTypeException exception = assertThrows(AvroTypeException.class,
383384
() -> decodeGenericBlob(reader, INT_RECORD, encoded, encoderType));
384-
Assertions.assertTrue(exception.getMessage().contains("missing required field newField"), exception.getMessage());
385+
assertTrue(exception.getMessage().contains("missing required field newField"), exception.getMessage());
385386
}
386387

387388
@ParameterizedTest
@@ -399,6 +400,18 @@ void readerWithDefaultValueIsApplied(EncoderType encoderType) throws Exception {
399400
assertEquals(314, decoded.get("newFieldWithDefault"));
400401
}
401402

403+
@ParameterizedTest
404+
@EnumSource(EncoderType.class)
405+
void readerWithEmptyListAsDefaultValueForUnionFieldIsApplied(EncoderType encoderType) throws Exception {
406+
Schema writer = SchemaBuilder.record(RECORD_A) //
407+
.fields() //
408+
.endRecord();
409+
Record record = new GenericData.Record(writer);
410+
byte[] encoded = encodeGenericBlob(record, encoderType);
411+
Record decoded = decodeGenericBlob(UNION_WITH_EMPTY_ARRAY_DEFAULT_RECORD, writer, encoded, encoderType);
412+
assertEquals(emptyList(), decoded.get(FIELD_A));
413+
}
414+
402415
@ParameterizedTest
403416
@EnumSource(EncoderType.class)
404417
void aliasesInSchema(EncoderType encoderType) throws Exception {

0 commit comments

Comments
 (0)