Skip to content

Commit 7f09169

Browse files
authored
[IcebergIO] Fix conversion logic for arrays of structs and maps of structs; fix output Schema resolution with column pruning (#35230)
* fix complex types; filx filter schema logic * update Changes * add missing changes from other PRs * trigger ITs * make separate impl for iterable * fix
1 parent 4b8ae10 commit 7f09169

12 files changed

Lines changed: 393 additions & 44 deletions

File tree

CHANGES.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@
7575
## I/Os
7676

7777
* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
78+
* [IcebergIO] Support reading with column pruning ([#34856](https://github.com/apache/beam/pull/34856))
79+
* [IcebergIO] Support reading with pushdown filtering ([#34827](https://github.com/apache/beam/pull/34827))
7880

7981
## New Features / Improvements
8082
* Adding Google Storage Requests Pays feature (Golang)([#30747](https://github.com/apache/beam/issues/30747)).
@@ -106,6 +108,7 @@
106108
* [Python] Fixed vLLM server recovery mechanism in the event of a process termination ([#35234](https://github.com/apache/beam/pull/35234)).
107109
* (Python) Fixed cloudpickle overwriting class states every time loading a same object of dynamic class ([#35062](https://github.com/apache/beam/issues/35062)).
108110
* [Python] Fixed pip install apache-beam[interactive] causes crash on google colab ([#35148](https://github.com/apache/beam/pull/35148)).
111+
* [IcebergIO] Fixed Beam <-> Iceberg conversion logic for arrays of structs and maps of structs ([#35230](https://github.com/apache/beam/pull/35230)).
109112

110113
## Security Fixes
111114
* Fixed [CVE-YYYY-NNNN](https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN) (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,11 @@
2424
import java.time.LocalDate;
2525
import java.time.LocalDateTime;
2626
import java.time.LocalTime;
27+
import java.util.HashSet;
2728
import java.util.List;
2829
import java.util.Map;
2930
import java.util.Objects;
31+
import java.util.Set;
3032
import java.util.function.BiFunction;
3133
import java.util.stream.Collectors;
3234
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlBasicCall;
@@ -72,6 +74,55 @@ class FilterUtils {
7274
.put(SqlKind.OR, Operation.OR)
7375
.build();
7476

77+
/**
78+
* Parses a SQL filter expression string and returns a set of all field names referenced within
79+
* it.
80+
*/
81+
static Set<String> getReferencedFieldNames(@Nullable String filter) {
82+
if (filter == null || filter.trim().isEmpty()) {
83+
return new HashSet<>();
84+
}
85+
86+
SqlParser parser = SqlParser.create(filter);
87+
try {
88+
SqlNode expression = parser.parseExpression();
89+
Set<String> fieldNames = new HashSet<>();
90+
extractFieldNames(expression, fieldNames);
91+
return fieldNames;
92+
} catch (Exception exception) {
93+
throw new RuntimeException(
94+
String.format("Encountered an error when parsing filter: '%s'", filter), exception);
95+
}
96+
}
97+
98+
private static void extractFieldNames(SqlNode node, Set<String> fieldNames) {
99+
if (node instanceof SqlIdentifier) {
100+
fieldNames.add(((SqlIdentifier) node).getSimple());
101+
} else if (node instanceof SqlBasicCall) {
102+
// recursively check operands
103+
SqlBasicCall call = (SqlBasicCall) node;
104+
for (SqlNode operand : call.getOperandList()) {
105+
extractFieldNames(operand, fieldNames);
106+
}
107+
} else if (node instanceof SqlNodeList) {
108+
// For IN clauses, the right-hand side is a SqlNodeList, so iterate through its elements
109+
SqlNodeList nodeList = (SqlNodeList) node;
110+
for (SqlNode element : nodeList.getList()) {
111+
if (element != null) {
112+
extractFieldNames(element, fieldNames);
113+
}
114+
}
115+
}
116+
// SqlLiteral nodes do not contain field names, so we can ignore them.
117+
}
118+
119+
/**
120+
* parses a SQL filter expression string into an Iceberg {@link Expression} that can be used for
121+
* data pruning.
122+
*
123+
* <p>Note: This utility currently supports only top-level fields within the filter expression.
124+
* Nested field references are not supported.
125+
*/
75126
static Expression convert(@Nullable String filter, Schema schema) {
76127
if (filter == null) {
77128
return Expressions.alwaysTrue();

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -603,7 +603,7 @@ public PCollection<Row> expand(PBegin input) {
603603
.setUseCdc(getUseCdc())
604604
.setKeepFields(getKeep())
605605
.setDropFields(getDrop())
606-
.setFilter(FilterUtils.convert(getFilter(), table.schema()))
606+
.setFilterString(getFilter())
607607
.build();
608608
scanConfig.validate(table);
609609

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java

Lines changed: 61 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.beam.sdk.schemas.Schema;
3232
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
3333
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
34+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
3435
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
3536
import org.apache.iceberg.Table;
3637
import org.apache.iceberg.catalog.TableIdentifier;
@@ -45,6 +46,10 @@
4546
@AutoValue
4647
public abstract class IcebergScanConfig implements Serializable {
4748
private transient @MonotonicNonNull Table cachedTable;
49+
private transient org.apache.iceberg.@MonotonicNonNull Schema cachedProjectedSchema;
50+
private transient org.apache.iceberg.@MonotonicNonNull Schema cachedRequiredSchema;
51+
private transient @MonotonicNonNull Evaluator cachedEvaluator;
52+
private transient @MonotonicNonNull Expression cachedFilter;
4853

4954
public enum ScanType {
5055
TABLE,
@@ -75,19 +80,59 @@ public Table getTable() {
7580
@VisibleForTesting
7681
static org.apache.iceberg.Schema resolveSchema(
7782
org.apache.iceberg.Schema schema, @Nullable List<String> keep, @Nullable List<String> drop) {
83+
return resolveSchema(schema, keep, drop, null);
84+
}
85+
86+
@VisibleForTesting
87+
static org.apache.iceberg.Schema resolveSchema(
88+
org.apache.iceberg.Schema schema,
89+
@Nullable List<String> keep,
90+
@Nullable List<String> drop,
91+
@Nullable Set<String> fieldsInFilter) {
92+
ImmutableList.Builder<String> selectedFieldsBuilder = ImmutableList.builder();
7893
if (keep != null && !keep.isEmpty()) {
79-
schema = schema.select(keep);
94+
selectedFieldsBuilder.addAll(keep);
8095
} else if (drop != null && !drop.isEmpty()) {
8196
Set<String> fields =
8297
schema.columns().stream().map(Types.NestedField::name).collect(Collectors.toSet());
8398
drop.forEach(fields::remove);
84-
schema = schema.select(fields);
99+
selectedFieldsBuilder.addAll(fields);
100+
} else {
101+
// default: include all columns
102+
return schema;
85103
}
86-
return schema;
104+
105+
if (fieldsInFilter != null && !fieldsInFilter.isEmpty()) {
106+
fieldsInFilter.stream()
107+
.map(f -> schema.caseInsensitiveFindField(f).name())
108+
.forEach(selectedFieldsBuilder::add);
109+
}
110+
ImmutableList<String> selectedFields = selectedFieldsBuilder.build();
111+
return selectedFields.isEmpty() ? schema : schema.select(selectedFields);
87112
}
88113

114+
/** Returns the projected Schema after applying column pruning. */
89115
public org.apache.iceberg.Schema getProjectedSchema() {
90-
return resolveSchema(getTable().schema(), getKeepFields(), getDropFields());
116+
if (cachedProjectedSchema == null) {
117+
cachedProjectedSchema = resolveSchema(getTable().schema(), getKeepFields(), getDropFields());
118+
}
119+
return cachedProjectedSchema;
120+
}
121+
122+
/**
123+
* Returns a Schema that includes all the fields required for a successful read. This includes
124+
* explicitly selected fields and fields referenced in the filter statement.
125+
*/
126+
public org.apache.iceberg.Schema getRequiredSchema() {
127+
if (cachedRequiredSchema == null) {
128+
cachedRequiredSchema =
129+
resolveSchema(
130+
getTable().schema(),
131+
getKeepFields(),
132+
getDropFields(),
133+
FilterUtils.getReferencedFieldNames(getFilterString()));
134+
}
135+
return cachedRequiredSchema;
91136
}
92137

93138
@Pure
@@ -98,15 +143,22 @@ public Evaluator getEvaluator() {
98143
return null;
99144
}
100145
if (cachedEvaluator == null) {
101-
cachedEvaluator = new Evaluator(getProjectedSchema().asStruct(), filter);
146+
cachedEvaluator = new Evaluator(getRequiredSchema().asStruct(), filter);
102147
}
103148
return cachedEvaluator;
104149
}
105150

106-
private transient @Nullable Evaluator cachedEvaluator;
151+
@Pure
152+
@Nullable
153+
public Expression getFilter() {
154+
if (cachedFilter == null) {
155+
cachedFilter = FilterUtils.convert(getFilterString(), getTable().schema());
156+
}
157+
return cachedFilter;
158+
}
107159

108160
@Pure
109-
public abstract @Nullable Expression getFilter();
161+
public abstract @Nullable String getFilterString();
110162

111163
@Pure
112164
public abstract @Nullable Boolean getCaseSensitive();
@@ -172,7 +224,7 @@ public Evaluator getEvaluator() {
172224
public static Builder builder() {
173225
return new AutoValue_IcebergScanConfig.Builder()
174226
.setScanType(ScanType.TABLE)
175-
.setFilter(null)
227+
.setFilterString(null)
176228
.setCaseSensitive(null)
177229
.setOptions(ImmutableMap.of())
178230
.setSnapshot(null)
@@ -211,7 +263,7 @@ public Builder setTableIdentifier(String... names) {
211263

212264
public abstract Builder setSchema(Schema schema);
213265

214-
public abstract Builder setFilter(@Nullable Expression filter);
266+
public abstract Builder setFilterString(@Nullable String filter);
215267

216268
public abstract Builder setCaseSensitive(@Nullable Boolean caseSensitive);
217269

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java

Lines changed: 90 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package org.apache.beam.sdk.io.iceberg;
1919

2020
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
21+
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
22+
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
2123

2224
import java.nio.ByteBuffer;
2325
import java.time.LocalDate;
@@ -30,17 +32,20 @@
3032
import java.util.Map;
3133
import java.util.Optional;
3234
import java.util.UUID;
35+
import java.util.stream.Collectors;
3336
import org.apache.beam.sdk.schemas.Schema;
3437
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
3538
import org.apache.beam.sdk.util.Preconditions;
3639
import org.apache.beam.sdk.values.Row;
3740
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
41+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
3842
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
3943
import org.apache.iceberg.data.GenericRecord;
4044
import org.apache.iceberg.data.Record;
4145
import org.apache.iceberg.types.Type;
4246
import org.apache.iceberg.types.Types;
4347
import org.apache.iceberg.util.DateTimeUtil;
48+
import org.checkerframework.checker.nullness.qual.NonNull;
4449
import org.checkerframework.checker.nullness.qual.Nullable;
4550
import org.joda.time.DateTime;
4651
import org.joda.time.Instant;
@@ -345,10 +350,34 @@ private static void copyFieldIntoRecord(Record rec, Types.NestedField field, Row
345350
copyRowIntoRecord(GenericRecord.create(field.type().asStructType()), row)));
346351
break;
347352
case LIST:
348-
Optional.ofNullable(value.getArray(name)).ifPresent(list -> rec.setField(name, list));
353+
Iterable<@NonNull ?> icebergList = value.getIterable(name);
354+
Type collectionType = ((Types.ListType) field.type()).elementType();
355+
356+
if (collectionType.isStructType() && icebergList != null) {
357+
org.apache.iceberg.Schema innerSchema = collectionType.asStructType().asSchema();
358+
ImmutableList.Builder<Record> builder = ImmutableList.builder();
359+
for (Row v : (Iterable<Row>) icebergList) {
360+
builder.add(beamRowToIcebergRecord(innerSchema, v));
361+
}
362+
icebergList = builder.build();
363+
}
364+
Optional.ofNullable(icebergList).ifPresent(list -> rec.setField(name, list));
349365
break;
350366
case MAP:
351-
Optional.ofNullable(value.getMap(name)).ifPresent(v -> rec.setField(name, v));
367+
Map<?, ?> icebergMap = value.getMap(name);
368+
Type valueType = ((Types.MapType) field.type()).valueType();
369+
// recurse on struct types
370+
if (valueType.isStructType() && icebergMap != null) {
371+
org.apache.iceberg.Schema innerSchema = valueType.asStructType().asSchema();
372+
373+
ImmutableMap.Builder<Object, Record> newMap = ImmutableMap.builder();
374+
for (Map.Entry<?, ?> entry : icebergMap.entrySet()) {
375+
Row row = checkStateNotNull(((Row) entry.getValue()));
376+
newMap.put(checkStateNotNull(entry.getKey()), beamRowToIcebergRecord(innerSchema, row));
377+
}
378+
icebergMap = newMap.build();
379+
}
380+
Optional.ofNullable(icebergMap).ifPresent(v -> rec.setField(name, v));
352381
break;
353382
}
354383
}
@@ -426,10 +455,68 @@ public static Row icebergRecordToBeamRow(Schema schema, Record record) {
426455
case DOUBLE: // Iceberg and Beam both use double
427456
case STRING: // Iceberg and Beam both use String
428457
case BOOLEAN: // Iceberg and Beam both use boolean
458+
rowBuilder.addValue(icebergValue);
459+
break;
429460
case ARRAY:
461+
checkState(
462+
icebergValue instanceof List,
463+
"Expected List type for field '%s' but received %s",
464+
field.getName(),
465+
icebergValue.getClass());
466+
List<@NonNull ?> beamList = (List<@NonNull ?>) icebergValue;
467+
Schema.FieldType collectionType =
468+
checkStateNotNull(field.getType().getCollectionElementType());
469+
// recurse on struct types
470+
if (collectionType.getTypeName().isCompositeType()) {
471+
Schema innerSchema = checkStateNotNull(collectionType.getRowSchema());
472+
beamList =
473+
beamList.stream()
474+
.map(v -> icebergRecordToBeamRow(innerSchema, (Record) v))
475+
.collect(Collectors.toList());
476+
}
477+
rowBuilder.addValue(beamList);
478+
break;
430479
case ITERABLE:
480+
checkState(
481+
icebergValue instanceof Iterable,
482+
"Expected Iterable type for field '%s' but received %s",
483+
field.getName(),
484+
icebergValue.getClass());
485+
Iterable<@NonNull ?> beamIterable = (Iterable<@NonNull ?>) icebergValue;
486+
Schema.FieldType iterableCollectionType =
487+
checkStateNotNull(field.getType().getCollectionElementType());
488+
// recurse on struct types
489+
if (iterableCollectionType.getTypeName().isCompositeType()) {
490+
Schema innerSchema = checkStateNotNull(iterableCollectionType.getRowSchema());
491+
ImmutableList.Builder<Row> builder = ImmutableList.builder();
492+
for (Record v : (Iterable<@NonNull Record>) icebergValue) {
493+
builder.add(icebergRecordToBeamRow(innerSchema, v));
494+
}
495+
beamIterable = builder.build();
496+
}
497+
rowBuilder.addValue(beamIterable);
498+
break;
431499
case MAP:
432-
rowBuilder.addValue(icebergValue);
500+
checkState(
501+
icebergValue instanceof Map,
502+
"Expected Map type for field '%s' but received %s",
503+
field.getName(),
504+
icebergValue.getClass());
505+
Map<?, ?> beamMap = (Map<?, ?>) icebergValue;
506+
Schema.FieldType valueType = checkStateNotNull(field.getType().getMapValueType());
507+
// recurse on struct types
508+
if (valueType.getTypeName().isCompositeType()) {
509+
Schema innerSchema = checkStateNotNull(valueType.getRowSchema());
510+
ImmutableMap.Builder<Object, Row> newMap = ImmutableMap.builder();
511+
for (Map.Entry<?, ?> entry : ((Map<?, ?>) icebergValue).entrySet()) {
512+
Record rec = ((Record) entry.getValue());
513+
newMap.put(
514+
checkStateNotNull(entry.getKey()),
515+
icebergRecordToBeamRow(innerSchema, checkStateNotNull(rec)));
516+
}
517+
beamMap = newMap.build();
518+
}
519+
rowBuilder.addValue(beamMap);
433520
break;
434521
case DATETIME:
435522
// Iceberg uses a long for micros.

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadFromTasks.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,9 @@ public void process(
7474
return;
7575
}
7676
FileScanTask task = fileScanTasks.get((int) l);
77-
org.apache.iceberg.Schema projected = scanConfig.getProjectedSchema();
78-
Schema beamSchema = IcebergUtils.icebergSchemaToBeamSchema(projected);
77+
Schema beamSchema = IcebergUtils.icebergSchemaToBeamSchema(scanConfig.getProjectedSchema());
7978
try (CloseableIterable<Record> fullIterable =
80-
ReadUtils.createReader(task, table, projected)) {
79+
ReadUtils.createReader(task, table, scanConfig.getRequiredSchema())) {
8180
CloseableIterable<Record> reader = ReadUtils.maybeApplyFilter(fullIterable, scanConfig);
8281

8382
for (Record record : reader) {

0 commit comments

Comments
 (0)