Skip to content

Commit ab8f9e5

Browse files
authored
Merge branch 'main' into feat/levenshtein-expression
2 parents 58dccd9 + 442d3fb commit ab8f9e5

286 files changed

Lines changed: 27884 additions & 20 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/pr_build_linux.yml

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,10 @@ jobs:
9797
- name: "Spark 4.0, JDK 21"
9898
java_version: "21"
9999
maven_opts: "-Pspark-4.0"
100-
# Spark 4.1 is intentionally absent: the lint job invokes -Psemanticdb,
101-
# but semanticdb-scalac_2.13.17 is not yet published, so we cannot
102-
# currently run scalafix against the spark-4.1 profile.
100+
# Spark 4.1 and 4.2 are intentionally absent: the lint job invokes -Psemanticdb,
101+
# but semanticdb-scalac for those Scala patch versions (2.13.17 / 2.13.18) is not
102+
# yet published, so we cannot currently run scalafix against the spark-4.1 or
103+
# spark-4.2 profiles.
103104
fail-fast: false
104105
steps:
105106
- uses: runs-on/action@742bf56072eb4845a0f94b3394673e4903c90ff0 # v2.1.0
@@ -305,6 +306,11 @@ jobs:
305306
java_version: "17"
306307
maven_opts: "-Pspark-4.1"
307308
scan_impl: "auto"
309+
310+
- name: "Spark 4.2, JDK 17"
311+
java_version: "17"
312+
maven_opts: "-Pspark-4.2"
313+
scan_impl: "auto"
308314
suite:
309315
- name: "fuzz"
310316
value: |

.github/workflows/pr_build_macos.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,12 @@ jobs:
144144
# runtime; the scala-2.13 profile would override it back to 2.13.16 and break.
145145
maven_opts: "-Pspark-4.1"
146146

147+
- name: "Spark 4.2, JDK 17, Scala 2.13"
148+
java_version: "17"
149+
# The spark-4.2 profile pins Scala to 2.13.18 to match Spark 4.2.0-preview4's
150+
# runtime; the scala-2.13 profile would override it back to 2.13.16 and break.
151+
maven_opts: "-Pspark-4.2"
152+
147153
suite:
148154
- name: "fuzz"
149155
value: |

docs/source/contributor-guide/spark_expressions_support.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@
210210
- [ ] second
211211
- [ ] timestamp_micros
212212
- [ ] timestamp_millis
213-
- [ ] timestamp_seconds
213+
- [x] timestamp_seconds
214214
- [ ] to_date
215215
- [ ] to_timestamp
216216
- [ ] to_timestamp_ltz

native/spark-expr/src/comet_scalar_funcs.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use crate::{
2626
spark_lpad, spark_make_decimal, spark_read_side_padding, spark_round, spark_rpad, spark_unhex,
2727
spark_unscaled_value, EvalMode, SparkArrayCompact, SparkArrayPositionFunc, SparkArraysOverlap,
2828
SparkContains, SparkDateDiff, SparkDateFromUnixDate, SparkDateTrunc, SparkMakeDate,
29-
SparkSizeFunc,
29+
SparkSecondsToTimestamp, SparkSizeFunc,
3030
};
3131
use arrow::datatypes::DataType;
3232
use datafusion::common::{DataFusionError, Result as DataFusionResult};
@@ -218,6 +218,7 @@ fn all_scalar_functions() -> Vec<Arc<ScalarUDF>> {
218218
Arc::new(ScalarUDF::new_from_impl(SparkDateFromUnixDate::default())),
219219
Arc::new(ScalarUDF::new_from_impl(SparkDateTrunc::default())),
220220
Arc::new(ScalarUDF::new_from_impl(SparkMakeDate::default())),
221+
Arc::new(ScalarUDF::new_from_impl(SparkSecondsToTimestamp::default())),
221222
Arc::new(ScalarUDF::new_from_impl(SparkSizeFunc::default())),
222223
]
223224
}

native/spark-expr/src/datetime_funcs/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ mod date_trunc;
2121
mod extract_date_part;
2222
mod hours;
2323
mod make_date;
24+
mod seconds_to_timestamp;
2425
mod timestamp_trunc;
2526
mod unix_timestamp;
2627

@@ -32,5 +33,6 @@ pub use extract_date_part::SparkMinute;
3233
pub use extract_date_part::SparkSecond;
3334
pub use hours::SparkHoursTransform;
3435
pub use make_date::SparkMakeDate;
36+
pub use seconds_to_timestamp::SparkSecondsToTimestamp;
3537
pub use timestamp_trunc::TimestampTruncExpr;
3638
pub use unix_timestamp::SparkUnixTimestamp;
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow::array::{
19+
Array, Float32Array, Float64Array, Int32Array, Int64Array, TimestampMicrosecondArray,
20+
};
21+
use arrow::compute::try_unary;
22+
use arrow::datatypes::{DataType, TimeUnit};
23+
use datafusion::common::{utils::take_function_args, DataFusionError, Result, ScalarValue};
24+
use datafusion::logical_expr::{
25+
ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility,
26+
};
27+
use std::any::Any;
28+
use std::sync::Arc;
29+
30+
const MICROS_PER_SECOND: i64 = 1_000_000;
31+
32+
/// Spark-compatible seconds_to_timestamp (timestamp_seconds) function.
33+
/// Converts seconds since Unix epoch to a timestamp.
34+
#[derive(Debug, PartialEq, Eq, Hash)]
35+
pub struct SparkSecondsToTimestamp {
36+
signature: Signature,
37+
aliases: Vec<String>,
38+
}
39+
40+
impl SparkSecondsToTimestamp {
41+
pub fn new() -> Self {
42+
Self {
43+
signature: Signature::one_of(
44+
vec![
45+
TypeSignature::Exact(vec![DataType::Int32]),
46+
TypeSignature::Exact(vec![DataType::Int64]),
47+
TypeSignature::Exact(vec![DataType::Float32]),
48+
TypeSignature::Exact(vec![DataType::Float64]),
49+
],
50+
Volatility::Immutable,
51+
),
52+
aliases: vec!["timestamp_seconds".to_string()],
53+
}
54+
}
55+
}
56+
57+
impl Default for SparkSecondsToTimestamp {
58+
fn default() -> Self {
59+
Self::new()
60+
}
61+
}
62+
63+
impl ScalarUDFImpl for SparkSecondsToTimestamp {
64+
fn as_any(&self) -> &dyn Any {
65+
self
66+
}
67+
68+
fn name(&self) -> &str {
69+
"seconds_to_timestamp"
70+
}
71+
72+
fn signature(&self) -> &Signature {
73+
&self.signature
74+
}
75+
76+
fn return_type(&self, _: &[DataType]) -> Result<DataType> {
77+
Ok(DataType::Timestamp(TimeUnit::Microsecond, None))
78+
}
79+
80+
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
81+
let [seconds] = take_function_args(self.name(), args.args)?;
82+
83+
match seconds {
84+
ColumnarValue::Array(arr) => {
85+
// Handle Int32 input — no overflow possible since i32 * 1_000_000 fits in i64
86+
if let Some(int_array) = arr.as_any().downcast_ref::<Int32Array>() {
87+
let result: TimestampMicrosecondArray =
88+
try_unary(int_array, |s| Ok((s as i64) * MICROS_PER_SECOND))?;
89+
return Ok(ColumnarValue::Array(Arc::new(result)));
90+
}
91+
92+
// Handle Int64 input — error on overflow to match Spark's Math.multiplyExact
93+
if let Some(int_array) = arr.as_any().downcast_ref::<Int64Array>() {
94+
let result: TimestampMicrosecondArray = try_unary(int_array, |s| {
95+
s.checked_mul(MICROS_PER_SECOND).ok_or_else(|| {
96+
arrow::error::ArrowError::ComputeError("long overflow".to_string())
97+
})
98+
})?;
99+
return Ok(ColumnarValue::Array(Arc::new(result)));
100+
}
101+
102+
// Handle Float32 input — cast to f64 and use Float64 path
103+
if let Some(float_array) = arr.as_any().downcast_ref::<Float32Array>() {
104+
let result: arrow::array::TimestampMicrosecondArray = float_array
105+
.iter()
106+
.map(|opt| {
107+
opt.and_then(|s| {
108+
let s = s as f64;
109+
if s.is_nan() || s.is_infinite() {
110+
None
111+
} else {
112+
Some((s * (MICROS_PER_SECOND as f64)) as i64)
113+
}
114+
})
115+
})
116+
.collect();
117+
return Ok(ColumnarValue::Array(Arc::new(result)));
118+
}
119+
120+
// Handle Float64 input — NaN and Infinity return null per Spark behavior
121+
if let Some(float_array) = arr.as_any().downcast_ref::<Float64Array>() {
122+
let result: arrow::array::TimestampMicrosecondArray = float_array
123+
.iter()
124+
.map(|opt| {
125+
opt.and_then(|s| {
126+
if s.is_nan() || s.is_infinite() {
127+
None
128+
} else {
129+
Some((s * (MICROS_PER_SECOND as f64)) as i64)
130+
}
131+
})
132+
})
133+
.collect();
134+
return Ok(ColumnarValue::Array(Arc::new(result)));
135+
}
136+
137+
Err(DataFusionError::Execution(format!(
138+
"seconds_to_timestamp expects Int32, Int64, Float32 or Float64 input, got {:?}",
139+
arr.data_type()
140+
)))
141+
}
142+
ColumnarValue::Scalar(scalar) => {
143+
let ts_micros = match &scalar {
144+
ScalarValue::Int32(Some(s)) => Some((*s as i64) * MICROS_PER_SECOND),
145+
ScalarValue::Int64(Some(s)) => {
146+
Some(s.checked_mul(MICROS_PER_SECOND).ok_or_else(|| {
147+
DataFusionError::ArrowError(
148+
Box::new(arrow::error::ArrowError::ComputeError(
149+
"long overflow".to_string(),
150+
)),
151+
None,
152+
)
153+
})?)
154+
}
155+
ScalarValue::Float32(Some(s)) => {
156+
let s = *s as f64;
157+
if s.is_nan() || s.is_infinite() {
158+
None
159+
} else {
160+
Some((s * (MICROS_PER_SECOND as f64)) as i64)
161+
}
162+
}
163+
ScalarValue::Float64(Some(s)) => {
164+
if s.is_nan() || s.is_infinite() {
165+
None
166+
} else {
167+
Some((s * (MICROS_PER_SECOND as f64)) as i64)
168+
}
169+
}
170+
ScalarValue::Int32(None)
171+
| ScalarValue::Int64(None)
172+
| ScalarValue::Float32(None)
173+
| ScalarValue::Float64(None)
174+
| ScalarValue::Null => None,
175+
_ => {
176+
return Err(DataFusionError::Execution(format!(
177+
"seconds_to_timestamp expects numeric scalar input, got {:?}",
178+
scalar.data_type()
179+
)))
180+
}
181+
};
182+
Ok(ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
183+
ts_micros, None,
184+
)))
185+
}
186+
}
187+
}
188+
189+
fn aliases(&self) -> &[String] {
190+
&self.aliases
191+
}
192+
}

native/spark-expr/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,8 @@ pub use comet_scalar_funcs::{
7474
pub use csv_funcs::*;
7575
pub use datetime_funcs::{
7676
SparkDateDiff, SparkDateFromUnixDate, SparkDateTrunc, SparkHour, SparkHoursTransform,
77-
SparkMakeDate, SparkMinute, SparkSecond, SparkUnixTimestamp, TimestampTruncExpr,
77+
SparkMakeDate, SparkMinute, SparkSecond, SparkSecondsToTimestamp, SparkUnixTimestamp,
78+
TimestampTruncExpr,
7879
};
7980
pub use error::{decimal_overflow_error, SparkError, SparkErrorWithContext, SparkResult};
8081
pub use hash_funcs::*;

spark/pom.xml

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,28 @@ under the License.
300300
</profile>
301301
<profile>
302302
<id>spark-4.2</id>
303-
<!-- 4.2 preview profile is build-only; no Iceberg or Jetty test dependencies are wired up. -->
303+
<dependencies>
304+
<!-- iceberg-spark-runtime-4.2 is not yet published; reuse the 4.0 runtime -->
305+
<dependency>
306+
<groupId>org.apache.iceberg</groupId>
307+
<artifactId>iceberg-spark-runtime-4.0_${scala.binary.version}</artifactId>
308+
<version>1.10.0</version>
309+
<scope>test</scope>
310+
</dependency>
311+
<!-- Jetty 11.x for Spark 4.2 (jakarta.servlet); matches Spark 4.2.0-preview4's jetty.version -->
312+
<dependency>
313+
<groupId>org.eclipse.jetty</groupId>
314+
<artifactId>jetty-server</artifactId>
315+
<version>11.0.26</version>
316+
<scope>test</scope>
317+
</dependency>
318+
<dependency>
319+
<groupId>org.eclipse.jetty</groupId>
320+
<artifactId>jetty-servlet</artifactId>
321+
<version>11.0.26</version>
322+
<scope>test</scope>
323+
</dependency>
324+
</dependencies>
304325
</profile>
305326
<profile>
306327
<id>generate-docs</id>

spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,10 @@ object CometSparkSessionExtensions extends Logging {
169169
org.apache.spark.SPARK_VERSION >= "4.1"
170170
}
171171

172+
def isSpark42Plus: Boolean = {
173+
org.apache.spark.SPARK_VERSION >= "4.2"
174+
}
175+
172176
/**
173177
* Whether we should override Spark memory configuration for Comet. This only returns true when
174178
* Comet native execution is enabled and/or Comet shuffle is enabled and Comet doesn't use

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim {
219219
classOf[Minute] -> CometMinute,
220220
classOf[NextDay] -> CometNextDay,
221221
classOf[Second] -> CometSecond,
222+
classOf[SecondsToTimestamp] -> CometSecondsToTimestamp,
222223
classOf[TruncDate] -> CometTruncDate,
223224
classOf[TruncTimestamp] -> CometTruncTimestamp,
224225
classOf[UnixTimestamp] -> CometUnixTimestamp,

0 commit comments

Comments
 (0)