Spark: Add vectorized Parquet reads for variant columns#16292
Spark: Add vectorized Parquet reads for variant columns#16292nssalian wants to merge 14 commits into
Conversation
|
@pvary @huaxingao @singhpk234 PTAL |
huan233usc
left a comment
There was a problem hiding this comment.
Some addition small comments
|
|
||
| private boolean supportsParquetBatchReads(Types.NestedField field) { | ||
| if (field.type().isVariantType()) { | ||
| return !PropertyUtil.propertyAsBoolean( |
There was a problem hiding this comment.
This gates batch reads on the write.parquet.shred-variants property. The property reflects the current write config, not what's in existing files — so a table that's currently false but still has shredded files (property toggled later, or files written elsewhere) would take the batch path and silently drop typed_value data. Is "property=false -> no shredded files" a safe assumption? If so, worth a short comment noting it.
There was a problem hiding this comment.
Good catch. Might have to look at a file level for typed_value fields. Let me find a nice way to add this.
There was a problem hiding this comment.
This is non-trivial. Working on it so it doesn't hit more edge cases and it's in line with the interfaces. Will surface once I have it cleanly working locally.
There was a problem hiding this comment.
I moved the detection from the table property write.parquet.shred-variants to a per-file lowerBounds.containsKey(variantFieldId) check on the manifest entry, so toggling the property after writing shredded files no longer drops typed_value data on the batch path. SparkScanBuilder opts into variant-column stats for both the batch and incremental scan paths so the check is available without opening any Parquet footers. Added a test too.
|
fixing the tests |
Follow up to #16087 - fixing the Vectorized support for variant to remove the temporary patches.
Rationale for this Change
Variant columns currently force the entire table into row-at-a-time reads because the vectorized reader doesn't handle them. This PR fixes that by reading variant's metadata and value children as Arrow VarBinary batches, with per-file detection so shredded files automatically fall back to row reads.
What changes are included in this PR?
Vectorized variant read path:
VectorizedReaderBuilder- addsvariantVisitor()that creates aVectorizedVariantVisitorscoped to each variant column's Parquet pathVectorizedVariantVisitor- walks variant's internal structure, creates Arrow readers for metadata + value leavesVectorizedArrowReader.VectorizedVariantReader- composes two child readers, delegatesread/setRowGroupInfo/setBatchSize/closeVectorHolder.VariantVectorHolder- carries both child holders through the batch pipelineVariantColumnVector(new) - SparkColumnVectorimplementinggetChild(0)= value,getChild(1)= metadata per Spark'sgetVariant()contractColumnVectorBuilder- dispatchesVariantVectorHolderbeforeisDummy()checkColumnVectorWithFilter- addsVariantTypebranch togetChild()so variant + DV/position deletes work with vectorizationShredded-file detection at scan plan:
SparkBatch.supportsParquetBatchReads(ScanTask)- per-filelowerBounds.containsKey(variantFieldId)check; presence indicates shredded payload, batch reads are disabled for that scanSparkBatch.supportsParquetBatchReads(NestedField)- falls back to row reads when the variant column's metrics mode isNoneorCounts(bounds aren't trustable for shredded detection)SparkScanBuilder- opts into variant-column stats for bothbuildIcebergBatchScanandbuildIcebergIncrementalAppendScansolowerBoundsis loaded at scan plan without opening Parquet footersBoth Spark 4.0 and 4.1.
Limitations
lowerBoundscheck detects them and falls back to row-at-a-time readswrite.metadata.metrics.defaultis set tononeorcountsfor a variant column, bounds aren't recorded so detection falls back conservatively to row readsAre these changes tested?
TestSparkVariantRead(v4.0 + v4.1)vectorized=falseandvectorized=true. Previously, thetruevalue tests were skippedtestVariantReadAfterDelete- variant column with DV deletes under vectorizationtestReadShreddedAfterPropertyToggled- writes shredded data withwrite.parquet.shred-variants=true, toggles the property tofalse, then reads. Verifies the per-filelowerBoundscheck forces row reads on the existing shredded files (parameterized overvectorized=false/true)testReadShreddedWithMetricsDisabled- shredded write withwrite.metadata.metrics.default=noneand=counts. Verifies the metrics-mode gate forces row reads when bounds aren't recorded (parameterized over both modes)TestVariantShredding(v4.0 + v4.1) - table created withPARQUET_SHRED_VARIANTS=true;SparkBatchcorrectly detects and falls backTestSnapshotTableProcedure(v4.0 + v4.1) - external Parquet imports with variant columns lacking the VARIANT annotation now read correctly with vectorization on by default. The previous manualread.parquet.vectorization.enabled=falseworkaround is removedAre there any user-facing changes?
write.metadata.metrics.default=noneorcounts) also fall back to row reads to avoid silent data lossPerformance
Measured
includeColumnStats(variantColumns)scan-plan overhead at 10/100/1000 files (5 iterations + 3 warmups, two independent runs, local SSD, hadoop catalog). Per-file delta is roughly 1-2 microseconds and within run-to-run noise at 1000 files. The opt-in only fires for projections containing variant columns; non-variant scans are unchanged.