Skip to content

Commit ce9b82a

Browse files
committed
Spark implementation
1 parent 6595ccc commit ce9b82a

12 files changed

Lines changed: 397 additions & 176 deletions

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import org.apache.iceberg.arrow.vectorized.BaseBatchReader;
2424
import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader;
2525
import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader.DeletedVectorReader;
26-
import org.apache.iceberg.data.DeleteFilter;
26+
import org.apache.iceberg.io.datafile.DeleteFilter;
2727
import org.apache.iceberg.parquet.VectorizedReader;
2828
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
2929
import org.apache.iceberg.util.Pair;

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020

2121
import java.util.Arrays;
2222
import java.util.function.Predicate;
23-
import org.apache.iceberg.data.DeleteFilter;
2423
import org.apache.iceberg.deletes.PositionDeleteIndex;
24+
import org.apache.iceberg.io.datafile.DeleteFilter;
2525
import org.apache.iceberg.util.Pair;
2626
import org.apache.spark.sql.catalyst.InternalRow;
2727
import org.apache.spark.sql.vectorized.ColumnVector;

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import org.apache.comet.parquet.AbstractColumnReader;
2626
import org.apache.comet.parquet.BatchReader;
2727
import org.apache.iceberg.Schema;
28-
import org.apache.iceberg.data.DeleteFilter;
28+
import org.apache.iceberg.io.datafile.DeleteFilter;
2929
import org.apache.iceberg.parquet.VectorizedReader;
3030
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
3131
import org.apache.iceberg.spark.SparkSchemaUtil;

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import java.util.stream.IntStream;
2525
import org.apache.iceberg.MetadataColumns;
2626
import org.apache.iceberg.Schema;
27-
import org.apache.iceberg.data.DeleteFilter;
27+
import org.apache.iceberg.io.datafile.DeleteFilter;
2828
import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
2929
import org.apache.iceberg.parquet.VectorizedReader;
3030
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import org.apache.arrow.vector.NullCheckingForGet;
2525
import org.apache.iceberg.Schema;
2626
import org.apache.iceberg.arrow.vectorized.VectorizedReaderBuilder;
27-
import org.apache.iceberg.data.DeleteFilter;
27+
import org.apache.iceberg.io.datafile.DeleteFilter;
2828
import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
2929
import org.apache.iceberg.parquet.VectorizedReader;
3030
import org.apache.iceberg.spark.SparkUtil;

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java

Lines changed: 95 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -19,25 +19,25 @@
1919
package org.apache.iceberg.spark.source;
2020

2121
import java.util.Map;
22-
import java.util.Set;
2322
import org.apache.iceberg.FileFormat;
24-
import org.apache.iceberg.MetadataColumns;
2523
import org.apache.iceberg.ScanTask;
2624
import org.apache.iceberg.ScanTaskGroup;
2725
import org.apache.iceberg.Schema;
2826
import org.apache.iceberg.Table;
2927
import org.apache.iceberg.expressions.Expression;
3028
import org.apache.iceberg.io.CloseableIterable;
3129
import org.apache.iceberg.io.InputFile;
30+
import org.apache.iceberg.io.datafile.DataFileServiceRegistry;
31+
import org.apache.iceberg.io.datafile.DeleteFilter;
32+
import org.apache.iceberg.io.datafile.ReaderBuilder;
3233
import org.apache.iceberg.orc.ORC;
3334
import org.apache.iceberg.parquet.Parquet;
34-
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
3535
import org.apache.iceberg.spark.OrcBatchReadConf;
3636
import org.apache.iceberg.spark.ParquetBatchReadConf;
3737
import org.apache.iceberg.spark.ParquetReaderType;
3838
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
3939
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
40-
import org.apache.iceberg.types.TypeUtil;
40+
import org.apache.spark.sql.catalyst.InternalRow;
4141
import org.apache.spark.sql.vectorized.ColumnarBatch;
4242

4343
abstract class BaseBatchReader<T extends ScanTask> extends BaseReader<ColumnarBatch, T> {
@@ -65,76 +65,105 @@ protected CloseableIterable<ColumnarBatch> newBatchIterable(
6565
Expression residual,
6666
Map<Integer, ?> idToConstant,
6767
SparkDeleteFilter deleteFilter) {
68-
switch (format) {
69-
case PARQUET:
70-
return newParquetIterable(inputFile, start, length, residual, idToConstant, deleteFilter);
68+
ReaderBuilder readerBuilder =
69+
DataFileServiceRegistry.read(
70+
format,
71+
ColumnarBatch.class.getName(),
72+
parquetConf != null ? parquetConf.readerType().name() : null,
73+
inputFile,
74+
expectedSchema(),
75+
idToConstant,
76+
deleteFilter)
77+
.split(start, length)
78+
.filter(residual)
79+
.caseSensitive(caseSensitive())
80+
// Spark eagerly consumes the batches. So the underlying memory allocated could be
81+
// reused
82+
// without worrying about subsequent reads clobbering over each other. This improves
83+
// read performance as every batch read doesn't have to pay the cost of allocating
84+
// memory.
85+
.reuseContainers()
86+
.withNameMapping(nameMapping());
87+
if (parquetConf != null) {
88+
readerBuilder = readerBuilder.recordsPerBatch(parquetConf.batchSize());
89+
} else if (orcConf != null) {
90+
readerBuilder = readerBuilder.recordsPerBatch(orcConf.batchSize());
91+
}
7192

72-
case ORC:
73-
return newOrcIterable(inputFile, start, length, residual, idToConstant);
93+
return readerBuilder.build();
94+
}
7495

75-
default:
76-
throw new UnsupportedOperationException(
77-
"Format: " + format + " not supported for batched reads");
96+
public static class IcebergParquetReaderService implements DataFileServiceRegistry.ReaderService {
97+
@Override
98+
public DataFileServiceRegistry.Key key() {
99+
return new DataFileServiceRegistry.Key(
100+
FileFormat.PARQUET, ColumnarBatch.class.getName(), ParquetReaderType.ICEBERG.name());
101+
}
102+
103+
@Override
104+
public ReaderBuilder builder(
105+
InputFile inputFile,
106+
Schema readSchema,
107+
Map<Integer, ?> idToConstant,
108+
DeleteFilter<?> deleteFilter) {
109+
// get required schema if there are deletes
110+
Schema requiredSchema = deleteFilter != null ? deleteFilter.requiredSchema() : readSchema;
111+
return Parquet.read(inputFile)
112+
.project(requiredSchema)
113+
.createBatchedReaderFunc(
114+
fileSchema ->
115+
VectorizedSparkParquetReaders.buildReader(
116+
requiredSchema,
117+
fileSchema,
118+
idToConstant,
119+
(DeleteFilter<InternalRow>) deleteFilter));
78120
}
79121
}
80122

81-
private CloseableIterable<ColumnarBatch> newParquetIterable(
82-
InputFile inputFile,
83-
long start,
84-
long length,
85-
Expression residual,
86-
Map<Integer, ?> idToConstant,
87-
SparkDeleteFilter deleteFilter) {
88-
// get required schema if there are deletes
89-
Schema requiredSchema = deleteFilter != null ? deleteFilter.requiredSchema() : expectedSchema();
123+
public static class CometParquetReaderService implements DataFileServiceRegistry.ReaderService {
124+
@Override
125+
public DataFileServiceRegistry.Key key() {
126+
return new DataFileServiceRegistry.Key(
127+
FileFormat.PARQUET, ColumnarBatch.class.getName(), ParquetReaderType.COMET.name());
128+
}
90129

91-
return Parquet.read(inputFile)
92-
.project(requiredSchema)
93-
.split(start, length)
94-
.createBatchedReaderFunc(
95-
fileSchema -> {
96-
if (parquetConf.readerType() == ParquetReaderType.COMET) {
97-
return VectorizedSparkParquetReaders.buildCometReader(
98-
requiredSchema, fileSchema, idToConstant, deleteFilter);
99-
} else {
100-
return VectorizedSparkParquetReaders.buildReader(
101-
requiredSchema, fileSchema, idToConstant, deleteFilter);
102-
}
103-
})
104-
.recordsPerBatch(parquetConf.batchSize())
105-
.filter(residual)
106-
.caseSensitive(caseSensitive())
107-
// Spark eagerly consumes the batches. So the underlying memory allocated could be reused
108-
// without worrying about subsequent reads clobbering over each other. This improves
109-
// read performance as every batch read doesn't have to pay the cost of allocating memory.
110-
.reuseContainers()
111-
.withNameMapping(nameMapping())
112-
.build();
130+
@Override
131+
public ReaderBuilder builder(
132+
InputFile inputFile,
133+
Schema readSchema,
134+
Map<Integer, ?> idToConstant,
135+
DeleteFilter<?> deleteFilter) {
136+
// get required schema if there are deletes
137+
Schema requiredSchema = deleteFilter != null ? deleteFilter.requiredSchema() : readSchema;
138+
return Parquet.read(inputFile)
139+
.project(requiredSchema)
140+
.createBatchedReaderFunc(
141+
fileSchema ->
142+
VectorizedSparkParquetReaders.buildCometReader(
143+
requiredSchema,
144+
fileSchema,
145+
idToConstant,
146+
(DeleteFilter<InternalRow>) deleteFilter));
147+
}
113148
}
114149

115-
private CloseableIterable<ColumnarBatch> newOrcIterable(
116-
InputFile inputFile,
117-
long start,
118-
long length,
119-
Expression residual,
120-
Map<Integer, ?> idToConstant) {
121-
Set<Integer> constantFieldIds = idToConstant.keySet();
122-
Set<Integer> metadataFieldIds = MetadataColumns.metadataFieldIds();
123-
Sets.SetView<Integer> constantAndMetadataFieldIds =
124-
Sets.union(constantFieldIds, metadataFieldIds);
125-
Schema schemaWithoutConstantAndMetadataFields =
126-
TypeUtil.selectNot(expectedSchema(), constantAndMetadataFieldIds);
150+
public static class ORCReaderService implements DataFileServiceRegistry.ReaderService {
151+
@Override
152+
public DataFileServiceRegistry.Key key() {
153+
return new DataFileServiceRegistry.Key(FileFormat.ORC, ColumnarBatch.class.getName());
154+
}
127155

128-
return ORC.read(inputFile)
129-
.project(schemaWithoutConstantAndMetadataFields)
130-
.split(start, length)
131-
.createBatchedReaderFunc(
132-
fileSchema ->
133-
VectorizedSparkOrcReaders.buildReader(expectedSchema(), fileSchema, idToConstant))
134-
.recordsPerBatch(orcConf.batchSize())
135-
.filter(residual)
136-
.caseSensitive(caseSensitive())
137-
.withNameMapping(nameMapping())
138-
.build();
156+
@Override
157+
public ReaderBuilder builder(
158+
InputFile inputFile,
159+
Schema readSchema,
160+
Map<Integer, ?> idToConstant,
161+
DeleteFilter<?> deleteFilter) {
162+
return ORC.read(inputFile)
163+
.project(ORC.schemaWithoutConstantAndMetadataFields(readSchema, idToConstant))
164+
.createBatchedReaderFunc(
165+
fileSchema ->
166+
VectorizedSparkOrcReaders.buildReader(readSchema, fileSchema, idToConstant));
167+
}
139168
}
140169
}

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java

Lines changed: 58 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import java.util.Map;
2222
import org.apache.iceberg.FileFormat;
23-
import org.apache.iceberg.MetadataColumns;
2423
import org.apache.iceberg.ScanTask;
2524
import org.apache.iceberg.ScanTaskGroup;
2625
import org.apache.iceberg.Schema;
@@ -29,13 +28,14 @@
2928
import org.apache.iceberg.expressions.Expression;
3029
import org.apache.iceberg.io.CloseableIterable;
3130
import org.apache.iceberg.io.InputFile;
31+
import org.apache.iceberg.io.datafile.DataFileServiceRegistry;
32+
import org.apache.iceberg.io.datafile.DeleteFilter;
33+
import org.apache.iceberg.io.datafile.ReaderBuilder;
3234
import org.apache.iceberg.orc.ORC;
3335
import org.apache.iceberg.parquet.Parquet;
34-
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
3536
import org.apache.iceberg.spark.data.SparkOrcReader;
3637
import org.apache.iceberg.spark.data.SparkParquetReaders;
3738
import org.apache.iceberg.spark.data.SparkPlannedAvroReader;
38-
import org.apache.iceberg.types.TypeUtil;
3939
import org.apache.spark.sql.catalyst.InternalRow;
4040

4141
abstract class BaseRowReader<T extends ScanTask> extends BaseReader<InternalRow, T> {
@@ -56,70 +56,69 @@ protected CloseableIterable<InternalRow> newIterable(
5656
Expression residual,
5757
Schema projection,
5858
Map<Integer, ?> idToConstant) {
59-
switch (format) {
60-
case PARQUET:
61-
return newParquetIterable(file, start, length, residual, projection, idToConstant);
62-
63-
case AVRO:
64-
return newAvroIterable(file, start, length, projection, idToConstant);
65-
66-
case ORC:
67-
return newOrcIterable(file, start, length, residual, projection, idToConstant);
68-
69-
default:
70-
throw new UnsupportedOperationException("Cannot read unknown format: " + format);
71-
}
72-
}
73-
74-
private CloseableIterable<InternalRow> newAvroIterable(
75-
InputFile file, long start, long length, Schema projection, Map<Integer, ?> idToConstant) {
76-
return Avro.read(file)
77-
.reuseContainers()
78-
.project(projection)
79-
.split(start, length)
80-
.createResolvingReader(schema -> SparkPlannedAvroReader.create(schema, idToConstant))
81-
.withNameMapping(nameMapping())
82-
.build();
83-
}
84-
85-
private CloseableIterable<InternalRow> newParquetIterable(
86-
InputFile file,
87-
long start,
88-
long length,
89-
Expression residual,
90-
Schema readSchema,
91-
Map<Integer, ?> idToConstant) {
92-
return Parquet.read(file)
59+
return DataFileServiceRegistry.read(
60+
format, InternalRow.class.getName(), file, projection, idToConstant)
9361
.reuseContainers()
9462
.split(start, length)
95-
.project(readSchema)
96-
.createReaderFunc(
97-
fileSchema -> SparkParquetReaders.buildReader(readSchema, fileSchema, idToConstant))
9863
.filter(residual)
9964
.caseSensitive(caseSensitive())
10065
.withNameMapping(nameMapping())
10166
.build();
10267
}
10368

104-
private CloseableIterable<InternalRow> newOrcIterable(
105-
InputFile file,
106-
long start,
107-
long length,
108-
Expression residual,
109-
Schema readSchema,
110-
Map<Integer, ?> idToConstant) {
111-
Schema readSchemaWithoutConstantAndMetadataFields =
112-
TypeUtil.selectNot(
113-
readSchema, Sets.union(idToConstant.keySet(), MetadataColumns.metadataFieldIds()));
69+
public static class ParquetReaderService implements DataFileServiceRegistry.ReaderService {
70+
@Override
71+
public DataFileServiceRegistry.Key key() {
72+
return new DataFileServiceRegistry.Key(FileFormat.PARQUET, InternalRow.class.getName());
73+
}
11474

115-
return ORC.read(file)
116-
.project(readSchemaWithoutConstantAndMetadataFields)
117-
.split(start, length)
118-
.createReaderFunc(
119-
readOrcSchema -> new SparkOrcReader(readSchema, readOrcSchema, idToConstant))
120-
.filter(residual)
121-
.caseSensitive(caseSensitive())
122-
.withNameMapping(nameMapping())
123-
.build();
75+
@Override
76+
public ReaderBuilder builder(
77+
InputFile inputFile,
78+
Schema readSchema,
79+
Map<Integer, ?> idToConstant,
80+
DeleteFilter<?> deleteFilter) {
81+
return Parquet.read(inputFile)
82+
.project(readSchema)
83+
.createReaderFunc(
84+
fileSchema -> SparkParquetReaders.buildReader(readSchema, fileSchema, idToConstant));
85+
}
86+
}
87+
88+
public static class ORCReaderService implements DataFileServiceRegistry.ReaderService {
89+
@Override
90+
public DataFileServiceRegistry.Key key() {
91+
return new DataFileServiceRegistry.Key(FileFormat.ORC, InternalRow.class.getName());
92+
}
93+
94+
@Override
95+
public ReaderBuilder builder(
96+
InputFile inputFile,
97+
Schema readSchema,
98+
Map<Integer, ?> idToConstant,
99+
DeleteFilter<?> deleteFilter) {
100+
return ORC.read(inputFile)
101+
.project(ORC.schemaWithoutConstantAndMetadataFields(readSchema, idToConstant))
102+
.createReaderFunc(
103+
readOrcSchema -> new SparkOrcReader(readSchema, readOrcSchema, idToConstant));
104+
}
105+
}
106+
107+
public static class AvroReaderService implements DataFileServiceRegistry.ReaderService {
108+
@Override
109+
public DataFileServiceRegistry.Key key() {
110+
return new DataFileServiceRegistry.Key(FileFormat.AVRO, InternalRow.class.getName());
111+
}
112+
113+
@Override
114+
public ReaderBuilder builder(
115+
InputFile inputFile,
116+
Schema readSchema,
117+
Map<Integer, ?> idToConstant,
118+
DeleteFilter<?> deleteFilter) {
119+
return Avro.read(inputFile)
120+
.project(readSchema)
121+
.createResolvingReader(schema -> SparkPlannedAvroReader.create(schema, idToConstant));
122+
}
124123
}
125124
}

0 commit comments

Comments
 (0)