diff --git a/native/spark-expr/Cargo.toml b/native/spark-expr/Cargo.toml index fd0a211b29..630d8285ce 100644 --- a/native/spark-expr/Cargo.toml +++ b/native/spark-expr/Cargo.toml @@ -80,6 +80,10 @@ harness = false name = "padding" harness = false +[[bench]] +name = "timestamp_trunc" +harness = false + [[bench]] name = "date_trunc" harness = false diff --git a/native/spark-expr/benches/timestamp_trunc.rs b/native/spark-expr/benches/timestamp_trunc.rs new file mode 100644 index 0000000000..2089a7d4ae --- /dev/null +++ b/native/spark-expr/benches/timestamp_trunc.rs @@ -0,0 +1,131 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{Array, RecordBatch, TimestampMicrosecondArray}; +use arrow::datatypes::{Field, Schema}; +use criterion::{criterion_group, criterion_main, Criterion}; +use datafusion::common::ScalarValue; +use datafusion::physical_expr::expressions::{Column, Literal}; +use datafusion::physical_expr::PhysicalExpr; +use datafusion_comet_spark_expr::TimestampTruncExpr; +use std::hint::black_box; +use std::sync::Arc; + +const NUM_ROWS: usize = 10000; + +fn create_timestamp_batch(timezone: &str) -> RecordBatch { + // Generate timestamps spread across time + // Each row is 1 billion microseconds (~16.7 minutes) apart + // This gives us about 115 days of spread across 10000 rows + let mut vec: Vec> = Vec::with_capacity(NUM_ROWS); + for i in 0..NUM_ROWS { + if i % 100 == 0 { + // Add some nulls (1% of data) + vec.push(None); + } else { + vec.push(Some(i as i64 * 1_000_000_001)); + } + } + + let array = TimestampMicrosecondArray::from(vec).with_timezone(timezone); + let schema = Arc::new(Schema::new(vec![Field::new( + "ts", + array.data_type().clone(), + true, + )])); + + RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap() +} + +fn make_col(name: &str, index: usize) -> Arc { + Arc::new(Column::new(name, index)) +} + +fn make_format_literal(format: &str) -> Arc { + Arc::new(Literal::new(ScalarValue::Utf8(Some(format.to_string())))) +} + +fn benchmark_utc(c: &mut Criterion) { + let timezone = "UTC"; + let batch = create_timestamp_batch(timezone); + let ts_col = make_col("ts", 0); + + let formats = vec![ + "YEAR", + "QUARTER", + "MONTH", + "WEEK", + "DAY", + "HOUR", + "MINUTE", + "SECOND", + "MILLISECOND", + "MICROSECOND", + ]; + + let mut group = c.benchmark_group("timestamp_trunc_utc"); + + for format in formats { + let expr = TimestampTruncExpr::new( + Arc::clone(&ts_col), + make_format_literal(format), + timezone.to_string(), + ); + + group.bench_function(format, |b| { + b.iter(|| black_box(expr.evaluate(black_box(&batch)).unwrap())) + }); + } + + group.finish(); +} + +fn benchmark_new_york(c: &mut Criterion) { + let timezone = "America/New_York"; + let batch = create_timestamp_batch(timezone); + let ts_col = make_col("ts", 0); + + // Test key formats that showed regression in Spark benchmarks + let formats = vec!["YEAR", "MONTH", "DAY", "SECOND"]; + + let mut group = c.benchmark_group("timestamp_trunc_new_york"); + + for format in formats { + let expr = TimestampTruncExpr::new( + Arc::clone(&ts_col), + make_format_literal(format), + timezone.to_string(), + ); + + group.bench_function(format, |b| { + b.iter(|| black_box(expr.evaluate(black_box(&batch)).unwrap())) + }); + } + + group.finish(); +} + +fn config() -> Criterion { + Criterion::default() +} + +criterion_group! { + name = benches; + config = config(); + targets = benchmark_utc, benchmark_new_york +} +criterion_main!(benches); diff --git a/native/spark-expr/src/kernels/temporal.rs b/native/spark-expr/src/kernels/temporal.rs index 2668e5095a..830ad3942e 100644 --- a/native/spark-expr/src/kernels/temporal.rs +++ b/native/spark-expr/src/kernels/temporal.rs @@ -46,6 +46,69 @@ macro_rules! return_compute_error_with { // and the beginning of the Unix Epoch (1970-01-01) const DAYS_TO_UNIX_EPOCH: i32 = 719_163; +// Microseconds per time unit - used for fast arithmetic-based truncation +const MICROS_PER_MILLISECOND: i64 = 1_000; +const MICROS_PER_SECOND: i64 = 1_000_000; +const MICROS_PER_MINUTE: i64 = 60 * MICROS_PER_SECOND; +const MICROS_PER_HOUR: i64 = 60 * MICROS_PER_MINUTE; + +/// Fast arithmetic-based timestamp truncation for sub-day formats. +/// These operations are timezone-independent because they only affect +/// the time portion which is the same across all timezones. +/// +/// For example, truncating 14:30:45.123456 to MINUTE gives 14:30:00.000000 +/// regardless of whether this is interpreted as UTC or America/New_York. +#[inline] +fn truncate_micros_to_unit(micros: i64, unit_size: i64) -> i64 { + // For positive timestamps, simple floor division works + // For negative timestamps (before 1970), we need to round toward negative infinity + if micros >= 0 { + micros - (micros % unit_size) + } else { + // For negative numbers, we want floor division behavior + // e.g., -1 truncated to seconds should be -1_000_000, not 0 + let remainder = micros % unit_size; + if remainder == 0 { + micros + } else { + micros - remainder - unit_size + } + } +} + +/// Truncate timestamp array using fast arithmetic for sub-day formats. +/// Returns Ok(Some(array)) if the format was handled, Ok(None) if not applicable. +fn try_truncate_timestamp_arithmetic( + array: &PrimitiveArray, + format: &str, +) -> Result, SparkError> +where + T: ArrowTemporalType + ArrowNumericType, + i64: From, +{ + let unit_size = match format { + "MICROSECOND" => { + // Microsecond truncation is a no-op for microsecond timestamps + // Just copy the values directly + let result: TimestampMicrosecondArray = + array.iter().map(|opt| opt.map(i64::from)).collect(); + return Ok(Some(result)); + } + "MILLISECOND" => MICROS_PER_MILLISECOND, + "SECOND" => MICROS_PER_SECOND, + "MINUTE" => MICROS_PER_MINUTE, + "HOUR" => MICROS_PER_HOUR, + _ => return Ok(None), // Format requires timezone-aware handling + }; + + let result: TimestampMicrosecondArray = array + .iter() + .map(|opt| opt.map(|v| truncate_micros_to_unit(i64::from(v), unit_size))) + .collect(); + + Ok(Some(result)) +} + // Optimized date truncation functions that work directly with days since epoch // These avoid the overhead of converting to/from NaiveDateTime @@ -537,66 +600,48 @@ where T: ArrowTemporalType + ArrowNumericType, i64: From, { + let format_upper = format.to_uppercase(); + + // Fast path: sub-day truncations can use simple arithmetic + // These are timezone-independent since they only affect the time portion + if let Some(result) = try_truncate_timestamp_arithmetic(array, &format_upper)? { + return Ok(result); + } + + // Slow path: date-level truncations require timezone-aware DateTime operations let builder = TimestampMicrosecondBuilder::with_capacity(array.len()); let iter = ArrayIter::new(array); match array.data_type() { - DataType::Timestamp(TimeUnit::Microsecond, Some(tz)) => { - match format.to_uppercase().as_str() { - "YEAR" | "YYYY" | "YY" => { - as_timestamp_tz_with_op::<&PrimitiveArray, T, _>(iter, builder, tz, |dt| { - as_micros_from_unix_epoch_utc(trunc_date_to_year(dt)) - }) - } - "QUARTER" => { - as_timestamp_tz_with_op::<&PrimitiveArray, T, _>(iter, builder, tz, |dt| { - as_micros_from_unix_epoch_utc(trunc_date_to_quarter(dt)) - }) - } - "MONTH" | "MON" | "MM" => { - as_timestamp_tz_with_op::<&PrimitiveArray, T, _>(iter, builder, tz, |dt| { - as_micros_from_unix_epoch_utc(trunc_date_to_month(dt)) - }) - } - "WEEK" => { - as_timestamp_tz_with_op::<&PrimitiveArray, T, _>(iter, builder, tz, |dt| { - as_micros_from_unix_epoch_utc(trunc_date_to_week(dt)) - }) - } - "DAY" | "DD" => { - as_timestamp_tz_with_op::<&PrimitiveArray, T, _>(iter, builder, tz, |dt| { - as_micros_from_unix_epoch_utc(trunc_date_to_day(dt)) - }) - } - "HOUR" => { - as_timestamp_tz_with_op::<&PrimitiveArray, T, _>(iter, builder, tz, |dt| { - as_micros_from_unix_epoch_utc(trunc_date_to_hour(dt)) - }) - } - "MINUTE" => { - as_timestamp_tz_with_op::<&PrimitiveArray, T, _>(iter, builder, tz, |dt| { - as_micros_from_unix_epoch_utc(trunc_date_to_minute(dt)) - }) - } - "SECOND" => { - as_timestamp_tz_with_op::<&PrimitiveArray, T, _>(iter, builder, tz, |dt| { - as_micros_from_unix_epoch_utc(trunc_date_to_second(dt)) - }) - } - "MILLISECOND" => { - as_timestamp_tz_with_op::<&PrimitiveArray, T, _>(iter, builder, tz, |dt| { - as_micros_from_unix_epoch_utc(trunc_date_to_ms(dt)) - }) - } - "MICROSECOND" => { - as_timestamp_tz_with_op::<&PrimitiveArray, T, _>(iter, builder, tz, |dt| { - as_micros_from_unix_epoch_utc(trunc_date_to_microsec(dt)) - }) - } - _ => Err(SparkError::Internal(format!( - "Unsupported format: {format:?} for function 'timestamp_trunc'" - ))), + DataType::Timestamp(TimeUnit::Microsecond, Some(tz)) => match format_upper.as_str() { + "YEAR" | "YYYY" | "YY" => { + as_timestamp_tz_with_op::<&PrimitiveArray, T, _>(iter, builder, tz, |dt| { + as_micros_from_unix_epoch_utc(trunc_date_to_year(dt)) + }) } - } + "QUARTER" => { + as_timestamp_tz_with_op::<&PrimitiveArray, T, _>(iter, builder, tz, |dt| { + as_micros_from_unix_epoch_utc(trunc_date_to_quarter(dt)) + }) + } + "MONTH" | "MON" | "MM" => { + as_timestamp_tz_with_op::<&PrimitiveArray, T, _>(iter, builder, tz, |dt| { + as_micros_from_unix_epoch_utc(trunc_date_to_month(dt)) + }) + } + "WEEK" => { + as_timestamp_tz_with_op::<&PrimitiveArray, T, _>(iter, builder, tz, |dt| { + as_micros_from_unix_epoch_utc(trunc_date_to_week(dt)) + }) + } + "DAY" | "DD" => { + as_timestamp_tz_with_op::<&PrimitiveArray, T, _>(iter, builder, tz, |dt| { + as_micros_from_unix_epoch_utc(trunc_date_to_day(dt)) + }) + } + _ => Err(SparkError::Internal(format!( + "Unsupported format: {format:?} for function 'timestamp_trunc'" + ))), + }, dt => return_compute_error_with!( "Unsupported input type '{:?}' for function 'timestamp_trunc'", dt diff --git a/pr_description.md b/pr_description.md new file mode 100644 index 0000000000..d458b9dab8 --- /dev/null +++ b/pr_description.md @@ -0,0 +1,108 @@ +## Summary + +- Add a SQL-file-based test framework for expression testing with `CometSqlFileTestSuite` and `SqlFileTestParser` +- Mark queries that expose native engine bugs with `ignore` directives linked to GitHub issues +- Mark expected fallback cases with `expect_fallback` directives + +## How the SQL file test framework works + +### Overview + +`CometSqlFileTestSuite` automatically discovers `.sql` test files under `spark/src/test/resources/sql-tests/expressions/` and registers each one as a ScalaTest test case. This allows expression tests to be written as plain SQL files rather than Scala code. + +### SQL file format + +Each `.sql` file contains a sequence of **statements** and **queries**, separated by blank lines: + +```sql +-- Config: spark.comet.regexp.allowIncompatible=true +-- ConfigMatrix: parquet.enable.dictionary=false,true +-- MinSparkVersion: 3.5 + +statement +CREATE TABLE test(s string) USING parquet + +statement +INSERT INTO test VALUES ('hello'), (NULL) + +query +SELECT upper(s) FROM test + +-- literal arguments +query tolerance=1e-6 +SELECT ln(1.0), ln(2.718281828459045) +``` + +**File-level directives** (in comment headers): +- `-- Config: key=value` — sets a Spark SQL config for all queries in the file +- `-- ConfigMatrix: key=val1,val2,...` — runs the entire file once per value (generates combinatorial test cases) +- `-- MinSparkVersion: X.Y` — skips the test on older Spark versions + +**Record types:** +- `statement` — executes DDL/DML (CREATE TABLE, INSERT, etc.) without checking results +- `query` — executes a SELECT and compares Comet results against Spark, verifying both correctness and native execution coverage + +**Query assertion modes:** +- `query` — (**default**) checks that results match Spark AND that all operators ran natively in Comet +- `query spark_answer_only` — checks results match Spark but does not verify native execution +- `query tolerance=1e-6` — checks results match within a floating-point tolerance +- `query expect_fallback(reason)` — checks results match Spark AND verifies that Comet fell back to Spark with a reason string containing the given text +- `query ignore(reason)` — skips the query entirely (used for known bugs, with a link to the GitHub issue) + +### Test runner (`CometSqlFileTestSuite`) + +The test runner: +1. Discovers all `.sql` files recursively under `sql-tests/` +2. Parses each file using `SqlFileTestParser` +3. Generates test case combinations from `ConfigMatrix` (e.g., `parquet.enable.dictionary=false,true` produces two test cases per file) +4. For each test case, creates tables, runs statements, and validates queries using the specified assertion mode +5. Disables Spark's `ConstantFolding` optimizer rule so that literal-only expressions are evaluated by Comet's native engine rather than being folded away at plan time +6. Cleans up tables after each test + +### Test file organization + +Test files are organized into category subdirectories: + +``` +sql-tests/expressions/ +├── array/ (array_contains, flatten, size, ...) +├── bitwise/ (bitwise operations) +├── cast/ (type casting) +├── conditional/ (if, case_when, coalesce, predicates) +├── datetime/ (hour, date_add, trunc, ...) +├── decimal/ (decimal arithmetic) +├── hash/ (md5, sha1, sha2, hash, xxhash64) +├── map/ (get_map_value, map_from_arrays) +├── math/ (abs, sin, log, round, ...) +├── misc/ (width_bucket, scalar_subquery, ...) +├── string/ (substring, concat, like, ...) +└── struct/ (create_named_struct, json_to_structs, ...) +``` + +## Native engine issues discovered + +These issues were found by testing literal argument combinations with constant folding disabled: + +| Issue | Title | +|-------|-------| +| [#3336](https://github.com/apache/datafusion-comet/issues/3336) | Native engine panics on all-scalar inputs for hour, minute, second, unix_timestamp | +| [#3337](https://github.com/apache/datafusion-comet/issues/3337) | Native engine panics on all-scalar inputs for Substring and StringSpace | +| [#3338](https://github.com/apache/datafusion-comet/issues/3338) | Native engine panics with 'index out of bounds' on literal array expressions | +| [#3339](https://github.com/apache/datafusion-comet/issues/3339) | Native engine crashes on concat_ws with literal NULL separator | +| [#3340](https://github.com/apache/datafusion-comet/issues/3340) | Native engine crashes on literal sha2() with 'Unsupported argument types' | +| [#3341](https://github.com/apache/datafusion-comet/issues/3341) | Native engine panics on scalar bit_count() | +| [#3342](https://github.com/apache/datafusion-comet/issues/3342) | Native engine crashes on literal DateTrunc and TimestampTrunc | +| [#3343](https://github.com/apache/datafusion-comet/issues/3343) | Native engine crashes on all-literal RLIKE expression | +| [#3344](https://github.com/apache/datafusion-comet/issues/3344) | Native replace() returns wrong result for empty-string search | +| [#3345](https://github.com/apache/datafusion-comet/issues/3345) | array_contains returns wrong result for literal array with NULL cast | +| [#3346](https://github.com/apache/datafusion-comet/issues/3346) | array_contains returns null instead of false for empty array with literal value | + +## Pre-existing issues also linked from ignored tests + +| Issue | Title | +|-------|-------| +| [#3326](https://github.com/apache/datafusion-comet/issues/3326) | space() with negative input causes native crash | +| [#3327](https://github.com/apache/datafusion-comet/issues/3327) | map_from_arrays() with NULL inputs causes native crash | +| [#3331](https://github.com/apache/datafusion-comet/issues/3331) | width_bucket fails: Int32 downcast to Int64Array | +| [#3332](https://github.com/apache/datafusion-comet/issues/3332) | GetArrayItem returns incorrect results with dynamic index | + diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala index b7a14d408c..7238a4253b 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala @@ -55,10 +55,10 @@ object CometDatetimeExpressionBenchmark extends CometBenchmarkBase { spark.sql(s"select timestamp_micros(cast(value/100000 as integer)) as ts FROM $tbl")) Seq( "YEAR", - "QUARTER", "MONTH", - "WEEK", "DAY", + "WEEK", + "QUARTER", "HOUR", "MINUTE", "SECOND",