Skip to content

Spark: Add vectorized Parquet reads for variant columns#16292

Open
nssalian wants to merge 14 commits into
apache:mainfrom
nssalian:variant-vectorized-reader
Open

Spark: Add vectorized Parquet reads for variant columns#16292
nssalian wants to merge 14 commits into
apache:mainfrom
nssalian:variant-vectorized-reader

Conversation

@nssalian

@nssalian nssalian commented May 11, 2026

Copy link
Copy Markdown
Collaborator

Follow up to #16087 - fixing the Vectorized support for variant to remove the temporary patches.

Rationale for this Change

Variant columns currently force the entire table into row-at-a-time reads because the vectorized reader doesn't handle them. This PR fixes that by reading variant's metadata and value children as Arrow VarBinary batches, with per-file detection so shredded files automatically fall back to row reads.

What changes are included in this PR?

Vectorized variant read path:

  • VectorizedReaderBuilder - adds variantVisitor() that creates a VectorizedVariantVisitor scoped to each variant column's Parquet path
  • VectorizedVariantVisitor - walks variant's internal structure, creates Arrow readers for metadata + value leaves
  • VectorizedArrowReader.VectorizedVariantReader - composes two child readers, delegates read/setRowGroupInfo/setBatchSize/close
  • VectorHolder.VariantVectorHolder - carries both child holders through the batch pipeline
  • VariantColumnVector (new) - Spark ColumnVector implementing getChild(0) = value, getChild(1) = metadata per Spark's getVariant() contract
  • ColumnVectorBuilder - dispatches VariantVectorHolder before isDummy() check
  • ColumnVectorWithFilter - adds VariantType branch to getChild() so variant + DV/position deletes work with vectorization

Shredded-file detection at scan plan:

  • SparkBatch.supportsParquetBatchReads(ScanTask) - per-file lowerBounds.containsKey(variantFieldId) check; presence indicates shredded payload, batch reads are disabled for that scan
  • SparkBatch.supportsParquetBatchReads(NestedField) - falls back to row reads when the variant column's metrics mode is None or Counts (bounds aren't trustable for shredded detection)
  • SparkScanBuilder - opts into variant-column stats for both buildIcebergBatchScan and buildIcebergIncrementalAppendScan so lowerBounds is loaded at scan plan without opening Parquet footers

Both Spark 4.0 and 4.1.

Limitations

  • Shredded variant columns are not vectorized. The per-file lowerBounds check detects them and falls back to row-at-a-time reads
  • Variant inside structs/lists/maps still falls back to row-at-a-time (pre-existing limitation for all complex types)
  • When write.metadata.metrics.default is set to none or counts for a variant column, bounds aren't recorded so detection falls back conservatively to row reads

Are these changes tested?

  • TestSparkVariantRead (v4.0 + v4.1)
    • All existing tests now run with both vectorized=false and vectorized=true. Previously, the true value tests were skipped
    • testVariantReadAfterDelete - variant column with DV deletes under vectorization
    • testReadShreddedAfterPropertyToggled - writes shredded data with write.parquet.shred-variants=true, toggles the property to false, then reads. Verifies the per-file lowerBounds check forces row reads on the existing shredded files (parameterized over vectorized=false/true)
    • testReadShreddedWithMetricsDisabled - shredded write with write.metadata.metrics.default=none and =counts. Verifies the metrics-mode gate forces row reads when bounds aren't recorded (parameterized over both modes)
  • TestVariantShredding (v4.0 + v4.1) - table created with PARQUET_SHRED_VARIANTS=true; SparkBatch correctly detects and falls back
  • TestSnapshotTableProcedure (v4.0 + v4.1) - external Parquet imports with variant columns lacking the VARIANT annotation now read correctly with vectorization on by default. The previous manual read.parquet.vectorization.enabled=false workaround is removed

Are there any user-facing changes?

  • Vectorization is now enabled for variant columns on tables that don't shred. Performance benefits flow through automatically
  • For tables with shredded variant data, batch reads transparently fall back to row reads on a per-file basis. No user configuration required
  • Tables that disable variant column metrics (write.metadata.metrics.default=none or counts) also fall back to row reads to avoid silent data loss

Performance

Measured includeColumnStats(variantColumns) scan-plan overhead at 10/100/1000 files (5 iterations + 3 warmups, two independent runs, local SSD, hadoop catalog). Per-file delta is roughly 1-2 microseconds and within run-to-run noise at 1000 files. The opt-in only fires for projections containing variant columns; non-variant scans are unchanged.

@nssalian nssalian changed the title Spark, Arrow: Add vectorized Parquet reads for variant columns Spark,Arrow: Add vectorized Parquet reads for variant columns May 11, 2026
@nssalian nssalian changed the title Spark,Arrow: Add vectorized Parquet reads for variant columns Spark: Add vectorized Parquet reads for variant columns May 11, 2026
@nssalian nssalian marked this pull request as ready for review May 13, 2026 15:44
@nssalian

Copy link
Copy Markdown
Collaborator Author

@pvary @huaxingao @singhpk234 PTAL

@nssalian nssalian requested a review from huaxingao June 1, 2026 17:06

@huan233usc huan233usc left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some addition small comments

Comment thread spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java Outdated
@nssalian nssalian requested review from Fokko and huan233usc June 4, 2026 22:33

@huan233usc huan233usc left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM Thanks


private boolean supportsParquetBatchReads(Types.NestedField field) {
if (field.type().isVariantType()) {
return !PropertyUtil.propertyAsBoolean(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gates batch reads on the write.parquet.shred-variants property. The property reflects the current write config, not what's in existing files — so a table that's currently false but still has shredded files (property toggled later, or files written elsewhere) would take the batch path and silently drop typed_value data. Is "property=false -> no shredded files" a safe assumption? If so, worth a short comment noting it.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Might have to look at a file level for typed_value fields. Let me find a nice way to add this.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is non-trivial. Working on it so it doesn't hit more edge cases and it's in line with the interfaces. Will surface once I have it cleanly working locally.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the detection from the table property write.parquet.shred-variants to a per-file lowerBounds.containsKey(variantFieldId) check on the manifest entry, so toggling the property after writing shredded files no longer drops typed_value data on the batch path. SparkScanBuilder opts into variant-column stats for both the batch and incremental scan paths so the check is available without opening any Parquet footers. Added a test too.

@nssalian

Copy link
Copy Markdown
Collaborator Author

fixing the tests

@nssalian nssalian requested review from huaxingao and singhpk234 June 23, 2026 03:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants