[SPARK-55444][SQL] Introduce and Route TimeType to Parquet vectorized read through the Types Framework#56661
Conversation
…hrough the Types Framework Vectorized-read integration was one of the follow-ups deferred by the Phase 3a Parquet PR (SPARK-55444). This routes TimeType's vectorized Parquet updater through the Types Framework, keyed on the Spark DataType like every other Phase 3a site, instead of the hardcoded `instanceof TimeType` arm in ParquetVectorUpdaterFactory. - Add ParquetTypeOps.getVectorUpdater(descriptor): Option[ParquetVectorUpdater] and a Java-friendly companion facade getVectorUpdaterOrNull(dt, descriptor). - TimeTypeParquetOps owns the updater: a Scala TimeMicrosToNanosUpdater replicating the former ParquetVectorUpdaterFactory.LongAsNanosUpdater (INT64 micros-of-day -> nanos-of-day). - ParquetVectorUpdaterFactory.getUpdater dispatches the framework first (ParquetTypeOps.getVectorUpdaterOrNull) before its built-in switch; the dead `instanceof TimeType` arm and the LongAsNanosUpdater class are removed. This also makes ParquetTypeOps.isBatchReadSupported=true honest for TimeType: the framework now supplies the vectorized updater instead of relying on the hardcoded factory arm. No behavior change. Verified by TimeTypeParquetOpsSuite (+2 tests), ParquetIOSuite TimeType reads (vectorized e2e), and ParquetVectorizedSuite. Generated-by: Claude Code (Claude Opus 4.8) Co-authored-by: Isaac
|
cc @davidm-db - this is one of the followups for vectorized read. What do you think about the change? The issue that caught my eye was |
|
@stevomitric Could you rebase on the recent master, please. |
Will do. Still debating if we should merge this change. Marked as WIP. |
…-tf-vectorized-read # Conflicts: # sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
personally, I'm fine with the change as-is, for a few reasons:
let's see if anyone else has any thoughts.. @MaxGekk do you have an opinion? |
davidm-db
left a comment
There was a problem hiding this comment.
A few notes on the WIP - the main one is the inline question on getVectorUpdater.
Minor, and not in this diff: ParquetVectorUpdaterBenchmark.scala:267 still labels the case "LongAsNanosUpdater (TimeType)", but the updater is now TimeMicrosToNanosUpdater. No functional impact - it passes TimeType() and routes through the framework, so it does exercise the new path - just worth renaming the label.
…ed updater; add reject test Address review feedback (@davidm-db, @shrirangmhalgi) on routing the TimeType vectorized Parquet read through the framework: - getVectorUpdater now validates the Parquet descriptor via a shared isCompatibleParquetType (the same accept set as the row path's requireCompatibleParquetType): it returns Some only for canonical INT64 TIME(MICROS)/TIME(NANOS) and None for anything else (INT32 TIME(MILLIS), raw INT64, INT64 TIMESTAMP, ...), so the factory falls through to the existing clean SchemaColumnConvertNotSupportedException instead of decoding a non-INT64 / non-TIME column as TimeType. This unifies the read guard across the row-based and vectorized readers. - Add a vectorized reject test (INT32 TIME(MILLIS) / raw INT64 / INT64 TIMESTAMP -> getVectorUpdater returns None); the accept test now uses real ColumnDescriptors. - Rename the stale ParquetVectorUpdaterBenchmark label "LongAsNanosUpdater (TimeType)" -> "TimeVectorUpdater (TimeType)". Generated-by: Claude Code (Claude Opus 4.8) Co-authored-by: Isaac
…-tf-vectorized-read
…ion in TimeVectorUpdater Address review feedback: drop TimeVectorUpdater's duplicate truncation-factor table and the per-column factor hoist, and call the shared (table-backed) DateTimeUtils.truncateTimeToPrecision - the same function the row-based newConverter path uses - so the two readers stay in lock-step and there is a single source of truth for the truncation. Behavior-preserving. Generated-by: Claude Code (Claude Opus 4.8) Co-authored-by: Isaac
… and tidy docs Address review round-2 follow-ups (all non-blocking / nits): - Add an end-to-end ParquetIOSuite reject test: an incompatible encoding (INT32 TIME(MILLIS), raw INT64, INT64 TIMESTAMP) requested as TimeType now throws a clean FAILED_READ_FILE.PARQUET_COLUMN_DATA_TYPE_MISMATCH on the actual vectorized reader, pinning the descriptor guard end-to-end (not just at the unit level). - Add an end-to-end test for a nullable TIME(NANOS) column, exercising the single-value TimeVectorUpdater.readValue / decodeSingleDictionaryId path that REQUIRED columns never hit. - Update the three ParquetVectorUpdaterBenchmark-*-results.txt row labels to match the renamed TimeVectorUpdater benchmark case. - ParquetTypeOps: move isBatchReadSupported under the "Vectorized Read" section so the body matches the class doc; broaden TimeTypeParquetOpsSuite's class scaladoc to cover the vectorized-read guard it now tests. Generated-by: Claude Code (Claude Opus 4.8) Co-authored-by: Isaac
|
cc @MaxGekk PTAL. |
| */ | ||
| private[ops] class TimeVectorUpdater(precision: Int, fileStoresNanos: Boolean) | ||
| extends ParquetVectorUpdater { | ||
| private def toTruncatedNanos(value: Long): Long = { |
There was a problem hiding this comment.
The new TimeVectorUpdater.toTruncatedNanos calls the shared DateTimeUtils.truncateTimeToPrecision per value; but that function is not table-backed, it recomputes math.pow on every call. Suggested resolution: make truncateTimeToPrecision itself table-backed (a 10^k lookup for k in 0..9) instead of math.pow. Then the row path, the vectorized path, and Cast all share one source of truth and the per-value cost drops to a table lookup; and the "table-backed" comment becomes accurate.
| import org.apache.spark.sql.execution.datasources.DataSourceUtils; | ||
| import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException; | ||
| import org.apache.spark.sql.execution.vectorized.WritableColumnVector; | ||
| import org.apache.spark.sql.execution.datasources.parquet.types.ops.ParquetTypeOps$; |
|
|
||
| /** | ||
| * The vectorized (batch) [[ParquetVectorUpdater]] for this type, or None to fall back to the | ||
| * built-in `ParquetVectorUpdaterFactory`. A type that returns Some here should also return |
There was a problem hiding this comment.
Docstring nit: getVectorUpdater's "A type that returns Some here should also return true from isBatchReadSupported" is really a two-way invariant; supplying an updater without flipping the gate means the vectorized path is never taken; flipping the gate without an updater (and without legacy factory support) routes into a factory that won't recognize the type. Worth stating as the bidirectional contract.
MaxGekk
left a comment
There was a problem hiding this comment.
0 blocking, 1 non-blocking, 0 nits. Clean, behavior-preserving refactor; one test-coverage gap that hardens the exact safety property this PR adds.
Suggestions (1)
- TimeTypeParquetOpsSuite.scala:162: incompatible-encoding reject is only half-tested — assert the factory actually throws, not just that
getVectorUpdaterreturnsNone— see inline
Verification
Traced TimeVectorUpdater against the removed factory TimeUpdater: exact equivalence — same isNanosTime unit selector, same microsToNanos conversion, same truncateTimeToPrecision(nanos, precision) truncation, identical dictionary path, all four updater methods. The row and vectorized paths are now provably value-identical (both compute truncateTimeToPrecision(if (fileStoresNanos) v else microsToNanos(v), precision)). TIME is excluded from lazy dictionary decoding (VectorizedColumnReader.isLazyDecodingSupported sets needsUpcast via isTimeTypeMatched), so truncation is never bypassed on the dict path. Incompatible encodings (INT32 TIME(MILLIS), raw INT64, INT64 TIMESTAMP) match no factory arm and reach throw constructConvertNotSupportedException — same reject set as the row path's requireCompatibleParquetType.
On the per-value truncateTimeToPrecision performance question: it's already table-backed (SparkDateTimeUtils.scala:181-188, no math.pow), and a 50M-value microbenchmark of the old hoisted-factor (x/f)*f vs the new per-value call measured identical (1.00x, ~0.53 ns/val) — the JIT hoists the array load and elides the assert, so no regression. (Minor: the benchmark result files were relabeled, not regenerated — byte-identical timings.)
| // mis-decoding (e.g. readLongs over an INT32 column). | ||
| Seq(int32Millis, rawInt64, int64Timestamp).foreach { field => | ||
| val descriptor = new ColumnDescriptor(Array("c"), field, 0, 0) | ||
| assert(TimeTypeParquetOps(timeMicros).getVectorUpdater(descriptor).isEmpty) |
There was a problem hiding this comment.
This asserts the framework half of the reject (getVectorUpdater returns None), but not the property the comment just above promises — that ParquetVectorUpdaterFactory.getUpdater(...), the entry point the vectorized reader actually calls, throws SchemaColumnConvertNotSupportedException for an incompatible TIME column. It does today (INT32/INT64-incompatible match no arm and reach throw constructConvertNotSupportedException), but nothing guards it: a future factory fall-through change (a broad TimeType arm or catch-all) would reintroduce the silent mis-read this PR fixes, and the isEmpty check wouldn't catch it.
Consider a factory-level reject test — ParquetVectorUpdaterSuite already has the newFactory(descriptor) + fac.getUpdater(...) pattern (~L428):
intercept[SchemaColumnConvertNotSupportedException] {
newFactory(int32TimeMillisDescriptor).getUpdater(int32TimeMillisDescriptor, TimeType(6))
}for INT32 TIME(MILLIS) / raw INT64 / INT64 TIMESTAMP. Non-blocking.
MaxGekk
left a comment
There was a problem hiding this comment.
0 addressed, 1 remaining, 0 new. (Re-review on the same commit ba81c5a as my prior review — no new commits or discussion since, so the prior conclusions stand and nothing new surfaced.)
0 blocking, 1 non-blocking, 0 nits. Clean, behavior-preserving refactor that unifies the row and vectorized read guards.
Correctness (1)
- TimeTypeParquetOpsSuite.scala:162 (remaining, non-blocking): the incompatible-encoding reject test still asserts only
getVectorUpdater(...).isEmpty, not the factory throw its comment promises — see my prior inline. The new end-to-endParquetIOSuitereject test now covers the throw at the reader level, so this is a unit-test-tightening nit, not a coverage hole.
Verification
Re-confirmed against the tree this round: TimeVectorUpdater is value-identical to the removed factory TimeUpdater — all four updater methods compute truncateTimeToPrecision(if (fileStoresNanos) v else microsToNanos(v), precision), and getVectorUpdater/requireCompatibleParquetType now share isCompatibleParquetType, so the row and vectorized readers accept/reject the identical INT64 TIME(MICROS/NANOS) set. @davidm-db's two getVectorUpdater concerns (non-INT64 read-as-TimeType regression; row-vs-vectorized divergence) are resolved by that guard: a non-INT64 column returns None, falls through the factory switch, and reaches the clean SchemaColumnConvertNotSupportedException (confirmed end-to-end by the new reject test asserting FAILED_READ_FILE.PARQUET_COLUMN_DATA_TYPE_MISMATCH). TIME stays excluded from lazy dictionary decoding (VectorizedColumnReader.java:169-175), so truncation is never bypassed on the dict path. On @uros-b's performance point: DateTimeUtils.truncateTimeToPrecision is already table-backed (SparkDateTimeUtils.scala:181 is an array lookup; math.pow is only in the one-time precompute at :188-189), so the per-value call is a table lookup, not a pow — no regression. Fork CI is green.
…pdater contract doc, factory-level reject test - ParquetVectorUpdaterFactory: order the new ParquetTypeOps$ import alphabetically. - ParquetTypeOps.getVectorUpdater: document the bidirectional contract with isBatchReadSupported (returning Some without flipping the gate leaves the vectorized path unreachable; flipping the gate without an updater here, and without legacy factory support, routes into a factory that does not recognize the type). - ParquetVectorUpdaterSuite: add a factory-level reject test asserting ParquetVectorUpdaterFactory.getUpdater throws SchemaColumnConvertNotSupportedException for an incompatible TIME encoding (INT32 TIME(MILLIS) / raw INT64 / INT64 TIMESTAMP) requested as TimeType, guarding the factory fall-through directly so a future broad arm cannot silently reintroduce the mis-read this PR fixes. Generated-by: Claude Code (Claude Opus 4.8) Co-authored-by: Isaac
|
@stevomitric please, resolve conflicts. |
…-tf-vectorized-read # Conflicts: # sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt # sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt # sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt
uros-b
left a comment
There was a problem hiding this comment.
Thank you @stevomitric and @MaxGekk @davidm-db @shrirangmhalgi! Please resolve conflicts and proceed with this PR.
…-tf-vectorized-read
…torUpdaterSuite
The two descriptor builders in the SPARK-55444 factory-reject test exceeded 100 chars; split each
.named("col") onto its own line so scalastyle passes.
Generated-by: Claude Code (Claude Opus 4.8)
Co-authored-by: Isaac
What changes were proposed in this pull request?
Follow-up to the Types Framework Phase 3a Parquet work (SPARK-55444). It moves
TimeType's vectorized Parquet decoder into the framework, so the framework now owns all ofTimeType's Parquet read/write paths (schema, write, row-based read, and now vectorized read).ParquetVectorUpdaterFactory.getUpdater(before theswitch (typeName)):ParquetTypeOps.getVectorUpdaterOrNull(sparkType, descriptor). A framework-managed type returning aSomeupdater short-circuits the factory's built-in cases; everything else returnsnulland falls through unchanged.ParquetTypeOps.getVectorUpdater(descriptor): Option[ParquetVectorUpdater](defaultNone), symmetric with the existingnewConverterrow hook and dispatched through the sameapply(dt)registration.TimeTypeParquetOpsowns the updater: a ScalaTimeVectorUpdaterreplacing the removed factoryTimeUpdater(INT64 micros/nanos-of-day → nanos-of-day, truncated to the requested precision).getVectorUpdatervalidates the descriptor via a newly-extractedisCompatibleParquetType— the same predicate the row path'srequireCompatibleParquetTypenow delegates to. Incompatible encodings (INT32 TIME(MILLIS), raw INT64, INT64 TIMESTAMP) returnNoneand fall through to the factory's cleanSchemaColumnConvertNotSupportedExceptioninstead of silently mis-reading. Both readers now accept/reject the identical encoding set.DateTimeUtils.truncateTimeToPrecision(the same call the row path uses) — no duplicate factor table.ParquetVectorUpdaterBenchmarkcase label toTimeVectorUpdater (TimeType).Why are the changes needed?
The vectorized Parquet decoder was the last
TimeTypeupdater living outside the framework:ParquetVectorUpdaterFactory.getUpdaterhadinstanceof TimeTypearms in its INT64 case, soParquetTypeOps.isBatchReadSupported = trueforTimeTypewas only safe because of out-of-framework factory code. Moving it in makes that flag honest and lets future framework types supply their own batch updater with no factory changes. (VectorizedColumnReader.isLazyDecodingSupported's TIME check intentionally stays — it gates the lazy-dictionary-decoding optimization, not the decode path.)Does this PR introduce any user-facing change?
No.
TimeVectorUpdaterperforms the same conversion + precision truncation as the removed factoryTimeUpdater, andisCompatibleParquetTypepreserves the existing accept/reject behavior on both readers.How was this patch tested?
TimeTypeParquetOpsSuite: dispatch tests plus a vectorized reject test (INT32 TIME(MILLIS) / raw INT64 / INT64 TIMESTAMP read asTimeType→getVectorUpdaterreturnsNone).ParquetIOSuiteTIME tests underwithAllParquetReaders(vectorized + row, dict on/off, MICROS/NANOS,isAdjustedToUTC, read-side precision truncation) exercise the relocated decode end-to-end.ParquetVectorizedSuitepasses unchanged.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Claude Opus 4.8)