Skip to content

[SPARK-55444][SQL] Introduce and Route TimeType to Parquet vectorized read through the Types Framework#56661

Open
stevomitric wants to merge 11 commits into
apache:masterfrom
stevomitric:stevomitric/parquet-tf-vectorized-read
Open

[SPARK-55444][SQL] Introduce and Route TimeType to Parquet vectorized read through the Types Framework#56661
stevomitric wants to merge 11 commits into
apache:masterfrom
stevomitric:stevomitric/parquet-tf-vectorized-read

Conversation

@stevomitric

@stevomitric stevomitric commented Jun 22, 2026

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Follow-up to the Types Framework Phase 3a Parquet work (SPARK-55444). It moves TimeType's vectorized Parquet decoder into the framework, so the framework now owns all of TimeType's Parquet read/write paths (schema, write, row-based read, and now vectorized read).

  • Adds a framework dispatch hook at the top of ParquetVectorUpdaterFactory.getUpdater (before the switch (typeName)): ParquetTypeOps.getVectorUpdaterOrNull(sparkType, descriptor). A framework-managed type returning a Some updater short-circuits the factory's built-in cases; everything else returns null and falls through unchanged.
  • Adds the trait extension point ParquetTypeOps.getVectorUpdater(descriptor): Option[ParquetVectorUpdater] (default None), symmetric with the existing newConverter row hook and dispatched through the same apply(dt) registration.
  • TimeTypeParquetOps owns the updater: a Scala TimeVectorUpdater replacing the removed factory TimeUpdater (INT64 micros/nanos-of-day → nanos-of-day, truncated to the requested precision).
  • Unifies the read guard: getVectorUpdater validates the descriptor via a newly-extracted isCompatibleParquetType — the same predicate the row path's requireCompatibleParquetType now delegates to. Incompatible encodings (INT32 TIME(MILLIS), raw INT64, INT64 TIMESTAMP) return None and fall through to the factory's clean SchemaColumnConvertNotSupportedException instead of silently mis-reading. Both readers now accept/reject the identical encoding set.
  • Truncation reuses the shared, table-backed DateTimeUtils.truncateTimeToPrecision (the same call the row path uses) — no duplicate factor table.
  • Renames the stale ParquetVectorUpdaterBenchmark case label to TimeVectorUpdater (TimeType).

Why are the changes needed?

The vectorized Parquet decoder was the last TimeType updater living outside the framework: ParquetVectorUpdaterFactory.getUpdater had instanceof TimeType arms in its INT64 case, so ParquetTypeOps.isBatchReadSupported = true for TimeType was only safe because of out-of-framework factory code. Moving it in makes that flag honest and lets future framework types supply their own batch updater with no factory changes. (VectorizedColumnReader.isLazyDecodingSupported's TIME check intentionally stays — it gates the lazy-dictionary-decoding optimization, not the decode path.)

Does this PR introduce any user-facing change?

No. TimeVectorUpdater performs the same conversion + precision truncation as the removed factory TimeUpdater, and isCompatibleParquetType preserves the existing accept/reject behavior on both readers.

How was this patch tested?

  • TimeTypeParquetOpsSuite: dispatch tests plus a vectorized reject test (INT32 TIME(MILLIS) / raw INT64 / INT64 TIMESTAMP read as TimeTypegetVectorUpdater returns None).
  • ParquetIOSuite TIME tests under withAllParquetReaders (vectorized + row, dict on/off, MICROS/NANOS, isAdjustedToUTC, read-side precision truncation) exercise the relocated decode end-to-end.
  • ParquetVectorizedSuite passes unchanged.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (Claude Opus 4.8)

…hrough the Types Framework

Vectorized-read integration was one of the follow-ups deferred by the Phase 3a Parquet PR
(SPARK-55444). This routes TimeType's vectorized Parquet updater through the Types Framework,
keyed on the Spark DataType like every other Phase 3a site, instead of the hardcoded
`instanceof TimeType` arm in ParquetVectorUpdaterFactory.

- Add ParquetTypeOps.getVectorUpdater(descriptor): Option[ParquetVectorUpdater] and a
  Java-friendly companion facade getVectorUpdaterOrNull(dt, descriptor).
- TimeTypeParquetOps owns the updater: a Scala TimeMicrosToNanosUpdater replicating the former
  ParquetVectorUpdaterFactory.LongAsNanosUpdater (INT64 micros-of-day -> nanos-of-day).
- ParquetVectorUpdaterFactory.getUpdater dispatches the framework first
  (ParquetTypeOps.getVectorUpdaterOrNull) before its built-in switch; the dead
  `instanceof TimeType` arm and the LongAsNanosUpdater class are removed.

This also makes ParquetTypeOps.isBatchReadSupported=true honest for TimeType: the framework
now supplies the vectorized updater instead of relying on the hardcoded factory arm.

No behavior change. Verified by TimeTypeParquetOpsSuite (+2 tests), ParquetIOSuite TimeType
reads (vectorized e2e), and ParquetVectorizedSuite.

Generated-by: Claude Code (Claude Opus 4.8)

Co-authored-by: Isaac
@stevomitric

Copy link
Copy Markdown
Contributor Author

cc @davidm-db - this is one of the followups for vectorized read. What do you think about the change? The issue that caught my eye was getUpdater has shared vectorized readers for specific types - so we would have to move the logic to parquet ops.

@MaxGekk

MaxGekk commented Jun 22, 2026

Copy link
Copy Markdown
Member

@stevomitric Could you rebase on the recent master, please.

@stevomitric stevomitric changed the title [SPARK-55444][SQL] Introduce and Route TimeType to Parquet vectorized read through the Types Framework [WIP][SPARK-55444][SQL] Introduce and Route TimeType to Parquet vectorized read through the Types Framework Jun 22, 2026
@stevomitric

Copy link
Copy Markdown
Contributor Author

@stevomitric Could you rebase on the recent master, please.

Will do. Still debating if we should merge this change. Marked as WIP.

…-tf-vectorized-read

# Conflicts:
#	sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
@davidm-db

Copy link
Copy Markdown
Contributor

cc @davidm-db - this is one of the followups for vectorized read. What do you think about the change? The issue that caught my eye was getUpdater has shared vectorized readers for specific types - so we would have to move the logic to parquet ops.

personally, I'm fine with the change as-is, for a few reasons:

  • if we were writing this from zero, we would do what your PR does
  • we don't have any new types in the plan at the moment that would hit this issue
  • at some points, we need to pay for not fully rewriting all types through the framework - this seems like a small cost
  • we can always figure out some kind of a solution when needed

let's see if anyone else has any thoughts.. @MaxGekk do you have an opinion?

@davidm-db davidm-db left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few notes on the WIP - the main one is the inline question on getVectorUpdater.

Minor, and not in this diff: ParquetVectorUpdaterBenchmark.scala:267 still labels the case "LongAsNanosUpdater (TimeType)", but the updater is now TimeMicrosToNanosUpdater. No functional impact - it passes TimeType() and routes through the framework, so it does exercise the new path - just worth renaming the label.

…ed updater; add reject test

Address review feedback (@davidm-db, @shrirangmhalgi) on routing the TimeType vectorized
Parquet read through the framework:

- getVectorUpdater now validates the Parquet descriptor via a shared isCompatibleParquetType
  (the same accept set as the row path's requireCompatibleParquetType): it returns Some only
  for canonical INT64 TIME(MICROS)/TIME(NANOS) and None for anything else (INT32 TIME(MILLIS),
  raw INT64, INT64 TIMESTAMP, ...), so the factory falls through to the existing clean
  SchemaColumnConvertNotSupportedException instead of decoding a non-INT64 / non-TIME column
  as TimeType. This unifies the read guard across the row-based and vectorized readers.
- Add a vectorized reject test (INT32 TIME(MILLIS) / raw INT64 / INT64 TIMESTAMP ->
  getVectorUpdater returns None); the accept test now uses real ColumnDescriptors.
- Rename the stale ParquetVectorUpdaterBenchmark label "LongAsNanosUpdater (TimeType)" ->
  "TimeVectorUpdater (TimeType)".

Generated-by: Claude Code (Claude Opus 4.8)

Co-authored-by: Isaac
…ion in TimeVectorUpdater

Address review feedback: drop TimeVectorUpdater's duplicate truncation-factor table and the
per-column factor hoist, and call the shared (table-backed) DateTimeUtils.truncateTimeToPrecision
- the same function the row-based newConverter path uses - so the two readers stay in lock-step
and there is a single source of truth for the truncation. Behavior-preserving.

Generated-by: Claude Code (Claude Opus 4.8)

Co-authored-by: Isaac
@stevomitric stevomitric changed the title [WIP][SPARK-55444][SQL] Introduce and Route TimeType to Parquet vectorized read through the Types Framework [SPARK-55444][SQL] Introduce and Route TimeType to Parquet vectorized read through the Types Framework Jun 24, 2026
… and tidy docs

Address review round-2 follow-ups (all non-blocking / nits):
- Add an end-to-end ParquetIOSuite reject test: an incompatible encoding (INT32 TIME(MILLIS),
  raw INT64, INT64 TIMESTAMP) requested as TimeType now throws a clean
  FAILED_READ_FILE.PARQUET_COLUMN_DATA_TYPE_MISMATCH on the actual vectorized reader, pinning
  the descriptor guard end-to-end (not just at the unit level).
- Add an end-to-end test for a nullable TIME(NANOS) column, exercising the single-value
  TimeVectorUpdater.readValue / decodeSingleDictionaryId path that REQUIRED columns never hit.
- Update the three ParquetVectorUpdaterBenchmark-*-results.txt row labels to match the renamed
  TimeVectorUpdater benchmark case.
- ParquetTypeOps: move isBatchReadSupported under the "Vectorized Read" section so the body
  matches the class doc; broaden TimeTypeParquetOpsSuite's class scaladoc to cover the
  vectorized-read guard it now tests.

Generated-by: Claude Code (Claude Opus 4.8)

Co-authored-by: Isaac
@stevomitric

Copy link
Copy Markdown
Contributor Author

cc @MaxGekk PTAL.

@stevomitric stevomitric requested a review from davidm-db June 25, 2026 13:17
*/
private[ops] class TimeVectorUpdater(precision: Int, fileStoresNanos: Boolean)
extends ParquetVectorUpdater {
private def toTruncatedNanos(value: Long): Long = {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new TimeVectorUpdater.toTruncatedNanos calls the shared DateTimeUtils.truncateTimeToPrecision per value; but that function is not table-backed, it recomputes math.pow on every call. Suggested resolution: make truncateTimeToPrecision itself table-backed (a 10^k lookup for k in 0..9) instead of math.pow. Then the row path, the vectorized path, and Cast all share one source of truth and the per-value cost drops to a table lookup; and the "table-backed" comment becomes accurate.

import org.apache.spark.sql.execution.datasources.DataSourceUtils;
import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
import org.apache.spark.sql.execution.datasources.parquet.types.ops.ParquetTypeOps$;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: alphabetical order


/**
* The vectorized (batch) [[ParquetVectorUpdater]] for this type, or None to fall back to the
* built-in `ParquetVectorUpdaterFactory`. A type that returns Some here should also return

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docstring nit: getVectorUpdater's "A type that returns Some here should also return true from isBatchReadSupported" is really a two-way invariant; supplying an updater without flipping the gate means the vectorized path is never taken; flipping the gate without an updater (and without legacy factory support) routes into a factory that won't recognize the type. Worth stating as the bidirectional contract.

@MaxGekk MaxGekk left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 blocking, 1 non-blocking, 0 nits. Clean, behavior-preserving refactor; one test-coverage gap that hardens the exact safety property this PR adds.

Suggestions (1)

  • TimeTypeParquetOpsSuite.scala:162: incompatible-encoding reject is only half-tested — assert the factory actually throws, not just that getVectorUpdater returns None — see inline

Verification

Traced TimeVectorUpdater against the removed factory TimeUpdater: exact equivalence — same isNanosTime unit selector, same microsToNanos conversion, same truncateTimeToPrecision(nanos, precision) truncation, identical dictionary path, all four updater methods. The row and vectorized paths are now provably value-identical (both compute truncateTimeToPrecision(if (fileStoresNanos) v else microsToNanos(v), precision)). TIME is excluded from lazy dictionary decoding (VectorizedColumnReader.isLazyDecodingSupported sets needsUpcast via isTimeTypeMatched), so truncation is never bypassed on the dict path. Incompatible encodings (INT32 TIME(MILLIS), raw INT64, INT64 TIMESTAMP) match no factory arm and reach throw constructConvertNotSupportedException — same reject set as the row path's requireCompatibleParquetType.

On the per-value truncateTimeToPrecision performance question: it's already table-backed (SparkDateTimeUtils.scala:181-188, no math.pow), and a 50M-value microbenchmark of the old hoisted-factor (x/f)*f vs the new per-value call measured identical (1.00x, ~0.53 ns/val) — the JIT hoists the array load and elides the assert, so no regression. (Minor: the benchmark result files were relabeled, not regenerated — byte-identical timings.)

// mis-decoding (e.g. readLongs over an INT32 column).
Seq(int32Millis, rawInt64, int64Timestamp).foreach { field =>
val descriptor = new ColumnDescriptor(Array("c"), field, 0, 0)
assert(TimeTypeParquetOps(timeMicros).getVectorUpdater(descriptor).isEmpty)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This asserts the framework half of the reject (getVectorUpdater returns None), but not the property the comment just above promises — that ParquetVectorUpdaterFactory.getUpdater(...), the entry point the vectorized reader actually calls, throws SchemaColumnConvertNotSupportedException for an incompatible TIME column. It does today (INT32/INT64-incompatible match no arm and reach throw constructConvertNotSupportedException), but nothing guards it: a future factory fall-through change (a broad TimeType arm or catch-all) would reintroduce the silent mis-read this PR fixes, and the isEmpty check wouldn't catch it.

Consider a factory-level reject test — ParquetVectorUpdaterSuite already has the newFactory(descriptor) + fac.getUpdater(...) pattern (~L428):

intercept[SchemaColumnConvertNotSupportedException] {
  newFactory(int32TimeMillisDescriptor).getUpdater(int32TimeMillisDescriptor, TimeType(6))
}

for INT32 TIME(MILLIS) / raw INT64 / INT64 TIMESTAMP. Non-blocking.

@MaxGekk MaxGekk left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 addressed, 1 remaining, 0 new. (Re-review on the same commit ba81c5a as my prior review — no new commits or discussion since, so the prior conclusions stand and nothing new surfaced.)

0 blocking, 1 non-blocking, 0 nits. Clean, behavior-preserving refactor that unifies the row and vectorized read guards.

Correctness (1)

  • TimeTypeParquetOpsSuite.scala:162 (remaining, non-blocking): the incompatible-encoding reject test still asserts only getVectorUpdater(...).isEmpty, not the factory throw its comment promises — see my prior inline. The new end-to-end ParquetIOSuite reject test now covers the throw at the reader level, so this is a unit-test-tightening nit, not a coverage hole.

Verification

Re-confirmed against the tree this round: TimeVectorUpdater is value-identical to the removed factory TimeUpdater — all four updater methods compute truncateTimeToPrecision(if (fileStoresNanos) v else microsToNanos(v), precision), and getVectorUpdater/requireCompatibleParquetType now share isCompatibleParquetType, so the row and vectorized readers accept/reject the identical INT64 TIME(MICROS/NANOS) set. @davidm-db's two getVectorUpdater concerns (non-INT64 read-as-TimeType regression; row-vs-vectorized divergence) are resolved by that guard: a non-INT64 column returns None, falls through the factory switch, and reaches the clean SchemaColumnConvertNotSupportedException (confirmed end-to-end by the new reject test asserting FAILED_READ_FILE.PARQUET_COLUMN_DATA_TYPE_MISMATCH). TIME stays excluded from lazy dictionary decoding (VectorizedColumnReader.java:169-175), so truncation is never bypassed on the dict path. On @uros-b's performance point: DateTimeUtils.truncateTimeToPrecision is already table-backed (SparkDateTimeUtils.scala:181 is an array lookup; math.pow is only in the one-time precompute at :188-189), so the per-value call is a table lookup, not a pow — no regression. Fork CI is green.

…pdater contract doc, factory-level reject test

- ParquetVectorUpdaterFactory: order the new ParquetTypeOps$ import alphabetically.
- ParquetTypeOps.getVectorUpdater: document the bidirectional contract with isBatchReadSupported
  (returning Some without flipping the gate leaves the vectorized path unreachable; flipping the
  gate without an updater here, and without legacy factory support, routes into a factory that
  does not recognize the type).
- ParquetVectorUpdaterSuite: add a factory-level reject test asserting
  ParquetVectorUpdaterFactory.getUpdater throws SchemaColumnConvertNotSupportedException for an
  incompatible TIME encoding (INT32 TIME(MILLIS) / raw INT64 / INT64 TIMESTAMP) requested as
  TimeType, guarding the factory fall-through directly so a future broad arm cannot silently
  reintroduce the mis-read this PR fixes.

Generated-by: Claude Code (Claude Opus 4.8)

Co-authored-by: Isaac
@MaxGekk

MaxGekk commented Jun 29, 2026

Copy link
Copy Markdown
Member

@stevomitric please, resolve conflicts.

…-tf-vectorized-read

# Conflicts:
#	sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt
#	sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt
#	sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt

@uros-b uros-b left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @stevomitric and @MaxGekk @davidm-db @shrirangmhalgi! Please resolve conflicts and proceed with this PR.

…torUpdaterSuite

The two descriptor builders in the SPARK-55444 factory-reject test exceeded 100 chars; split each
.named("col") onto its own line so scalastyle passes.

Generated-by: Claude Code (Claude Opus 4.8)

Co-authored-by: Isaac
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants