Skip to content

Commit ae1b89e

Browse files
authored
Merge pull request JanKaul#318 from peter-edb/tz-aware-datetransform
fix(317): Support TZ aware Timestamps in DateTransform
2 parents d7691eb + 2021539 commit ae1b89e

File tree

1 file changed

+244
-2
lines changed

1 file changed

+244
-2
lines changed

datafusion_iceberg/src/pruning_statistics.rs

Lines changed: 244 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use crate::error::Error as DatafusionIcebergError;
1818
use datafusion::{
1919
arrow::{
2020
array::ArrayRef,
21-
datatypes::{DataType, Schema as ArrowSchema},
21+
datatypes::{DataType, Schema as ArrowSchema, TimeUnit},
2222
},
2323
common::{
2424
tree_node::{Transformed, TreeNode},
@@ -362,7 +362,15 @@ impl DateTransform {
362362
TypeSignature::Exact(vec![DataType::Utf8, DataType::Date32]),
363363
TypeSignature::Exact(vec![
364364
DataType::Utf8,
365-
DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Microsecond, None),
365+
DataType::Timestamp(TimeUnit::Microsecond, None),
366+
]),
367+
// Iceberg `timestamptz` is always UTC microseconds, mapped to
368+
// Timestamp(Microsecond, Some("UTC")) in iceberg-rust-spec/src/arrow/schema.rs.
369+
// Arrow allows arbitrary tz strings [1] but we only accept "UTC".
370+
// [1] https://github.com/apache/arrow/blob/apache-arrow-23.0.1/format/Schema.fbs#L385
371+
TypeSignature::Exact(vec![
372+
DataType::Utf8,
373+
DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
366374
]),
367375
]),
368376
volatility: Volatility::Immutable,
@@ -467,3 +475,237 @@ fn value_to_scalarvalue(value: Value) -> Result<ScalarValue, Error> {
467475
))),
468476
}
469477
}
478+
479+
#[cfg(test)]
480+
mod date_transform_tests {
481+
use super::*;
482+
use datafusion::arrow::datatypes::Field;
483+
use datafusion::common::config::ConfigOptions;
484+
use std::sync::Arc;
485+
486+
/// Helper: invoke `DateTransform` directly with a transform name and scalar value.
487+
fn invoke_date_transform(
488+
transform_name: &str,
489+
scalar: ScalarValue,
490+
) -> datafusion::error::Result<ColumnarValue> {
491+
let dt = DateTransform::new();
492+
let value_type = scalar.data_type();
493+
dt.invoke_with_args(ScalarFunctionArgs {
494+
args: vec![
495+
ColumnarValue::Scalar(ScalarValue::new_utf8(transform_name)),
496+
ColumnarValue::Scalar(scalar),
497+
],
498+
arg_fields: vec![
499+
Arc::new(Field::new("transform", DataType::Utf8, false)),
500+
Arc::new(Field::new("value", value_type, true)),
501+
],
502+
number_rows: 1,
503+
return_field: Arc::new(Field::new("result", DataType::Int32, true)),
504+
config_options: Arc::new(ConfigOptions::default()),
505+
})
506+
}
507+
508+
/// Extract Int32 from a ColumnarValue, panicking with a clear message on mismatch.
509+
fn unwrap_int32(result: ColumnarValue) -> i32 {
510+
match result {
511+
ColumnarValue::Scalar(ScalarValue::Int32(Some(v))) => v,
512+
other => panic!("expected ScalarValue::Int32, got {other:?}"),
513+
}
514+
}
515+
516+
// 2024-03-15T10:30:00Z in microseconds since epoch
517+
const TS_MICROS: i64 = 1_710_498_600_000_000;
518+
519+
// -- invoke DateTransform with Date32 (19797 days since epoch = 2024-03-15) --
520+
521+
#[test]
522+
fn year_on_date32() {
523+
let result = invoke_date_transform("year", ScalarValue::Date32(Some(19797))).unwrap();
524+
// 2024 - 1970 = 54
525+
assert_eq!(unwrap_int32(result), 54);
526+
}
527+
528+
#[test]
529+
fn month_on_date32() {
530+
let result = invoke_date_transform("month", ScalarValue::Date32(Some(19797))).unwrap();
531+
// (2024 - 1970) * 12 + 3 = 651 (month is 1-based)
532+
assert_eq!(unwrap_int32(result), 651);
533+
}
534+
535+
#[test]
536+
fn day_on_date32() {
537+
let result = invoke_date_transform("day", ScalarValue::Date32(Some(19797))).unwrap();
538+
assert_eq!(unwrap_int32(result), 19797);
539+
}
540+
541+
#[test]
542+
fn hour_on_date32_is_rejected() {
543+
// Date32 has no time component — hour transform is not supported
544+
let result = invoke_date_transform("hour", ScalarValue::Date32(Some(19797)));
545+
assert!(
546+
result.is_err(),
547+
"hour transform should not be supported for Date32"
548+
);
549+
}
550+
551+
// -- invoke DateTransform directly with Timestamp (no TZ) --
552+
553+
#[test]
554+
fn year_on_timestamp() {
555+
let result = invoke_date_transform(
556+
"year",
557+
ScalarValue::TimestampMicrosecond(Some(TS_MICROS), None),
558+
)
559+
.unwrap();
560+
// 2024 - 1970 = 54
561+
assert_eq!(unwrap_int32(result), 54);
562+
}
563+
564+
#[test]
565+
fn month_on_timestamp() {
566+
let result = invoke_date_transform(
567+
"month",
568+
ScalarValue::TimestampMicrosecond(Some(TS_MICROS), None),
569+
)
570+
.unwrap();
571+
// (2024 - 1970) * 12 + 3 = 651 (month is 1-based)
572+
assert_eq!(unwrap_int32(result), 651);
573+
}
574+
575+
#[test]
576+
fn day_on_timestamp() {
577+
let result = invoke_date_transform(
578+
"day",
579+
ScalarValue::TimestampMicrosecond(Some(TS_MICROS), None),
580+
)
581+
.unwrap();
582+
// 2024-03-15 is day 19797 since epoch
583+
assert_eq!(unwrap_int32(result), 19797);
584+
}
585+
586+
#[test]
587+
fn hour_on_timestamp() {
588+
let result = invoke_date_transform(
589+
"hour",
590+
ScalarValue::TimestampMicrosecond(Some(TS_MICROS), None),
591+
)
592+
.unwrap();
593+
// 19797 * 24 + 10 = 475138
594+
assert_eq!(unwrap_int32(result), 475138);
595+
}
596+
597+
// -- invoke DateTransform with Timestamp(UTC) --
598+
599+
#[test]
600+
fn year_on_timestamp_with_utc() {
601+
let result = invoke_date_transform(
602+
"year",
603+
ScalarValue::TimestampMicrosecond(Some(TS_MICROS), Some("UTC".into())),
604+
)
605+
.unwrap();
606+
assert_eq!(unwrap_int32(result), 54);
607+
}
608+
609+
#[test]
610+
fn month_on_timestamp_with_utc() {
611+
let result = invoke_date_transform(
612+
"month",
613+
ScalarValue::TimestampMicrosecond(Some(TS_MICROS), Some("UTC".into())),
614+
)
615+
.unwrap();
616+
assert_eq!(unwrap_int32(result), 651);
617+
}
618+
619+
#[test]
620+
fn day_on_timestamp_with_utc() {
621+
let result = invoke_date_transform(
622+
"day",
623+
ScalarValue::TimestampMicrosecond(Some(TS_MICROS), Some("UTC".into())),
624+
)
625+
.unwrap();
626+
assert_eq!(unwrap_int32(result), 19797);
627+
}
628+
629+
#[test]
630+
fn hour_on_timestamp_with_utc() {
631+
let result = invoke_date_transform(
632+
"hour",
633+
ScalarValue::TimestampMicrosecond(Some(TS_MICROS), Some("UTC".into())),
634+
)
635+
.unwrap();
636+
assert_eq!(unwrap_int32(result), 475138);
637+
}
638+
639+
// -- edge cases --
640+
641+
#[test]
642+
fn epoch_zero_transforms() {
643+
// 1970-01-01T00:00:00Z
644+
let cases = vec![
645+
("year", 0), // 1970 - 1970 = 0
646+
("month", 1), // 0 * 12 + 1 = 1 (month is 1-based)
647+
("day", 0), // day 0 since epoch
648+
("hour", 0), // hour 0 since epoch
649+
];
650+
for (name, expected) in cases {
651+
let result =
652+
invoke_date_transform(name, ScalarValue::TimestampMicrosecond(Some(0), None))
653+
.unwrap();
654+
assert_eq!(
655+
unwrap_int32(result),
656+
expected,
657+
"epoch zero: {name} transform"
658+
);
659+
}
660+
}
661+
662+
#[test]
663+
fn invalid_transform_name_is_rejected() {
664+
let result = invoke_date_transform(
665+
"century",
666+
ScalarValue::TimestampMicrosecond(Some(TS_MICROS), None),
667+
);
668+
assert!(result.is_err(), "unknown transform name should be rejected");
669+
}
670+
671+
#[test]
672+
fn signature_rejects_non_utc_timezone() {
673+
// Iceberg only maps timestamptz to Timestamp(Microsecond, Some("UTC"))
674+
// per iceberg-rust-spec/src/arrow/schema.rs. The DateTransform signature
675+
// enforces this — non-UTC tz strings are not accepted.
676+
let udf = ScalarUDF::new_from_impl(DateTransform::new());
677+
let sig = &udf.signature().type_signature;
678+
let non_utc = vec![
679+
Some("+00:00".into()),
680+
Some("Etc/UTC".into()),
681+
Some("America/New_York".into()),
682+
];
683+
for tz in non_utc {
684+
let args = vec![
685+
DataType::Utf8,
686+
DataType::Timestamp(TimeUnit::Microsecond, tz.clone()),
687+
];
688+
let accepts = match sig {
689+
TypeSignature::OneOf(variants) => variants.iter().any(|v| match v {
690+
TypeSignature::Exact(expected) => expected == &args,
691+
_ => false,
692+
}),
693+
_ => false,
694+
};
695+
assert!(
696+
!accepts,
697+
"signature should reject Timestamp(Microsecond, {tz:?})"
698+
);
699+
}
700+
}
701+
702+
// -- transform_literal wiring --
703+
704+
#[test]
705+
fn transform_literal_identity_passes_through() {
706+
let input = Expr::Literal(ScalarValue::TimestampMicrosecond(Some(42), None), None);
707+
let result = transform_literal(input.clone(), &Transform::Identity)
708+
.expect("identity should pass through");
709+
assert_eq!(result, input);
710+
}
711+
}

0 commit comments

Comments
 (0)