Skip to content

Commit 062e822

Browse files
authored
Spark 3.4: Pass FileIO on Spark's read path (#16307)
Backport of #15683 (and length fix #16284) to spark/v3.4. Note: BaseReader required an adaptation \u2014 v3.4 still used the legacy table.encryption().decrypt(...) path. Switched it to fileIO.bulkDecrypt(...) to match v3.5/4.0/4.1, since the broadcast FileIO is now an EncryptingFileIO (combined in the constructor). All other files match the v3.5 patch byte-for-byte (with paths translated).
1 parent a676552 commit 062e822

23 files changed

Lines changed: 413 additions & 40 deletions

spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java renamed to spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/source/TestRemoteScanPlanning.java

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,26 @@
1616
* specific language governing permissions and limitations
1717
* under the License.
1818
*/
19-
package org.apache.iceberg.spark.extensions;
19+
package org.apache.iceberg.spark.source;
2020

21+
import static org.assertj.core.api.Assertions.assertThat;
22+
23+
import java.util.function.Supplier;
24+
import org.apache.hadoop.conf.Configuration;
2125
import org.apache.iceberg.CatalogProperties;
2226
import org.apache.iceberg.ParameterizedTestExtension;
2327
import org.apache.iceberg.Parameters;
28+
import org.apache.iceberg.Table;
29+
import org.apache.iceberg.io.FileIO;
2430
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
31+
import org.apache.iceberg.rest.RESTCatalog;
2532
import org.apache.iceberg.rest.RESTCatalogProperties;
2633
import org.apache.iceberg.spark.SparkCatalogConfig;
2734
import org.apache.iceberg.spark.sql.TestSelect;
35+
import org.apache.spark.sql.connector.read.Batch;
36+
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
37+
import org.assertj.core.api.InstanceOfAssertFactories;
38+
import org.junit.jupiter.api.TestTemplate;
2839
import org.junit.jupiter.api.extension.ExtendWith;
2940

3041
@ExtendWith(ParameterizedTestExtension.class)
@@ -38,8 +49,6 @@ protected static Object[][] parameters() {
3849
ImmutableMap.builder()
3950
.putAll(SparkCatalogConfig.REST.properties())
4051
.put(CatalogProperties.URI, restCatalog.properties().get(CatalogProperties.URI))
41-
// this flag is typically only set by the server, but we set it from the client for
42-
// testing
4352
.put(
4453
RESTCatalogProperties.SCAN_PLANNING_MODE,
4554
RESTCatalogProperties.ScanPlanningMode.SERVER.modeName())
@@ -48,4 +57,36 @@ protected static Object[][] parameters() {
4857
}
4958
};
5059
}
60+
61+
@TestTemplate
62+
public void fileIOIsPropagated() {
63+
RESTCatalog catalog = new RESTCatalog();
64+
catalog.setConf(new Configuration());
65+
catalog.initialize(
66+
"test",
67+
ImmutableMap.<String, String>builder()
68+
.putAll(restCatalog.properties())
69+
.put(
70+
RESTCatalogProperties.SCAN_PLANNING_MODE,
71+
RESTCatalogProperties.ScanPlanningMode.SERVER.modeName())
72+
.build());
73+
Table table = catalog.loadTable(tableIdent);
74+
75+
SparkScanBuilder builder = new SparkScanBuilder(spark, table, CaseInsensitiveStringMap.empty());
76+
verifyFileIOHasPlanId(builder.build().toBatch(), table);
77+
verifyFileIOHasPlanId(builder.buildCopyOnWriteScan().toBatch(), table);
78+
}
79+
80+
private void verifyFileIOHasPlanId(Batch batch, Table table) {
81+
FileIO fileIOForScan =
82+
(FileIO)
83+
assertThat(batch)
84+
.extracting("fileIO")
85+
.isInstanceOf(Supplier.class)
86+
.asInstanceOf(InstanceOfAssertFactories.type(Supplier.class))
87+
.actual()
88+
.get();
89+
assertThat(fileIOForScan.properties()).containsKey(RESTCatalogProperties.REST_SCAN_PLAN_ID);
90+
assertThat(table.io().properties()).doesNotContainKey(RESTCatalogProperties.REST_SCAN_PLAN_ID);
91+
}
5192
}

spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.iceberg.formats.FormatModelRegistry;
3232
import org.apache.iceberg.formats.ReadBuilder;
3333
import org.apache.iceberg.io.CloseableIterable;
34+
import org.apache.iceberg.io.FileIO;
3435
import org.apache.iceberg.io.InputFile;
3536
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
3637
import org.apache.iceberg.spark.OrcBatchReadConf;
@@ -49,6 +50,7 @@ abstract class BaseBatchReader<T extends ScanTask> extends BaseReader<ColumnarBa
4950

5051
BaseBatchReader(
5152
Table table,
53+
FileIO fileIO,
5254
ScanTaskGroup<T> taskGroup,
5355
Schema tableSchema,
5456
Schema expectedSchema,
@@ -57,7 +59,13 @@ abstract class BaseBatchReader<T extends ScanTask> extends BaseReader<ColumnarBa
5759
OrcBatchReadConf orcConf,
5860
boolean cacheDeleteFilesOnExecutors) {
5961
super(
60-
table, taskGroup, tableSchema, expectedSchema, caseSensitive, cacheDeleteFilesOnExecutors);
62+
table,
63+
fileIO,
64+
taskGroup,
65+
tableSchema,
66+
expectedSchema,
67+
caseSensitive,
68+
cacheDeleteFilesOnExecutors);
6169
this.parquetConf = parquetConf;
6270
this.orcConf = orcConf;
6371
}

spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,12 @@
4242
import org.apache.iceberg.data.DeleteFilter;
4343
import org.apache.iceberg.data.DeleteLoader;
4444
import org.apache.iceberg.deletes.DeleteCounter;
45-
import org.apache.iceberg.encryption.EncryptedFiles;
46-
import org.apache.iceberg.encryption.EncryptedInputFile;
45+
import org.apache.iceberg.encryption.EncryptingFileIO;
4746
import org.apache.iceberg.io.CloseableIterator;
47+
import org.apache.iceberg.io.FileIO;
4848
import org.apache.iceberg.io.InputFile;
4949
import org.apache.iceberg.mapping.NameMapping;
5050
import org.apache.iceberg.mapping.NameMappingParser;
51-
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
52-
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
5351
import org.apache.iceberg.spark.SparkExecutorCache;
5452
import org.apache.iceberg.spark.SparkSchemaUtil;
5553
import org.apache.iceberg.spark.SparkUtil;
@@ -69,6 +67,7 @@ abstract class BaseReader<T, TaskT extends ScanTask> implements Closeable {
6967
private static final Logger LOG = LoggerFactory.getLogger(BaseReader.class);
7068

7169
private final Table table;
70+
private final EncryptingFileIO fileIO;
7271
private final Schema tableSchema;
7372
private final Schema expectedSchema;
7473
private final boolean caseSensitive;
@@ -85,12 +84,14 @@ abstract class BaseReader<T, TaskT extends ScanTask> implements Closeable {
8584

8685
BaseReader(
8786
Table table,
87+
FileIO fileIO,
8888
ScanTaskGroup<TaskT> taskGroup,
8989
Schema tableSchema,
9090
Schema expectedSchema,
9191
boolean caseSensitive,
9292
boolean cacheDeleteFilesOnExecutors) {
9393
this.table = table;
94+
this.fileIO = EncryptingFileIO.combine(fileIO, table().encryption());
9495
this.taskGroup = taskGroup;
9596
this.tasks = taskGroup.tasks().iterator();
9697
this.currentIterator = CloseableIterator.empty();
@@ -182,25 +183,14 @@ protected InputFile getInputFile(String location) {
182183

183184
private Map<String, InputFile> inputFiles() {
184185
if (lazyInputFiles == null) {
185-
Stream<EncryptedInputFile> encryptedFiles =
186-
taskGroup.tasks().stream().flatMap(this::referencedFiles).map(this::toEncryptedInputFile);
187-
188-
// decrypt with the batch call to avoid multiple RPCs to a key server, if possible
189-
Iterable<InputFile> decryptedFiles = table.encryption().decrypt(encryptedFiles::iterator);
190-
191-
Map<String, InputFile> files = Maps.newHashMapWithExpectedSize(taskGroup.tasks().size());
192-
decryptedFiles.forEach(decrypted -> files.putIfAbsent(decrypted.location(), decrypted));
193-
this.lazyInputFiles = ImmutableMap.copyOf(files);
186+
this.lazyInputFiles =
187+
fileIO.bulkDecrypt(
188+
() -> taskGroup.tasks().stream().flatMap(this::referencedFiles).iterator());
194189
}
195190

196191
return lazyInputFiles;
197192
}
198193

199-
private EncryptedInputFile toEncryptedInputFile(ContentFile<?> file) {
200-
InputFile inputFile = table.io().newInputFile(file.location());
201-
return EncryptedFiles.encryptedInput(inputFile, file.keyMetadata());
202-
}
203-
204194
protected Map<Integer, ?> constantsMap(ContentScanTask<?> task, Schema readSchema) {
205195
if (readSchema.findField(MetadataColumns.PARTITION_COLUMN_ID) != null) {
206196
StructType partitionType = Partitioning.partitionType(table);

spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,27 @@
2828
import org.apache.iceberg.formats.FormatModelRegistry;
2929
import org.apache.iceberg.formats.ReadBuilder;
3030
import org.apache.iceberg.io.CloseableIterable;
31+
import org.apache.iceberg.io.FileIO;
3132
import org.apache.iceberg.io.InputFile;
3233
import org.apache.spark.sql.catalyst.InternalRow;
3334

3435
abstract class BaseRowReader<T extends ScanTask> extends BaseReader<InternalRow, T> {
3536
BaseRowReader(
3637
Table table,
38+
FileIO fileIO,
3739
ScanTaskGroup<T> taskGroup,
3840
Schema tableSchema,
3941
Schema expectedSchema,
4042
boolean caseSensitive,
4143
boolean cacheDeleteFilesOnExecutors) {
4244
super(
43-
table, taskGroup, tableSchema, expectedSchema, caseSensitive, cacheDeleteFilesOnExecutors);
45+
table,
46+
fileIO,
47+
taskGroup,
48+
tableSchema,
49+
expectedSchema,
50+
caseSensitive,
51+
cacheDeleteFilesOnExecutors);
4452
}
4553

4654
protected CloseableIterable<InternalRow> newIterable(

spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.iceberg.Schema;
2727
import org.apache.iceberg.Table;
2828
import org.apache.iceberg.io.CloseableIterator;
29+
import org.apache.iceberg.io.FileIO;
2930
import org.apache.iceberg.io.InputFile;
3031
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
3132
import org.apache.iceberg.spark.OrcBatchReadConf;
@@ -53,6 +54,7 @@ class BatchDataReader extends BaseBatchReader<FileScanTask>
5354
OrcBatchReadConf orcBatchReadConf) {
5455
this(
5556
partition.table(),
57+
partition.io(),
5658
partition.taskGroup(),
5759
SnapshotUtil.schemaFor(partition.table(), partition.branch()),
5860
partition.expectedSchema(),
@@ -64,6 +66,7 @@ class BatchDataReader extends BaseBatchReader<FileScanTask>
6466

6567
BatchDataReader(
6668
Table table,
69+
FileIO fileIO,
6770
ScanTaskGroup<FileScanTask> taskGroup,
6871
Schema tableSchema,
6972
Schema expectedSchema,
@@ -73,6 +76,7 @@ class BatchDataReader extends BaseBatchReader<FileScanTask>
7376
boolean cacheDeleteFilesOnExecutors) {
7477
super(
7578
table,
79+
fileIO,
7680
taskGroup,
7781
tableSchema,
7882
expectedSchema,

spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.iceberg.Table;
3636
import org.apache.iceberg.io.CloseableIterable;
3737
import org.apache.iceberg.io.CloseableIterator;
38+
import org.apache.iceberg.io.FileIO;
3839
import org.apache.iceberg.io.InputFile;
3940
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
4041
import org.apache.iceberg.util.SnapshotUtil;
@@ -51,6 +52,7 @@ class ChangelogRowReader extends BaseRowReader<ChangelogScanTask>
5152
ChangelogRowReader(SparkInputPartition partition) {
5253
this(
5354
partition.table(),
55+
partition.io(),
5456
partition.taskGroup(),
5557
SnapshotUtil.schemaFor(partition.table(), partition.branch()),
5658
partition.expectedSchema(),
@@ -60,13 +62,15 @@ class ChangelogRowReader extends BaseRowReader<ChangelogScanTask>
6062

6163
ChangelogRowReader(
6264
Table table,
65+
FileIO fileIO,
6366
ScanTaskGroup<ChangelogScanTask> taskGroup,
6467
Schema tableSchema,
6568
Schema expectedSchema,
6669
boolean caseSensitive,
6770
boolean cacheDeleteFilesOnExecutors) {
6871
super(
6972
table,
73+
fileIO,
7074
taskGroup,
7175
tableSchema,
7276
ChangelogUtil.dropChangelogMetadata(expectedSchema),

spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,27 @@
2525
import org.apache.iceberg.Schema;
2626
import org.apache.iceberg.Table;
2727
import org.apache.iceberg.io.CloseableIterator;
28+
import org.apache.iceberg.io.FileIO;
2829
import org.apache.spark.rdd.InputFileBlockHolder;
2930
import org.apache.spark.sql.catalyst.InternalRow;
3031

3132
public class EqualityDeleteRowReader extends RowDataReader {
3233
public EqualityDeleteRowReader(
3334
CombinedScanTask task,
3435
Table table,
36+
FileIO fileIO,
3537
Schema tableSchema,
3638
Schema expectedSchema,
3739
boolean caseSensitive,
3840
boolean cacheDeleteFilesOnExecutors) {
39-
super(table, task, tableSchema, expectedSchema, caseSensitive, cacheDeleteFilesOnExecutors);
41+
super(
42+
table,
43+
fileIO,
44+
task,
45+
tableSchema,
46+
expectedSchema,
47+
caseSensitive,
48+
cacheDeleteFilesOnExecutors);
4049
}
4150

4251
@Override

spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.iceberg.expressions.Expression;
2929
import org.apache.iceberg.expressions.ExpressionUtil;
3030
import org.apache.iceberg.io.CloseableIterator;
31+
import org.apache.iceberg.io.FileIO;
3132
import org.apache.iceberg.io.InputFile;
3233
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
3334
import org.apache.iceberg.util.ContentFileUtil;
@@ -46,6 +47,7 @@ class PositionDeletesRowReader extends BaseRowReader<PositionDeletesScanTask>
4647
PositionDeletesRowReader(SparkInputPartition partition) {
4748
this(
4849
partition.table(),
50+
partition.io(),
4951
partition.taskGroup(),
5052
SnapshotUtil.schemaFor(partition.table(), partition.branch()),
5153
partition.expectedSchema(),
@@ -55,14 +57,20 @@ class PositionDeletesRowReader extends BaseRowReader<PositionDeletesScanTask>
5557

5658
PositionDeletesRowReader(
5759
Table table,
60+
FileIO fileIO,
5861
ScanTaskGroup<PositionDeletesScanTask> taskGroup,
5962
Schema tableSchema,
6063
Schema expectedSchema,
6164
boolean caseSensitive,
6265
boolean cacheDeleteFilesOnExecutors) {
63-
6466
super(
65-
table, taskGroup, tableSchema, expectedSchema, caseSensitive, cacheDeleteFilesOnExecutors);
67+
table,
68+
fileIO,
69+
taskGroup,
70+
tableSchema,
71+
expectedSchema,
72+
caseSensitive,
73+
cacheDeleteFilesOnExecutors);
6674

6775
int numSplits = taskGroup.tasks().size();
6876
LOG.debug("Reading {} position delete file split(s) for table {}", numSplits, table.name());

spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.iceberg.Table;
2929
import org.apache.iceberg.io.CloseableIterable;
3030
import org.apache.iceberg.io.CloseableIterator;
31+
import org.apache.iceberg.io.FileIO;
3132
import org.apache.iceberg.io.InputFile;
3233
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
3334
import org.apache.iceberg.spark.source.metrics.TaskNumDeletes;
@@ -48,6 +49,7 @@ class RowDataReader extends BaseRowReader<FileScanTask> implements PartitionRead
4849
RowDataReader(SparkInputPartition partition) {
4950
this(
5051
partition.table(),
52+
partition.io(),
5153
partition.taskGroup(),
5254
SnapshotUtil.schemaFor(partition.table(), partition.branch()),
5355
partition.expectedSchema(),
@@ -57,14 +59,21 @@ class RowDataReader extends BaseRowReader<FileScanTask> implements PartitionRead
5759

5860
RowDataReader(
5961
Table table,
62+
FileIO fileIO,
6063
ScanTaskGroup<FileScanTask> taskGroup,
6164
Schema tableSchema,
6265
Schema expectedSchema,
6366
boolean caseSensitive,
6467
boolean cacheDeleteFilesOnExecutors) {
6568

6669
super(
67-
table, taskGroup, tableSchema, expectedSchema, caseSensitive, cacheDeleteFilesOnExecutors);
70+
table,
71+
fileIO,
72+
taskGroup,
73+
tableSchema,
74+
expectedSchema,
75+
caseSensitive,
76+
cacheDeleteFilesOnExecutors);
6877

6978
numSplits = taskGroup.tasks().size();
7079
LOG.debug("Reading {} file split(s) for table {}", numSplits, table.name());

0 commit comments

Comments
 (0)