Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,16 @@ public ParquetVectorUpdater getUpdater(ColumnDescriptor descriptor, DataType spa
return new LongAsMicrosUpdater();
} else if (sparkType instanceof DayTimeIntervalType) {
return new LongUpdater();
} else if (sparkType instanceof TimestampNTZNanosType &&
isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.NANOS)) {
// TIMESTAMP(NANOS) postdates the proleptic Gregorian switch, so no datetime rebase is
// needed. The INT64 epoch-nanos value is decomposed into (epochMicros, nanosWithinMicro)
// and truncated to the requested precision, matching the row-based converter path.
return new TimestampNanosUpdater(((TimestampNTZNanosType) sparkType).precision());
} else if (sparkType instanceof TimestampLTZNanosType &&
isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.NANOS)) {
// Same as NTZ nanos: no rebase, no timezone conversion at storage level.
return new TimestampNanosUpdater(((TimestampLTZNanosType) sparkType).precision());
} else if (canReadAsDecimal(descriptor, sparkType)) {
return new LongToDecimalUpdater(descriptor, (DecimalType) sparkType);
} else if (sparkType instanceof TimeType &&
Expand Down Expand Up @@ -922,6 +932,78 @@ public void decodeSingleDictionaryId(
}
}

// Reads an INT64 TIMESTAMP(NANOS) column. The epoch-nanos value is decomposed into
// (epochMicros, nanosWithinMicro) and the sub-microsecond digits are truncated to the requested
// precision. This matches the row-based converter in TimestampNanosParquetOps.newConverter which
// calls DateTimeUtils.epochNanosToTimestampNanos(value, precision).
// No datetime rebase is applied (TIMESTAMP(NANOS) postdates the proleptic Gregorian switch).
// No timezone conversion is applied at the storage level.
private static class TimestampNanosUpdater implements ParquetVectorUpdater {
// Pre-computed truncation divisor for the nanosWithinMicro component.
// precision 7 -> divisor 100, precision 8 -> divisor 10, precision 9 -> divisor 1
private final int nanosTruncationDivisor;

TimestampNanosUpdater(int precision) {
switch (precision) {
case 7:
this.nanosTruncationDivisor = 100;
break;
case 8:
this.nanosTruncationDivisor = 10;
break;
case 9:
this.nanosTruncationDivisor = 1;
break;
default:
throw new IllegalArgumentException(
"Invalid nanosecond timestamp precision: " + precision);
}
}

private void putTimestampNanos(int offset, WritableColumnVector values, long epochNanos) {
long epochMicros = Math.floorDiv(epochNanos, 1000L);
int rawNanosWithinMicro = (int) Math.floorMod(epochNanos, 1000L);
short nanosWithinMicro =
(short) ((rawNanosWithinMicro / nanosTruncationDivisor) * nanosTruncationDivisor);
values.getChild(0).putLong(offset, epochMicros);
values.getChild(1).putShort(offset, nanosWithinMicro);
}

@Override
public void readValues(
int total,
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
for (int i = 0; i < total; i++) {
putTimestampNanos(offset + i, values, valuesReader.readLong());
}
}

@Override
public void skipValues(int total, VectorizedValuesReader valuesReader) {
valuesReader.skipLongs(total);
}

@Override
public void readValue(
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
putTimestampNanos(offset, values, valuesReader.readLong());
}

@Override
public void decodeSingleDictionaryId(
int offset,
WritableColumnVector values,
WritableColumnVector dictionaryIds,
Dictionary dictionary) {
long epochNanos = dictionary.decodeToLong(dictionaryIds.getDictId(offset));
putTimestampNanos(offset, values, epochNanos);
}
}

// Reads an INT64 TIME column into the internal nanoseconds-since-midnight representation and
// truncates it to the requested TimeType precision. `fileStoresNanos` selects the on-disk unit:
// TIME(NANOS) stores nanos directly (identity), TIME(MICROS) stores micros (converted to nanos).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,11 @@ private boolean isLazyDecodingSupported(
// TIME columns (both MICROS and NANOS) need per-value processing in the updater: a unit
// conversion for MICROS and/or truncation to the requested precision. Lazy dictionary
// decoding would bypass the updater, so it must be disabled for them.
// TIMESTAMP(NANOS) columns also need per-value processing: the INT64 epoch-nanos is
// decomposed into a two-child vector (epochMicros, nanosWithinMicro).
boolean needsUpcast = (isDecimal && !DecimalType.is64BitDecimalType(sparkType)) ||
updaterFactory.isTimestampTypeMatched(TimeUnit.MILLIS) ||
updaterFactory.isTimestampTypeMatched(TimeUnit.NANOS) ||
updaterFactory.isTimeTypeMatched(TimeUnit.MICROS) ||
updaterFactory.isTimeTypeMatched(TimeUnit.NANOS);
boolean needsRebase = updaterFactory.isTimestampTypeMatched(TimeUnit.MICROS) &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.parquet.{HasParentContainerUpdater, ParentContainerUpdater, ParquetPrimitiveConverter}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataType, TimestampLTZNanosType, TimestampNTZNanosType}
import org.apache.spark.unsafe.types.TimestampNanosVal

Expand All @@ -53,9 +54,9 @@ import org.apache.spark.unsafe.types.TimestampNanosVal
*
* TIMESTAMP(NANOS) postdates Spark's switch to the proleptic Gregorian calendar, so the values are
* exempt from datetime rebasing (the rebase modes only cover DATE, TIMESTAMP_MILLIS and
* TIMESTAMP_MICROS). Vectorized read is not supported: the value is a 16-byte composite rather
* than a single long slot, so `isBatchReadSupported` stays false (the trait default) and reads go
* through the row-based converter.
* TIMESTAMP_MICROS). The vectorized reader decomposes the INT64 epoch-nanos into the two-child
* column vector (epochMicros: Long, nanosWithinMicro: Short) via TimestampNanosUpdater in
* ParquetVectorUpdaterFactory.
*
* @see ParquetTypeOps for the dispatch contract
* @since 4.3.0
Expand All @@ -77,6 +78,10 @@ private[parquet] trait TimestampNanosParquetOps extends ParquetTypeOps {
// The Parquet TIMESTAMP `isAdjustedToUTC` flag: LTZ is UTC-adjusted, NTZ is not.
private def isAdjustedToUTC: Boolean = !isNtz

// ==================== Vectorized Read Support ====================

override def isBatchReadSupported(sqlConf: SQLConf): Boolean = true

// ==================== Schema Conversion ====================

override def convertToParquetType(
Expand Down
Loading