Skip to content

Commit c8ba456

Browse files
committed
CI Fix (ii).
1 parent 3c7238f commit c8ba456

1 file changed

Lines changed: 168 additions & 91 deletions

File tree

datafusion/functions/src/datetime/date_part.rs

Lines changed: 168 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -27,20 +27,23 @@ use arrow::datatypes::DataType::{
2727
Date32, Date64, Duration, Interval, Time32, Time64, Timestamp,
2828
};
2929
use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
30-
use arrow::datatypes::{ArrowTimestampType, DataType, Field, FieldRef, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType};
31-
use chrono::{DateTime, MappedLocalTime, Offset, TimeDelta, TimeZone, Utc};
30+
use arrow::datatypes::{
31+
ArrowTimestampType, DataType, Field, FieldRef, TimeUnit, TimestampMicrosecondType,
32+
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
33+
};
34+
3235
use datafusion_common::cast::as_primitive_array;
3336
use datafusion_common::types::{logical_date, NativeType};
34-
use std::ops::Add;
3537

38+
use super::adjust_to_local_time;
3639
use datafusion_common::{
3740
cast::{
3841
as_date32_array, as_date64_array, as_int32_array, as_time32_millisecond_array,
3942
as_time32_second_array, as_time64_microsecond_array, as_time64_nanosecond_array,
4043
as_timestamp_microsecond_array, as_timestamp_millisecond_array,
4144
as_timestamp_nanosecond_array, as_timestamp_second_array,
4245
},
43-
exec_err, internal_datafusion_err, internal_err, not_impl_err,
46+
exec_err, internal_err, not_impl_err,
4447
types::logical_string,
4548
utils::take_function_args,
4649
Result, ScalarValue,
@@ -128,7 +131,7 @@ impl DatePartFunc {
128131
],
129132
Volatility::Immutable,
130133
),
131-
aliases: vec![String::from("datepart")],
134+
aliases: vec![String::from("datepart"), String::from("extract")],
132135
}
133136
}
134137
}
@@ -199,12 +202,17 @@ impl ScalarUDFImpl for DatePartFunc {
199202
};
200203

201204
let (is_timezone_aware, tz_str_opt) = match array.data_type() {
202-
Timestamp(_, Some(tz_str)) => (true, Some(tz_str.clone())),
205+
Timestamp(_, Some(tz_str)) => (true, Some(Arc::clone(tz_str))),
203206
_ => (false, None),
204207
};
205208

206-
// Adjust timestamps for extraction
207-
let array = if is_timezone_aware {
209+
let part_trim = part_normalization(&part);
210+
let is_epoch = is_epoch(&part);
211+
212+
// Epoch is timezone-independent - it always returns seconds since 1970-01-01 UTC
213+
let array = if is_epoch {
214+
array
215+
} else if is_timezone_aware {
208216
// For timezone-aware timestamps, extract in their own timezone
209217
let tz_str = tz_str_opt.as_ref().unwrap();
210218
let tz = match tz_str.parse::<Tz>() {
@@ -223,39 +231,42 @@ impl ScalarUDFImpl for DatePartFunc {
223231
adjust_timestamp_array::<TimestampMillisecondType>(&array, tz)?
224232
}
225233
Second => adjust_timestamp_array::<TimestampSecondType>(&array, tz)?,
226-
_ => array,
227234
},
228235
_ => array,
229236
}
230237
} else if let Timestamp(time_unit, None) = array.data_type() {
231238
// For naive timestamps, interpret in session timezone
232-
let tz = match config.execution.time_zone.parse::<Tz>() {
233-
Ok(tz) => tz,
234-
Err(_) => return exec_err!("Invalid timezone"),
235-
};
236-
match time_unit {
237-
Nanosecond => {
238-
adjust_timestamp_array::<TimestampNanosecondType>(&array, tz)?
239-
}
240-
Microsecond => {
241-
adjust_timestamp_array::<TimestampMicrosecondType>(&array, tz)?
242-
}
243-
Millisecond => {
244-
adjust_timestamp_array::<TimestampMillisecondType>(&array, tz)?
239+
// If no timezone is configured, treat as UTC
240+
if let Some(tz_str) = &config.execution.time_zone {
241+
let tz = match tz_str.parse::<Tz>() {
242+
Ok(tz) => tz,
243+
Err(_) => return exec_err!("Invalid timezone"),
244+
};
245+
246+
match time_unit {
247+
Nanosecond => {
248+
adjust_timestamp_array::<TimestampNanosecondType>(&array, tz)?
249+
}
250+
Microsecond => {
251+
adjust_timestamp_array::<TimestampMicrosecondType>(&array, tz)?
252+
}
253+
Millisecond => {
254+
adjust_timestamp_array::<TimestampMillisecondType>(&array, tz)?
255+
}
256+
Second => adjust_timestamp_array::<TimestampSecondType>(&array, tz)?,
245257
}
246-
Second => adjust_timestamp_array::<TimestampSecondType>(&array, tz)?,
247-
_ => array,
258+
} else {
259+
// If no timezone is configured, return the array as-is (UTC interpretation)
260+
array
248261
}
249262
} else {
250263
array
251264
};
252265

253-
let part_trim = part_normalization(&part);
254-
255266
// using IntervalUnit here means we hand off all the work of supporting plurals (like "seconds")
256267
// and synonyms ( like "ms,msec,msecond,millisecond") to Arrow
257-
let mut arr = if let Ok(interval_unit) = IntervalUnit::from_str(part_trim) {
258-
match interval_unit {
268+
let arr = if let Ok(interval_unit) = IntervalUnit::from_str(part_trim) {
269+
let extracted = match interval_unit {
259270
IntervalUnit::Year => date_part(array.as_ref(), DatePart::Year)?,
260271
IntervalUnit::Month => date_part(array.as_ref(), DatePart::Month)?,
261272
IntervalUnit::Week => date_part(array.as_ref(), DatePart::Week)?,
@@ -266,8 +277,39 @@ impl ScalarUDFImpl for DatePartFunc {
266277
IntervalUnit::Millisecond => seconds_as_i32(array.as_ref(), Millisecond)?,
267278
IntervalUnit::Microsecond => seconds_as_i32(array.as_ref(), Microsecond)?,
268279
IntervalUnit::Nanosecond => seconds_as_i32(array.as_ref(), Nanosecond)?,
269-
// century and decade are not supported by `DatePart`
270280
_ => return exec_err!("Date part '{part}' not supported"),
281+
};
282+
283+
// For fixed offsets (like +04:00, -05:30), apply the offset to extract values.
284+
// Named timezones (like 'America/New_York') are handled by adjust_to_local_time
285+
// and DST is already applied via chrono.
286+
if is_timezone_aware {
287+
let tz_str = tz_str_opt.as_ref().unwrap().as_ref();
288+
if is_fixed_offset(tz_str) {
289+
if let Some(offset_info) = extract_offset_components(tz_str) {
290+
match interval_unit {
291+
IntervalUnit::Hour => apply_hour_offset(
292+
extracted.as_ref(),
293+
offset_info.hours,
294+
offset_info.minutes,
295+
)?,
296+
IntervalUnit::Minute => apply_minute_offset(
297+
extracted.as_ref(),
298+
offset_info.minutes,
299+
)?,
300+
IntervalUnit::Day => {
301+
apply_day_offset(extracted.as_ref(), offset_info.hours)?
302+
}
303+
_ => extracted,
304+
}
305+
} else {
306+
extracted
307+
}
308+
} else {
309+
extracted
310+
}
311+
} else {
312+
extracted
271313
}
272314
} else {
273315
// special cases that can be extracted (in postgres) but are not interval units
@@ -281,8 +323,6 @@ impl ScalarUDFImpl for DatePartFunc {
281323
}
282324
};
283325

284-
285-
286326
Ok(if is_scalar {
287327
ColumnarValue::Scalar(ScalarValue::try_from_array(arr.as_ref(), 0)?)
288328
} else {
@@ -299,54 +339,6 @@ impl ScalarUDFImpl for DatePartFunc {
299339
}
300340
}
301341

302-
fn adjust_to_local_time<T: ArrowTimestampType>(ts: i64, tz: Tz) -> Result<i64> {
303-
fn convert_timestamp<F>(ts: i64, converter: F) -> Result<DateTime<Utc>>
304-
where
305-
F: Fn(i64) -> MappedLocalTime<DateTime<Utc>>,
306-
{
307-
match converter(ts) {
308-
MappedLocalTime::Ambiguous(earliest, latest) => exec_err!(
309-
"Ambiguous timestamp. Do you mean {:?} or {:?}",
310-
earliest,
311-
latest
312-
),
313-
MappedLocalTime::None => exec_err!(
314-
"The local time does not exist because there is a gap in the local time."
315-
),
316-
MappedLocalTime::Single(date_time) => Ok(date_time),
317-
}
318-
}
319-
320-
let date_time = match T::UNIT {
321-
Nanosecond => Utc.timestamp_nanos(ts),
322-
Microsecond => convert_timestamp(ts, |ts| Utc.timestamp_micros(ts))?,
323-
Millisecond => convert_timestamp(ts, |ts| Utc.timestamp_millis_opt(ts))?,
324-
Second => convert_timestamp(ts, |ts| Utc.timestamp_opt(ts, 0))?,
325-
};
326-
327-
let offset_seconds: i64 = tz
328-
.offset_from_utc_datetime(&date_time.naive_utc())
329-
.fix()
330-
.local_minus_utc() as i64;
331-
332-
let adjusted_date_time = date_time.add(
333-
TimeDelta::try_seconds(offset_seconds)
334-
.ok_or_else(|| internal_datafusion_err!("Offset seconds should be less than i64::MAX / 1_000 or greater than -i64::MAX / 1_000"))?,
335-
);
336-
337-
// convert back to i64
338-
match T::UNIT {
339-
Nanosecond => adjusted_date_time.timestamp_nanos_opt().ok_or_else(|| {
340-
internal_datafusion_err!(
341-
"Failed to convert DateTime to timestamp in nanosecond. This error may occur if the date is out of range. The supported date ranges are between 1677-09-21T00:12:43.145224192 and 2262-04-11T23:47:16.854775807"
342-
)
343-
}),
344-
Microsecond => Ok(adjusted_date_time.timestamp_micros()),
345-
Millisecond => Ok(adjusted_date_time.timestamp_millis()),
346-
Second => Ok(adjusted_date_time.timestamp()),
347-
}
348-
}
349-
350342
fn adjust_timestamp_array<T: ArrowTimestampType>(
351343
array: &ArrayRef,
352344
tz: Tz,
@@ -370,18 +362,108 @@ fn is_epoch(part: &str) -> bool {
370362
matches!(part.to_lowercase().as_str(), "epoch")
371363
}
372364

373-
// Try to remove quote if exist, if the quote is invalid, return original string and let the downstream function handle the error
365+
// Check if a timezone string is a fixed offset
366+
fn is_fixed_offset(tz_str: &str) -> bool {
367+
tz_str.starts_with('+') || tz_str.starts_with('-')
368+
}
369+
370+
// Holds the components of a timezone offset (hours and minutes).
371+
struct OffsetInfo {
372+
hours: i32,
373+
minutes: i32,
374+
}
375+
376+
// Extracts the offset components from a timezone string like "+04:00" or "-05:30".
377+
fn extract_offset_components(tz_str: &str) -> Option<OffsetInfo> {
378+
if tz_str.len() < 6 {
379+
return None;
380+
}
381+
382+
let sign = match &tz_str[0..1] {
383+
"+" => 1,
384+
"-" => -1,
385+
_ => return None,
386+
};
387+
388+
let hours: i32 = tz_str[1..3].parse().ok()?;
389+
let minutes: i32 = tz_str[4..6].parse().ok()?;
390+
391+
Some(OffsetInfo {
392+
hours: sign * hours,
393+
minutes: sign * minutes,
394+
})
395+
}
396+
397+
// Applies the timezone offset to hour values in an array.
398+
fn apply_hour_offset(
399+
array: &dyn Array,
400+
offset_hours: i32,
401+
offset_minutes: i32,
402+
) -> Result<ArrayRef> {
403+
let hour_array = as_int32_array(array)?;
404+
let result: Int32Array = hour_array
405+
.iter()
406+
.map(|hour| {
407+
hour.map(|h| {
408+
let mut adjusted = h + offset_hours;
409+
if offset_minutes.abs() >= 30 {
410+
adjusted += if offset_minutes > 0 { 1 } else { -1 };
411+
}
412+
((adjusted % 24) + 24) % 24
413+
})
414+
})
415+
.collect();
416+
Ok(Arc::new(result))
417+
}
418+
419+
// Applies the timezone offset to minute values in an array.
420+
fn apply_minute_offset(array: &dyn Array, offset_minutes: i32) -> Result<ArrayRef> {
421+
let minute_array = as_int32_array(array)?;
422+
let result: Int32Array = minute_array
423+
.iter()
424+
.map(|minute| {
425+
minute.map(|m| {
426+
let adjusted = m + offset_minutes;
427+
((adjusted % 60) + 60) % 60
428+
})
429+
})
430+
.collect();
431+
Ok(Arc::new(result))
432+
}
433+
434+
// Applies the timezone offset to day values in an array.
435+
fn apply_day_offset(array: &dyn Array, offset_hours: i32) -> Result<ArrayRef> {
436+
let day_array = as_int32_array(array)?;
437+
let result: Int32Array = day_array
438+
.iter()
439+
.map(|day| {
440+
day.map(|d| {
441+
if offset_hours >= 24 {
442+
d + (offset_hours / 24)
443+
} else if offset_hours <= -24 {
444+
d + (offset_hours / 24)
445+
} else if offset_hours > 0 {
446+
d + 1
447+
} else if offset_hours < 0 {
448+
d - 1
449+
} else {
450+
d
451+
}
452+
})
453+
})
454+
.collect();
455+
Ok(Arc::new(result))
456+
}
457+
458+
// Try to remove quotes if they exist. If the quotes are invalid, return original string.
374459
fn part_normalization(part: &str) -> &str {
375460
part.strip_prefix(|c| c == '\'' || c == '\"')
376461
.and_then(|s| s.strip_suffix(|c| c == '\'' || c == '\"'))
377462
.unwrap_or(part)
378463
}
379464

380-
/// Invoke [`date_part`] on an `array` (e.g. Timestamp) and convert the
381-
/// result to a total number of seconds, milliseconds, microseconds or
382-
/// nanoseconds
465+
// Converts seconds to i32 with the specified time unit.
383466
fn seconds_as_i32(array: &dyn Array, unit: TimeUnit) -> Result<ArrayRef> {
384-
// Nanosecond is neither supported in Postgres nor DuckDB, to avoid dealing
385467
// with overflow and precision issue we don't support nanosecond
386468
if unit == Nanosecond {
387469
return not_impl_err!("Date part {unit:?} not supported");
@@ -402,7 +484,6 @@ fn seconds_as_i32(array: &dyn Array, unit: TimeUnit) -> Result<ArrayRef> {
402484
};
403485

404486
let secs = date_part(array, DatePart::Second)?;
405-
// This assumes array is primitive and not a dictionary
406487
let secs = as_int32_array(secs.as_ref())?;
407488
let subsecs = date_part(array, DatePart::Nanosecond)?;
408489
let subsecs = as_int32_array(subsecs.as_ref())?;
@@ -430,11 +511,8 @@ fn seconds_as_i32(array: &dyn Array, unit: TimeUnit) -> Result<ArrayRef> {
430511
}
431512
}
432513

433-
/// Invoke [`date_part`] on an `array` (e.g. Timestamp) and convert the
434-
/// result to a total number of seconds, milliseconds, microseconds or
435-
/// nanoseconds
436-
///
437-
/// Given epoch return f64, this is a duplicated function to optimize for f64 type
514+
// Converts seconds to f64 with the specified time unit.
515+
// Used for Interval and Duration types that need floating-point precision.
438516
fn seconds(array: &dyn Array, unit: TimeUnit) -> Result<ArrayRef> {
439517
let sf = match unit {
440518
Second => 1_f64,
@@ -443,7 +521,6 @@ fn seconds(array: &dyn Array, unit: TimeUnit) -> Result<ArrayRef> {
443521
Nanosecond => 1_000_000_000_f64,
444522
};
445523
let secs = date_part(array, DatePart::Second)?;
446-
// This assumes array is primitive and not a dictionary
447524
let secs = as_int32_array(secs.as_ref())?;
448525
let subsecs = date_part(array, DatePart::Nanosecond)?;
449526
let subsecs = as_int32_array(subsecs.as_ref())?;

0 commit comments

Comments
 (0)