Skip to content

Commit 1016a72

Browse files
committed
feat: implement make_time
1 parent da187f2 commit 1016a72

11 files changed

Lines changed: 416 additions & 7 deletions

File tree

native/core/src/execution/columnar_to_row.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ enum TypedArray<'a> {
161161
Float64(&'a Float64Array),
162162
Date32(&'a Date32Array),
163163
TimestampMicro(&'a TimestampMicrosecondArray),
164+
Time64Nano(&'a Time64NanosecondArray),
164165
Decimal128(&'a Decimal128Array, u8), // array + precision
165166
String(&'a StringArray),
166167
LargeString(&'a LargeStringArray),
@@ -200,6 +201,9 @@ impl<'a> TypedArray<'a> {
200201
DataType::Timestamp(TimeUnit::Microsecond, _) => Ok(TypedArray::TimestampMicro(
201202
downcast_array!(array, TimestampMicrosecondArray)?,
202203
)),
204+
DataType::Time64(TimeUnit::Nanosecond) => Ok(TypedArray::Time64Nano(
205+
downcast_array!(array, Time64NanosecondArray)?,
206+
)),
203207
DataType::Decimal128(p, _) => Ok(TypedArray::Decimal128(
204208
downcast_array!(array, Decimal128Array)?,
205209
*p,
@@ -267,6 +271,7 @@ impl<'a> TypedArray<'a> {
267271
Float64,
268272
Date32,
269273
TimestampMicro,
274+
Time64Nano,
270275
Decimal128,
271276
String,
272277
LargeString,
@@ -295,6 +300,7 @@ impl<'a> TypedArray<'a> {
295300
TypedArray::Float64(arr) => arr.value(row_idx).to_bits() as i64,
296301
TypedArray::Date32(arr) => arr.value(row_idx) as i64,
297302
TypedArray::TimestampMicro(arr) => arr.value(row_idx),
303+
TypedArray::Time64Nano(arr) => arr.value(row_idx),
298304
TypedArray::Decimal128(arr, precision) if *precision <= MAX_LONG_DIGITS => {
299305
arr.value(row_idx) as i64
300306
}
@@ -317,7 +323,8 @@ impl<'a> TypedArray<'a> {
317323
| TypedArray::Float32(_)
318324
| TypedArray::Float64(_)
319325
| TypedArray::Date32(_)
320-
| TypedArray::TimestampMicro(_) => false,
326+
| TypedArray::TimestampMicro(_)
327+
| TypedArray::Time64Nano(_) => false,
321328
TypedArray::Decimal128(_, precision) => *precision > MAX_LONG_DIGITS,
322329
_ => true,
323330
}
@@ -380,6 +387,7 @@ enum TypedElements<'a> {
380387
Float64(&'a Float64Array),
381388
Date32(&'a Date32Array),
382389
TimestampMicro(&'a TimestampMicrosecondArray),
390+
Time64Nano(&'a Time64NanosecondArray),
383391
Decimal128(&'a Decimal128Array, u8),
384392
String(&'a StringArray),
385393
LargeString(&'a LargeStringArray),
@@ -418,6 +426,11 @@ impl<'a> TypedElements<'a> {
418426
return TypedElements::TimestampMicro(arr);
419427
}
420428
}
429+
DataType::Time64(TimeUnit::Nanosecond) => {
430+
if let Some(arr) = array.as_any().downcast_ref::<Time64NanosecondArray>() {
431+
return TypedElements::Time64Nano(arr);
432+
}
433+
}
421434
DataType::Decimal128(p, _) => {
422435
if let Some(arr) = array.as_any().downcast_ref::<Decimal128Array>() {
423436
return TypedElements::Decimal128(arr, *p);
@@ -442,6 +455,7 @@ impl<'a> TypedElements<'a> {
442455
TypedElements::Int32(_) | TypedElements::Date32(_) | TypedElements::Float32(_) => 4,
443456
TypedElements::Int64(_)
444457
| TypedElements::TimestampMicro(_)
458+
| TypedElements::Time64Nano(_)
445459
| TypedElements::Float64(_) => 8,
446460
TypedElements::Decimal128(_, p) if *p <= MAX_LONG_DIGITS => 8,
447461
_ => 8, // Variable-length uses 8 bytes for offset+length
@@ -460,6 +474,7 @@ impl<'a> TypedElements<'a> {
460474
| TypedElements::Float64(_)
461475
| TypedElements::Date32(_)
462476
| TypedElements::TimestampMicro(_)
477+
| TypedElements::Time64Nano(_)
463478
)
464479
}
465480

@@ -479,6 +494,7 @@ impl<'a> TypedElements<'a> {
479494
Float64,
480495
Date32,
481496
TimestampMicro,
497+
Time64Nano,
482498
Decimal128,
483499
String,
484500
LargeString,
@@ -502,7 +518,8 @@ impl<'a> TypedElements<'a> {
502518
| TypedElements::Float32(_)
503519
| TypedElements::Float64(_)
504520
| TypedElements::Date32(_)
505-
| TypedElements::TimestampMicro(_) => true,
521+
| TypedElements::TimestampMicro(_)
522+
| TypedElements::Time64Nano(_) => true,
506523
TypedElements::Decimal128(_, p) => *p <= MAX_LONG_DIGITS,
507524
_ => false,
508525
}
@@ -521,6 +538,7 @@ impl<'a> TypedElements<'a> {
521538
TypedElements::Float64(arr) => arr.value(idx).to_bits() as i64,
522539
TypedElements::Date32(arr) => arr.value(idx) as i64,
523540
TypedElements::TimestampMicro(arr) => arr.value(idx),
541+
TypedElements::Time64Nano(arr) => arr.value(idx),
524542
TypedElements::Decimal128(arr, _) => arr.value(idx) as i64,
525543
_ => 0, // Should not be called for variable-length types
526544
}

native/core/src/execution/serde.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ pub fn to_arrow_datatype(dt_value: &DataType) -> ArrowDataType {
9696
}
9797
DataTypeId::TimestampNtz => ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
9898
DataTypeId::Date => ArrowDataType::Date32,
99+
DataTypeId::Time => ArrowDataType::Time64(TimeUnit::Nanosecond),
99100
DataTypeId::Null => ArrowDataType::Null,
100101
DataTypeId::List => match dt_value
101102
.type_info

native/proto/src/proto/types.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ message DataType {
5959
LIST = 14;
6060
MAP = 15;
6161
STRUCT = 16;
62+
TIME = 17;
6263
}
6364
DataTypeId type_id = 1;
6465

native/spark-expr/src/comet_scalar_funcs.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use crate::{
2626
spark_lpad, spark_make_decimal, spark_read_side_padding, spark_round, spark_rpad, spark_unhex,
2727
spark_unscaled_value, EvalMode, SparkArrayCompact, SparkArrayPositionFunc, SparkArraysOverlap,
2828
SparkContains, SparkDateDiff, SparkDateFromUnixDate, SparkDateTrunc, SparkMakeDate,
29-
SparkSecondsToTimestamp, SparkSizeFunc,
29+
SparkMakeTime, SparkSecondsToTimestamp, SparkSizeFunc,
3030
};
3131
use arrow::datatypes::DataType;
3232
use datafusion::common::{DataFusionError, Result as DataFusionResult};
@@ -214,6 +214,7 @@ fn all_scalar_functions() -> Vec<Arc<ScalarUDF>> {
214214
Arc::new(ScalarUDF::new_from_impl(SparkDateFromUnixDate::default())),
215215
Arc::new(ScalarUDF::new_from_impl(SparkDateTrunc::default())),
216216
Arc::new(ScalarUDF::new_from_impl(SparkMakeDate::default())),
217+
Arc::new(ScalarUDF::new_from_impl(SparkMakeTime::default())),
217218
Arc::new(ScalarUDF::new_from_impl(SparkSecondsToTimestamp::default())),
218219
Arc::new(ScalarUDF::new_from_impl(SparkSizeFunc::default())),
219220
]
Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow::array::{Array, Decimal128Array, Int32Array, Time64NanosecondArray};
19+
use arrow::compute::cast;
20+
use arrow::datatypes::{DataType, TimeUnit};
21+
use datafusion::common::{utils::take_function_args, DataFusionError, Result};
22+
use datafusion::logical_expr::{
23+
ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
24+
};
25+
use std::any::Any;
26+
use std::sync::Arc;
27+
28+
const MICROS_PER_SECOND: i128 = 1_000_000;
29+
const NANOS_PER_MICRO: i64 = 1_000;
30+
const NANOS_PER_SECOND: i64 = 1_000_000_000;
31+
32+
#[derive(Debug, PartialEq, Eq, Hash)]
33+
pub struct SparkMakeTime {
34+
signature: Signature,
35+
}
36+
37+
impl SparkMakeTime {
38+
pub fn new() -> Self {
39+
Self {
40+
signature: Signature::any(3, Volatility::Immutable),
41+
}
42+
}
43+
}
44+
45+
impl Default for SparkMakeTime {
46+
fn default() -> Self {
47+
Self::new()
48+
}
49+
}
50+
51+
/// Converts hours, minutes, and fractional seconds (Decimal(16,6)) to nanoseconds from midnight.
52+
/// Returns an error for invalid inputs (matching Spark's always-throw behavior).
53+
fn make_time(hours: i32, minutes: i32, secs_and_micros_unscaled: i128) -> Result<i64> {
54+
let full_secs = secs_and_micros_unscaled.div_euclid(MICROS_PER_SECOND);
55+
let frac_micros = secs_and_micros_unscaled.rem_euclid(MICROS_PER_SECOND);
56+
57+
if full_secs > i32::MAX as i128 || full_secs < 0 {
58+
return Err(DataFusionError::Execution(format!(
59+
"Invalid value for SecondOfMinute (valid values 0 - 59): {}",
60+
secs_and_micros_unscaled / MICROS_PER_SECOND
61+
)));
62+
}
63+
64+
let secs = full_secs as i32;
65+
let nanos = (frac_micros as i64) * NANOS_PER_MICRO;
66+
67+
if !(0..=23).contains(&hours) {
68+
return Err(DataFusionError::Execution(format!(
69+
"Invalid value for HourOfDay (valid values 0 - 23): {hours}"
70+
)));
71+
}
72+
if !(0..=59).contains(&minutes) {
73+
return Err(DataFusionError::Execution(format!(
74+
"Invalid value for MinuteOfHour (valid values 0 - 59): {minutes}"
75+
)));
76+
}
77+
if !(0..=59).contains(&secs) {
78+
return Err(DataFusionError::Execution(format!(
79+
"Invalid value for SecondOfMinute (valid values 0 - 59): {secs}"
80+
)));
81+
}
82+
83+
let total_nanos =
84+
hours as i64 * 3_600 * NANOS_PER_SECOND + minutes as i64 * 60 * NANOS_PER_SECOND + secs as i64 * NANOS_PER_SECOND + nanos;
85+
86+
Ok(total_nanos)
87+
}
88+
89+
impl ScalarUDFImpl for SparkMakeTime {
90+
fn as_any(&self) -> &dyn Any {
91+
self
92+
}
93+
94+
fn name(&self) -> &str {
95+
"make_time"
96+
}
97+
98+
fn signature(&self) -> &Signature {
99+
&self.signature
100+
}
101+
102+
fn return_type(&self, _: &[DataType]) -> Result<DataType> {
103+
Ok(DataType::Time64(TimeUnit::Nanosecond))
104+
}
105+
106+
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
107+
let [hours, minutes, secs_and_micros] = take_function_args(self.name(), args.args)?;
108+
109+
let num_rows = [&hours, &minutes, &secs_and_micros]
110+
.iter()
111+
.find_map(|arg| match arg {
112+
ColumnarValue::Array(array) => Some(array.len()),
113+
ColumnarValue::Scalar(_) => None,
114+
})
115+
.unwrap_or(1);
116+
117+
let hours_arr = hours.into_array(num_rows)?;
118+
let minutes_arr = minutes.into_array(num_rows)?;
119+
let secs_arr = secs_and_micros.into_array(num_rows)?;
120+
121+
let hours_arr = cast_to_int32(&hours_arr)?;
122+
let minutes_arr = cast_to_int32(&minutes_arr)?;
123+
124+
let hours_array = hours_arr.as_any().downcast_ref::<Int32Array>().ok_or_else(|| {
125+
DataFusionError::Execution("make_time: failed to cast hours to Int32".to_string())
126+
})?;
127+
128+
let minutes_array =
129+
minutes_arr
130+
.as_any()
131+
.downcast_ref::<Int32Array>()
132+
.ok_or_else(|| {
133+
DataFusionError::Execution(
134+
"make_time: failed to cast minutes to Int32".to_string(),
135+
)
136+
})?;
137+
138+
let secs_array = secs_arr
139+
.as_any()
140+
.downcast_ref::<Decimal128Array>()
141+
.ok_or_else(|| {
142+
DataFusionError::Execution(
143+
"make_time: expected Decimal128 for seconds argument".to_string(),
144+
)
145+
})?;
146+
147+
let len = hours_array.len();
148+
let mut builder = Time64NanosecondArray::builder(len);
149+
150+
for i in 0..len {
151+
if hours_array.is_null(i) || minutes_array.is_null(i) || secs_array.is_null(i) {
152+
builder.append_null();
153+
} else {
154+
let h = hours_array.value(i);
155+
let m = minutes_array.value(i);
156+
let s = secs_array.value(i);
157+
158+
let nanos = make_time(h, m, s)?;
159+
builder.append_value(nanos);
160+
}
161+
}
162+
163+
Ok(ColumnarValue::Array(Arc::new(builder.finish())))
164+
}
165+
}
166+
167+
fn cast_to_int32(arr: &Arc<dyn Array>) -> Result<Arc<dyn Array>> {
168+
if arr.data_type() == &DataType::Int32 {
169+
Ok(Arc::clone(arr))
170+
} else {
171+
cast(arr.as_ref(), &DataType::Int32)
172+
.map_err(|e| DataFusionError::Execution(format!("Failed to cast to Int32: {e}")))
173+
}
174+
}
175+
176+
#[cfg(test)]
177+
mod tests {
178+
use super::*;
179+
180+
#[test]
181+
fn test_make_time_valid() {
182+
// Midnight
183+
assert_eq!(make_time(0, 0, 0).unwrap(), 0);
184+
// 1 hour
185+
assert_eq!(make_time(1, 0, 0).unwrap(), 3_600_000_000_000);
186+
// 1 minute
187+
assert_eq!(make_time(0, 1, 0).unwrap(), 60_000_000_000);
188+
// 1 second (unscaled: 1_000_000)
189+
assert_eq!(make_time(0, 0, 1_000_000).unwrap(), 1_000_000_000);
190+
// 1.5 seconds (unscaled: 1_500_000)
191+
assert_eq!(make_time(0, 0, 1_500_000).unwrap(), 1_500_000_000);
192+
// 23:59:59.999999 (unscaled: 59_999_999)
193+
assert_eq!(
194+
make_time(23, 59, 59_999_999).unwrap(),
195+
86_399_999_999_000
196+
);
197+
// 12:30:45.123456 (unscaled: 45_123_456)
198+
assert_eq!(
199+
make_time(12, 30, 45_123_456).unwrap(),
200+
12 * 3_600_000_000_000 + 30 * 60_000_000_000 + 45_123_456_000
201+
);
202+
}
203+
204+
#[test]
205+
fn test_make_time_invalid_hours() {
206+
assert!(make_time(24, 0, 0).is_err());
207+
assert!(make_time(25, 0, 0).is_err());
208+
assert!(make_time(-1, 0, 0).is_err());
209+
}
210+
211+
#[test]
212+
fn test_make_time_invalid_minutes() {
213+
assert!(make_time(0, 60, 0).is_err());
214+
assert!(make_time(0, -1, 0).is_err());
215+
}
216+
217+
#[test]
218+
fn test_make_time_invalid_seconds() {
219+
// 60 seconds (unscaled: 60_000_000)
220+
assert!(make_time(0, 0, 60_000_000).is_err());
221+
// 100.5 seconds (unscaled: 100_500_000)
222+
assert!(make_time(0, 0, 100_500_000).is_err());
223+
// negative seconds (unscaled: -1_000_000)
224+
assert!(make_time(0, 0, -1_000_000).is_err());
225+
}
226+
227+
#[test]
228+
fn test_make_time_overflow_seconds() {
229+
// Very large value that overflows i32
230+
let large = (i32::MAX as i128 + 1) * MICROS_PER_SECOND;
231+
assert!(make_time(0, 0, large).is_err());
232+
}
233+
}

native/spark-expr/src/datetime_funcs/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ mod date_trunc;
2121
mod extract_date_part;
2222
mod hours;
2323
mod make_date;
24+
mod make_time;
2425
mod seconds_to_timestamp;
2526
mod timestamp_trunc;
2627
mod unix_timestamp;
@@ -33,6 +34,7 @@ pub use extract_date_part::SparkMinute;
3334
pub use extract_date_part::SparkSecond;
3435
pub use hours::SparkHoursTransform;
3536
pub use make_date::SparkMakeDate;
37+
pub use make_time::SparkMakeTime;
3638
pub use seconds_to_timestamp::SparkSecondsToTimestamp;
3739
pub use timestamp_trunc::TimestampTruncExpr;
3840
pub use unix_timestamp::SparkUnixTimestamp;

native/spark-expr/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,8 @@ pub use comet_scalar_funcs::{
7474
pub use csv_funcs::*;
7575
pub use datetime_funcs::{
7676
SparkDateDiff, SparkDateFromUnixDate, SparkDateTrunc, SparkHour, SparkHoursTransform,
77-
SparkMakeDate, SparkMinute, SparkSecond, SparkSecondsToTimestamp, SparkUnixTimestamp,
78-
TimestampTruncExpr,
77+
SparkMakeDate, SparkMakeTime, SparkMinute, SparkSecond, SparkSecondsToTimestamp,
78+
SparkUnixTimestamp, TimestampTruncExpr,
7979
};
8080
pub use error::{decimal_overflow_error, SparkError, SparkErrorWithContext, SparkResult};
8181
pub use hash_funcs::*;

0 commit comments

Comments
 (0)