diff --git a/datafusion/spark/src/function/conversion/cast.rs b/datafusion/spark/src/function/conversion/cast.rs new file mode 100644 index 0000000000000..cd6b7c7d3331d --- /dev/null +++ b/datafusion/spark/src/function/conversion/cast.rs @@ -0,0 +1,659 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{Array, ArrayRef, AsArray, TimestampMicrosecondBuilder}; +use arrow::datatypes::{ + ArrowPrimitiveType, DataType, Field, FieldRef, Int8Type, Int16Type, Int32Type, + Int64Type, TimeUnit, +}; +use datafusion_common::config::ConfigOptions; +use datafusion_common::types::{ + logical_int8, logical_int16, logical_int32, logical_int64, logical_string, +}; +use datafusion_common::{Result, ScalarValue, exec_err, internal_err}; +use datafusion_expr::{Coercion, TypeSignatureClass}; +use datafusion_expr::{ + ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, + Signature, TypeSignature, Volatility, +}; +use std::any::Any; +use std::sync::Arc; + +const MICROS_PER_SECOND: i64 = 1_000_000; + +/// Convert seconds to microseconds with saturating overflow behavior (matches spark spec) +#[inline] +fn secs_to_micros(secs: i64) -> i64 { + secs.saturating_mul(MICROS_PER_SECOND) +} + +/// Spark-compatible `cast` function for type conversions +/// +/// This implements Spark's CAST expression with a target type parameter +/// +/// # Usage +/// ```sql +/// SELECT spark_cast(value, 'timestamp') +/// ``` +/// +/// # Currently supported conversions +/// - Int8/Int16/Int32/Int64 -> Timestamp (target_type = 'timestamp') +/// +/// The integer value is interpreted as seconds since the Unix epoch (1970-01-01 00:00:00 UTC) +/// and converted to a timestamp with microsecond precision (matches spark's spec) +/// +/// # Overflow behavior +/// Uses saturating multiplication to handle overflow - values that would overflow +/// i64 when multiplied by 1,000,000 are clamped to i64::MAX or i64::MIN +/// +/// # References +/// - +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkCast { + signature: Signature, + timezone: Option>, +} + +impl Default for SparkCast { + fn default() -> Self { + Self::new() + } +} + +impl SparkCast { + pub fn new() -> Self { + Self::new_with_config(&ConfigOptions::default()) + } + + pub fn new_with_config(config: &ConfigOptions) -> Self { + // First arg: value to cast (only signed ints - Spark doesn't have unsigned integers) + // Second arg: target datatype as Utf8 string literal (ex : 'timestamp') + let string_arg = + Coercion::new_exact(TypeSignatureClass::Native(logical_string())); + + // Spark only supports signed integers, so we explicitly list them + let signed_int_signatures = [ + logical_int8(), + logical_int16(), + logical_int32(), + logical_int64(), + ] + .map(|int_type| { + TypeSignature::Coercible(vec![ + Coercion::new_exact(TypeSignatureClass::Native(int_type)), + string_arg.clone(), + ]) + }); + + Self { + signature: Signature::new( + TypeSignature::OneOf(Vec::from(signed_int_signatures)), + Volatility::Stable, + ), + timezone: config + .execution + .time_zone + .as_ref() + .map(|tz| Arc::from(tz.as_str())) + .or_else(|| Some(Arc::from("UTC"))), + } + } +} + +/// Parse target type string into a DataType +fn parse_target_type(type_str: &str, timezone: Option>) -> Result { + match type_str.to_lowercase().as_str() { + // further data type support in future + "timestamp" => Ok(DataType::Timestamp(TimeUnit::Microsecond, timezone)), + other => exec_err!( + "Unsupported spark_cast target type '{}'. Supported types: timestamp", + other + ), + } +} + +/// Extract target type string from scalar arguments +fn get_target_type_from_scalar_args( + scalar_args: &[Option<&ScalarValue>], + timezone: Option>, +) -> Result { + let type_arg = scalar_args.get(1).and_then(|opt| *opt); + + match type_arg { + Some(ScalarValue::Utf8(Some(s))) + | Some(ScalarValue::LargeUtf8(Some(s))) + | Some(ScalarValue::Utf8View(Some(s))) => parse_target_type(s, timezone), + _ => exec_err!( + "spark_cast requires second argument to be a string of target data type ex: timestamp" + ), + } +} + +fn cast_int_to_timestamp( + array: &ArrayRef, + timezone: Option>, +) -> Result +where + T::Native: Into, +{ + let arr = array.as_primitive::(); + let mut builder = TimestampMicrosecondBuilder::with_capacity(arr.len()); + + for i in 0..arr.len() { + if arr.is_null(i) { + builder.append_null(); + } else { + // spark saturates to i64 min/max + let micros = secs_to_micros(arr.value(i).into()); + builder.append_value(micros); + } + } + + Ok(Arc::new(builder.finish().with_timezone_opt(timezone))) +} + +impl ScalarUDFImpl for SparkCast { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "spark_cast" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + internal_err!("return_field_from_args should be used instead") + } + + fn with_updated_config(&self, config: &ConfigOptions) -> Option { + Some(ScalarUDF::from(Self::new_with_config(config))) + } + + fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result { + let nullable = args.arg_fields.iter().any(|f| f.is_nullable()); + let return_type = get_target_type_from_scalar_args( + args.scalar_arguments, + self.timezone.clone(), + )?; + Ok(Arc::new(Field::new(self.name(), return_type, nullable))) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let target_type = args.return_field.data_type(); + match target_type { + DataType::Timestamp(TimeUnit::Microsecond, tz) => { + cast_to_timestamp(&args.args[0], tz.clone()) + } + other => exec_err!("Unsupported spark_cast target type: {:?}", other), + } + } +} + +/// Cast value to timestamp internal function +fn cast_to_timestamp( + input: &ColumnarValue, + timezone: Option>, +) -> Result { + match input { + ColumnarValue::Array(array) => match array.data_type() { + DataType::Null => Ok(ColumnarValue::Array(Arc::new( + arrow::array::TimestampMicrosecondArray::new_null(array.len()) + .with_timezone_opt(timezone), + ))), + DataType::Int8 => Ok(ColumnarValue::Array( + cast_int_to_timestamp::(array, timezone)?, + )), + DataType::Int16 => Ok(ColumnarValue::Array(cast_int_to_timestamp::< + Int16Type, + >(array, timezone)?)), + DataType::Int32 => Ok(ColumnarValue::Array(cast_int_to_timestamp::< + Int32Type, + >(array, timezone)?)), + DataType::Int64 => Ok(ColumnarValue::Array(cast_int_to_timestamp::< + Int64Type, + >(array, timezone)?)), + other => exec_err!("Unsupported cast from {:?} to timestamp", other), + }, + ColumnarValue::Scalar(scalar) => { + let micros = match scalar { + ScalarValue::Null + | ScalarValue::Int8(None) + | ScalarValue::Int16(None) + | ScalarValue::Int32(None) + | ScalarValue::Int64(None) => None, + ScalarValue::Int8(Some(v)) => Some(secs_to_micros((*v).into())), + ScalarValue::Int16(Some(v)) => Some(secs_to_micros((*v).into())), + ScalarValue::Int32(Some(v)) => Some(secs_to_micros((*v).into())), + ScalarValue::Int64(Some(v)) => Some(secs_to_micros(*v)), + other => { + return exec_err!("Unsupported cast from {:?} to timestamp", other); + } + }; + Ok(ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond( + micros, timezone, + ))) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::{Int8Array, Int16Array, Int32Array, Int64Array}; + use arrow::datatypes::TimestampMicrosecondType; + + // helpers to make testing easier + fn make_args(input: ColumnarValue, target_type: &str) -> ScalarFunctionArgs { + make_args_with_timezone(input, target_type, Some("UTC")) + } + + fn make_args_with_timezone( + input: ColumnarValue, + target_type: &str, + timezone: Option<&str>, + ) -> ScalarFunctionArgs { + let return_field = Arc::new(Field::new( + "result", + DataType::Timestamp( + TimeUnit::Microsecond, + Some(Arc::from(timezone.unwrap())), + ), + true, + )); + let mut config = ConfigOptions::default(); + if let Some(tz) = timezone { + config.execution.time_zone = Some(tz.to_string()); + } + ScalarFunctionArgs { + args: vec![ + input, + ColumnarValue::Scalar(ScalarValue::Utf8(Some(target_type.to_string()))), + ], + arg_fields: vec![], + number_rows: 0, + return_field, + config_options: Arc::new(config), + } + } + + fn assert_scalar_timestamp(result: ColumnarValue, expected: i64) { + assert_scalar_timestamp_with_tz(result, expected, "UTC"); + } + + fn assert_scalar_timestamp_with_tz( + result: ColumnarValue, + expected: i64, + expected_tz: &str, + ) { + match result { + ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond( + Some(val), + Some(tz), + )) => { + assert_eq!(val, expected); + assert_eq!(tz.as_ref(), expected_tz); + } + _ => { + panic!( + "Expected scalar timestamp with value {expected} and {expected_tz} timezone" + ) + } + } + } + + fn assert_scalar_null(result: ColumnarValue) { + assert_scalar_null_with_tz(result, "UTC"); + } + + fn assert_scalar_null_with_tz(result: ColumnarValue, expected_tz: &str) { + match result { + ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(None, Some(tz))) => { + assert_eq!(tz.as_ref(), expected_tz); + } + _ => panic!("Expected null scalar timestamp with {expected_tz} timezone"), + } + } + + #[test] + fn test_cast_int8_array_to_timestamp() { + let array: ArrayRef = Arc::new(Int8Array::from(vec![ + Some(0), + Some(1), + Some(-1), + Some(127), + Some(-128), + None, + ])); + + let cast = SparkCast::new(); + let args = make_args(ColumnarValue::Array(array), "timestamp"); + let result = cast.invoke_with_args(args).unwrap(); + + match result { + ColumnarValue::Array(result_array) => { + let ts_array = result_array.as_primitive::(); + assert_eq!(ts_array.value(0), 0); + assert_eq!(ts_array.value(1), 1_000_000); + assert_eq!(ts_array.value(2), -1_000_000); + assert_eq!(ts_array.value(3), 127_000_000); + assert_eq!(ts_array.value(4), -128_000_000); + assert!(ts_array.is_null(5)); + } + _ => panic!("Expected array result"), + } + } + + #[test] + fn test_cast_int16_array_to_timestamp() { + let array: ArrayRef = Arc::new(Int16Array::from(vec![ + Some(0), + Some(32767), + Some(-32768), + None, + ])); + + let cast = SparkCast::new(); + let args = make_args(ColumnarValue::Array(array), "timestamp"); + let result = cast.invoke_with_args(args).unwrap(); + + match result { + ColumnarValue::Array(result_array) => { + let ts_array = result_array.as_primitive::(); + assert_eq!(ts_array.value(0), 0); + assert_eq!(ts_array.value(1), 32_767_000_000); + assert_eq!(ts_array.value(2), -32_768_000_000); + assert!(ts_array.is_null(3)); + } + _ => panic!("Expected array result"), + } + } + + #[test] + fn test_cast_int32_array_to_timestamp() { + let array: ArrayRef = + Arc::new(Int32Array::from(vec![Some(0), Some(1704067200), None])); + + let cast = SparkCast::new(); + let args = make_args(ColumnarValue::Array(array), "timestamp"); + let result = cast.invoke_with_args(args).unwrap(); + + match result { + ColumnarValue::Array(result_array) => { + let ts_array = result_array.as_primitive::(); + assert_eq!(ts_array.value(0), 0); + assert_eq!(ts_array.value(1), 1_704_067_200_000_000); + assert!(ts_array.is_null(2)); + } + _ => panic!("Expected array result"), + } + } + + #[test] + fn test_cast_int64_array_overflow() { + let array: ArrayRef = + Arc::new(Int64Array::from(vec![Some(i64::MAX), Some(i64::MIN)])); + + let cast = SparkCast::new(); + let args = make_args(ColumnarValue::Array(array), "timestamp"); + let result = cast.invoke_with_args(args).unwrap(); + + match result { + ColumnarValue::Array(result_array) => { + let ts_array = result_array.as_primitive::(); + // saturating_mul clamps to i64::MAX/MIN + assert_eq!(ts_array.value(0), i64::MAX); + assert_eq!(ts_array.value(1), i64::MIN); + } + _ => panic!("Expected array result"), + } + } + + #[test] + fn test_cast_int64_array_to_timestamp() { + let array: ArrayRef = Arc::new(Int64Array::from(vec![ + Some(0), + Some(1704067200), + Some(-86400), + None, + ])); + + let cast = SparkCast::new(); + let args = make_args(ColumnarValue::Array(array), "timestamp"); + let result = cast.invoke_with_args(args).unwrap(); + + match result { + ColumnarValue::Array(result_array) => { + let ts_array = result_array.as_primitive::(); + assert_eq!(ts_array.value(0), 0); + assert_eq!(ts_array.value(1), 1_704_067_200_000_000); + assert_eq!(ts_array.value(2), -86_400_000_000); // -1 day + assert!(ts_array.is_null(3)); + } + _ => panic!("Expected array result"), + } + } + + #[test] + fn test_cast_scalar_int8() { + let cast = SparkCast::new(); + let args = make_args( + ColumnarValue::Scalar(ScalarValue::Int8(Some(100))), + "timestamp", + ); + let result = cast.invoke_with_args(args).unwrap(); + assert_scalar_timestamp(result, 100_000_000); + } + + #[test] + fn test_cast_scalar_int16() { + let cast = SparkCast::new(); + let args = make_args( + ColumnarValue::Scalar(ScalarValue::Int16(Some(100))), + "timestamp", + ); + let result = cast.invoke_with_args(args).unwrap(); + assert_scalar_timestamp(result, 100_000_000); + } + + #[test] + fn test_cast_scalar_int32() { + let cast = SparkCast::new(); + let args = make_args( + ColumnarValue::Scalar(ScalarValue::Int32(Some(1704067200))), + "timestamp", + ); + let result = cast.invoke_with_args(args).unwrap(); + assert_scalar_timestamp(result, 1_704_067_200_000_000); + } + + #[test] + fn test_cast_scalar_int64() { + let cast = SparkCast::new(); + let args = make_args( + ColumnarValue::Scalar(ScalarValue::Int64(Some(1704067200))), + "timestamp", + ); + let result = cast.invoke_with_args(args).unwrap(); + assert_scalar_timestamp(result, 1_704_067_200_000_000); + } + + #[test] + fn test_cast_scalar_negative() { + let cast = SparkCast::new(); + let args = make_args( + ColumnarValue::Scalar(ScalarValue::Int32(Some(-86400))), + "timestamp", + ); + let result = cast.invoke_with_args(args).unwrap(); + // -86400 seconds = -1 day before epoch + assert_scalar_timestamp(result, -86_400_000_000); + } + + #[test] + fn test_cast_scalar_null() { + let cast = SparkCast::new(); + let args = + make_args(ColumnarValue::Scalar(ScalarValue::Int64(None)), "timestamp"); + let result = cast.invoke_with_args(args).unwrap(); + assert_scalar_null(result); + } + + #[test] + fn test_cast_scalar_int64_overflow() { + let cast = SparkCast::new(); + let args = make_args( + ColumnarValue::Scalar(ScalarValue::Int64(Some(i64::MAX))), + "timestamp", + ); + let result = cast.invoke_with_args(args).unwrap(); + // saturating_mul clamps to i64::MAX + assert_scalar_timestamp(result, i64::MAX); + } + + #[test] + fn test_unsupported_target_type() { + let cast = SparkCast::new(); + // invoke_with_args uses return_field which would be set correctly by planning + // For this test, we need to check return_field_from_args + let arg_fields: Vec = + vec![Arc::new(Field::new("a", DataType::Int64, true))]; + let target_type = ScalarValue::Utf8(Some("string".to_string())); + let scalar_arguments: Vec> = vec![None, Some(&target_type)]; + let return_field_args = ReturnFieldArgs { + arg_fields: &arg_fields, + scalar_arguments: &scalar_arguments, + }; + let result = cast.return_field_from_args(return_field_args); + assert!(result.is_err()); + assert!( + result + .unwrap_err() + .to_string() + .contains("Unsupported spark_cast target type") + ); + } + + #[test] + fn test_unsupported_source_type() { + let cast = SparkCast::new(); + let args = make_args( + ColumnarValue::Scalar(ScalarValue::Utf8(Some("2024-01-01".to_string()))), + "timestamp", + ); + let result = cast.invoke_with_args(args); + assert!(result.is_err()); + assert!( + result + .unwrap_err() + .to_string() + .contains("Unsupported cast from") + ); + } + + #[test] + fn test_cast_null_to_timestamp() { + let cast = SparkCast::new(); + let args = make_args(ColumnarValue::Scalar(ScalarValue::Null), "timestamp"); + let result = cast.invoke_with_args(args).unwrap(); + assert_scalar_null(result); + } + + #[test] + fn test_cast_null_array_to_timestamp() { + let array: ArrayRef = Arc::new(arrow::array::NullArray::new(3)); + + let cast = SparkCast::new(); + let args = make_args(ColumnarValue::Array(array), "timestamp"); + let result = cast.invoke_with_args(args).unwrap(); + + match result { + ColumnarValue::Array(result_array) => { + let ts_array = result_array.as_primitive::(); + assert_eq!(ts_array.len(), 3); + assert!(ts_array.is_null(0)); + assert!(ts_array.is_null(1)); + assert!(ts_array.is_null(2)); + } + _ => panic!("Expected array result"), + } + } + + #[test] + fn test_cast_int_to_timestamp_with_timezones() { + // Test with various timezones like Comet does + let timezones = [ + "UTC", + "America/New_York", + "America/Los_Angeles", + "Europe/London", + "Asia/Tokyo", + "Australia/Sydney", + ]; + + let cast = SparkCast::new(); + let test_value: i64 = 1704067200; // 2024-01-01 00:00:00 UTC + let expected_micros = test_value * MICROS_PER_SECOND; + + for tz in timezones { + // scalar + let args = make_args_with_timezone( + ColumnarValue::Scalar(ScalarValue::Int64(Some(test_value))), + "timestamp", + Some(tz), + ); + let result = cast.invoke_with_args(args).unwrap(); + assert_scalar_timestamp_with_tz(result, expected_micros, tz); + + // array input + let array: ArrayRef = + Arc::new(Int64Array::from(vec![Some(test_value), None])); + let args = make_args_with_timezone( + ColumnarValue::Array(array), + "timestamp", + Some(tz), + ); + let result = cast.invoke_with_args(args).unwrap(); + + match result { + ColumnarValue::Array(result_array) => { + let ts_array = + result_array.as_primitive::(); + assert_eq!(ts_array.value(0), expected_micros); + assert!(ts_array.is_null(1)); + assert_eq!(ts_array.timezone(), Some(tz)); + } + _ => panic!("Expected array result for timezone {tz}"), + } + } + } + + #[test] + fn test_cast_int_to_timestamp_default_timezone() { + let cast = SparkCast::new(); + let args = make_args( + ColumnarValue::Scalar(ScalarValue::Int64(Some(0))), + "timestamp", + ); + let result = cast.invoke_with_args(args).unwrap(); + // Defaults to UTC + assert_scalar_timestamp_with_tz(result, 0, "UTC"); + } +} diff --git a/datafusion/spark/src/function/conversion/mod.rs b/datafusion/spark/src/function/conversion/mod.rs index a87df9a2c87a0..e8a89fa8c0616 100644 --- a/datafusion/spark/src/function/conversion/mod.rs +++ b/datafusion/spark/src/function/conversion/mod.rs @@ -15,11 +15,26 @@ // specific language governing permissions and limitations // under the License. +mod cast; + use datafusion_expr::ScalarUDF; +use datafusion_functions::make_udf_function_with_config; use std::sync::Arc; -pub mod expr_fn {} +make_udf_function_with_config!(cast::SparkCast, spark_cast); + +pub mod expr_fn { + use datafusion_functions::export_functions; + + export_functions!(( + spark_cast, + "Casts given value to the specified type following Spark-compatible semantics", + @config arg1 arg2 + )); +} pub fn functions() -> Vec> { - vec![] + use datafusion_common::config::ConfigOptions; + let config = ConfigOptions::default(); + vec![spark_cast(&config)] } diff --git a/datafusion/sqllogictest/test_files/spark/conversion/cast_int_to_timestamp.slt b/datafusion/sqllogictest/test_files/spark/conversion/cast_int_to_timestamp.slt new file mode 100644 index 0000000000000..8b775337297a8 --- /dev/null +++ b/datafusion/sqllogictest/test_files/spark/conversion/cast_int_to_timestamp.slt @@ -0,0 +1,252 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Test spark_cast from int8 to timestamp +query P +SELECT spark_cast(arrow_cast(0, 'Int8'), 'timestamp'); +---- +1970-01-01T00:00:00Z + +query P +SELECT spark_cast(arrow_cast(1, 'Int8'), 'timestamp'); +---- +1970-01-01T00:00:01Z + +query P +SELECT spark_cast(arrow_cast(-1, 'Int8'), 'timestamp'); +---- +1969-12-31T23:59:59Z + +# Test spark_cast from int16 to timestamp +query P +SELECT spark_cast(arrow_cast(0, 'Int16'), 'timestamp'); +---- +1970-01-01T00:00:00Z + +query P +SELECT spark_cast(arrow_cast(3600, 'Int16'), 'timestamp'); +---- +1970-01-01T01:00:00Z + +# Test spark_cast from int32 to timestamp +query P +SELECT spark_cast(arrow_cast(0, 'Int32'), 'timestamp'); +---- +1970-01-01T00:00:00Z + +query P +SELECT spark_cast(arrow_cast(1704067200, 'Int32'), 'timestamp'); +---- +2024-01-01T00:00:00Z + +# Test spark_cast from int64 to timestamp +query P +SELECT spark_cast(0::bigint, 'timestamp'); +---- +1970-01-01T00:00:00Z + +query P +SELECT spark_cast(1704067200::bigint, 'timestamp'); +---- +2024-01-01T00:00:00Z + +# Test NULL handling +query P +SELECT spark_cast(arrow_cast(NULL, 'Int8'), 'timestamp'); +---- +NULL + +query P +SELECT spark_cast(arrow_cast(NULL, 'Int16'), 'timestamp'); +---- +NULL + +query P +SELECT spark_cast(arrow_cast(NULL, 'Int32'), 'timestamp'); +---- +NULL + +query P +SELECT spark_cast(NULL::bigint, 'timestamp'); +---- +NULL + +# Test untyped NULL +query P +SELECT spark_cast(NULL, 'timestamp'); +---- +NULL + +# Test Int8 boundary values +query P +SELECT spark_cast(arrow_cast(127, 'Int8'), 'timestamp'); +---- +1970-01-01T00:02:07Z + +query P +SELECT spark_cast(arrow_cast(-128, 'Int8'), 'timestamp'); +---- +1969-12-31T23:57:52Z + +# Test Int16 boundary values +query P +SELECT spark_cast(arrow_cast(32767, 'Int16'), 'timestamp'); +---- +1970-01-01T09:06:07Z + +query P +SELECT spark_cast(arrow_cast(-32768, 'Int16'), 'timestamp'); +---- +1969-12-31T14:53:52Z + +# Test Int64 negative value +query P +SELECT spark_cast(-86400::bigint, 'timestamp'); +---- +1969-12-31T00:00:00Z + +# Test unsupported source type - should error +statement error +SELECT spark_cast('2024-01-01', 'timestamp'); + +# Test unsupported target type - should error +statement error +SELECT spark_cast(100, 'string'); + +# Test with different session timezones to verify simplify() picks up config + +# America/Los_Angeles (PST/PDT - has DST) +statement ok +SET datafusion.execution.time_zone = 'America/Los_Angeles'; + +# Epoch in PST (UTC-8) +query P +SELECT spark_cast(0::bigint, 'timestamp'); +---- +1969-12-31T16:00:00-08:00 + +# 2024-01-01 00:00:00 UTC in PST (winter, UTC-8) +query P +SELECT spark_cast(1704067200::bigint, 'timestamp'); +---- +2023-12-31T16:00:00-08:00 + +# America/Phoenix (MST - no DST, always UTC-7) +statement ok +SET datafusion.execution.time_zone = 'America/Phoenix'; + +# Epoch in Phoenix (UTC-7) +query P +SELECT spark_cast(0::bigint, 'timestamp'); +---- +1969-12-31T17:00:00-07:00 + +# 2024-01-01 00:00:00 UTC in Phoenix (still UTC-7, no DST) +query P +SELECT spark_cast(1704067200::bigint, 'timestamp'); +---- +2023-12-31T17:00:00-07:00 + +# Test with different timezones - LA (has DST) +statement ok +SET datafusion.execution.time_zone = 'America/Los_Angeles'; + +query P +SELECT spark_cast(1710054000::bigint, 'timestamp'); +---- +2024-03-09T23:00:00-08:00 + +query P +SELECT spark_cast(1710057600::bigint, 'timestamp'); +---- +2024-03-10T00:00:00-08:00 + +# Phoenix has no DST - always UTC-7 +statement ok +SET datafusion.execution.time_zone = 'America/Phoenix'; + +query P +SELECT spark_cast(1710054000::bigint, 'timestamp'); +---- +2024-03-10T00:00:00-07:00 + +query P +SELECT spark_cast(1710057600::bigint, 'timestamp'); +---- +2024-03-10T01:00:00-07:00 + +# Reset to default UTC +statement ok +SET datafusion.execution.time_zone = 'UTC'; + +############################# +# Array Tests +############################# + +# Create test table with 4 int columns: null, min, max, regular value +statement ok +CREATE TABLE int_test AS SELECT + arrow_cast(column1, 'Int8') as i8_col, + arrow_cast(column2, 'Int16') as i16_col, + arrow_cast(column3, 'Int32') as i32_col, + column4::bigint as i64_col +FROM (VALUES + (NULL, NULL, NULL, NULL), + (-128, -32768, -2147483648, -86400), + (127, 32767, 2147483647, 86400), + (100, 3600, 1710054000, 1710054000) +); + +# Test in UTC +query PPPP +SELECT spark_cast(i8_col, 'timestamp'), spark_cast(i16_col, 'timestamp'), spark_cast(i32_col, 'timestamp'), spark_cast(i64_col, 'timestamp') FROM int_test; +---- +NULL NULL NULL NULL +1969-12-31T23:57:52Z 1969-12-31T14:53:52Z 1901-12-13T20:45:52Z 1969-12-31T00:00:00Z +1970-01-01T00:02:07Z 1970-01-01T09:06:07Z 2038-01-19T03:14:07Z 1970-01-02T00:00:00Z +1970-01-01T00:01:40Z 1970-01-01T01:00:00Z 2024-03-10T07:00:00Z 2024-03-10T07:00:00Z + +# Test in America/Los_Angeles (PST - has DST) +statement ok +SET datafusion.execution.time_zone = 'America/Los_Angeles'; + +query PPPP +SELECT spark_cast(i8_col, 'timestamp'), spark_cast(i16_col, 'timestamp'), spark_cast(i32_col, 'timestamp'), spark_cast(i64_col, 'timestamp') FROM int_test; +---- +NULL NULL NULL NULL +1969-12-31T15:57:52-08:00 1969-12-31T06:53:52-08:00 1901-12-13T12:45:52-08:00 1969-12-30T16:00:00-08:00 +1969-12-31T16:02:07-08:00 1970-01-01T01:06:07-08:00 2038-01-18T19:14:07-08:00 1970-01-01T16:00:00-08:00 +1969-12-31T16:01:40-08:00 1969-12-31T17:00:00-08:00 2024-03-09T23:00:00-08:00 2024-03-09T23:00:00-08:00 + +# Test in America/Phoenix (MST - no DST, always UTC-7) +statement ok +SET datafusion.execution.time_zone = 'America/Phoenix'; + +query PPPP +SELECT spark_cast(i8_col, 'timestamp'), spark_cast(i16_col, 'timestamp'), spark_cast(i32_col, 'timestamp'), spark_cast(i64_col, 'timestamp') FROM int_test; +---- +NULL NULL NULL NULL +1969-12-31T16:57:52-07:00 1969-12-31T07:53:52-07:00 1901-12-13T13:45:52-07:00 1969-12-30T17:00:00-07:00 +1969-12-31T17:02:07-07:00 1970-01-01T02:06:07-07:00 2038-01-18T20:14:07-07:00 1970-01-01T17:00:00-07:00 +1969-12-31T17:01:40-07:00 1969-12-31T18:00:00-07:00 2024-03-10T00:00:00-07:00 2024-03-10T00:00:00-07:00 + +# Reset and cleanup +statement ok +SET datafusion.execution.time_zone = 'UTC'; + +statement ok +DROP TABLE int_test;