Skip to content

Commit fc7d090

Browse files
authored
Spark date part (#19823)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #19822 . - Part of #15914 ## Rationale for this change The current date_part function in datafusion have a few differences with the spark implementation: - day of week parts are 1 indexed in spark but 0 indexed in datafusion - spark supports a few more aliases for certain parts Full list of spark supported aliases: https://github.com/apache/spark/blob/a03bedb6c1281c5263a42bfd20608d2ee005ab05/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala#L3356-L3371 ## What changes are included in this PR? New date_part function in spark crate. ## Are these changes tested? Yes with SLT ## Are there any user-facing changes? yes
1 parent 8b179d9 commit fc7d090

File tree

7 files changed

+436
-109
lines changed

7 files changed

+436
-109
lines changed

datafusion/functions/src/datetime/date_part.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ impl ScalarUDFImpl for DatePartFunc {
152152

153153
fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
154154
let [field, _] = take_function_args(self.name(), args.scalar_arguments)?;
155+
let nullable = args.arg_fields[1].is_nullable();
155156

156157
field
157158
.and_then(|sv| {
@@ -160,9 +161,9 @@ impl ScalarUDFImpl for DatePartFunc {
160161
.filter(|s| !s.is_empty())
161162
.map(|part| {
162163
if is_epoch(part) {
163-
Field::new(self.name(), DataType::Float64, true)
164+
Field::new(self.name(), DataType::Float64, nullable)
164165
} else {
165-
Field::new(self.name(), DataType::Int32, true)
166+
Field::new(self.name(), DataType::Int32, nullable)
166167
}
167168
})
168169
})
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow::datatypes::{DataType, Field, FieldRef};
19+
use datafusion_common::types::logical_date;
20+
use datafusion_common::{
21+
Result, ScalarValue, internal_err, types::logical_string, utils::take_function_args,
22+
};
23+
use datafusion_expr::expr::ScalarFunction;
24+
use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyContext};
25+
use datafusion_expr::{
26+
Coercion, ColumnarValue, Expr, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl,
27+
Signature, TypeSignature, TypeSignatureClass, Volatility,
28+
};
29+
use std::{any::Any, sync::Arc};
30+
31+
/// Wrapper around datafusion date_part function to handle
32+
/// Spark behavior returning day of the week 1-indexed instead of 0-indexed and different part aliases.
33+
/// <https://spark.apache.org/docs/latest/api/sql/index.html#date_part>
34+
#[derive(Debug, PartialEq, Eq, Hash)]
35+
pub struct SparkDatePart {
36+
signature: Signature,
37+
aliases: Vec<String>,
38+
}
39+
40+
impl Default for SparkDatePart {
41+
fn default() -> Self {
42+
Self::new()
43+
}
44+
}
45+
46+
impl SparkDatePart {
47+
pub fn new() -> Self {
48+
Self {
49+
signature: Signature::one_of(
50+
vec![
51+
TypeSignature::Coercible(vec![
52+
Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
53+
Coercion::new_exact(TypeSignatureClass::Timestamp),
54+
]),
55+
TypeSignature::Coercible(vec![
56+
Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
57+
Coercion::new_exact(TypeSignatureClass::Native(logical_date())),
58+
]),
59+
],
60+
Volatility::Immutable,
61+
),
62+
aliases: vec![String::from("datepart")],
63+
}
64+
}
65+
}
66+
67+
impl ScalarUDFImpl for SparkDatePart {
68+
fn as_any(&self) -> &dyn Any {
69+
self
70+
}
71+
72+
fn name(&self) -> &str {
73+
"date_part"
74+
}
75+
76+
fn aliases(&self) -> &[String] {
77+
&self.aliases
78+
}
79+
80+
fn signature(&self) -> &Signature {
81+
&self.signature
82+
}
83+
84+
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
85+
internal_err!("Use return_field_from_args in this case instead.")
86+
}
87+
88+
fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
89+
let nullable = args.arg_fields.iter().any(|f| f.is_nullable());
90+
91+
Ok(Arc::new(Field::new(self.name(), DataType::Int32, nullable)))
92+
}
93+
94+
fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue> {
95+
internal_err!("spark date_part should have been simplified to standard date_part")
96+
}
97+
98+
fn simplify(
99+
&self,
100+
args: Vec<Expr>,
101+
_info: &SimplifyContext,
102+
) -> Result<ExprSimplifyResult> {
103+
let [part_expr, date_expr] = take_function_args(self.name(), args)?;
104+
105+
let part = match part_expr.as_literal() {
106+
Some(ScalarValue::Utf8(Some(v)))
107+
| Some(ScalarValue::Utf8View(Some(v)))
108+
| Some(ScalarValue::LargeUtf8(Some(v))) => v.to_lowercase(),
109+
_ => {
110+
return internal_err!(
111+
"First argument of `DATE_PART` must be non-null scalar Utf8"
112+
);
113+
}
114+
};
115+
116+
// Map Spark-specific date part aliases to datafusion ones
117+
let part = match part.as_str() {
118+
"yearofweek" | "year_iso" => "isoyear",
119+
"dayofweek" => "dow",
120+
"dayofweek_iso" | "dow_iso" => "isodow",
121+
other => other,
122+
};
123+
124+
let part_expr = Expr::Literal(ScalarValue::new_utf8(part), None);
125+
126+
let date_part_expr = Expr::ScalarFunction(ScalarFunction::new_udf(
127+
datafusion_functions::datetime::date_part(),
128+
vec![part_expr, date_expr],
129+
));
130+
131+
match part {
132+
// Add 1 for day-of-week parts to convert 0-indexed to 1-indexed
133+
"dow" | "isodow" => Ok(ExprSimplifyResult::Simplified(
134+
date_part_expr + Expr::Literal(ScalarValue::Int32(Some(1)), None),
135+
)),
136+
_ => Ok(ExprSimplifyResult::Simplified(date_part_expr)),
137+
}
138+
}
139+
}

datafusion/spark/src/function/datetime/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
// under the License.
1717

1818
pub mod date_add;
19+
pub mod date_part;
1920
pub mod date_sub;
2021
pub mod extract;
2122
pub mod last_day;
@@ -36,6 +37,7 @@ make_udf_function!(last_day::SparkLastDay, last_day);
3637
make_udf_function!(make_dt_interval::SparkMakeDtInterval, make_dt_interval);
3738
make_udf_function!(make_interval::SparkMakeInterval, make_interval);
3839
make_udf_function!(next_day::SparkNextDay, next_day);
40+
make_udf_function!(date_part::SparkDatePart, date_part);
3941

4042
pub mod expr_fn {
4143
use datafusion_functions::export_functions;
@@ -83,6 +85,11 @@ pub mod expr_fn {
8385
"Returns the first date which is later than start_date and named as indicated. The function returns NULL if at least one of the input parameters is NULL.",
8486
arg1 arg2
8587
));
88+
export_functions!((
89+
date_part,
90+
"Extracts a part of the date or time from a date, time, or timestamp expression.",
91+
arg1 arg2
92+
));
8693
}
8794

8895
pub fn functions() -> Vec<Arc<ScalarUDF>> {
@@ -96,5 +103,6 @@ pub fn functions() -> Vec<Arc<ScalarUDF>> {
96103
make_dt_interval(),
97104
make_interval(),
98105
next_day(),
106+
date_part(),
99107
]
100108
}

datafusion/spark/src/lib.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,24 @@
9292
//! let expr = sha2(col("my_data"), lit(256));
9393
//! ```
9494
//!
95+
//! # Example: using the Spark expression planner
96+
//!
97+
//! The [`planner::SparkFunctionPlanner`] provides Spark-compatible expression
98+
//! planning, such as mapping SQL `EXTRACT` expressions to Spark's `date_part`
99+
//! function. To use it, register it with your session context:
100+
//!
101+
//! ```ignore
102+
//! use std::sync::Arc;
103+
//! use datafusion::prelude::SessionContext;
104+
//! use datafusion_spark::planner::SparkFunctionPlanner;
105+
//!
106+
//! let mut ctx = SessionContext::new();
107+
//! // Register the Spark expression planner
108+
//! ctx.register_expr_planner(Arc::new(SparkFunctionPlanner))?;
109+
//! // Now EXTRACT expressions will use Spark semantics
110+
//! let df = ctx.sql("SELECT EXTRACT(YEAR FROM timestamp_col) FROM my_table").await?;
111+
//! ```
112+
//!
95113
//![`Expr`]: datafusion_expr::Expr
96114
97115
pub mod function;

datafusion/spark/src/planner.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,15 @@ use datafusion_expr::planner::{ExprPlanner, PlannerResult};
2323
pub struct SparkFunctionPlanner;
2424

2525
impl ExprPlanner for SparkFunctionPlanner {
26+
fn plan_extract(
27+
&self,
28+
args: Vec<Expr>,
29+
) -> datafusion_common::Result<PlannerResult<Vec<Expr>>> {
30+
Ok(PlannerResult::Planned(Expr::ScalarFunction(
31+
ScalarFunction::new_udf(crate::function::datetime::date_part(), args),
32+
)))
33+
}
34+
2635
fn plan_substring(
2736
&self,
2837
args: Vec<Expr>,

0 commit comments

Comments
 (0)