From 9e71267ae78d0c6cbde7ab98ebbcb17d9fa09cdd Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Wed, 25 Feb 2026 08:39:26 -0800 Subject: [PATCH 1/7] df_int_timestamp_cast --- .../spark/src/function/conversion/cast.rs | 358 ++++++++++++++++++ .../spark/src/function/conversion/mod.rs | 5 +- .../conversion/cast_int_to_timestamp.slt | 86 +++++ 3 files changed, 448 insertions(+), 1 deletion(-) create mode 100644 datafusion/spark/src/function/conversion/cast.rs create mode 100644 datafusion/sqllogictest/test_files/spark/conversion/cast_int_to_timestamp.slt diff --git a/datafusion/spark/src/function/conversion/cast.rs b/datafusion/spark/src/function/conversion/cast.rs new file mode 100644 index 0000000000000..cf71dce26d1ca --- /dev/null +++ b/datafusion/spark/src/function/conversion/cast.rs @@ -0,0 +1,358 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{Array, ArrayRef, AsArray, TimestampMicrosecondBuilder}; +use arrow::datatypes::{ + ArrowPrimitiveType, DataType, Int8Type, Int16Type, Int32Type, Int64Type, TimeUnit, +}; +use datafusion_common::{Result as DataFusionResult, ScalarValue, exec_err}; +use datafusion_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, +}; +use std::any::Any; +use std::sync::Arc; +const MICROS_PER_SECOND: i64 = 1_000_000; + +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct Cast { + signature: Signature, +} +impl Default for Cast { + fn default() -> Self { + Self::new() + } +} + +impl Cast { + pub fn new() -> Self { + Self { + signature: Signature::any(1, Volatility::Immutable), + } + } +} + +fn cast_int_to_timestamp( + array: &ArrayRef, +) -> DataFusionResult +where + T::Native: Into, +{ + let arr = array.as_primitive::(); + let mut builder = TimestampMicrosecondBuilder::with_capacity(arr.len()); + + for i in 0..arr.len() { + if arr.is_null(i) { + builder.append_null(); + } else { + let micros = (arr.value(i).into()).saturating_mul(MICROS_PER_SECOND); + builder.append_value(micros); + } + } + + Ok(Arc::new(builder.finish())) +} + +impl ScalarUDFImpl for Cast { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "spark_cast" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> DataFusionResult { + // for now we will be supporting int -> timestamp and keep adding more spark-compatible spark + match &arg_types[0] { + DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => { + Ok(DataType::Timestamp(TimeUnit::Microsecond, None)) + } + _ => exec_err!("Unsupported cast from {:?}", arg_types[0]), + } + } + + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> DataFusionResult { + let input = &args.args[0]; + match input { + ColumnarValue::Array(array) => match array.data_type() { + DataType::Int8 => { + let result = cast_int_to_timestamp::(array)?; + Ok(ColumnarValue::Array(result)) + } + DataType::Int16 => { + let result = cast_int_to_timestamp::(array)?; + Ok(ColumnarValue::Array(result)) + } + DataType::Int32 => { + let result = cast_int_to_timestamp::(array)?; + Ok(ColumnarValue::Array(result)) + } + DataType::Int64 => { + let result = cast_int_to_timestamp::(array)?; + Ok(ColumnarValue::Array(result)) + } + _ => exec_err!( + "Unsupported cast from {:?} to timestamp", + array.data_type() + ), + }, + ColumnarValue::Scalar(scalar) => { + // Handle scalar conversions + match scalar { + ScalarValue::Int8(None) + | ScalarValue::Int16(None) + | ScalarValue::Int32(None) + | ScalarValue::Int64(None) => Ok(ColumnarValue::Scalar( + ScalarValue::TimestampMicrosecond(None, None), + )), + ScalarValue::Int8(Some(v)) => { + let micros = (*v as i64).saturating_mul(MICROS_PER_SECOND); + Ok(ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond( + Some(micros), + None, + ))) + } + ScalarValue::Int16(Some(v)) => { + let micros = (*v as i64).saturating_mul(MICROS_PER_SECOND); + Ok(ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond( + Some(micros), + None, + ))) + } + ScalarValue::Int32(Some(v)) => { + let micros = (*v as i64).saturating_mul(MICROS_PER_SECOND); + Ok(ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond( + Some(micros), + None, + ))) + } + ScalarValue::Int64(Some(v)) => { + let micros = (*v).saturating_mul(MICROS_PER_SECOND); + Ok(ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond( + Some(micros), + None, + ))) + } + _ => exec_err!("Unsupported cast from {:?} to timestamp", scalar), + } + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::{Int8Array, Int16Array, Int32Array, Int64Array}; + use arrow::datatypes::{Field, TimestampMicrosecondType}; + use datafusion_expr::ScalarFunctionArgs; + + fn make_args(input: ColumnarValue) -> ScalarFunctionArgs { + let return_field = Arc::new(Field::new( + "result", + DataType::Timestamp(TimeUnit::Microsecond, None), + true, + )); + ScalarFunctionArgs { + args: vec![input], + arg_fields: vec![], + number_rows: 0, + return_field, + config_options: Arc::new(Default::default()), + } + } + + fn assert_scalar_timestamp(result: ColumnarValue, expected: i64) { + match result { + ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(val), None)) => { + assert_eq!(val, expected); + } + _ => panic!("Expected scalar timestamp with value {expected}"), + } + } + + fn assert_scalar_null(result: ColumnarValue) { + assert!(matches!( + result, + ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(None, None)) + )); + } + + #[test] + fn test_cast_int8_array_to_timestamp() { + let array: ArrayRef = Arc::new(Int8Array::from(vec![ + Some(0), + Some(1), + Some(-1), + Some(127), + Some(-128), + None, + ])); + + let cast = Cast::new(); + let args = make_args(ColumnarValue::Array(array)); + let result = cast.invoke_with_args(args).unwrap(); + + match result { + ColumnarValue::Array(result_array) => { + let ts_array = result_array.as_primitive::(); + assert_eq!(ts_array.value(0), 0); + assert_eq!(ts_array.value(1), 1_000_000); + assert_eq!(ts_array.value(2), -1_000_000); + assert_eq!(ts_array.value(3), 127_000_000); + assert_eq!(ts_array.value(4), -128_000_000); + assert!(ts_array.is_null(5)); + } + _ => panic!("Expected array result"), + } + } + + #[test] + fn test_cast_int16_array_to_timestamp() { + let array: ArrayRef = Arc::new(Int16Array::from(vec![ + Some(0), + Some(32767), + Some(-32768), + None, + ])); + + let cast = Cast::new(); + let args = make_args(ColumnarValue::Array(array)); + let result = cast.invoke_with_args(args).unwrap(); + + match result { + ColumnarValue::Array(result_array) => { + let ts_array = result_array.as_primitive::(); + assert_eq!(ts_array.value(0), 0); + assert_eq!(ts_array.value(1), 32_767_000_000); + assert_eq!(ts_array.value(2), -32_768_000_000); + assert!(ts_array.is_null(3)); + } + _ => panic!("Expected array result"), + } + } + + #[test] + fn test_cast_int32_array_to_timestamp() { + let array: ArrayRef = + Arc::new(Int32Array::from(vec![Some(0), Some(1704067200), None])); + + let cast = Cast::new(); + let args = make_args(ColumnarValue::Array(array)); + let result = cast.invoke_with_args(args).unwrap(); + + match result { + ColumnarValue::Array(result_array) => { + let ts_array = result_array.as_primitive::(); + assert_eq!(ts_array.value(0), 0); + assert_eq!(ts_array.value(1), 1_704_067_200_000_000); + assert!(ts_array.is_null(2)); + } + _ => panic!("Expected array result"), + } + } + + #[test] + fn test_cast_int64_array_overflow() { + let array: ArrayRef = + Arc::new(Int64Array::from(vec![Some(i64::MAX), Some(i64::MIN)])); + + let cast = Cast::new(); + let args = make_args(ColumnarValue::Array(array)); + let result = cast.invoke_with_args(args).unwrap(); + + match result { + ColumnarValue::Array(result_array) => { + let ts_array = result_array.as_primitive::(); + assert_eq!(ts_array.value(0), i64::MAX); + assert_eq!(ts_array.value(1), i64::MIN); + } + _ => panic!("Expected array result"), + } + } + + #[test] + fn test_cast_scalar_int8() { + let cast = Cast::new(); + let args = make_args(ColumnarValue::Scalar(ScalarValue::Int8(Some(100)))); + let result = cast.invoke_with_args(args).unwrap(); + assert_scalar_timestamp(result, 100_000_000); + } + + #[test] + fn test_cast_scalar_int32() { + let cast = Cast::new(); + let args = make_args(ColumnarValue::Scalar(ScalarValue::Int32(Some(1704067200)))); + let result = cast.invoke_with_args(args).unwrap(); + assert_scalar_timestamp(result, 1_704_067_200_000_000); + } + + #[test] + fn test_cast_scalar_null() { + let cast = Cast::new(); + let args = make_args(ColumnarValue::Scalar(ScalarValue::Int64(None))); + let result = cast.invoke_with_args(args).unwrap(); + assert_scalar_null(result); + } + + #[test] + fn test_cast_scalar_int64_overflow() { + let cast = Cast::new(); + let args = make_args(ColumnarValue::Scalar(ScalarValue::Int64(Some(i64::MAX)))); + let result = cast.invoke_with_args(args).unwrap(); + assert_scalar_timestamp(result, i64::MAX); + } + + #[test] + fn test_unsupported_scalar_type() { + let cast = Cast::new(); + let args = make_args(ColumnarValue::Scalar(ScalarValue::Utf8(Some( + "2024-01-01".to_string(), + )))); + let result = cast.invoke_with_args(args); + assert!(result.is_err()); + assert!( + result + .unwrap_err() + .to_string() + .contains("Unsupported cast from") + ); + } + + #[test] + fn test_unsupported_array_type() { + let cast = Cast::new(); + let array: ArrayRef = + Arc::new(arrow::array::Float32Array::from(vec![1.0, 2.0, 3.0])); + let args = make_args(ColumnarValue::Array(array)); + let result = cast.invoke_with_args(args); + assert!(result.is_err()); + assert!( + result + .unwrap_err() + .to_string() + .contains("Unsupported cast from") + ); + } +} diff --git a/datafusion/spark/src/function/conversion/mod.rs b/datafusion/spark/src/function/conversion/mod.rs index a87df9a2c87a0..f1877a03dcc95 100644 --- a/datafusion/spark/src/function/conversion/mod.rs +++ b/datafusion/spark/src/function/conversion/mod.rs @@ -15,11 +15,14 @@ // specific language governing permissions and limitations // under the License. +mod cast; + +use cast::Cast; use datafusion_expr::ScalarUDF; use std::sync::Arc; pub mod expr_fn {} pub fn functions() -> Vec> { - vec![] + vec![Arc::new(ScalarUDF::from(Cast::new()))] } diff --git a/datafusion/sqllogictest/test_files/spark/conversion/cast_int_to_timestamp.slt b/datafusion/sqllogictest/test_files/spark/conversion/cast_int_to_timestamp.slt new file mode 100644 index 0000000000000..7952b477cf1ee --- /dev/null +++ b/datafusion/sqllogictest/test_files/spark/conversion/cast_int_to_timestamp.slt @@ -0,0 +1,86 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Test spark_cast from int8 to timestamp +query P +SELECT spark_cast(arrow_cast(0, 'Int8')); +---- +1970-01-01T00:00:00 + +query P +SELECT spark_cast(arrow_cast(1, 'Int8')); +---- +1970-01-01T00:00:01 + +query P +SELECT spark_cast(arrow_cast(-1, 'Int8')); +---- +1969-12-31T23:59:59 + +# Test spark_cast from int16 to timestamp +query P +SELECT spark_cast(arrow_cast(0, 'Int16')); +---- +1970-01-01T00:00:00 + +query P +SELECT spark_cast(arrow_cast(3600, 'Int16')); +---- +1970-01-01T01:00:00 + +# Test spark_cast from int32 to timestamp +query P +SELECT spark_cast(arrow_cast(0, 'Int32')); +---- +1970-01-01T00:00:00 + +query P +SELECT spark_cast(arrow_cast(1704067200, 'Int32')); +---- +2024-01-01T00:00:00 + +# Test spark_cast from int64 to timestamp +query P +SELECT spark_cast(0::bigint); +---- +1970-01-01T00:00:00 + +query P +SELECT spark_cast(1704067200::bigint); +---- +2024-01-01T00:00:00 + +# Test NULL handling +query P +SELECT spark_cast(arrow_cast(NULL, 'Int8')); +---- +NULL + +query P +SELECT spark_cast(arrow_cast(NULL, 'Int16')); +---- +NULL + +query P +SELECT spark_cast(arrow_cast(NULL, 'Int32')); +---- +NULL + +query P +SELECT spark_cast(NULL::bigint); +---- +NULL From 199e3f7c3c9446f049a2160cdf0b4c6831b5dbd7 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Wed, 4 Mar 2026 22:40:15 -0800 Subject: [PATCH 2/7] setup_spark_based_cast_tests --- .../spark/src/function/conversion/cast.rs | 382 +++++++++++++----- .../spark/src/function/conversion/mod.rs | 16 +- .../conversion/cast_int_to_timestamp.slt | 68 +++- 3 files changed, 342 insertions(+), 124 deletions(-) diff --git a/datafusion/spark/src/function/conversion/cast.rs b/datafusion/spark/src/function/conversion/cast.rs index cf71dce26d1ca..eb7da92f937de 100644 --- a/datafusion/spark/src/function/conversion/cast.rs +++ b/datafusion/spark/src/function/conversion/cast.rs @@ -17,31 +17,92 @@ use arrow::array::{Array, ArrayRef, AsArray, TimestampMicrosecondBuilder}; use arrow::datatypes::{ - ArrowPrimitiveType, DataType, Int8Type, Int16Type, Int32Type, Int64Type, TimeUnit, + ArrowPrimitiveType, DataType, Field, FieldRef, Int8Type, Int16Type, Int32Type, + Int64Type, TimeUnit, +}; +use datafusion_common::utils::take_function_args; +use datafusion_common::{ + Result as DataFusionResult, ScalarValue, exec_err, internal_err, }; -use datafusion_common::{Result as DataFusionResult, ScalarValue, exec_err}; use datafusion_expr::{ - ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, + ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature, + TypeSignature, Volatility, }; use std::any::Any; use std::sync::Arc; + const MICROS_PER_SECOND: i64 = 1_000_000; +/// Spark-compatible `cast` function for type conversions. +/// +/// This implements Spark's CAST expression with a target type parameter. +/// +/// # Usage +/// ```sql +/// SELECT spark_cast(value, 'timestamp') +/// ``` +/// +/// # Currently supported conversions +/// - Int8/Int16/Int32/Int64 -> Timestamp (target_type = 'timestamp') +/// +/// The integer value is interpreted as seconds since the Unix epoch (1970-01-01 00:00:00 UTC) +/// and converted to a timestamp with microsecond precision. +/// +/// # Overflow behavior +/// Uses saturating multiplication to handle overflow - values that would overflow +/// i64 when multiplied by 1,000,000 are clamped to i64::MAX or i64::MIN. +/// +/// # References +/// - #[derive(Debug, PartialEq, Eq, Hash)] -pub struct Cast { +pub struct SparkCast { signature: Signature, } -impl Default for Cast { + +impl Default for SparkCast { fn default() -> Self { Self::new() } } -impl Cast { +impl SparkCast { pub fn new() -> Self { Self { - signature: Signature::any(1, Volatility::Immutable), + // First arg: value to cast (only ints for now with potential to add further support later) + // Second arg: target datatype as Utf8 string literal (ex : 'timestamp') + signature: Signature::one_of( + vec![TypeSignature::Any(2)], + Volatility::Immutable, + ), + } + } +} + +/// Parse target type string into a DataType +fn parse_target_type(type_str: &str) -> DataFusionResult { + match type_str.to_lowercase().as_str() { + // could add further data type support in future + "timestamp" => Ok(DataType::Timestamp(TimeUnit::Microsecond, None)), + other => exec_err!( + "Unsupported spark_cast target type '{}'. Supported types: timestamp", + other + ), + } +} + +/// Extract target type string from scalar arguments +fn get_target_type_from_scalar_args( + scalar_args: &[Option<&ScalarValue>], +) -> DataFusionResult { + let [_, type_arg] = take_function_args("spark_cast", scalar_args)?; + + match type_arg { + Some(ScalarValue::Utf8(Some(s))) | Some(ScalarValue::LargeUtf8(Some(s))) => { + parse_target_type(s) } + _ => exec_err!( + "spark_cast requires second argument to be a string of target data type ex: timestamp" + ), } } @@ -58,6 +119,7 @@ where if arr.is_null(i) { builder.append_null(); } else { + // spark saturates to i64 min/max let micros = (arr.value(i).into()).saturating_mul(MICROS_PER_SECOND); builder.append_value(micros); } @@ -66,7 +128,7 @@ where Ok(Arc::new(builder.finish())) } -impl ScalarUDFImpl for Cast { +impl ScalarUDFImpl for SparkCast { fn as_any(&self) -> &dyn Any { self } @@ -79,84 +141,79 @@ impl ScalarUDFImpl for Cast { &self.signature } - fn return_type(&self, arg_types: &[DataType]) -> DataFusionResult { - // for now we will be supporting int -> timestamp and keep adding more spark-compatible spark - match &arg_types[0] { - DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => { - Ok(DataType::Timestamp(TimeUnit::Microsecond, None)) - } - _ => exec_err!("Unsupported cast from {:?}", arg_types[0]), - } + fn return_type(&self, _arg_types: &[DataType]) -> DataFusionResult { + internal_err!("return_field_from_args should be used instead") + } + + fn return_field_from_args( + &self, + args: ReturnFieldArgs, + ) -> DataFusionResult { + let nullable = args.arg_fields.iter().any(|f| f.is_nullable()); + let target_type = get_target_type_from_scalar_args(args.scalar_arguments)?; + Ok(Arc::new(Field::new(self.name(), target_type, nullable))) } fn invoke_with_args( &self, args: ScalarFunctionArgs, ) -> DataFusionResult { - let input = &args.args[0]; - match input { - ColumnarValue::Array(array) => match array.data_type() { - DataType::Int8 => { - let result = cast_int_to_timestamp::(array)?; - Ok(ColumnarValue::Array(result)) - } - DataType::Int16 => { - let result = cast_int_to_timestamp::(array)?; - Ok(ColumnarValue::Array(result)) + let target_type = args.return_field.data_type(); + + match target_type { + DataType::Timestamp(TimeUnit::Microsecond, None) => { + cast_to_timestamp(&args.args[0]) + } + other => exec_err!("Unsupported spark_cast target type: {:?}", other), + } + } +} + +/// Cast value to timestamp +fn cast_to_timestamp(input: &ColumnarValue) -> DataFusionResult { + match input { + ColumnarValue::Array(array) => match array.data_type() { + DataType::Null => Ok(ColumnarValue::Array(Arc::new( + arrow::array::TimestampMicrosecondArray::new_null(array.len()), + ))), + DataType::Int8 => Ok(ColumnarValue::Array( + cast_int_to_timestamp::(array)?, + )), + DataType::Int16 => Ok(ColumnarValue::Array(cast_int_to_timestamp::< + Int16Type, + >(array)?)), + DataType::Int32 => Ok(ColumnarValue::Array(cast_int_to_timestamp::< + Int32Type, + >(array)?)), + DataType::Int64 => Ok(ColumnarValue::Array(cast_int_to_timestamp::< + Int64Type, + >(array)?)), + other => exec_err!("Unsupported cast from {:?} to timestamp", other), + }, + ColumnarValue::Scalar(scalar) => { + let micros = match scalar { + ScalarValue::Null + | ScalarValue::Int8(None) + | ScalarValue::Int16(None) + | ScalarValue::Int32(None) + | ScalarValue::Int64(None) => None, + ScalarValue::Int8(Some(v)) => { + Some((*v as i64).saturating_mul(MICROS_PER_SECOND)) } - DataType::Int32 => { - let result = cast_int_to_timestamp::(array)?; - Ok(ColumnarValue::Array(result)) + ScalarValue::Int16(Some(v)) => { + Some((*v as i64).saturating_mul(MICROS_PER_SECOND)) } - DataType::Int64 => { - let result = cast_int_to_timestamp::(array)?; - Ok(ColumnarValue::Array(result)) + ScalarValue::Int32(Some(v)) => { + Some((*v as i64).saturating_mul(MICROS_PER_SECOND)) } - _ => exec_err!( - "Unsupported cast from {:?} to timestamp", - array.data_type() - ), - }, - ColumnarValue::Scalar(scalar) => { - // Handle scalar conversions - match scalar { - ScalarValue::Int8(None) - | ScalarValue::Int16(None) - | ScalarValue::Int32(None) - | ScalarValue::Int64(None) => Ok(ColumnarValue::Scalar( - ScalarValue::TimestampMicrosecond(None, None), - )), - ScalarValue::Int8(Some(v)) => { - let micros = (*v as i64).saturating_mul(MICROS_PER_SECOND); - Ok(ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond( - Some(micros), - None, - ))) - } - ScalarValue::Int16(Some(v)) => { - let micros = (*v as i64).saturating_mul(MICROS_PER_SECOND); - Ok(ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond( - Some(micros), - None, - ))) - } - ScalarValue::Int32(Some(v)) => { - let micros = (*v as i64).saturating_mul(MICROS_PER_SECOND); - Ok(ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond( - Some(micros), - None, - ))) - } - ScalarValue::Int64(Some(v)) => { - let micros = (*v).saturating_mul(MICROS_PER_SECOND); - Ok(ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond( - Some(micros), - None, - ))) - } - _ => exec_err!("Unsupported cast from {:?} to timestamp", scalar), + ScalarValue::Int64(Some(v)) => Some(v.saturating_mul(MICROS_PER_SECOND)), + other => { + return exec_err!("Unsupported cast from {:?} to timestamp", other); } - } + }; + Ok(ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond( + micros, None, + ))) } } } @@ -165,17 +222,19 @@ impl ScalarUDFImpl for Cast { mod tests { use super::*; use arrow::array::{Int8Array, Int16Array, Int32Array, Int64Array}; - use arrow::datatypes::{Field, TimestampMicrosecondType}; - use datafusion_expr::ScalarFunctionArgs; + use arrow::datatypes::TimestampMicrosecondType; - fn make_args(input: ColumnarValue) -> ScalarFunctionArgs { + fn make_args(input: ColumnarValue, target_type: &str) -> ScalarFunctionArgs { let return_field = Arc::new(Field::new( "result", DataType::Timestamp(TimeUnit::Microsecond, None), true, )); ScalarFunctionArgs { - args: vec![input], + args: vec![ + input, + ColumnarValue::Scalar(ScalarValue::Utf8(Some(target_type.to_string()))), + ], arg_fields: vec![], number_rows: 0, return_field, @@ -210,8 +269,8 @@ mod tests { None, ])); - let cast = Cast::new(); - let args = make_args(ColumnarValue::Array(array)); + let cast = SparkCast::new(); + let args = make_args(ColumnarValue::Array(array), "timestamp"); let result = cast.invoke_with_args(args).unwrap(); match result { @@ -237,8 +296,8 @@ mod tests { None, ])); - let cast = Cast::new(); - let args = make_args(ColumnarValue::Array(array)); + let cast = SparkCast::new(); + let args = make_args(ColumnarValue::Array(array), "timestamp"); let result = cast.invoke_with_args(args).unwrap(); match result { @@ -258,8 +317,8 @@ mod tests { let array: ArrayRef = Arc::new(Int32Array::from(vec![Some(0), Some(1704067200), None])); - let cast = Cast::new(); - let args = make_args(ColumnarValue::Array(array)); + let cast = SparkCast::new(); + let args = make_args(ColumnarValue::Array(array), "timestamp"); let result = cast.invoke_with_args(args).unwrap(); match result { @@ -278,13 +337,14 @@ mod tests { let array: ArrayRef = Arc::new(Int64Array::from(vec![Some(i64::MAX), Some(i64::MIN)])); - let cast = Cast::new(); - let args = make_args(ColumnarValue::Array(array)); + let cast = SparkCast::new(); + let args = make_args(ColumnarValue::Array(array), "timestamp"); let result = cast.invoke_with_args(args).unwrap(); match result { ColumnarValue::Array(result_array) => { let ts_array = result_array.as_primitive::(); + // saturating_mul clamps to i64::MAX/MIN assert_eq!(ts_array.value(0), i64::MAX); assert_eq!(ts_array.value(1), i64::MIN); } @@ -292,60 +352,138 @@ mod tests { } } + #[test] + fn test_cast_int64_array_to_timestamp() { + let array: ArrayRef = Arc::new(Int64Array::from(vec![ + Some(0), + Some(1704067200), + Some(-86400), + None, + ])); + + let cast = SparkCast::new(); + let args = make_args(ColumnarValue::Array(array), "timestamp"); + let result = cast.invoke_with_args(args).unwrap(); + + match result { + ColumnarValue::Array(result_array) => { + let ts_array = result_array.as_primitive::(); + assert_eq!(ts_array.value(0), 0); + assert_eq!(ts_array.value(1), 1_704_067_200_000_000); + assert_eq!(ts_array.value(2), -86_400_000_000); // -1 day + assert!(ts_array.is_null(3)); + } + _ => panic!("Expected array result"), + } + } + #[test] fn test_cast_scalar_int8() { - let cast = Cast::new(); - let args = make_args(ColumnarValue::Scalar(ScalarValue::Int8(Some(100)))); + let cast = SparkCast::new(); + let args = make_args( + ColumnarValue::Scalar(ScalarValue::Int8(Some(100))), + "timestamp", + ); + let result = cast.invoke_with_args(args).unwrap(); + assert_scalar_timestamp(result, 100_000_000); + } + + #[test] + fn test_cast_scalar_int16() { + let cast = SparkCast::new(); + let args = make_args( + ColumnarValue::Scalar(ScalarValue::Int16(Some(100))), + "timestamp", + ); let result = cast.invoke_with_args(args).unwrap(); assert_scalar_timestamp(result, 100_000_000); } #[test] fn test_cast_scalar_int32() { - let cast = Cast::new(); - let args = make_args(ColumnarValue::Scalar(ScalarValue::Int32(Some(1704067200)))); + let cast = SparkCast::new(); + let args = make_args( + ColumnarValue::Scalar(ScalarValue::Int32(Some(1704067200))), + "timestamp", + ); let result = cast.invoke_with_args(args).unwrap(); assert_scalar_timestamp(result, 1_704_067_200_000_000); } + #[test] + fn test_cast_scalar_int64() { + let cast = SparkCast::new(); + let args = make_args( + ColumnarValue::Scalar(ScalarValue::Int64(Some(1704067200))), + "timestamp", + ); + let result = cast.invoke_with_args(args).unwrap(); + assert_scalar_timestamp(result, 1_704_067_200_000_000); + } + + #[test] + fn test_cast_scalar_negative() { + let cast = SparkCast::new(); + let args = make_args( + ColumnarValue::Scalar(ScalarValue::Int32(Some(-86400))), + "timestamp", + ); + let result = cast.invoke_with_args(args).unwrap(); + // -86400 seconds = -1 day before epoch + assert_scalar_timestamp(result, -86_400_000_000); + } + #[test] fn test_cast_scalar_null() { - let cast = Cast::new(); - let args = make_args(ColumnarValue::Scalar(ScalarValue::Int64(None))); + let cast = SparkCast::new(); + let args = + make_args(ColumnarValue::Scalar(ScalarValue::Int64(None)), "timestamp"); let result = cast.invoke_with_args(args).unwrap(); assert_scalar_null(result); } #[test] fn test_cast_scalar_int64_overflow() { - let cast = Cast::new(); - let args = make_args(ColumnarValue::Scalar(ScalarValue::Int64(Some(i64::MAX)))); + let cast = SparkCast::new(); + let args = make_args( + ColumnarValue::Scalar(ScalarValue::Int64(Some(i64::MAX))), + "timestamp", + ); let result = cast.invoke_with_args(args).unwrap(); + // saturating_mul clamps to i64::MAX assert_scalar_timestamp(result, i64::MAX); } #[test] - fn test_unsupported_scalar_type() { - let cast = Cast::new(); - let args = make_args(ColumnarValue::Scalar(ScalarValue::Utf8(Some( - "2024-01-01".to_string(), - )))); - let result = cast.invoke_with_args(args); + fn test_unsupported_target_type() { + let cast = SparkCast::new(); + // invoke_with_args uses return_field which would be set correctly by planning + // For this test, we need to check return_field_from_args + let arg_fields: Vec = + vec![Arc::new(Field::new("a", DataType::Int64, true))]; + let target_type = ScalarValue::Utf8(Some("string".to_string())); + let scalar_arguments: Vec> = vec![None, Some(&target_type)]; + let return_field_args = ReturnFieldArgs { + arg_fields: &arg_fields, + scalar_arguments: &scalar_arguments, + }; + let result = cast.return_field_from_args(return_field_args); assert!(result.is_err()); assert!( result .unwrap_err() .to_string() - .contains("Unsupported cast from") + .contains("Unsupported spark_cast target type") ); } #[test] - fn test_unsupported_array_type() { - let cast = Cast::new(); - let array: ArrayRef = - Arc::new(arrow::array::Float32Array::from(vec![1.0, 2.0, 3.0])); - let args = make_args(ColumnarValue::Array(array)); + fn test_unsupported_source_type() { + let cast = SparkCast::new(); + let args = make_args( + ColumnarValue::Scalar(ScalarValue::Utf8(Some("2024-01-01".to_string()))), + "timestamp", + ); let result = cast.invoke_with_args(args); assert!(result.is_err()); assert!( @@ -355,4 +493,32 @@ mod tests { .contains("Unsupported cast from") ); } + + #[test] + fn test_cast_null_to_timestamp() { + let cast = SparkCast::new(); + let args = make_args(ColumnarValue::Scalar(ScalarValue::Null), "timestamp"); + let result = cast.invoke_with_args(args).unwrap(); + assert_scalar_null(result); + } + + #[test] + fn test_cast_null_array_to_timestamp() { + let array: ArrayRef = Arc::new(arrow::array::NullArray::new(3)); + + let cast = SparkCast::new(); + let args = make_args(ColumnarValue::Array(array), "timestamp"); + let result = cast.invoke_with_args(args).unwrap(); + + match result { + ColumnarValue::Array(result_array) => { + let ts_array = result_array.as_primitive::(); + assert_eq!(ts_array.len(), 3); + assert!(ts_array.is_null(0)); + assert!(ts_array.is_null(1)); + assert!(ts_array.is_null(2)); + } + _ => panic!("Expected array result"), + } + } } diff --git a/datafusion/spark/src/function/conversion/mod.rs b/datafusion/spark/src/function/conversion/mod.rs index f1877a03dcc95..06b08500379c9 100644 --- a/datafusion/spark/src/function/conversion/mod.rs +++ b/datafusion/spark/src/function/conversion/mod.rs @@ -17,12 +17,22 @@ mod cast; -use cast::Cast; use datafusion_expr::ScalarUDF; +use datafusion_functions::make_udf_function; use std::sync::Arc; -pub mod expr_fn {} +make_udf_function!(cast::SparkCast, spark_cast); + +pub mod expr_fn { + use datafusion_functions::export_functions; + + export_functions!(( + spark_cast, + "Casts given value to the specified type following Spark-compatible semantics", + arg1 arg2 + )); +} pub fn functions() -> Vec> { - vec![Arc::new(ScalarUDF::from(Cast::new()))] + vec![spark_cast()] } diff --git a/datafusion/sqllogictest/test_files/spark/conversion/cast_int_to_timestamp.slt b/datafusion/sqllogictest/test_files/spark/conversion/cast_int_to_timestamp.slt index 7952b477cf1ee..3dce64511335b 100644 --- a/datafusion/sqllogictest/test_files/spark/conversion/cast_int_to_timestamp.slt +++ b/datafusion/sqllogictest/test_files/spark/conversion/cast_int_to_timestamp.slt @@ -17,70 +17,112 @@ # Test spark_cast from int8 to timestamp query P -SELECT spark_cast(arrow_cast(0, 'Int8')); +SELECT spark_cast(arrow_cast(0, 'Int8'), 'timestamp'); ---- 1970-01-01T00:00:00 query P -SELECT spark_cast(arrow_cast(1, 'Int8')); +SELECT spark_cast(arrow_cast(1, 'Int8'), 'timestamp'); ---- 1970-01-01T00:00:01 query P -SELECT spark_cast(arrow_cast(-1, 'Int8')); +SELECT spark_cast(arrow_cast(-1, 'Int8'), 'timestamp'); ---- 1969-12-31T23:59:59 # Test spark_cast from int16 to timestamp query P -SELECT spark_cast(arrow_cast(0, 'Int16')); +SELECT spark_cast(arrow_cast(0, 'Int16'), 'timestamp'); ---- 1970-01-01T00:00:00 query P -SELECT spark_cast(arrow_cast(3600, 'Int16')); +SELECT spark_cast(arrow_cast(3600, 'Int16'), 'timestamp'); ---- 1970-01-01T01:00:00 # Test spark_cast from int32 to timestamp query P -SELECT spark_cast(arrow_cast(0, 'Int32')); +SELECT spark_cast(arrow_cast(0, 'Int32'), 'timestamp'); ---- 1970-01-01T00:00:00 query P -SELECT spark_cast(arrow_cast(1704067200, 'Int32')); +SELECT spark_cast(arrow_cast(1704067200, 'Int32'), 'timestamp'); ---- 2024-01-01T00:00:00 # Test spark_cast from int64 to timestamp query P -SELECT spark_cast(0::bigint); +SELECT spark_cast(0::bigint, 'timestamp'); ---- 1970-01-01T00:00:00 query P -SELECT spark_cast(1704067200::bigint); +SELECT spark_cast(1704067200::bigint, 'timestamp'); ---- 2024-01-01T00:00:00 # Test NULL handling query P -SELECT spark_cast(arrow_cast(NULL, 'Int8')); +SELECT spark_cast(arrow_cast(NULL, 'Int8'), 'timestamp'); ---- NULL query P -SELECT spark_cast(arrow_cast(NULL, 'Int16')); +SELECT spark_cast(arrow_cast(NULL, 'Int16'), 'timestamp'); ---- NULL query P -SELECT spark_cast(arrow_cast(NULL, 'Int32')); +SELECT spark_cast(arrow_cast(NULL, 'Int32'), 'timestamp'); ---- NULL query P -SELECT spark_cast(NULL::bigint); +SELECT spark_cast(NULL::bigint, 'timestamp'); ---- NULL + +# Test untyped NULL +query P +SELECT spark_cast(NULL, 'timestamp'); +---- +NULL + +# Test Int8 boundary values +query P +SELECT spark_cast(arrow_cast(127, 'Int8'), 'timestamp'); +---- +1970-01-01T00:02:07 + +query P +SELECT spark_cast(arrow_cast(-128, 'Int8'), 'timestamp'); +---- +1969-12-31T23:57:52 + +# Test Int16 boundary values +query P +SELECT spark_cast(arrow_cast(32767, 'Int16'), 'timestamp'); +---- +1970-01-01T09:06:07 + +query P +SELECT spark_cast(arrow_cast(-32768, 'Int16'), 'timestamp'); +---- +1969-12-31T14:53:52 + +# Test Int64 negative value +query P +SELECT spark_cast(-86400::bigint, 'timestamp'); +---- +1969-12-31T00:00:00 + +# Test unsupported source type - should error +statement error +SELECT spark_cast('2024-01-01', 'timestamp'); + +# Test unsupported target type - should error +statement error +SELECT spark_cast(100, 'string'); From 476e7872cfd55a3257218c4e11e2ed67a160cb90 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Wed, 4 Mar 2026 23:54:26 -0800 Subject: [PATCH 3/7] setup_spark_based_cast_tests --- .../spark/src/function/conversion/cast.rs | 174 ++++++++++++++---- 1 file changed, 141 insertions(+), 33 deletions(-) diff --git a/datafusion/spark/src/function/conversion/cast.rs b/datafusion/spark/src/function/conversion/cast.rs index eb7da92f937de..59992a44fa304 100644 --- a/datafusion/spark/src/function/conversion/cast.rs +++ b/datafusion/spark/src/function/conversion/cast.rs @@ -33,9 +33,14 @@ use std::sync::Arc; const MICROS_PER_SECOND: i64 = 1_000_000; -/// Spark-compatible `cast` function for type conversions. +/// Convert seconds to microseconds with saturating overflow behavior +fn secs_to_micros(secs: impl Into) -> i64 { + secs.into().saturating_mul(MICROS_PER_SECOND) +} + +/// Spark-compatible `cast` function for type conversions /// -/// This implements Spark's CAST expression with a target type parameter. +/// This implements Spark's CAST expression with a target type parameter /// /// # Usage /// ```sql @@ -46,11 +51,11 @@ const MICROS_PER_SECOND: i64 = 1_000_000; /// - Int8/Int16/Int32/Int64 -> Timestamp (target_type = 'timestamp') /// /// The integer value is interpreted as seconds since the Unix epoch (1970-01-01 00:00:00 UTC) -/// and converted to a timestamp with microsecond precision. +/// and converted to a timestamp with microsecond precision (matches spark's spec) /// /// # Overflow behavior /// Uses saturating multiplication to handle overflow - values that would overflow -/// i64 when multiplied by 1,000,000 are clamped to i64::MAX or i64::MIN. +/// i64 when multiplied by 1,000,000 are clamped to i64::MAX or i64::MIN /// /// # References /// - @@ -81,7 +86,7 @@ impl SparkCast { /// Parse target type string into a DataType fn parse_target_type(type_str: &str) -> DataFusionResult { match type_str.to_lowercase().as_str() { - // could add further data type support in future + // further data type support in future "timestamp" => Ok(DataType::Timestamp(TimeUnit::Microsecond, None)), other => exec_err!( "Unsupported spark_cast target type '{}'. Supported types: timestamp", @@ -108,6 +113,7 @@ fn get_target_type_from_scalar_args( fn cast_int_to_timestamp( array: &ArrayRef, + timezone: Option>, ) -> DataFusionResult where T::Native: Into, @@ -120,12 +126,12 @@ where builder.append_null(); } else { // spark saturates to i64 min/max - let micros = (arr.value(i).into()).saturating_mul(MICROS_PER_SECOND); + let micros = secs_to_micros(arr.value(i).into()); builder.append_value(micros); } } - Ok(Arc::new(builder.finish())) + Ok(Arc::new(builder.finish().with_timezone_opt(timezone))) } impl ScalarUDFImpl for SparkCast { @@ -159,35 +165,47 @@ impl ScalarUDFImpl for SparkCast { args: ScalarFunctionArgs, ) -> DataFusionResult { let target_type = args.return_field.data_type(); + // Use session timezone, fallback to UTC if not set + let session_tz: Arc = args + .config_options + .execution + .time_zone + .clone() + .map(|s| Arc::from(s.as_str())) + .unwrap_or_else(|| Arc::from("UTC")); match target_type { DataType::Timestamp(TimeUnit::Microsecond, None) => { - cast_to_timestamp(&args.args[0]) + cast_to_timestamp(&args.args[0], Some(session_tz)) } other => exec_err!("Unsupported spark_cast target type: {:?}", other), } } } -/// Cast value to timestamp -fn cast_to_timestamp(input: &ColumnarValue) -> DataFusionResult { +/// Cast value to timestamp internal function +fn cast_to_timestamp( + input: &ColumnarValue, + timezone: Option>, +) -> DataFusionResult { match input { ColumnarValue::Array(array) => match array.data_type() { DataType::Null => Ok(ColumnarValue::Array(Arc::new( - arrow::array::TimestampMicrosecondArray::new_null(array.len()), + arrow::array::TimestampMicrosecondArray::new_null(array.len()) + .with_timezone_opt(timezone), ))), DataType::Int8 => Ok(ColumnarValue::Array( - cast_int_to_timestamp::(array)?, + cast_int_to_timestamp::(array, timezone)?, )), DataType::Int16 => Ok(ColumnarValue::Array(cast_int_to_timestamp::< Int16Type, - >(array)?)), + >(array, timezone)?)), DataType::Int32 => Ok(ColumnarValue::Array(cast_int_to_timestamp::< Int32Type, - >(array)?)), + >(array, timezone)?)), DataType::Int64 => Ok(ColumnarValue::Array(cast_int_to_timestamp::< Int64Type, - >(array)?)), + >(array, timezone)?)), other => exec_err!("Unsupported cast from {:?} to timestamp", other), }, ColumnarValue::Scalar(scalar) => { @@ -197,22 +215,16 @@ fn cast_to_timestamp(input: &ColumnarValue) -> DataFusionResult { | ScalarValue::Int16(None) | ScalarValue::Int32(None) | ScalarValue::Int64(None) => None, - ScalarValue::Int8(Some(v)) => { - Some((*v as i64).saturating_mul(MICROS_PER_SECOND)) - } - ScalarValue::Int16(Some(v)) => { - Some((*v as i64).saturating_mul(MICROS_PER_SECOND)) - } - ScalarValue::Int32(Some(v)) => { - Some((*v as i64).saturating_mul(MICROS_PER_SECOND)) - } - ScalarValue::Int64(Some(v)) => Some(v.saturating_mul(MICROS_PER_SECOND)), + ScalarValue::Int8(Some(v)) => Some(secs_to_micros(*v)), + ScalarValue::Int16(Some(v)) => Some(secs_to_micros(*v)), + ScalarValue::Int32(Some(v)) => Some(secs_to_micros(*v)), + ScalarValue::Int64(Some(v)) => Some(secs_to_micros(*v)), other => { return exec_err!("Unsupported cast from {:?} to timestamp", other); } }; Ok(ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond( - micros, None, + micros, timezone, ))) } } @@ -224,12 +236,25 @@ mod tests { use arrow::array::{Int8Array, Int16Array, Int32Array, Int64Array}; use arrow::datatypes::TimestampMicrosecondType; + // helpers to make testing easier fn make_args(input: ColumnarValue, target_type: &str) -> ScalarFunctionArgs { + make_args_with_timezone(input, target_type, None) + } + + fn make_args_with_timezone( + input: ColumnarValue, + target_type: &str, + timezone: Option<&str>, + ) -> ScalarFunctionArgs { let return_field = Arc::new(Field::new( "result", DataType::Timestamp(TimeUnit::Microsecond, None), true, )); + let mut config = datafusion_common::config::ConfigOptions::default(); + if let Some(tz) = timezone { + config.execution.time_zone = Some(tz.to_string()); + } ScalarFunctionArgs { args: vec![ input, @@ -238,24 +263,46 @@ mod tests { arg_fields: vec![], number_rows: 0, return_field, - config_options: Arc::new(Default::default()), + config_options: Arc::new(config), } } fn assert_scalar_timestamp(result: ColumnarValue, expected: i64) { + assert_scalar_timestamp_with_tz(result, expected, "UTC"); + } + + fn assert_scalar_timestamp_with_tz( + result: ColumnarValue, + expected: i64, + expected_tz: &str, + ) { match result { - ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(val), None)) => { + ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond( + Some(val), + Some(tz), + )) => { assert_eq!(val, expected); + assert_eq!(tz.as_ref(), expected_tz); + } + _ => { + panic!( + "Expected scalar timestamp with value {expected} and {expected_tz} timezone" + ) } - _ => panic!("Expected scalar timestamp with value {expected}"), } } fn assert_scalar_null(result: ColumnarValue) { - assert!(matches!( - result, - ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(None, None)) - )); + assert_scalar_null_with_tz(result, "UTC"); + } + + fn assert_scalar_null_with_tz(result: ColumnarValue, expected_tz: &str) { + match result { + ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(None, Some(tz))) => { + assert_eq!(tz.as_ref(), expected_tz); + } + _ => panic!("Expected null scalar timestamp with {expected_tz} timezone"), + } } #[test] @@ -521,4 +568,65 @@ mod tests { _ => panic!("Expected array result"), } } + + #[test] + fn test_cast_int_to_timestamp_with_timezones() { + // Test with various timezones like Comet does + let timezones = [ + "UTC", + "America/New_York", + "America/Los_Angeles", + "Europe/London", + "Asia/Tokyo", + "Australia/Sydney", + ]; + + let cast = SparkCast::new(); + let test_value: i64 = 1704067200; // 2024-01-01 00:00:00 UTC + let expected_micros = test_value * MICROS_PER_SECOND; + + for tz in timezones { + // scalar + let args = make_args_with_timezone( + ColumnarValue::Scalar(ScalarValue::Int64(Some(test_value))), + "timestamp", + Some(tz), + ); + let result = cast.invoke_with_args(args).unwrap(); + assert_scalar_timestamp_with_tz(result, expected_micros, tz); + + // array input + let array: ArrayRef = + Arc::new(Int64Array::from(vec![Some(test_value), None])); + let args = make_args_with_timezone( + ColumnarValue::Array(array), + "timestamp", + Some(tz), + ); + let result = cast.invoke_with_args(args).unwrap(); + + match result { + ColumnarValue::Array(result_array) => { + let ts_array = + result_array.as_primitive::(); + assert_eq!(ts_array.value(0), expected_micros); + assert!(ts_array.is_null(1)); + assert_eq!(ts_array.timezone(), Some(tz)); + } + _ => panic!("Expected array result for timezone {tz}"), + } + } + } + + #[test] + fn test_cast_int_to_timestamp_default_timezone() { + let cast = SparkCast::new(); + let args = make_args( + ColumnarValue::Scalar(ScalarValue::Int64(Some(0))), + "timestamp", + ); + let result = cast.invoke_with_args(args).unwrap(); + // Defaults to UTC + assert_scalar_timestamp_with_tz(result, 0, "UTC"); + } } From 91f202a8ea57f55c881466aba49f4dc0d92ee269 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Thu, 5 Mar 2026 00:01:28 -0800 Subject: [PATCH 4/7] setup_spark_based_cast_tests --- datafusion/spark/src/function/conversion/cast.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/datafusion/spark/src/function/conversion/cast.rs b/datafusion/spark/src/function/conversion/cast.rs index 59992a44fa304..8de0c60d1d2f6 100644 --- a/datafusion/spark/src/function/conversion/cast.rs +++ b/datafusion/spark/src/function/conversion/cast.rs @@ -34,8 +34,9 @@ use std::sync::Arc; const MICROS_PER_SECOND: i64 = 1_000_000; /// Convert seconds to microseconds with saturating overflow behavior -fn secs_to_micros(secs: impl Into) -> i64 { - secs.into().saturating_mul(MICROS_PER_SECOND) +#[inline] +fn secs_to_micros(secs: i64) -> i64 { + secs.saturating_mul(MICROS_PER_SECOND) } /// Spark-compatible `cast` function for type conversions @@ -215,9 +216,9 @@ fn cast_to_timestamp( | ScalarValue::Int16(None) | ScalarValue::Int32(None) | ScalarValue::Int64(None) => None, - ScalarValue::Int8(Some(v)) => Some(secs_to_micros(*v)), - ScalarValue::Int16(Some(v)) => Some(secs_to_micros(*v)), - ScalarValue::Int32(Some(v)) => Some(secs_to_micros(*v)), + ScalarValue::Int8(Some(v)) => Some(secs_to_micros((*v).into())), + ScalarValue::Int16(Some(v)) => Some(secs_to_micros((*v).into())), + ScalarValue::Int32(Some(v)) => Some(secs_to_micros((*v).into())), ScalarValue::Int64(Some(v)) => Some(secs_to_micros(*v)), other => { return exec_err!("Unsupported cast from {:?} to timestamp", other); From 228a24ee91b508ccd9ee722a60a67c519db3d772 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Mon, 9 Mar 2026 08:46:25 -0700 Subject: [PATCH 5/7] schema_changes_utils --- .../spark/src/function/conversion/cast.rs | 29 +++++++------------ 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/datafusion/spark/src/function/conversion/cast.rs b/datafusion/spark/src/function/conversion/cast.rs index 8de0c60d1d2f6..b0b371abc6e7e 100644 --- a/datafusion/spark/src/function/conversion/cast.rs +++ b/datafusion/spark/src/function/conversion/cast.rs @@ -24,12 +24,12 @@ use datafusion_common::utils::take_function_args; use datafusion_common::{ Result as DataFusionResult, ScalarValue, exec_err, internal_err, }; -use datafusion_expr::{ - ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature, - TypeSignature, Volatility, -}; +use datafusion_expr::{ColumnarValue, Expr, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility}; use std::any::Any; use std::sync::Arc; +use datafusion::logical_expr::{Coercion, TypeSignatureClass}; +use datafusion_common::types::{logical_int64, logical_string}; +use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyContext}; const MICROS_PER_SECOND: i64 = 1_000_000; @@ -73,12 +73,14 @@ impl Default for SparkCast { impl SparkCast { pub fn new() -> Self { + // First arg: value to cast (only ints for now with potential to add further support later) + // Second arg: target datatype as Utf8 string literal (ex : 'timestamp') + let int_arg = Coercion::new_exact(TypeSignatureClass::Native(logical_int64())); + let string_arg = Coercion::new_exact(TypeSignatureClass::Native(logical_string())); Self { - // First arg: value to cast (only ints for now with potential to add further support later) - // Second arg: target datatype as Utf8 string literal (ex : 'timestamp') signature: Signature::one_of( - vec![TypeSignature::Any(2)], - Volatility::Immutable, + vec![TypeSignature::Coercible(vec![int_arg, string_arg])], + Volatility::Stable, ), } } @@ -152,15 +154,6 @@ impl ScalarUDFImpl for SparkCast { internal_err!("return_field_from_args should be used instead") } - fn return_field_from_args( - &self, - args: ReturnFieldArgs, - ) -> DataFusionResult { - let nullable = args.arg_fields.iter().any(|f| f.is_nullable()); - let target_type = get_target_type_from_scalar_args(args.scalar_arguments)?; - Ok(Arc::new(Field::new(self.name(), target_type, nullable))) - } - fn invoke_with_args( &self, args: ScalarFunctionArgs, @@ -176,7 +169,7 @@ impl ScalarUDFImpl for SparkCast { .unwrap_or_else(|| Arc::from("UTC")); match target_type { - DataType::Timestamp(TimeUnit::Microsecond, None) => { + DataType::Timestamp(TimeUnit::Microsecond, _) => { cast_to_timestamp(&args.args[0], Some(session_tz)) } other => exec_err!("Unsupported spark_cast target type: {:?}", other), From 532377eec7c52bc1c07d9ac480f7bcd0ca5a28c2 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Mon, 9 Mar 2026 15:22:08 -0700 Subject: [PATCH 6/7] fix_review_comments --- .../spark/src/function/conversion/cast.rs | 94 +++++++++++++------ .../conversion/cast_int_to_timestamp.slt | 94 ++++++++++++++++--- 2 files changed, 145 insertions(+), 43 deletions(-) diff --git a/datafusion/spark/src/function/conversion/cast.rs b/datafusion/spark/src/function/conversion/cast.rs index b0b371abc6e7e..b97abb8d1ab2a 100644 --- a/datafusion/spark/src/function/conversion/cast.rs +++ b/datafusion/spark/src/function/conversion/cast.rs @@ -20,20 +20,23 @@ use arrow::datatypes::{ ArrowPrimitiveType, DataType, Field, FieldRef, Int8Type, Int16Type, Int32Type, Int64Type, TimeUnit, }; -use datafusion_common::utils::take_function_args; +use datafusion::logical_expr::{Coercion, TypeSignatureClass}; +use datafusion_common::config::ConfigOptions; +use datafusion_common::types::logical_string; use datafusion_common::{ Result as DataFusionResult, ScalarValue, exec_err, internal_err, }; -use datafusion_expr::{ColumnarValue, Expr, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility}; +use datafusion_expr::TypeSignatureClass::Integer; +use datafusion_expr::{ + ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, + Signature, TypeSignature, Volatility, +}; use std::any::Any; use std::sync::Arc; -use datafusion::logical_expr::{Coercion, TypeSignatureClass}; -use datafusion_common::types::{logical_int64, logical_string}; -use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyContext}; const MICROS_PER_SECOND: i64 = 1_000_000; -/// Convert seconds to microseconds with saturating overflow behavior +/// Convert seconds to microseconds with saturating overflow behavior (matches spark spec) #[inline] fn secs_to_micros(secs: i64) -> i64 { secs.saturating_mul(MICROS_PER_SECOND) @@ -63,6 +66,7 @@ fn secs_to_micros(secs: i64) -> i64 { #[derive(Debug, PartialEq, Eq, Hash)] pub struct SparkCast { signature: Signature, + timezone: Option>, } impl Default for SparkCast { @@ -73,24 +77,45 @@ impl Default for SparkCast { impl SparkCast { pub fn new() -> Self { + Self::new_with_config(&ConfigOptions::default()) + } + + pub fn new_with_config(config: &ConfigOptions) -> Self { // First arg: value to cast (only ints for now with potential to add further support later) // Second arg: target datatype as Utf8 string literal (ex : 'timestamp') - let int_arg = Coercion::new_exact(TypeSignatureClass::Native(logical_int64())); - let string_arg = Coercion::new_exact(TypeSignatureClass::Native(logical_string())); + let int_arg = Coercion::new_exact(Integer); + let string_arg = + Coercion::new_exact(TypeSignatureClass::Native(logical_string())); Self { signature: Signature::one_of( - vec![TypeSignature::Coercible(vec![int_arg, string_arg])], + vec![ + TypeSignature::Coercible(vec![int_arg.clone(), string_arg.clone()]), + TypeSignature::Coercible(vec![ + int_arg, + string_arg.clone(), + string_arg, + ]), + ], Volatility::Stable, ), + timezone: config + .execution + .time_zone + .as_ref() + .map(|tz| Arc::from(tz.as_str())) + .or_else(|| Some(Arc::from("UTC"))), } } } /// Parse target type string into a DataType -fn parse_target_type(type_str: &str) -> DataFusionResult { +fn parse_target_type( + type_str: &str, + timezone: Option>, +) -> DataFusionResult { match type_str.to_lowercase().as_str() { // further data type support in future - "timestamp" => Ok(DataType::Timestamp(TimeUnit::Microsecond, None)), + "timestamp" => Ok(DataType::Timestamp(TimeUnit::Microsecond, timezone)), other => exec_err!( "Unsupported spark_cast target type '{}'. Supported types: timestamp", other @@ -101,13 +126,14 @@ fn parse_target_type(type_str: &str) -> DataFusionResult { /// Extract target type string from scalar arguments fn get_target_type_from_scalar_args( scalar_args: &[Option<&ScalarValue>], + timezone: Option>, ) -> DataFusionResult { - let [_, type_arg] = take_function_args("spark_cast", scalar_args)?; + let type_arg = scalar_args.get(1).and_then(|opt| *opt); match type_arg { - Some(ScalarValue::Utf8(Some(s))) | Some(ScalarValue::LargeUtf8(Some(s))) => { - parse_target_type(s) - } + Some(ScalarValue::Utf8(Some(s))) + | Some(ScalarValue::LargeUtf8(Some(s))) + | Some(ScalarValue::Utf8View(Some(s))) => parse_target_type(s, timezone), _ => exec_err!( "spark_cast requires second argument to be a string of target data type ex: timestamp" ), @@ -154,23 +180,30 @@ impl ScalarUDFImpl for SparkCast { internal_err!("return_field_from_args should be used instead") } + fn with_updated_config(&self, config: &ConfigOptions) -> Option { + Some(ScalarUDF::from(Self::new_with_config(config))) + } + + fn return_field_from_args( + &self, + args: ReturnFieldArgs, + ) -> DataFusionResult { + let nullable = args.arg_fields.iter().any(|f| f.is_nullable()); + let return_type = get_target_type_from_scalar_args( + args.scalar_arguments, + self.timezone.clone(), + )?; + Ok(Arc::new(Field::new(self.name(), return_type, nullable))) + } + fn invoke_with_args( &self, args: ScalarFunctionArgs, ) -> DataFusionResult { let target_type = args.return_field.data_type(); - // Use session timezone, fallback to UTC if not set - let session_tz: Arc = args - .config_options - .execution - .time_zone - .clone() - .map(|s| Arc::from(s.as_str())) - .unwrap_or_else(|| Arc::from("UTC")); - match target_type { - DataType::Timestamp(TimeUnit::Microsecond, _) => { - cast_to_timestamp(&args.args[0], Some(session_tz)) + DataType::Timestamp(TimeUnit::Microsecond, tz) => { + cast_to_timestamp(&args.args[0], tz.clone()) } other => exec_err!("Unsupported spark_cast target type: {:?}", other), } @@ -232,7 +265,7 @@ mod tests { // helpers to make testing easier fn make_args(input: ColumnarValue, target_type: &str) -> ScalarFunctionArgs { - make_args_with_timezone(input, target_type, None) + make_args_with_timezone(input, target_type, Some("UTC")) } fn make_args_with_timezone( @@ -242,10 +275,13 @@ mod tests { ) -> ScalarFunctionArgs { let return_field = Arc::new(Field::new( "result", - DataType::Timestamp(TimeUnit::Microsecond, None), + DataType::Timestamp( + TimeUnit::Microsecond, + Some(Arc::from(timezone.unwrap())), + ), true, )); - let mut config = datafusion_common::config::ConfigOptions::default(); + let mut config = ConfigOptions::default(); if let Some(tz) = timezone { config.execution.time_zone = Some(tz.to_string()); } diff --git a/datafusion/sqllogictest/test_files/spark/conversion/cast_int_to_timestamp.slt b/datafusion/sqllogictest/test_files/spark/conversion/cast_int_to_timestamp.slt index 3dce64511335b..305417b5770db 100644 --- a/datafusion/sqllogictest/test_files/spark/conversion/cast_int_to_timestamp.slt +++ b/datafusion/sqllogictest/test_files/spark/conversion/cast_int_to_timestamp.slt @@ -19,50 +19,50 @@ query P SELECT spark_cast(arrow_cast(0, 'Int8'), 'timestamp'); ---- -1970-01-01T00:00:00 +1970-01-01T00:00:00Z query P SELECT spark_cast(arrow_cast(1, 'Int8'), 'timestamp'); ---- -1970-01-01T00:00:01 +1970-01-01T00:00:01Z query P SELECT spark_cast(arrow_cast(-1, 'Int8'), 'timestamp'); ---- -1969-12-31T23:59:59 +1969-12-31T23:59:59Z # Test spark_cast from int16 to timestamp query P SELECT spark_cast(arrow_cast(0, 'Int16'), 'timestamp'); ---- -1970-01-01T00:00:00 +1970-01-01T00:00:00Z query P SELECT spark_cast(arrow_cast(3600, 'Int16'), 'timestamp'); ---- -1970-01-01T01:00:00 +1970-01-01T01:00:00Z # Test spark_cast from int32 to timestamp query P SELECT spark_cast(arrow_cast(0, 'Int32'), 'timestamp'); ---- -1970-01-01T00:00:00 +1970-01-01T00:00:00Z query P SELECT spark_cast(arrow_cast(1704067200, 'Int32'), 'timestamp'); ---- -2024-01-01T00:00:00 +2024-01-01T00:00:00Z # Test spark_cast from int64 to timestamp query P SELECT spark_cast(0::bigint, 'timestamp'); ---- -1970-01-01T00:00:00 +1970-01-01T00:00:00Z query P SELECT spark_cast(1704067200::bigint, 'timestamp'); ---- -2024-01-01T00:00:00 +2024-01-01T00:00:00Z # Test NULL handling query P @@ -95,29 +95,29 @@ NULL query P SELECT spark_cast(arrow_cast(127, 'Int8'), 'timestamp'); ---- -1970-01-01T00:02:07 +1970-01-01T00:02:07Z query P SELECT spark_cast(arrow_cast(-128, 'Int8'), 'timestamp'); ---- -1969-12-31T23:57:52 +1969-12-31T23:57:52Z # Test Int16 boundary values query P SELECT spark_cast(arrow_cast(32767, 'Int16'), 'timestamp'); ---- -1970-01-01T09:06:07 +1970-01-01T09:06:07Z query P SELECT spark_cast(arrow_cast(-32768, 'Int16'), 'timestamp'); ---- -1969-12-31T14:53:52 +1969-12-31T14:53:52Z # Test Int64 negative value query P SELECT spark_cast(-86400::bigint, 'timestamp'); ---- -1969-12-31T00:00:00 +1969-12-31T00:00:00Z # Test unsupported source type - should error statement error @@ -126,3 +126,69 @@ SELECT spark_cast('2024-01-01', 'timestamp'); # Test unsupported target type - should error statement error SELECT spark_cast(100, 'string'); + +# Test with different session timezones to verify simplify() picks up config + +# America/Los_Angeles (PST/PDT - has DST) +statement ok +SET datafusion.execution.time_zone = 'America/Los_Angeles'; + +# Epoch in PST (UTC-8) +query P +SELECT spark_cast(0::bigint, 'timestamp'); +---- +1969-12-31T16:00:00-08:00 + +# 2024-01-01 00:00:00 UTC in PST (winter, UTC-8) +query P +SELECT spark_cast(1704067200::bigint, 'timestamp'); +---- +2023-12-31T16:00:00-08:00 + +# America/Phoenix (MST - no DST, always UTC-7) +statement ok +SET datafusion.execution.time_zone = 'America/Phoenix'; + +# Epoch in Phoenix (UTC-7) +query P +SELECT spark_cast(0::bigint, 'timestamp'); +---- +1969-12-31T17:00:00-07:00 + +# 2024-01-01 00:00:00 UTC in Phoenix (still UTC-7, no DST) +query P +SELECT spark_cast(1704067200::bigint, 'timestamp'); +---- +2023-12-31T17:00:00-07:00 + +# Test with different timezones - LA (has DST) +statement ok +SET datafusion.execution.time_zone = 'America/Los_Angeles'; + +query P +SELECT spark_cast(1710054000::bigint, 'timestamp'); +---- +2024-03-09T23:00:00-08:00 + +query P +SELECT spark_cast(1710057600::bigint, 'timestamp'); +---- +2024-03-10T00:00:00-08:00 + +# Phoenix has no DST - always UTC-7 +statement ok +SET datafusion.execution.time_zone = 'America/Phoenix'; + +query P +SELECT spark_cast(1710054000::bigint, 'timestamp'); +---- +2024-03-10T00:00:00-07:00 + +query P +SELECT spark_cast(1710057600::bigint, 'timestamp'); +---- +2024-03-10T01:00:00-07:00 + +# Reset to default UTC +statement ok +SET datafusion.execution.time_zone = 'UTC'; From 665d5937328c597abaec0736785fe25bd7a126ff Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Mon, 9 Mar 2026 15:30:18 -0700 Subject: [PATCH 7/7] fix_review_comments --- .../spark/src/function/conversion/cast.rs | 61 +++++++++---------- .../spark/src/function/conversion/mod.rs | 10 +-- .../conversion/cast_int_to_timestamp.slt | 58 ++++++++++++++++++ 3 files changed, 93 insertions(+), 36 deletions(-) diff --git a/datafusion/spark/src/function/conversion/cast.rs b/datafusion/spark/src/function/conversion/cast.rs index b97abb8d1ab2a..cd6b7c7d3331d 100644 --- a/datafusion/spark/src/function/conversion/cast.rs +++ b/datafusion/spark/src/function/conversion/cast.rs @@ -20,13 +20,12 @@ use arrow::datatypes::{ ArrowPrimitiveType, DataType, Field, FieldRef, Int8Type, Int16Type, Int32Type, Int64Type, TimeUnit, }; -use datafusion::logical_expr::{Coercion, TypeSignatureClass}; use datafusion_common::config::ConfigOptions; -use datafusion_common::types::logical_string; -use datafusion_common::{ - Result as DataFusionResult, ScalarValue, exec_err, internal_err, +use datafusion_common::types::{ + logical_int8, logical_int16, logical_int32, logical_int64, logical_string, }; -use datafusion_expr::TypeSignatureClass::Integer; +use datafusion_common::{Result, ScalarValue, exec_err, internal_err}; +use datafusion_expr::{Coercion, TypeSignatureClass}; use datafusion_expr::{ ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, TypeSignature, Volatility, @@ -81,21 +80,28 @@ impl SparkCast { } pub fn new_with_config(config: &ConfigOptions) -> Self { - // First arg: value to cast (only ints for now with potential to add further support later) + // First arg: value to cast (only signed ints - Spark doesn't have unsigned integers) // Second arg: target datatype as Utf8 string literal (ex : 'timestamp') - let int_arg = Coercion::new_exact(Integer); let string_arg = Coercion::new_exact(TypeSignatureClass::Native(logical_string())); + + // Spark only supports signed integers, so we explicitly list them + let signed_int_signatures = [ + logical_int8(), + logical_int16(), + logical_int32(), + logical_int64(), + ] + .map(|int_type| { + TypeSignature::Coercible(vec![ + Coercion::new_exact(TypeSignatureClass::Native(int_type)), + string_arg.clone(), + ]) + }); + Self { - signature: Signature::one_of( - vec![ - TypeSignature::Coercible(vec![int_arg.clone(), string_arg.clone()]), - TypeSignature::Coercible(vec![ - int_arg, - string_arg.clone(), - string_arg, - ]), - ], + signature: Signature::new( + TypeSignature::OneOf(Vec::from(signed_int_signatures)), Volatility::Stable, ), timezone: config @@ -109,10 +115,7 @@ impl SparkCast { } /// Parse target type string into a DataType -fn parse_target_type( - type_str: &str, - timezone: Option>, -) -> DataFusionResult { +fn parse_target_type(type_str: &str, timezone: Option>) -> Result { match type_str.to_lowercase().as_str() { // further data type support in future "timestamp" => Ok(DataType::Timestamp(TimeUnit::Microsecond, timezone)), @@ -127,7 +130,7 @@ fn parse_target_type( fn get_target_type_from_scalar_args( scalar_args: &[Option<&ScalarValue>], timezone: Option>, -) -> DataFusionResult { +) -> Result { let type_arg = scalar_args.get(1).and_then(|opt| *opt); match type_arg { @@ -143,7 +146,7 @@ fn get_target_type_from_scalar_args( fn cast_int_to_timestamp( array: &ArrayRef, timezone: Option>, -) -> DataFusionResult +) -> Result where T::Native: Into, { @@ -176,7 +179,7 @@ impl ScalarUDFImpl for SparkCast { &self.signature } - fn return_type(&self, _arg_types: &[DataType]) -> DataFusionResult { + fn return_type(&self, _arg_types: &[DataType]) -> Result { internal_err!("return_field_from_args should be used instead") } @@ -184,10 +187,7 @@ impl ScalarUDFImpl for SparkCast { Some(ScalarUDF::from(Self::new_with_config(config))) } - fn return_field_from_args( - &self, - args: ReturnFieldArgs, - ) -> DataFusionResult { + fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result { let nullable = args.arg_fields.iter().any(|f| f.is_nullable()); let return_type = get_target_type_from_scalar_args( args.scalar_arguments, @@ -196,10 +196,7 @@ impl ScalarUDFImpl for SparkCast { Ok(Arc::new(Field::new(self.name(), return_type, nullable))) } - fn invoke_with_args( - &self, - args: ScalarFunctionArgs, - ) -> DataFusionResult { + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { let target_type = args.return_field.data_type(); match target_type { DataType::Timestamp(TimeUnit::Microsecond, tz) => { @@ -214,7 +211,7 @@ impl ScalarUDFImpl for SparkCast { fn cast_to_timestamp( input: &ColumnarValue, timezone: Option>, -) -> DataFusionResult { +) -> Result { match input { ColumnarValue::Array(array) => match array.data_type() { DataType::Null => Ok(ColumnarValue::Array(Arc::new( diff --git a/datafusion/spark/src/function/conversion/mod.rs b/datafusion/spark/src/function/conversion/mod.rs index 06b08500379c9..e8a89fa8c0616 100644 --- a/datafusion/spark/src/function/conversion/mod.rs +++ b/datafusion/spark/src/function/conversion/mod.rs @@ -18,10 +18,10 @@ mod cast; use datafusion_expr::ScalarUDF; -use datafusion_functions::make_udf_function; +use datafusion_functions::make_udf_function_with_config; use std::sync::Arc; -make_udf_function!(cast::SparkCast, spark_cast); +make_udf_function_with_config!(cast::SparkCast, spark_cast); pub mod expr_fn { use datafusion_functions::export_functions; @@ -29,10 +29,12 @@ pub mod expr_fn { export_functions!(( spark_cast, "Casts given value to the specified type following Spark-compatible semantics", - arg1 arg2 + @config arg1 arg2 )); } pub fn functions() -> Vec> { - vec![spark_cast()] + use datafusion_common::config::ConfigOptions; + let config = ConfigOptions::default(); + vec![spark_cast(&config)] } diff --git a/datafusion/sqllogictest/test_files/spark/conversion/cast_int_to_timestamp.slt b/datafusion/sqllogictest/test_files/spark/conversion/cast_int_to_timestamp.slt index 305417b5770db..8b775337297a8 100644 --- a/datafusion/sqllogictest/test_files/spark/conversion/cast_int_to_timestamp.slt +++ b/datafusion/sqllogictest/test_files/spark/conversion/cast_int_to_timestamp.slt @@ -192,3 +192,61 @@ SELECT spark_cast(1710057600::bigint, 'timestamp'); # Reset to default UTC statement ok SET datafusion.execution.time_zone = 'UTC'; + +############################# +# Array Tests +############################# + +# Create test table with 4 int columns: null, min, max, regular value +statement ok +CREATE TABLE int_test AS SELECT + arrow_cast(column1, 'Int8') as i8_col, + arrow_cast(column2, 'Int16') as i16_col, + arrow_cast(column3, 'Int32') as i32_col, + column4::bigint as i64_col +FROM (VALUES + (NULL, NULL, NULL, NULL), + (-128, -32768, -2147483648, -86400), + (127, 32767, 2147483647, 86400), + (100, 3600, 1710054000, 1710054000) +); + +# Test in UTC +query PPPP +SELECT spark_cast(i8_col, 'timestamp'), spark_cast(i16_col, 'timestamp'), spark_cast(i32_col, 'timestamp'), spark_cast(i64_col, 'timestamp') FROM int_test; +---- +NULL NULL NULL NULL +1969-12-31T23:57:52Z 1969-12-31T14:53:52Z 1901-12-13T20:45:52Z 1969-12-31T00:00:00Z +1970-01-01T00:02:07Z 1970-01-01T09:06:07Z 2038-01-19T03:14:07Z 1970-01-02T00:00:00Z +1970-01-01T00:01:40Z 1970-01-01T01:00:00Z 2024-03-10T07:00:00Z 2024-03-10T07:00:00Z + +# Test in America/Los_Angeles (PST - has DST) +statement ok +SET datafusion.execution.time_zone = 'America/Los_Angeles'; + +query PPPP +SELECT spark_cast(i8_col, 'timestamp'), spark_cast(i16_col, 'timestamp'), spark_cast(i32_col, 'timestamp'), spark_cast(i64_col, 'timestamp') FROM int_test; +---- +NULL NULL NULL NULL +1969-12-31T15:57:52-08:00 1969-12-31T06:53:52-08:00 1901-12-13T12:45:52-08:00 1969-12-30T16:00:00-08:00 +1969-12-31T16:02:07-08:00 1970-01-01T01:06:07-08:00 2038-01-18T19:14:07-08:00 1970-01-01T16:00:00-08:00 +1969-12-31T16:01:40-08:00 1969-12-31T17:00:00-08:00 2024-03-09T23:00:00-08:00 2024-03-09T23:00:00-08:00 + +# Test in America/Phoenix (MST - no DST, always UTC-7) +statement ok +SET datafusion.execution.time_zone = 'America/Phoenix'; + +query PPPP +SELECT spark_cast(i8_col, 'timestamp'), spark_cast(i16_col, 'timestamp'), spark_cast(i32_col, 'timestamp'), spark_cast(i64_col, 'timestamp') FROM int_test; +---- +NULL NULL NULL NULL +1969-12-31T16:57:52-07:00 1969-12-31T07:53:52-07:00 1901-12-13T13:45:52-07:00 1969-12-30T17:00:00-07:00 +1969-12-31T17:02:07-07:00 1970-01-01T02:06:07-07:00 2038-01-18T20:14:07-07:00 1970-01-01T17:00:00-07:00 +1969-12-31T17:01:40-07:00 1969-12-31T18:00:00-07:00 2024-03-10T00:00:00-07:00 2024-03-10T00:00:00-07:00 + +# Reset and cleanup +statement ok +SET datafusion.execution.time_zone = 'UTC'; + +statement ok +DROP TABLE int_test;