Skip to content

Commit d635a1f

Browse files
committed
[SPARK-57570][SQL] Support TimeType in vectorized-reader column population
### What changes were proposed in this pull request? Make the vectorized reader handle `TimeType` when it populates partition, missing, and constant columns. The only production change is one branch in `ColumnVectorUtils.appendValue` (the `toBatch` path): a TIME value is now appended as its nanos-of-day `long` via `DateTimeUtils.localTimeToNanos`. Before, there was no `TimeType` case and it failed with "Datatype not supported". The `populate` path already worked, since `TimeType` is physically a `long`. This builds on SPARK-54203, which added the underlying `RowToColumnConverter` and column-vector allocation support. The unsupported-type branch still throws `_LEGACY_ERROR_TEMP_3192` for other types; renaming that legacy error is tracked separately in SPARK-57745. ### Why are the changes needed? This makes `TimeType` a first-class citizen in the vectorized (columnar) read path instead of a type that fails depending on where a column comes from. Concretely, it enables: - **TIME partition columns in vectorized reads.** A table partitioned by a TIME column previously errored when the partition value was materialized into the columnar batch. With this change such reads succeed with the vectorized reader engaged, so partition pruning and the columnar scan both work. - **Schema-evolution / "missing" TIME columns.** When a Parquet/ORC file predates a TIME column added to the table schema, the reader fills that column via the same population path; those reads now succeed instead of failing. - **Constant-folded TIME columns** injected into a scan populate correctly. - **`toBatch` round-trips with TIME**, e.g. row-to-columnar conversions that carry `java.time.LocalTime` values. Without this, queries touching TIME columns in these scenarios either fail with an unsupported-datatype error or fall back to the slower row-based reader. After the change, TIME behaves consistently with `DATE`, `TIMESTAMP`, and interval types in this layer, and downstream code built on `ColumnarBatch` can carry TIME columns through the population path without special-casing. It also clears a blocker in the SPARK-54203 umbrella and keeps the vectorized layer's hand-maintained type dispatch in step with the other datetime types. Note: physically reading TIME data columns stored inside Parquet/ORC files (as opposed to populated partition / missing / constant columns) is a separate concern and is out of scope here. ### Does this PR introduce _any_ user-facing change? No. TIME is an in-progress, not-yet-released data type; this only widens internal vectorized support so that previously-failing TIME column population now succeeds. ### How was this patch tested? New unit tests: - `ColumnVectorUtilsSuite` (the `populate` / constant-column path): TIME across precisions 0/6/7/9, boundary values (`00:00:00`, `23:59:59.999999999`), null (missing column), and TIME nested in struct / array / map. - `ColumnarBatchSuite` (the `toBatch` path): `TimeType` added to the random-schema test and `compareStruct` (top-level and array element), a per-precision `testVector` (0/6/7/9 + boundaries), a nested struct/array `toBatch` test, and a negative unsupported-type case. Ran `build/sbt 'sql/testOnly *ColumnVectorUtilsSuite *ColumnarBatchSuite'` (93 tests pass). Scalastyle and Java checkstyle pass. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Cursor Closes #56858 from MaxGekk/time-vec-column-pop. Authored-by: Maxim Gekk <max.gekk@gmail.com> Signed-off-by: Max Gekk <max.gekk@gmail.com>
1 parent 2539e18 commit d635a1f

3 files changed

Lines changed: 184 additions & 4 deletions

File tree

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.sql.Date;
2323
import java.sql.Timestamp;
2424
import java.time.LocalDateTime;
25+
import java.time.LocalTime;
2526
import java.util.HashMap;
2627
import java.util.Iterator;
2728
import java.util.List;
@@ -236,6 +237,8 @@ private static void appendValue(WritableColumnVector dst, DataType t, Object o)
236237
dst.appendLong(DateTimeUtils.fromJavaTimestamp((Timestamp) o));
237238
} else if (t instanceof TimestampNTZType) {
238239
dst.appendLong(DateTimeUtils.localDateTimeToMicros((LocalDateTime) o));
240+
} else if (t instanceof TimeType) {
241+
dst.appendLong(DateTimeUtils.localTimeToNanos((LocalTime) o));
239242
} else {
240243
throw new SparkUnsupportedOperationException(
241244
"UNSUPPORTED_DATATYPE", Map.of("typeName", QueryExecutionErrors.toSQLType(t)));

sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorUtilsSuite.scala

Lines changed: 97 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717

1818
package org.apache.spark.sql.execution.vectorized
1919

20+
import java.time.LocalTime
21+
2022
import org.apache.spark.SparkFunSuite
2123
import org.apache.spark.sql.catalyst.InternalRow
22-
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData}
24+
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData}
2325
import org.apache.spark.sql.types._
2426
import org.apache.spark.unsafe.types.{CalendarInterval, TimestampNanosVal}
2527
import org.apache.spark.unsafe.types.UTF8String
@@ -257,4 +259,98 @@ class ColumnVectorUtilsSuite extends SparkFunSuite {
257259
ColumnVectorUtils.populate(vector, InternalRow(null), 0)
258260
assert(vector.hasNull)
259261
}
262+
263+
private def timeNanos(s: String): Long = DateTimeUtils.localTimeToNanos(LocalTime.parse(s))
264+
265+
// TimeType is physically a long (nanoseconds since midnight). Precision affects display only,
266+
// not storage, so every TimeType(p) is filled through the same PhysicalLongType code path.
267+
Seq(
268+
0 -> "12:30:45",
269+
6 -> "12:30:45.123456",
270+
7 -> "12:30:45.1234567",
271+
9 -> "12:30:45.123456789").foreach { case (p, s) =>
272+
testConstantColumnVector(s"fill time p=$p", 10, TimeType(p)) { vector =>
273+
val nanos = timeNanos(s)
274+
ColumnVectorUtils.populate(vector, InternalRow(nanos), 0)
275+
(0 until 10).foreach { i =>
276+
assert(vector.getLong(i) == nanos)
277+
}
278+
}
279+
}
280+
281+
testConstantColumnVector("fill time boundaries", 10, TimeType(9)) { vector =>
282+
Seq(0L, 86399999999999L).foreach { nanos =>
283+
ColumnVectorUtils.populate(vector, InternalRow(nanos), 0)
284+
(0 until 10).foreach { i =>
285+
assert(vector.getLong(i) == nanos)
286+
}
287+
}
288+
}
289+
290+
testConstantColumnVector("fill time null", 10, TimeType(6)) { vector =>
291+
ColumnVectorUtils.populate(vector, InternalRow(null), 0)
292+
assert(vector.hasNull)
293+
assert(vector.numNulls() == 10)
294+
(0 until 10).foreach { i =>
295+
assert(vector.isNullAt(i))
296+
}
297+
}
298+
299+
testConstantColumnVector("fill struct with time field", 10,
300+
new StructType().add("t", TimeType(6)).add("flag", BooleanType)) { vector =>
301+
val nanos = timeNanos("01:02:03.456789")
302+
ColumnVectorUtils.populate(vector, InternalRow(InternalRow(nanos, true)), 0)
303+
(0 until 10).foreach { i =>
304+
assert(vector.getChild(0).getLong(i) == nanos)
305+
assert(vector.getChild(1).getBoolean(i))
306+
}
307+
}
308+
309+
testConstantColumnVector("fill struct with null time field", 10,
310+
new StructType().add("t", TimeType(6), nullable = true).add("flag", BooleanType)) { vector =>
311+
ColumnVectorUtils.populate(vector, InternalRow(InternalRow(null, true)), 0)
312+
(0 until 10).foreach { i =>
313+
assert(vector.getChild(0).isNullAt(i))
314+
assert(vector.getChild(1).getBoolean(i))
315+
}
316+
}
317+
318+
testConstantColumnVector("fill array of time", 10, ArrayType(TimeType(9))) { vector =>
319+
val n0 = timeNanos("00:00:01")
320+
val n1 = timeNanos("12:00:00.123456789")
321+
val n2 = 86399999999999L
322+
val arr = new GenericArrayData(Array[Any](n0, n1, n2))
323+
ColumnVectorUtils.populate(vector, InternalRow(arr), 0)
324+
(0 until 10).foreach { i =>
325+
val a = vector.getArray(i)
326+
assert(a.numElements() == 3)
327+
assert(a.getLong(0) == n0)
328+
assert(a.getLong(1) == n1)
329+
assert(a.getLong(2) == n2)
330+
}
331+
}
332+
333+
testConstantColumnVector("fill null array of time", 10, ArrayType(TimeType(6))) { vector =>
334+
ColumnVectorUtils.populate(vector, InternalRow(null), 0)
335+
assert(vector.hasNull)
336+
}
337+
338+
testConstantColumnVector("fill map of int -> time", 10,
339+
MapType(IntegerType, TimeType(6))) { vector =>
340+
val keys = new GenericArrayData(Array[Any](1, 2, 3))
341+
val v0 = timeNanos("00:00:00")
342+
val v1 = timeNanos("06:30:15.123456")
343+
val v2 = 86399999999999L
344+
val values = new GenericArrayData(Array[Any](v0, v1, v2))
345+
val map = new ArrayBasedMapData(keys, values)
346+
ColumnVectorUtils.populate(vector, InternalRow(map), 0)
347+
(0 until 10).foreach { i =>
348+
val m = vector.getMap(i)
349+
assert(m.numElements() == 3)
350+
assert(m.keyArray().toIntArray === Array(1, 2, 3))
351+
assert(m.valueArray().getLong(0) == v0)
352+
assert(m.valueArray().getLong(1) == v1)
353+
assert(m.valueArray().getLong(2) == v2)
354+
}
355+
}
260356
}

sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala

Lines changed: 84 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer
2121
import java.nio.ByteOrder
2222
import java.nio.charset.StandardCharsets
2323
import java.sql.{Date, Timestamp}
24-
import java.time.LocalDateTime
24+
import java.time.{LocalDateTime, LocalTime}
2525
import java.util
2626

2727
import scala.collection.mutable
@@ -32,7 +32,7 @@ import scala.util.Random
3232
import org.apache.arrow.vector.IntVector
3333
import org.apache.parquet.bytes.ByteBufferInputStream
3434

35-
import org.apache.spark.SparkFunSuite
35+
import org.apache.spark.{SparkFunSuite, SparkUnsupportedOperationException}
3636
import org.apache.spark.memory.MemoryMode
3737
import org.apache.spark.sql.{RandomDataGenerator, Row}
3838
import org.apache.spark.sql.catalyst.InternalRow
@@ -1432,6 +1432,10 @@ class ColumnarBatchSuite extends SparkFunSuite {
14321432
assert(r1.getLong(ordinal) ==
14331433
DateTimeUtils.localDateTimeToMicros(r2.getAs[LocalDateTime](ordinal)),
14341434
"Seed = " + seed)
1435+
case _: TimeType =>
1436+
assert(r1.getLong(ordinal) ==
1437+
DateTimeUtils.localTimeToNanos(r2.getAs[LocalTime](ordinal)),
1438+
"Seed = " + seed)
14351439
case t: DecimalType =>
14361440
val d1 = r1.getDecimal(ordinal, t.precision, t.scale).toBigDecimal
14371441
val d2 = r2.getDecimal(ordinal)
@@ -1506,6 +1510,17 @@ class ColumnarBatchSuite extends SparkFunSuite {
15061510
}
15071511
i += 1
15081512
}
1513+
case _: TimeType =>
1514+
var i = 0
1515+
while (i < a1.length) {
1516+
assert((a1(i) == null) == (a2(i) == null), "Seed = " + seed)
1517+
if (a1(i) != null) {
1518+
val i1 = a1(i).asInstanceOf[Long]
1519+
val i2 = DateTimeUtils.localTimeToNanos(a2(i).asInstanceOf[LocalTime])
1520+
assert(i1 === i2, "Seed = " + seed)
1521+
}
1522+
i += 1
1523+
}
15091524
case t: DecimalType =>
15101525
var i = 0
15111526
while (i < a1.length) {
@@ -1562,7 +1577,8 @@ class ColumnarBatchSuite extends SparkFunSuite {
15621577
DecimalType.ShortDecimal, DecimalType.IntDecimal, DecimalType.ByteDecimal,
15631578
DecimalType.FloatDecimal, DecimalType.LongDecimal, new DecimalType(5, 2),
15641579
new DecimalType(12, 2), new DecimalType(30, 10), CalendarIntervalType,
1565-
DateType, StringType, BinaryType, TimestampType, TimestampNTZType)
1580+
DateType, StringType, BinaryType, TimestampType, TimestampNTZType,
1581+
TimeType(0), TimeType(3), TimeType(), TimeType(TimeType.MAX_PRECISION))
15661582
val seed = System.nanoTime()
15671583
val NUM_ROWS = 200
15681584
val NUM_ITERS = 1000
@@ -2126,6 +2142,71 @@ class ColumnarBatchSuite extends SparkFunSuite {
21262142
}
21272143
}
21282144

2145+
// TimeType is physically a long (nanoseconds since midnight); precision affects display only.
2146+
// The generic `get(int, DataType)` accessor is intentionally not extended for TimeType in this
2147+
// change (tracked separately), so values are read back via the typed `getLong` accessor.
2148+
Seq(0, 6, 7, 9).foreach { p =>
2149+
val dt = TimeType(p)
2150+
testVector(s"TIME(precision=$p)", 10, dt) {
2151+
column =>
2152+
val values = Array(0L, 86399999999999L) ++ (2 until 10).map(_.toLong * 1000000000L)
2153+
(0 until 10).foreach { i =>
2154+
column.putLong(i, values(i))
2155+
}
2156+
val batchRow = new ColumnarBatchRow(Array(column))
2157+
(0 until 10).foreach { i =>
2158+
batchRow.rowId = i
2159+
assert(batchRow.getLong(0) == values(i))
2160+
val batchRowCopy = batchRow.copy()
2161+
assert(batchRowCopy.getLong(0) == values(i))
2162+
}
2163+
}
2164+
}
2165+
2166+
test("SPARK-57570: toBatch with TIME nested in struct and array") {
2167+
val schema = new StructType()
2168+
.add("s", new StructType().add("t", TimeType(6)).add("flag", BooleanType))
2169+
.add("a", ArrayType(TimeType(9)))
2170+
val t1 = LocalTime.parse("01:02:03.123456")
2171+
val t2 = LocalTime.parse("23:59:59.999999999")
2172+
val t3 = LocalTime.parse("00:00:00")
2173+
val n1 = DateTimeUtils.localTimeToNanos(t1)
2174+
val n2 = DateTimeUtils.localTimeToNanos(t2)
2175+
val n3 = DateTimeUtils.localTimeToNanos(t3)
2176+
val rows = Seq(
2177+
Row(Row(t1, true), Seq(t1, t2)),
2178+
Row(Row(t3, false), Seq(t3)))
2179+
Seq(MemoryMode.ON_HEAP, MemoryMode.OFF_HEAP).foreach { memMode =>
2180+
val batch = ColumnVectorUtils.toBatch(schema, memMode, rows.iterator.asJava)
2181+
try {
2182+
assert(batch.numRows() == 2)
2183+
val structCol = batch.column(0)
2184+
assert(structCol.getChild(0).getLong(0) == n1)
2185+
assert(structCol.getChild(1).getBoolean(0))
2186+
assert(structCol.getChild(0).getLong(1) == n3)
2187+
assert(!structCol.getChild(1).getBoolean(1))
2188+
val arrCol = batch.column(1)
2189+
val a0 = arrCol.getArray(0)
2190+
assert(a0.numElements() == 2)
2191+
assert(a0.getLong(0) == n1)
2192+
assert(a0.getLong(1) == n2)
2193+
val a1 = arrCol.getArray(1)
2194+
assert(a1.numElements() == 1)
2195+
assert(a1.getLong(0) == n3)
2196+
} finally {
2197+
batch.close()
2198+
}
2199+
}
2200+
}
2201+
2202+
test("SPARK-57570: toBatch throws on unsupported data type") {
2203+
val schema = new StructType().add("m", MapType(IntegerType, IntegerType))
2204+
intercept[SparkUnsupportedOperationException] {
2205+
ColumnVectorUtils.toBatch(
2206+
schema, MemoryMode.ON_HEAP, Seq(Row(Map(1 -> 2))).iterator.asJava)
2207+
}
2208+
}
2209+
21292210
testVector("[SPARK-55552] Variant", 3, VariantType) {
21302211
column =>
21312212
val valueChild = column.getChild(0)

0 commit comments

Comments
 (0)