Skip to content

Commit 9ec1b93

Browse files
authored
Spark: Backport support writing shredded variant in Iceberg-Spark (#16241)
backports #14297
1 parent 153237b commit 9ec1b93

7 files changed

Lines changed: 1302 additions & 1 deletion

File tree

spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,4 +111,12 @@ private SparkSQLProperties() {}
111111

112112
// Prefix for custom snapshot properties
113113
public static final String SNAPSHOT_PROPERTY_PREFIX = "spark.sql.iceberg.snapshot-property.";
114+
115+
// Controls whether to shred variant columns during write operations
116+
public static final String SHRED_VARIANTS = "spark.sql.iceberg.shred-variants";
117+
118+
// Controls the buffer size for variant schema inference during writes
119+
// This determines how many rows are buffered before inferring shredded schema
120+
public static final String VARIANT_INFERENCE_BUFFER_SIZE =
121+
"spark.sql.iceberg.variant-inference-buffer-size";
114122
}

spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY;
3434
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
3535
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL;
36+
import static org.apache.iceberg.TableProperties.PARQUET_SHRED_VARIANTS;
37+
import static org.apache.iceberg.TableProperties.PARQUET_VARIANT_BUFFER_SIZE;
3638
import static org.apache.spark.sql.connector.write.RowLevelOperation.Command.DELETE;
3739

3840
import java.util.Locale;
@@ -529,6 +531,14 @@ private Map<String, String> dataWriteProperties() {
529531
if (parquetCompressionLevel != null) {
530532
writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel);
531533
}
534+
boolean shouldShredVariants = shredVariants();
535+
writeProperties.put(PARQUET_SHRED_VARIANTS, String.valueOf(shouldShredVariants));
536+
537+
// Add variant shredding configuration properties
538+
if (shouldShredVariants) {
539+
writeProperties.put(
540+
PARQUET_VARIANT_BUFFER_SIZE, String.valueOf(variantInferenceBufferSize()));
541+
}
532542
break;
533543

534544
case AVRO:
@@ -749,4 +759,24 @@ public DeleteGranularity deleteGranularity() {
749759
.defaultValue(DeleteGranularity.FILE)
750760
.parse();
751761
}
762+
763+
public boolean shredVariants() {
764+
return confParser
765+
.booleanConf()
766+
.option(SparkWriteOptions.SHRED_VARIANTS)
767+
.sessionConf(SparkSQLProperties.SHRED_VARIANTS)
768+
.tableProperty(TableProperties.PARQUET_SHRED_VARIANTS)
769+
.defaultValue(TableProperties.PARQUET_SHRED_VARIANTS_DEFAULT)
770+
.parse();
771+
}
772+
773+
public int variantInferenceBufferSize() {
774+
return confParser
775+
.intConf()
776+
.option(SparkWriteOptions.VARIANT_INFERENCE_BUFFER_SIZE)
777+
.sessionConf(SparkSQLProperties.VARIANT_INFERENCE_BUFFER_SIZE)
778+
.tableProperty(TableProperties.PARQUET_VARIANT_BUFFER_SIZE)
779+
.defaultValue(TableProperties.PARQUET_VARIANT_BUFFER_SIZE_DEFAULT)
780+
.parse();
781+
}
752782
}

spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,10 @@ private SparkWriteOptions() {}
8686

8787
// Overrides the delete granularity
8888
public static final String DELETE_GRANULARITY = "delete-granularity";
89+
90+
// Controls whether to shred variant columns during write operations
91+
public static final String SHRED_VARIANTS = "shred-variants";
92+
93+
// Controls the buffer size for variant schema inference during writes
94+
public static final String VARIANT_INFERENCE_BUFFER_SIZE = "variant-inference-buffer-size";
8995
}

spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,9 @@ public static void register() {
5151
StructType.class,
5252
SparkParquetWriters::buildWriter,
5353
(icebergSchema, fileSchema, engineSchema, idToConstant) ->
54-
SparkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant)));
54+
SparkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant),
55+
new SparkVariantShreddingAnalyzer(),
56+
InternalRow::copy));
5557

5658
FormatModelRegistry.register(
5759
ParquetFormatModel.create(
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.spark.source;
20+
21+
import java.nio.ByteBuffer;
22+
import java.nio.ByteOrder;
23+
import java.util.List;
24+
import org.apache.iceberg.parquet.VariantShreddingAnalyzer;
25+
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
26+
import org.apache.iceberg.variants.VariantMetadata;
27+
import org.apache.iceberg.variants.VariantValue;
28+
import org.apache.spark.sql.catalyst.InternalRow;
29+
import org.apache.spark.sql.types.StructType;
30+
import org.apache.spark.unsafe.types.VariantVal;
31+
32+
/**
33+
* Spark-specific implementation that extracts variant values from {@link InternalRow} instances.
34+
*/
35+
class SparkVariantShreddingAnalyzer extends VariantShreddingAnalyzer<InternalRow, StructType> {
36+
37+
SparkVariantShreddingAnalyzer() {}
38+
39+
@Override
40+
protected int resolveColumnIndex(StructType sparkSchema, String columnName) {
41+
try {
42+
return sparkSchema.fieldIndex(columnName);
43+
} catch (IllegalArgumentException e) {
44+
return -1;
45+
}
46+
}
47+
48+
@Override
49+
protected List<VariantValue> extractVariantValues(
50+
List<InternalRow> bufferedRows, int variantFieldIndex) {
51+
List<VariantValue> values = Lists.newArrayList();
52+
53+
for (InternalRow row : bufferedRows) {
54+
if (!row.isNullAt(variantFieldIndex)) {
55+
VariantVal variantVal = row.getVariant(variantFieldIndex);
56+
if (variantVal != null) {
57+
VariantValue variantValue =
58+
VariantValue.from(
59+
VariantMetadata.from(
60+
ByteBuffer.wrap(variantVal.getMetadata()).order(ByteOrder.LITTLE_ENDIAN)),
61+
ByteBuffer.wrap(variantVal.getValue()).order(ByteOrder.LITTLE_ENDIAN));
62+
values.add(variantValue);
63+
}
64+
}
65+
}
66+
67+
return values;
68+
}
69+
}

spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY;
3535
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
3636
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL;
37+
import static org.apache.iceberg.TableProperties.PARQUET_SHRED_VARIANTS;
3738
import static org.apache.iceberg.TableProperties.UPDATE_DISTRIBUTION_MODE;
3839
import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;
3940
import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_HASH;
@@ -61,6 +62,7 @@
6162
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
6263
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
6364
import org.apache.spark.sql.internal.SQLConf;
65+
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
6466
import org.junit.jupiter.api.AfterEach;
6567
import org.junit.jupiter.api.BeforeEach;
6668
import org.junit.jupiter.api.TestTemplate;
@@ -340,6 +342,8 @@ public void testSparkConfOverride() {
340342
TableProperties.DELETE_PARQUET_COMPRESSION,
341343
"snappy"),
342344
ImmutableMap.of(
345+
PARQUET_SHRED_VARIANTS,
346+
"false",
343347
DELETE_PARQUET_COMPRESSION,
344348
"zstd",
345349
PARQUET_COMPRESSION,
@@ -461,6 +465,8 @@ public void testDataPropsDefaultsAsDeleteProps() {
461465
PARQUET_COMPRESSION_LEVEL,
462466
"5"),
463467
ImmutableMap.of(
468+
PARQUET_SHRED_VARIANTS,
469+
"false",
464470
DELETE_PARQUET_COMPRESSION,
465471
"zstd",
466472
PARQUET_COMPRESSION,
@@ -532,6 +538,8 @@ public void testDeleteFileWriteConf() {
532538
DELETE_PARQUET_COMPRESSION_LEVEL,
533539
"6"),
534540
ImmutableMap.of(
541+
PARQUET_SHRED_VARIANTS,
542+
"false",
535543
DELETE_PARQUET_COMPRESSION,
536544
"zstd",
537545
PARQUET_COMPRESSION,
@@ -686,4 +694,81 @@ private void checkMode(DistributionMode expectedMode, SparkWriteConf writeConf)
686694
assertThat(writeConf.copyOnWriteDistributionMode(MERGE)).isEqualTo(expectedMode);
687695
assertThat(writeConf.positionDeltaDistributionMode(MERGE)).isEqualTo(expectedMode);
688696
}
697+
698+
@TestTemplate
699+
public void testShredVariantsDefault() {
700+
Table table = validationCatalog.loadTable(tableIdent);
701+
SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of());
702+
assertThat(writeConf.shredVariants()).isFalse();
703+
}
704+
705+
@TestTemplate
706+
public void testVariantInferenceBufferSizeDefault() {
707+
Table table = validationCatalog.loadTable(tableIdent);
708+
SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of());
709+
assertThat(writeConf.variantInferenceBufferSize())
710+
.isEqualTo(TableProperties.PARQUET_VARIANT_BUFFER_SIZE_DEFAULT);
711+
}
712+
713+
@TestTemplate
714+
public void testVariantInferenceBufferSizeTableProperty() {
715+
Table table = validationCatalog.loadTable(tableIdent);
716+
717+
table.updateProperties().set(TableProperties.PARQUET_VARIANT_BUFFER_SIZE, "500").commit();
718+
719+
SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of());
720+
assertThat(writeConf.variantInferenceBufferSize()).isEqualTo(500);
721+
}
722+
723+
@TestTemplate
724+
public void testShredVariantsSessionOverridesTableProperty() {
725+
Table table = validationCatalog.loadTable(tableIdent);
726+
table.updateProperties().set(TableProperties.PARQUET_SHRED_VARIANTS, "false").commit();
727+
728+
withSQLConf(
729+
ImmutableMap.of(SparkSQLProperties.SHRED_VARIANTS, "true"),
730+
() -> {
731+
SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of());
732+
assertThat(writeConf.shredVariants()).isTrue();
733+
});
734+
}
735+
736+
@TestTemplate
737+
public void testShredVariantsWriteOptionOverridesSessionConf() {
738+
withSQLConf(
739+
ImmutableMap.of(SparkSQLProperties.SHRED_VARIANTS, "false"),
740+
() -> {
741+
Table table = validationCatalog.loadTable(tableIdent);
742+
SparkWriteConf writeConf =
743+
new SparkWriteConf(
744+
spark,
745+
table,
746+
new CaseInsensitiveStringMap(
747+
ImmutableMap.of(SparkWriteOptions.SHRED_VARIANTS, "true")));
748+
assertThat(writeConf.shredVariants()).isTrue();
749+
});
750+
}
751+
752+
@TestTemplate
753+
public void testVariantInferenceBufferSizeSessionConf() {
754+
withSQLConf(
755+
ImmutableMap.of(SparkSQLProperties.VARIANT_INFERENCE_BUFFER_SIZE, "250"),
756+
() -> {
757+
Table table = validationCatalog.loadTable(tableIdent);
758+
SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of());
759+
assertThat(writeConf.variantInferenceBufferSize()).isEqualTo(250);
760+
});
761+
}
762+
763+
@TestTemplate
764+
public void testWritePropertiesIncludeVariantShredding() {
765+
Table table = validationCatalog.loadTable(tableIdent);
766+
table.updateProperties().set(TableProperties.PARQUET_SHRED_VARIANTS, "true").commit();
767+
table.updateProperties().set(TableProperties.PARQUET_VARIANT_BUFFER_SIZE, "200").commit();
768+
769+
SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of());
770+
Map<String, String> writeProperties = writeConf.writeProperties();
771+
assertThat(writeProperties).containsEntry(PARQUET_SHRED_VARIANTS, "true");
772+
assertThat(writeProperties).containsEntry(TableProperties.PARQUET_VARIANT_BUFFER_SIZE, "200");
773+
}
689774
}

0 commit comments

Comments
 (0)