Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions docs/source/user-guide/latest/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,22 @@ the [Comet Supported Expressions Guide](expressions.md) for more information on
- **StructsToJson (to_json)**: Does not support `+Infinity` and `-Infinity` for numeric types (float, double).
[#3016](https://github.com/apache/datafusion-comet/issues/3016)

## Date and Time Functions

Comet's native implementation of date and time functions may produce different results than Spark for dates
far in the future (approximately beyond year 2100). This is because Comet uses the chrono-tz library for
timezone calculations, which has limited support for Daylight Saving Time (DST) rules beyond the IANA
time zone database's explicit transitions.

For dates within a reasonable range (approximately 1970-2100), Comet's date and time functions are compatible
with Spark. For dates beyond this range, functions that involve timezone-aware calculations (such as
`date_trunc` with timezone-aware timestamps) may produce results with incorrect DST offsets.

If you need to process dates far in the future with accurate timezone handling, consider:

- Using timezone-naive types (`timestamp_ntz`) when timezone conversion is not required
- Falling back to Spark for these specific operations

## Regular Expressions

Comet uses the Rust regexp crate for evaluating regular expressions, and this has different behavior from Java's
Expand Down
33 changes: 23 additions & 10 deletions native/spark-expr/src/datetime_funcs/timestamp_trunc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,20 +113,33 @@ impl PhysicalExpr for TimestampTruncExpr {
let tz = self.timezone.clone();
match (timestamp, format) {
(ColumnarValue::Array(ts), ColumnarValue::Scalar(Utf8(Some(format)))) => {
let ts = array_with_timezone(
ts,
tz.clone(),
Some(&DataType::Timestamp(Microsecond, Some(tz.into()))),
)?;
// For TimestampNTZ (Timestamp(Microsecond, None)), skip timezone conversion.
// NTZ values are timezone-independent and truncation should operate directly
// on the naive microsecond values without any timezone resolution.
let is_ntz = matches!(ts.data_type(), DataType::Timestamp(Microsecond, None));
let ts = if is_ntz {
ts
} else {
array_with_timezone(
ts,
tz.clone(),
Some(&DataType::Timestamp(Microsecond, Some(tz.into()))),
)?
};
let result = timestamp_trunc_dyn(&ts, format)?;
Ok(ColumnarValue::Array(result))
}
(ColumnarValue::Array(ts), ColumnarValue::Array(formats)) => {
let ts = array_with_timezone(
ts,
tz.clone(),
Some(&DataType::Timestamp(Microsecond, Some(tz.into()))),
)?;
let is_ntz = matches!(ts.data_type(), DataType::Timestamp(Microsecond, None));
let ts = if is_ntz {
ts
} else {
array_with_timezone(
ts,
tz.clone(),
Some(&DataType::Timestamp(Microsecond, Some(tz.into()))),
)?
};
let result = timestamp_trunc_array_fmt_dyn(&ts, &formats)?;
Ok(ColumnarValue::Array(result))
}
Expand Down
21 changes: 21 additions & 0 deletions native/spark-expr/src/datetime_funcs/unix_timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,27 @@ impl ScalarUDFImpl for SparkUnixTimestamp {

match args {
[ColumnarValue::Array(array)] => match array.data_type() {
DataType::Timestamp(Microsecond, None) => {
// TimestampNTZ: No timezone conversion needed - simply divide microseconds
// by MICROS_PER_SECOND. TimestampNTZ stores local time without timezone.
let timestamp_array =
array.as_primitive::<arrow::datatypes::TimestampMicrosecondType>();

let result: PrimitiveArray<Int64Type> = if timestamp_array.null_count() == 0 {
timestamp_array
.values()
.iter()
.map(|&micros| micros / MICROS_PER_SECOND)
.collect()
} else {
timestamp_array
.iter()
.map(|v| v.map(|micros| div_floor(micros, MICROS_PER_SECOND)))
.collect()
};

Ok(ColumnarValue::Array(Arc::new(result)))
}
DataType::Timestamp(_, _) => {
let is_utc = self.timezone == "UTC";
let array = if is_utc
Expand Down
166 changes: 163 additions & 3 deletions native/spark-expr/src/kernels/temporal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

//! temporal kernels

use chrono::{DateTime, Datelike, Duration, NaiveDate, Timelike, Utc};
use chrono::{
DateTime, Datelike, Duration, LocalResult, NaiveDate, NaiveDateTime, Offset, TimeZone,
Timelike, Utc,
};

use std::sync::Arc;

Expand Down Expand Up @@ -153,10 +156,30 @@ where
Ok(())
}

// Apply the Tz to the Naive Date Time,,convert to UTC, and return as microseconds in Unix epoch
// Apply the Tz to the Naive Date Time, convert to UTC, and return as microseconds in Unix epoch.
// This function re-interprets the local datetime in the timezone to ensure the correct DST offset
// is used for the target date (not the original date's offset). This is important when truncation
// changes the date to a different DST period (e.g., from December/PST to October/PDT).
//
// Note: For far-future dates (approximately beyond year 2100), chrono-tz may not accurately
// calculate DST transitions, which can result in incorrect offsets. See the compatibility
// guide for more information.
#[inline]
fn as_micros_from_unix_epoch_utc(dt: Option<DateTime<Tz>>) -> i64 {
dt.unwrap().with_timezone(&Utc).timestamp_micros()
let dt = dt.unwrap();
let naive = dt.naive_local();
let tz = dt.timezone();

// Re-interpret the local time in the timezone to get the correct DST offset
// for the truncated date. Use noon to avoid DST gaps that occur around midnight.
let noon = naive.date().and_hms_opt(12, 0, 0).unwrap_or(naive);

let offset = match tz.offset_from_local_datetime(&noon) {
LocalResult::Single(off) | LocalResult::Ambiguous(off, _) => off.fix(),
LocalResult::None => return dt.with_timezone(&Utc).timestamp_micros(),
};

(naive - offset).and_utc().timestamp_micros()
}

#[inline]
Expand Down Expand Up @@ -529,6 +552,85 @@ pub(crate) fn timestamp_trunc_dyn(
}
}

/// Convert microseconds since epoch to NaiveDateTime
#[inline]
fn micros_to_naive(micros: i64) -> Option<NaiveDateTime> {
DateTime::from_timestamp_micros(micros).map(|dt| dt.naive_utc())
}

/// Convert NaiveDateTime back to microseconds since epoch
#[inline]
fn naive_to_micros(dt: NaiveDateTime) -> i64 {
dt.and_utc().timestamp_micros()
}

/// Truncate a TimestampNTZ array without any timezone conversion.
/// NTZ values are timezone-independent; we treat the raw microseconds as a naive datetime.
fn timestamp_trunc_ntz<T>(
array: &PrimitiveArray<T>,
format: String,
) -> Result<TimestampMicrosecondArray, SparkError>
where
T: ArrowTemporalType + ArrowNumericType,
i64: From<T::Native>,
{
let trunc_fn: fn(NaiveDateTime) -> Option<NaiveDateTime> = match format.to_uppercase().as_str()
{
"YEAR" | "YYYY" | "YY" => trunc_date_to_year,
"QUARTER" => trunc_date_to_quarter,
"MONTH" | "MON" | "MM" => trunc_date_to_month,
"WEEK" => trunc_date_to_week,
"DAY" | "DD" => trunc_date_to_day,
"HOUR" => trunc_date_to_hour,
"MINUTE" => trunc_date_to_minute,
"SECOND" => trunc_date_to_second,
"MILLISECOND" => trunc_date_to_ms,
"MICROSECOND" => trunc_date_to_microsec,
_ => {
return Err(SparkError::Internal(format!(
"Unsupported format: {format:?} for function 'timestamp_trunc'"
)))
}
};

let result: TimestampMicrosecondArray = array
.iter()
.map(|opt_val| {
opt_val.and_then(|v| {
let micros: i64 = v.into();
micros_to_naive(micros)
.and_then(trunc_fn)
.map(naive_to_micros)
})
})
.collect();

Ok(result)
}

/// Truncate a single NTZ value and append to builder
fn timestamp_trunc_ntz_single<F>(
value: Option<i64>,
builder: &mut PrimitiveBuilder<TimestampMicrosecondType>,
op: F,
) -> Result<(), SparkError>
where
F: Fn(NaiveDateTime) -> Option<NaiveDateTime>,
{
match value {
Some(micros) => match micros_to_naive(micros).and_then(op) {
Some(truncated) => builder.append_value(naive_to_micros(truncated)),
None => {
return Err(SparkError::Internal(
"Unable to truncate NTZ timestamp".to_string(),
))
}
},
None => builder.append_null(),
}
Ok(())
}

pub(crate) fn timestamp_trunc<T>(
array: &PrimitiveArray<T>,
format: String,
Expand All @@ -540,6 +642,10 @@ where
let builder = TimestampMicrosecondBuilder::with_capacity(array.len());
let iter = ArrayIter::new(array);
match array.data_type() {
DataType::Timestamp(TimeUnit::Microsecond, None) => {
// TimestampNTZ: operate directly on naive microsecond values without timezone
timestamp_trunc_ntz(array, format)
}
DataType::Timestamp(TimeUnit::Microsecond, Some(tz)) => {
match format.to_uppercase().as_str() {
"YEAR" | "YYYY" | "YY" => {
Expand Down Expand Up @@ -687,6 +793,60 @@ macro_rules! timestamp_trunc_array_fmt_helper {
"lengths of values array and format array must be the same"
);
match $datatype {
DataType::Timestamp(TimeUnit::Microsecond, None) => {
// TimestampNTZ: operate directly on naive microsecond values
for (index, val) in iter.enumerate() {
let micros_val = val.map(|v| i64::from(v));
let op_result = match $formats.value(index).to_uppercase().as_str() {
"YEAR" | "YYYY" | "YY" => {
timestamp_trunc_ntz_single(micros_val, &mut builder, trunc_date_to_year)
}
"QUARTER" => timestamp_trunc_ntz_single(
micros_val,
&mut builder,
trunc_date_to_quarter,
),
"MONTH" | "MON" | "MM" => timestamp_trunc_ntz_single(
micros_val,
&mut builder,
trunc_date_to_month,
),
"WEEK" => {
timestamp_trunc_ntz_single(micros_val, &mut builder, trunc_date_to_week)
}
"DAY" | "DD" => {
timestamp_trunc_ntz_single(micros_val, &mut builder, trunc_date_to_day)
}
"HOUR" => {
timestamp_trunc_ntz_single(micros_val, &mut builder, trunc_date_to_hour)
}
"MINUTE" => timestamp_trunc_ntz_single(
micros_val,
&mut builder,
trunc_date_to_minute,
),
"SECOND" => timestamp_trunc_ntz_single(
micros_val,
&mut builder,
trunc_date_to_second,
),
"MILLISECOND" => {
timestamp_trunc_ntz_single(micros_val, &mut builder, trunc_date_to_ms)
}
"MICROSECOND" => timestamp_trunc_ntz_single(
micros_val,
&mut builder,
trunc_date_to_microsec,
),
_ => Err(SparkError::Internal(format!(
"Unsupported format: {:?} for function 'timestamp_trunc'",
$formats.value(index)
))),
};
op_result?
}
Ok(builder.finish())
}
DataType::Timestamp(TimeUnit::Microsecond, Some(tz)) => {
let tz: Tz = tz.parse()?;
for (index, val) in iter.enumerate() {
Expand Down
4 changes: 1 addition & 3 deletions spark/src/main/scala/org/apache/comet/serde/datetime.scala
Original file line number Diff line number Diff line change
Expand Up @@ -295,11 +295,9 @@ object CometSecond extends CometExpressionSerde[Second] {
object CometUnixTimestamp extends CometExpressionSerde[UnixTimestamp] {

private def isSupportedInputType(expr: UnixTimestamp): Boolean = {
// Note: TimestampNTZType is not supported because Comet incorrectly applies
// timezone conversion to TimestampNTZ values. TimestampNTZ stores local time
// without timezone, so no conversion should be applied.
expr.children.head.dataType match {
case TimestampType | DateType => true
case dt if dt.typeName == "timestamp_ntz" => true
case _ => false
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.

-- Exercise Comet's native date_trunc on a timezone that observes DST.
-- CometTruncTimestamp marks non-UTC as Incompatible, so enable allowIncompatible
-- to force the native path and verify correctness against Spark.

-- Config: spark.comet.expression.TruncTimestamp.allowIncompatible=true
-- Config: spark.sql.session.timeZone=America/Los_Angeles

statement
CREATE TABLE test_trunc_dst(ts timestamp) USING parquet

statement
INSERT INTO test_trunc_dst VALUES
(timestamp('2024-11-03 06:30:00')),
(timestamp('2024-11-03 14:30:00')),
(timestamp('2024-03-10 05:30:00')),
(timestamp('2024-03-10 12:30:00')),
(timestamp('2024-11-15 12:00:00')),
(timestamp('2024-12-15 23:30:00')),
(timestamp('2024-07-15 10:00:00')),
(NULL)

-- DAY truncation on a time after fall-back (PST) produces midnight of the same
-- day, which is BEFORE the fall-back transition and should be in PDT.
query
SELECT ts, date_trunc('DAY', ts) FROM test_trunc_dst ORDER BY ts

-- HOUR truncation crossing DST boundaries.
query
SELECT ts, date_trunc('HOUR', ts) FROM test_trunc_dst ORDER BY ts

-- WEEK truncation can span DST transitions (the week of Nov 3 2024 starts Oct 28 PDT).
query
SELECT ts, date_trunc('WEEK', ts) FROM test_trunc_dst ORDER BY ts

-- MONTH truncation: Nov 15 PST truncated to MONTH gives Nov 1, which is PDT.
query
SELECT ts, date_trunc('MONTH', ts) FROM test_trunc_dst ORDER BY ts

-- QUARTER truncation: the motivating case from the PR (Dec PST -> Oct PDT).
query
SELECT ts, date_trunc('QUARTER', ts) FROM test_trunc_dst ORDER BY ts

-- YEAR truncation: Dec 15 PST truncated to YEAR gives Jan 1, which is PST too, so no DST crossing.
query
SELECT ts, date_trunc('YEAR', ts) FROM test_trunc_dst ORDER BY ts
8 changes: 7 additions & 1 deletion spark/src/test/scala/org/apache/comet/CometCastSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1594,7 +1594,13 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {
// CAST from TimestampNTZType

test("cast TimestampNTZType to StringType") {
castTest(generateTimestampNTZ(), DataTypes.StringType)
// TimestampNTZ is timezone-independent, so casting to string should produce
// the same result regardless of the session timezone.
for (tz <- representativeTimezones) {
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> tz) {
castTest(generateTimestampNTZ(), DataTypes.StringType)
}
}
}

test("cast TimestampNTZType to DateType") {
Expand Down
Loading