Skip to content

Commit d32b56f

Browse files
geruhdevendra-nr
authored andcommitted
Spark 3.4: Backport apache#12836 for row lineage support in spark parquet reader (apache#13353)
1 parent cf844c5 commit d32b56f

6 files changed

Lines changed: 163 additions & 57 deletions

File tree

spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java

Lines changed: 25 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import java.util.Arrays;
2626
import java.util.List;
2727
import java.util.Map;
28-
import org.apache.iceberg.MetadataColumns;
2928
import org.apache.iceberg.Schema;
3029
import org.apache.iceberg.parquet.ParquetSchemaUtil;
3130
import org.apache.iceberg.parquet.ParquetUtil;
@@ -137,61 +136,52 @@ public ParquetValueReader<?> message(
137136
@Override
138137
public ParquetValueReader<?> struct(
139138
Types.StructType expected, GroupType struct, List<ParquetValueReader<?>> fieldReaders) {
139+
if (null == expected) {
140+
return new InternalRowReader(ImmutableList.of());
141+
}
142+
140143
// match the expected struct's order
141144
Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
142-
Map<Integer, Type> typesById = Maps.newHashMap();
143-
Map<Integer, Integer> maxDefinitionLevelsById = Maps.newHashMap();
144145
List<Type> fields = struct.getFields();
145146
for (int i = 0; i < fields.size(); i += 1) {
146147
Type fieldType = fields.get(i);
147148
int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1;
148149
if (fieldType.getId() != null) {
149150
int id = fieldType.getId().intValue();
150151
readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, fieldReaders.get(i)));
151-
typesById.put(id, fieldType);
152-
if (idToConstant.containsKey(id)) {
153-
maxDefinitionLevelsById.put(id, fieldD);
154-
}
155152
}
156153
}
157154

158-
List<Types.NestedField> expectedFields =
159-
expected != null ? expected.fields() : ImmutableList.of();
155+
int constantDefinitionLevel = type.getMaxDefinitionLevel(currentPath());
156+
List<Types.NestedField> expectedFields = expected.fields();
160157
List<ParquetValueReader<?>> reorderedFields =
161158
Lists.newArrayListWithExpectedSize(expectedFields.size());
162-
// Defaulting to parent max definition level
163-
int defaultMaxDefinitionLevel = type.getMaxDefinitionLevel(currentPath());
159+
164160
for (Types.NestedField field : expectedFields) {
165161
int id = field.fieldId();
166-
ParquetValueReader<?> reader = readersById.get(id);
167-
if (idToConstant.containsKey(id)) {
168-
// containsKey is used because the constant may be null
169-
int fieldMaxDefinitionLevel =
170-
maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel);
171-
reorderedFields.add(
172-
ParquetValueReaders.constant(idToConstant.get(id), fieldMaxDefinitionLevel));
173-
} else if (id == MetadataColumns.ROW_POSITION.fieldId()) {
174-
reorderedFields.add(ParquetValueReaders.position());
175-
} else if (id == MetadataColumns.IS_DELETED.fieldId()) {
176-
reorderedFields.add(ParquetValueReaders.constant(false));
177-
} else if (reader != null) {
178-
reorderedFields.add(reader);
179-
} else if (field.initialDefault() != null) {
180-
reorderedFields.add(
181-
ParquetValueReaders.constant(
182-
SparkUtil.internalToSpark(field.type(), field.initialDefault()),
183-
maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel)));
184-
} else if (field.isOptional()) {
185-
reorderedFields.add(ParquetValueReaders.nulls());
186-
} else {
187-
throw new IllegalArgumentException(
188-
String.format("Missing required field: %s", field.name()));
189-
}
162+
ParquetValueReader<?> reader =
163+
ParquetValueReaders.replaceWithMetadataReader(
164+
id, readersById.get(id), idToConstant, constantDefinitionLevel);
165+
reorderedFields.add(defaultReader(field, reader, constantDefinitionLevel));
190166
}
191167

192168
return new InternalRowReader(reorderedFields);
193169
}
194170

171+
private ParquetValueReader<?> defaultReader(
172+
Types.NestedField field, ParquetValueReader<?> reader, int constantDL) {
173+
if (reader != null) {
174+
return reader;
175+
} else if (field.initialDefault() != null) {
176+
return ParquetValueReaders.constant(
177+
SparkUtil.internalToSpark(field.type(), field.initialDefault()), constantDL);
178+
} else if (field.isOptional()) {
179+
return ParquetValueReaders.nulls();
180+
}
181+
182+
throw new IllegalArgumentException(String.format("Missing required field: %s", field.name()));
183+
}
184+
195185
@Override
196186
public ParquetValueReader<?> list(
197187
Types.ListType expectedList, GroupType array, ParquetValueReader<?> elementReader) {

spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,10 @@ private boolean useCometBatchReads() {
180180
}
181181

182182
private boolean supportsCometBatchReads(Types.NestedField field) {
183-
return field.type().isPrimitiveType() && !field.type().typeId().equals(Type.TypeID.UUID);
183+
return field.type().isPrimitiveType()
184+
&& !field.type().typeId().equals(Type.TypeID.UUID)
185+
&& field.fieldId() != MetadataColumns.ROW_ID.fieldId()
186+
&& field.fieldId() != MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId();
184187
}
185188

186189
// conditions for using ORC batch reads:

spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/SparkTestHelperBase.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,7 @@ protected void assertEquals(
6464
Object[] actual = actualRows.get(row);
6565
assertThat(actual).as("Number of columns should match").hasSameSizeAs(expected);
6666
for (int col = 0; col < actualRows.get(row).length; col += 1) {
67-
String newContext = String.format("%s: row %d col %d", context, row + 1, col + 1);
68-
assertEquals(newContext, expected, actual);
67+
assertEquals(context + ": row " + (row + 1), expected, actual);
6968
}
7069
}
7170
}
@@ -83,7 +82,9 @@ protected void assertEquals(String context, Object[] expectedRow, Object[] actua
8382
assertEquals(newContext, (Object[]) expectedValue, (Object[]) actualValue);
8483
}
8584
} else if (expectedValue != ANY) {
86-
assertThat(actualValue).as("%s contents should match", context).isEqualTo(expectedValue);
85+
assertThat(actualValue)
86+
.as(context + " col " + (col + 1) + " contents should match")
87+
.isEqualTo(expectedValue);
8788
}
8889
}
8990
}

spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,15 @@
2727
import java.math.BigDecimal;
2828
import java.nio.ByteBuffer;
2929
import java.nio.file.Path;
30+
import java.util.List;
31+
import java.util.Map;
3032
import java.util.UUID;
3133
import java.util.concurrent.atomic.AtomicInteger;
3234
import java.util.stream.Stream;
35+
import org.apache.iceberg.MetadataColumns;
3336
import org.apache.iceberg.Schema;
37+
import org.apache.iceberg.data.GenericRecord;
38+
import org.apache.iceberg.data.Record;
3439
import org.apache.iceberg.expressions.Literal;
3540
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
3641
import org.apache.iceberg.types.Type;
@@ -51,13 +56,27 @@
5156

5257
public abstract class AvroDataTest {
5358

59+
private static final long FIRST_ROW_ID = 2_000L;
60+
protected static final Map<Integer, Object> ID_TO_CONSTANT =
61+
Map.of(
62+
MetadataColumns.ROW_ID.fieldId(),
63+
FIRST_ROW_ID,
64+
MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId(),
65+
34L);
66+
5467
protected abstract void writeAndValidate(Schema schema) throws IOException;
5568

5669
protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throws IOException {
5770
throw new UnsupportedEncodingException(
5871
"Cannot run test, writeAndValidate(Schema, Schema) is not implemented");
5972
}
6073

74+
protected void writeAndValidate(Schema writeSchema, Schema expectedSchema, List<Record> records)
75+
throws IOException {
76+
throw new UnsupportedEncodingException(
77+
"Cannot run test, writeAndValidate(Schema, Schema, List<Record>) is not implemented");
78+
}
79+
6180
protected boolean supportsDefaultValues() {
6281
return false;
6382
}
@@ -66,6 +85,10 @@ protected boolean supportsNestedTypes() {
6685
return true;
6786
}
6887

88+
protected boolean supportsRowLineage() {
89+
return false;
90+
}
91+
6992
protected static final StructType SUPPORTED_PRIMITIVES =
7093
StructType.of(
7194
required(100, "id", LongType.get()),
@@ -547,4 +570,39 @@ public void testPrimitiveTypeDefaultValues(Type.PrimitiveType type, Literal<?> d
547570

548571
writeAndValidate(writeSchema, readSchema);
549572
}
573+
574+
@Test
575+
public void testRowLineage() throws Exception {
576+
Assumptions.assumeThat(supportsRowLineage())
577+
.as("Row lineage support is not implemented")
578+
.isTrue();
579+
580+
Schema schema =
581+
new Schema(
582+
required(1, "id", LongType.get()),
583+
required(2, "data", Types.StringType.get()),
584+
MetadataColumns.ROW_ID,
585+
MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER);
586+
587+
GenericRecord record = GenericRecord.create(schema);
588+
589+
writeAndValidate(
590+
schema,
591+
schema,
592+
List.of(
593+
record.copy(Map.of("id", 1L, "data", "a")),
594+
record.copy(Map.of("id", 2L, "data", "b")),
595+
record.copy(
596+
Map.of(
597+
"id",
598+
3L,
599+
"data",
600+
"c",
601+
"_row_id",
602+
1_000L,
603+
"_last_updated_sequence_number",
604+
33L)),
605+
record.copy(Map.of("id", 4L, "data", "d", "_row_id", 1_001L)),
606+
record.copy(Map.of("id", 5L, "data", "e"))));
607+
}
550608
}

spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
import java.util.List;
3939
import java.util.Map;
4040
import java.util.UUID;
41+
import org.apache.iceberg.MetadataColumns;
42+
import org.apache.iceberg.data.GenericDataUtil;
4143
import org.apache.iceberg.data.Record;
4244
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
4345
import org.apache.iceberg.types.Type;
@@ -198,12 +200,47 @@ private static void assertEqualsSafe(Type type, Object expected, Object actual)
198200

199201
public static void assertEqualsUnsafe(
200202
Types.StructType struct, Record expected, InternalRow actual) {
203+
assertEqualsUnsafe(struct, expected, actual, null, -1);
204+
}
205+
206+
public static void assertEqualsUnsafe(
207+
Types.StructType struct,
208+
Record expected,
209+
InternalRow actual,
210+
Map<Integer, Object> idToConstant,
211+
int pos) {
212+
Types.StructType expectedType = expected.struct();
201213
List<Types.NestedField> fields = struct.fields();
202-
for (int i = 0; i < fields.size(); i += 1) {
203-
Type fieldType = fields.get(i).type();
214+
for (int readPos = 0; readPos < fields.size(); readPos += 1) {
215+
Types.NestedField field = fields.get(readPos);
216+
Types.NestedField expectedField = expectedType.field(field.fieldId());
204217

205-
Object expectedValue = expected.get(i);
206-
Object actualValue = actual.get(i, convert(fieldType));
218+
Type fieldType = field.type();
219+
Object actualValue =
220+
actual.isNullAt(readPos) ? null : actual.get(readPos, convert(fieldType));
221+
222+
Object expectedValue;
223+
if (expectedField != null) {
224+
int id = expectedField.fieldId();
225+
if (id == MetadataColumns.ROW_ID.fieldId()) {
226+
expectedValue = expected.getField(expectedField.name());
227+
if (expectedValue == null && idToConstant != null) {
228+
expectedValue = (Long) idToConstant.get(id) + pos;
229+
}
230+
231+
} else if (id == MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId()) {
232+
expectedValue = expected.getField(expectedField.name());
233+
if (expectedValue == null && idToConstant != null) {
234+
expectedValue = idToConstant.get(id);
235+
}
236+
237+
} else {
238+
expectedValue = expected.getField(expectedField.name());
239+
}
240+
} else {
241+
// comparison expects Iceberg's generic representation
242+
expectedValue = GenericDataUtil.internalToGeneric(field.type(), field.initialDefault());
243+
}
207244

208245
assertEqualsUnsafe(fieldType, expectedValue, actualValue);
209246
}

spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
*/
1919
package org.apache.iceberg.spark.data;
2020

21-
import static org.apache.iceberg.spark.data.TestHelpers.assertEqualsUnsafe;
2221
import static org.apache.iceberg.types.Types.NestedField.required;
2322
import static org.assertj.core.api.Assertions.assertThat;
2423
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -28,7 +27,6 @@
2827
import java.util.Iterator;
2928
import java.util.List;
3029
import java.util.Map;
31-
import org.apache.avro.generic.GenericData;
3230
import org.apache.hadoop.conf.Configuration;
3331
import org.apache.iceberg.DataFiles;
3432
import org.apache.iceberg.FileFormat;
@@ -38,7 +36,9 @@
3836
import org.apache.iceberg.Schema;
3937
import org.apache.iceberg.Table;
4038
import org.apache.iceberg.data.IcebergGenerics;
39+
import org.apache.iceberg.data.RandomGenericData;
4140
import org.apache.iceberg.data.Record;
41+
import org.apache.iceberg.data.parquet.GenericParquetWriter;
4242
import org.apache.iceberg.hadoop.HadoopTables;
4343
import org.apache.iceberg.inmemory.InMemoryOutputFile;
4444
import org.apache.iceberg.io.CloseableIterable;
@@ -70,6 +70,13 @@ protected void writeAndValidate(Schema schema) throws IOException {
7070

7171
@Override
7272
protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throws IOException {
73+
List<Record> expected = RandomGenericData.generate(writeSchema, 100, 0L);
74+
writeAndValidate(writeSchema, expectedSchema, expected);
75+
}
76+
77+
@Override
78+
protected void writeAndValidate(Schema writeSchema, Schema expectedSchema, List<Record> expected)
79+
throws IOException {
7380
assumeThat(
7481
null
7582
== TypeUtil.find(
@@ -79,29 +86,39 @@ protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throw
7986
.as("Parquet Avro cannot write non-string map keys")
8087
.isTrue();
8188

82-
List<GenericData.Record> expected = RandomData.generateList(writeSchema, 100, 0L);
83-
84-
OutputFile outputFile = new InMemoryOutputFile();
85-
86-
try (FileAppender<GenericData.Record> writer =
87-
Parquet.write(outputFile).schema(writeSchema).named("test").build()) {
89+
OutputFile output = new InMemoryOutputFile();
90+
try (FileAppender<Record> writer =
91+
Parquet.write(output)
92+
.schema(writeSchema)
93+
.createWriterFunc(GenericParquetWriter::create)
94+
.named("test")
95+
.build()) {
8896
writer.addAll(expected);
8997
}
9098

9199
try (CloseableIterable<InternalRow> reader =
92-
Parquet.read(outputFile.toInputFile())
100+
Parquet.read(output.toInputFile())
93101
.project(expectedSchema)
94-
.createReaderFunc(type -> SparkParquetReaders.buildReader(expectedSchema, type))
102+
.createReaderFunc(
103+
type -> SparkParquetReaders.buildReader(expectedSchema, type, ID_TO_CONSTANT))
95104
.build()) {
96105
Iterator<InternalRow> rows = reader.iterator();
97-
for (GenericData.Record record : expected) {
98-
assertThat(rows.hasNext()).as("Should have expected number of rows").isTrue();
99-
assertEqualsUnsafe(expectedSchema.asStruct(), record, rows.next());
106+
int pos = 0;
107+
for (Record record : expected) {
108+
assertThat(rows).as("Should have expected number of rows").hasNext();
109+
GenericsHelpers.assertEqualsUnsafe(
110+
expectedSchema.asStruct(), record, rows.next(), ID_TO_CONSTANT, pos);
111+
pos += 1;
100112
}
101-
assertThat(rows.hasNext()).as("Should not have extra rows").isFalse();
113+
assertThat(rows).as("Should not have extra rows").isExhausted();
102114
}
103115
}
104116

117+
@Override
118+
protected boolean supportsRowLineage() {
119+
return true;
120+
}
121+
105122
@Override
106123
protected boolean supportsDefaultValues() {
107124
return true;

0 commit comments

Comments
 (0)