Skip to content

Commit 6976e02

Browse files
karuppayyakaruppayya
andauthored
Spark: Backport #16088 to Spark 3.4, 3.5, 4.0 (#16344)
Co-authored-by: karuppayya <karuppayya1990@gmaiul.com>
1 parent 62fe817 commit 6976e02

12 files changed

Lines changed: 384 additions & 6 deletions

File tree

spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.iceberg.TableProperties;
2828
import org.apache.iceberg.exceptions.ValidationException;
2929
import org.apache.iceberg.hadoop.Util;
30+
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
3031
import org.apache.iceberg.util.PropertyUtil;
3132
import org.apache.spark.SparkConf;
3233
import org.apache.spark.sql.SparkSession;
@@ -318,6 +319,7 @@ public boolean aggregatePushDownEnabled() {
318319
public boolean adaptiveSplitSizeEnabled() {
319320
return confParser
320321
.booleanConf()
322+
.sessionConf(SparkSQLProperties.READ_ADAPTIVE_SPLIT_SIZE_ENABLED)
321323
.tableProperty(TableProperties.ADAPTIVE_SPLIT_SIZE_ENABLED)
322324
.defaultValue(TableProperties.ADAPTIVE_SPLIT_SIZE_ENABLED_DEFAULT)
323325
.parse();
@@ -329,6 +331,17 @@ public int parallelism() {
329331
return Math.max(defaultParallelism, numShufflePartitions);
330332
}
331333

334+
public int splitParallelism() {
335+
int parallelism =
336+
confParser
337+
.intConf()
338+
.sessionConf(SparkSQLProperties.READ_ADAPTIVE_SPLIT_SIZE_PARALLELISM)
339+
.defaultValue(parallelism())
340+
.parse();
341+
Preconditions.checkArgument(parallelism > 0, "Split parallelism must be > 0: %s", parallelism);
342+
return parallelism;
343+
}
344+
332345
public boolean distributedPlanningEnabled() {
333346
return table instanceof SupportsDistributedScanPlanning distributed
334347
&& distributed.allowDistributedPlanning()

spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,4 +111,13 @@ private SparkSQLProperties() {}
111111
public static final String ASYNC_MICRO_BATCH_PLANNING_ENABLED =
112112
"spark.sql.iceberg.async-micro-batch-planning-enabled";
113113
public static final boolean ASYNC_MICRO_BATCH_PLANNING_ENABLED_DEFAULT = false;
114+
115+
// Controls whether adaptive split sizing is enabled
116+
public static final String READ_ADAPTIVE_SPLIT_SIZE_ENABLED =
117+
"spark.sql.iceberg.read.adaptive-split-size.enabled";
118+
119+
// Overrides the parallelism used for adaptive split sizing. When unset, the parallelism
120+
// defaults to max(spark.default.parallelism, spark.sql.shuffle.partitions).
121+
public static final String READ_ADAPTIVE_SPLIT_SIZE_PARALLELISM =
122+
"spark.sql.iceberg.read.adaptive-split-size.parallelism";
114123
}

spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -356,8 +356,17 @@ public CustomMetric[] supportedCustomMetrics() {
356356
protected long adjustSplitSize(List<? extends ScanTask> tasks, long splitSize) {
357357
if (readConf.splitSizeOption() == null && readConf.adaptiveSplitSizeEnabled()) {
358358
long scanSize = tasks.stream().mapToLong(ScanTask::sizeBytes).sum();
359-
int parallelism = readConf.parallelism();
360-
return TableScanUtil.adjustSplitSize(scanSize, parallelism, splitSize);
359+
int parallelism = readConf.splitParallelism();
360+
long adjustedSplitSize = TableScanUtil.adjustSplitSize(scanSize, parallelism, splitSize);
361+
if (adjustedSplitSize != splitSize) {
362+
LOG.debug(
363+
"Adjusted split size from {} to {} for table {} with parallelism {}",
364+
splitSize,
365+
adjustedSplitSize,
366+
table().name(),
367+
parallelism);
368+
}
369+
return adjustedSplitSize;
361370
} else {
362371
return splitSize;
363372
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.spark;
20+
21+
import static org.assertj.core.api.Assertions.assertThat;
22+
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
23+
24+
import org.apache.iceberg.ParameterizedTestExtension;
25+
import org.apache.iceberg.Table;
26+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
27+
import org.apache.spark.sql.internal.SQLConf;
28+
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
29+
import org.junit.jupiter.api.AfterEach;
30+
import org.junit.jupiter.api.BeforeEach;
31+
import org.junit.jupiter.api.TestTemplate;
32+
import org.junit.jupiter.api.extension.ExtendWith;
33+
34+
@ExtendWith(ParameterizedTestExtension.class)
35+
public class TestSparkReadConf extends TestBaseWithCatalog {
36+
37+
@BeforeEach
38+
public void before() {
39+
super.before();
40+
sql("CREATE TABLE %s (id BIGINT, data STRING) USING iceberg", tableName);
41+
}
42+
43+
@AfterEach
44+
public void after() {
45+
sql("DROP TABLE IF EXISTS %s", tableName);
46+
}
47+
48+
@TestTemplate
49+
public void testSplitParallelismDefault() {
50+
Table table = validationCatalog.loadTable(tableIdent);
51+
SparkReadConf conf = new SparkReadConf(spark, table, CaseInsensitiveStringMap.empty());
52+
assertThat(conf.splitParallelism()).isEqualTo(conf.parallelism());
53+
}
54+
55+
@TestTemplate
56+
public void testSplitParallelismSessionConf() {
57+
Table table = validationCatalog.loadTable(tableIdent);
58+
withSQLConf(
59+
ImmutableMap.of(
60+
SQLConf.SHUFFLE_PARTITIONS().key(),
61+
"999",
62+
SparkSQLProperties.READ_ADAPTIVE_SPLIT_SIZE_PARALLELISM,
63+
"42"),
64+
() -> {
65+
SparkReadConf conf = new SparkReadConf(spark, table, CaseInsensitiveStringMap.empty());
66+
assertThat(conf.splitParallelism()).isEqualTo(42);
67+
});
68+
}
69+
70+
@TestTemplate
71+
public void testSplitParallelismRejectsZero() {
72+
Table table = validationCatalog.loadTable(tableIdent);
73+
withSQLConf(
74+
ImmutableMap.of(SparkSQLProperties.READ_ADAPTIVE_SPLIT_SIZE_PARALLELISM, "0"),
75+
() -> {
76+
SparkReadConf conf = new SparkReadConf(spark, table, CaseInsensitiveStringMap.empty());
77+
assertThatIllegalArgumentException()
78+
.isThrownBy(conf::splitParallelism)
79+
.withMessageContaining("Split parallelism must be > 0");
80+
});
81+
}
82+
83+
@TestTemplate
84+
public void testSplitParallelismRejectsNegative() {
85+
Table table = validationCatalog.loadTable(tableIdent);
86+
withSQLConf(
87+
ImmutableMap.of(SparkSQLProperties.READ_ADAPTIVE_SPLIT_SIZE_PARALLELISM, "-5"),
88+
() -> {
89+
SparkReadConf conf = new SparkReadConf(spark, table, CaseInsensitiveStringMap.empty());
90+
assertThatIllegalArgumentException()
91+
.isThrownBy(conf::splitParallelism)
92+
.withMessageContaining("Split parallelism must be > 0");
93+
});
94+
}
95+
}

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.iceberg.TableProperties;
2828
import org.apache.iceberg.exceptions.ValidationException;
2929
import org.apache.iceberg.hadoop.Util;
30+
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
3031
import org.apache.iceberg.util.PropertyUtil;
3132
import org.apache.spark.SparkConf;
3233
import org.apache.spark.sql.SparkSession;
@@ -314,6 +315,7 @@ public boolean aggregatePushDownEnabled() {
314315
public boolean adaptiveSplitSizeEnabled() {
315316
return confParser
316317
.booleanConf()
318+
.sessionConf(SparkSQLProperties.READ_ADAPTIVE_SPLIT_SIZE_ENABLED)
317319
.tableProperty(TableProperties.ADAPTIVE_SPLIT_SIZE_ENABLED)
318320
.defaultValue(TableProperties.ADAPTIVE_SPLIT_SIZE_ENABLED_DEFAULT)
319321
.parse();
@@ -325,6 +327,17 @@ public int parallelism() {
325327
return Math.max(defaultParallelism, numShufflePartitions);
326328
}
327329

330+
public int splitParallelism() {
331+
int parallelism =
332+
confParser
333+
.intConf()
334+
.sessionConf(SparkSQLProperties.READ_ADAPTIVE_SPLIT_SIZE_PARALLELISM)
335+
.defaultValue(parallelism())
336+
.parse();
337+
Preconditions.checkArgument(parallelism > 0, "Split parallelism must be > 0: %s", parallelism);
338+
return parallelism;
339+
}
340+
328341
public boolean distributedPlanningEnabled() {
329342
return table instanceof SupportsDistributedScanPlanning distributed
330343
&& distributed.allowDistributedPlanning()

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,4 +108,13 @@ private SparkSQLProperties() {}
108108
public static final String ASYNC_MICRO_BATCH_PLANNING_ENABLED =
109109
"spark.sql.iceberg.async-micro-batch-planning-enabled";
110110
public static final boolean ASYNC_MICRO_BATCH_PLANNING_ENABLED_DEFAULT = false;
111+
112+
// Controls whether adaptive split sizing is enabled
113+
public static final String READ_ADAPTIVE_SPLIT_SIZE_ENABLED =
114+
"spark.sql.iceberg.read.adaptive-split-size.enabled";
115+
116+
// Overrides the parallelism used for adaptive split sizing. When unset, the parallelism
117+
// defaults to max(spark.default.parallelism, spark.sql.shuffle.partitions).
118+
public static final String READ_ADAPTIVE_SPLIT_SIZE_PARALLELISM =
119+
"spark.sql.iceberg.read.adaptive-split-size.parallelism";
111120
}

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -356,8 +356,17 @@ public CustomMetric[] supportedCustomMetrics() {
356356
protected long adjustSplitSize(List<? extends ScanTask> tasks, long splitSize) {
357357
if (readConf.splitSizeOption() == null && readConf.adaptiveSplitSizeEnabled()) {
358358
long scanSize = tasks.stream().mapToLong(ScanTask::sizeBytes).sum();
359-
int parallelism = readConf.parallelism();
360-
return TableScanUtil.adjustSplitSize(scanSize, parallelism, splitSize);
359+
int parallelism = readConf.splitParallelism();
360+
long adjustedSplitSize = TableScanUtil.adjustSplitSize(scanSize, parallelism, splitSize);
361+
if (adjustedSplitSize != splitSize) {
362+
LOG.debug(
363+
"Adjusted split size from {} to {} for table {} with parallelism {}",
364+
splitSize,
365+
adjustedSplitSize,
366+
table().name(),
367+
parallelism);
368+
}
369+
return adjustedSplitSize;
361370
} else {
362371
return splitSize;
363372
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.spark;
20+
21+
import static org.assertj.core.api.Assertions.assertThat;
22+
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
23+
24+
import org.apache.iceberg.ParameterizedTestExtension;
25+
import org.apache.iceberg.Table;
26+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
27+
import org.apache.spark.sql.internal.SQLConf;
28+
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
29+
import org.junit.jupiter.api.AfterEach;
30+
import org.junit.jupiter.api.BeforeEach;
31+
import org.junit.jupiter.api.TestTemplate;
32+
import org.junit.jupiter.api.extension.ExtendWith;
33+
34+
@ExtendWith(ParameterizedTestExtension.class)
35+
public class TestSparkReadConf extends TestBaseWithCatalog {
36+
37+
@BeforeEach
38+
public void before() {
39+
super.before();
40+
sql("CREATE TABLE %s (id BIGINT, data STRING) USING iceberg", tableName);
41+
}
42+
43+
@AfterEach
44+
public void after() {
45+
sql("DROP TABLE IF EXISTS %s", tableName);
46+
}
47+
48+
@TestTemplate
49+
public void testSplitParallelismDefault() {
50+
Table table = validationCatalog.loadTable(tableIdent);
51+
SparkReadConf conf = new SparkReadConf(spark, table, CaseInsensitiveStringMap.empty());
52+
assertThat(conf.splitParallelism()).isEqualTo(conf.parallelism());
53+
}
54+
55+
@TestTemplate
56+
public void testSplitParallelismSessionConf() {
57+
Table table = validationCatalog.loadTable(tableIdent);
58+
withSQLConf(
59+
ImmutableMap.of(
60+
SQLConf.SHUFFLE_PARTITIONS().key(),
61+
"999",
62+
SparkSQLProperties.READ_ADAPTIVE_SPLIT_SIZE_PARALLELISM,
63+
"42"),
64+
() -> {
65+
SparkReadConf conf = new SparkReadConf(spark, table, CaseInsensitiveStringMap.empty());
66+
assertThat(conf.splitParallelism()).isEqualTo(42);
67+
});
68+
}
69+
70+
@TestTemplate
71+
public void testSplitParallelismRejectsZero() {
72+
Table table = validationCatalog.loadTable(tableIdent);
73+
withSQLConf(
74+
ImmutableMap.of(SparkSQLProperties.READ_ADAPTIVE_SPLIT_SIZE_PARALLELISM, "0"),
75+
() -> {
76+
SparkReadConf conf = new SparkReadConf(spark, table, CaseInsensitiveStringMap.empty());
77+
assertThatIllegalArgumentException()
78+
.isThrownBy(conf::splitParallelism)
79+
.withMessageContaining("Split parallelism must be > 0");
80+
});
81+
}
82+
83+
@TestTemplate
84+
public void testSplitParallelismRejectsNegative() {
85+
Table table = validationCatalog.loadTable(tableIdent);
86+
withSQLConf(
87+
ImmutableMap.of(SparkSQLProperties.READ_ADAPTIVE_SPLIT_SIZE_PARALLELISM, "-5"),
88+
() -> {
89+
SparkReadConf conf = new SparkReadConf(spark, table, CaseInsensitiveStringMap.empty());
90+
assertThatIllegalArgumentException()
91+
.isThrownBy(conf::splitParallelism)
92+
.withMessageContaining("Split parallelism must be > 0");
93+
});
94+
}
95+
}

spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.iceberg.TableProperties;
2828
import org.apache.iceberg.exceptions.ValidationException;
2929
import org.apache.iceberg.hadoop.Util;
30+
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
3031
import org.apache.iceberg.util.PropertyUtil;
3132
import org.apache.spark.SparkConf;
3233
import org.apache.spark.sql.SparkSession;
@@ -314,6 +315,7 @@ public boolean aggregatePushDownEnabled() {
314315
public boolean adaptiveSplitSizeEnabled() {
315316
return confParser
316317
.booleanConf()
318+
.sessionConf(SparkSQLProperties.READ_ADAPTIVE_SPLIT_SIZE_ENABLED)
317319
.tableProperty(TableProperties.ADAPTIVE_SPLIT_SIZE_ENABLED)
318320
.defaultValue(TableProperties.ADAPTIVE_SPLIT_SIZE_ENABLED_DEFAULT)
319321
.parse();
@@ -325,6 +327,17 @@ public int parallelism() {
325327
return Math.max(defaultParallelism, numShufflePartitions);
326328
}
327329

330+
public int splitParallelism() {
331+
int parallelism =
332+
confParser
333+
.intConf()
334+
.sessionConf(SparkSQLProperties.READ_ADAPTIVE_SPLIT_SIZE_PARALLELISM)
335+
.defaultValue(parallelism())
336+
.parse();
337+
Preconditions.checkArgument(parallelism > 0, "Split parallelism must be > 0: %s", parallelism);
338+
return parallelism;
339+
}
340+
328341
public boolean distributedPlanningEnabled() {
329342
return table instanceof SupportsDistributedScanPlanning distributed
330343
&& distributed.allowDistributedPlanning()

spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,15 @@ private SparkSQLProperties() {}
112112
// Prefix for custom snapshot properties
113113
public static final String SNAPSHOT_PROPERTY_PREFIX = "spark.sql.iceberg.snapshot-property.";
114114

115+
// Controls whether adaptive split sizing is enabled
116+
public static final String READ_ADAPTIVE_SPLIT_SIZE_ENABLED =
117+
"spark.sql.iceberg.read.adaptive-split-size.enabled";
118+
119+
// Overrides the parallelism used for adaptive split sizing. When unset, the parallelism
120+
// defaults to max(spark.default.parallelism, spark.sql.shuffle.partitions).
121+
public static final String READ_ADAPTIVE_SPLIT_SIZE_PARALLELISM =
122+
"spark.sql.iceberg.read.adaptive-split-size.parallelism";
123+
115124
// Controls whether to shred variant columns during write operations
116125
public static final String SHRED_VARIANTS = "spark.sql.iceberg.shred-variants";
117126

0 commit comments

Comments
 (0)