From 953154ed9896e224afb386b9d3d235cefbb14b5b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 12 Jan 2026 14:18:08 -0700 Subject: [PATCH 1/8] tests --- .../comet/CometHashExpressionSuite.scala | 414 ++++++++++++++++++ 1 file changed, 414 insertions(+) create mode 100644 spark/src/test/scala/org/apache/comet/CometHashExpressionSuite.scala diff --git a/spark/src/test/scala/org/apache/comet/CometHashExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometHashExpressionSuite.scala new file mode 100644 index 0000000000..d04317edb2 --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/CometHashExpressionSuite.scala @@ -0,0 +1,414 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet + +import org.scalactic.source.Position +import org.scalatest.Tag + +import org.apache.spark.sql.CometTestBase +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper + +/** + * Test suite for Spark murmur3 hash function compatibility between Spark and Comet. + * + * These tests verify that Comet's native implementation of murmur3 hash produces identical + * results to Spark's implementation for all supported data types. + */ +class CometHashExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { + + override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit + pos: Position): Unit = { + super.test(testName, testTags: _*) { + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_AUTO) { + testFun + } + } + } + + test("hash - boolean") { + withTable("t") { + sql("CREATE TABLE t(c BOOLEAN) USING parquet") + sql("INSERT INTO t VALUES (true), (false), (null)") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t ORDER BY c") + } + } + + test("hash - byte/tinyint") { + withTable("t") { + sql("CREATE TABLE t(c TINYINT) USING parquet") + sql("INSERT INTO t VALUES (1), (0), (-1), (127), (-128), (null)") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t ORDER BY c") + } + } + + test("hash - short/smallint") { + withTable("t") { + sql("CREATE TABLE t(c SMALLINT) USING parquet") + sql("INSERT INTO t VALUES (1), (0), (-1), (32767), (-32768), (null)") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t ORDER BY c") + } + } + + test("hash - int") { + withTable("t") { + sql("CREATE TABLE t(c INT) USING parquet") + sql("INSERT INTO t VALUES (1), (0), (-1), (2147483647), (-2147483648), (null)") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t ORDER BY c") + } + } + + test("hash - long/bigint") { + withTable("t") { + sql("CREATE TABLE t(c BIGINT) USING parquet") + sql( + "INSERT INTO t VALUES (1), (0), (-1), (9223372036854775807), (-9223372036854775808), (null)") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t ORDER BY c") + } + } + + test("hash - float") { + withTable("t") { + sql("CREATE TABLE t(c FLOAT) USING parquet") + sql("INSERT INTO t VALUES (1.0), (0.0), (-0.0), (-1.0), (3.14159), (null)") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t ORDER BY c") + } + } + + test("hash - double") { + withTable("t") { + sql("CREATE TABLE t(c DOUBLE) USING parquet") + sql("INSERT INTO t VALUES (1.0), (0.0), (-0.0), (-1.0), (3.14159265358979), (null)") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t ORDER BY c") + } + } + + test("hash - string") { + withTable("t") { + sql("CREATE TABLE t(c STRING) USING parquet") + sql("INSERT INTO t VALUES ('hello'), (''), ('Spark SQL'), ('苹果手机'), (null)") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t ORDER BY c") + } + } + + test("hash - binary") { + withTable("t") { + sql("CREATE TABLE t(c BINARY) USING parquet") + sql("INSERT INTO t VALUES (X''), (X'00'), (X'0102030405'), (null)") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t ORDER BY c") + } + } + + test("hash - date") { + withTable("t") { + sql("CREATE TABLE t(c DATE) USING parquet") + sql( + "INSERT INTO t VALUES (DATE '2023-01-01'), (DATE '1970-01-01'), (DATE '2000-12-31'), (null)") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t ORDER BY c") + } + } + + test("hash - timestamp") { + withTable("t") { + sql("CREATE TABLE t(c TIMESTAMP) USING parquet") + sql("""INSERT INTO t VALUES + (TIMESTAMP '2023-01-01 12:00:00'), + (TIMESTAMP '1970-01-01 00:00:00'), + (TIMESTAMP '2000-12-31 23:59:59'), + (null)""") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t ORDER BY c") + } + } + + test("hash - decimal (precision <= 18)") { + Seq((10, 2), (18, 0), (18, 10)).foreach { case (precision, scale) => + withTable("t") { + sql(s"CREATE TABLE t(c DECIMAL($precision, $scale)) USING parquet") + sql("INSERT INTO t VALUES (1.23), (-1.23), (0.0), (null)") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t ORDER BY c") + } + } + } + + test("hash - decimal (precision > 18)") { + Seq((20, 2), (38, 10)).foreach { case (precision, scale) => + withTable("t") { + sql(s"CREATE TABLE t(c DECIMAL($precision, $scale)) USING parquet") + sql("INSERT INTO t VALUES (1.23), (-1.23), (0.0), (null)") + // Large decimals may fall back to Spark, so just check the answer + checkSparkAnswer("SELECT c, hash(c) FROM t ORDER BY c") + } + } + } + + // ==================== Complex Types ==================== + // Note: The SQL hash() expression for complex types falls back to Spark execution. + // These tests verify correctness of the hash values (used by native shuffle partitioning). + + test("hash - array of integers") { + withTable("t") { + sql("CREATE TABLE t(c ARRAY) USING parquet") + sql("""INSERT INTO t VALUES + (array(1, 2, 3)), + (array(-1, 0, 1)), + (array()), + (null), + (array(null)), + (array(1, null, 3))""") + checkSparkAnswer("SELECT c, hash(c) FROM t") + } + } + + test("hash - array of strings") { + withTable("t") { + sql("CREATE TABLE t(c ARRAY) USING parquet") + sql("""INSERT INTO t VALUES + (array('hello', 'world')), + (array('Spark', 'SQL')), + (array('')), + (array()), + (null), + (array(null)), + (array('a', null, 'b'))""") + checkSparkAnswer("SELECT c, hash(c) FROM t") + } + } + + test("hash - array of doubles") { + withTable("t") { + sql("CREATE TABLE t(c ARRAY) USING parquet") + sql("""INSERT INTO t VALUES + (array(1.0, 2.0, 3.0)), + (array(-1.0, 0.0, 1.0)), + (array()), + (null)""") + checkSparkAnswer("SELECT c, hash(c) FROM t") + } + } + + test("hash - nested array (array of arrays)") { + withTable("t") { + sql("CREATE TABLE t(c ARRAY>) USING parquet") + sql("""INSERT INTO t VALUES + (array(array(1, 2), array(3, 4))), + (array(array(), array(1))), + (array()), + (null)""") + checkSparkAnswer("SELECT c, hash(c) FROM t") + } + } + + test("hash - struct") { + withTable("t") { + sql("CREATE TABLE t(c STRUCT) USING parquet") + sql("""INSERT INTO t VALUES + (named_struct('a', 1, 'b', 'hello')), + (named_struct('a', -1, 'b', '')), + (named_struct('a', null, 'b', 'test')), + (named_struct('a', 42, 'b', null)), + (null)""") + checkSparkAnswer("SELECT c, hash(c) FROM t") + } + } + + test("hash - nested struct") { + withTable("t") { + sql("CREATE TABLE t(c STRUCT>) USING parquet") + sql("""INSERT INTO t VALUES + (named_struct('a', 1, 'b', named_struct('x', 'hello', 'y', 3.14))), + (named_struct('a', 2, 'b', named_struct('x', '', 'y', 0.0))), + (named_struct('a', 3, 'b', null)), + (null)""") + checkSparkAnswer("SELECT c, hash(c) FROM t") + } + } + + test("hash - struct with array field") { + withTable("t") { + sql("CREATE TABLE t(c STRUCT>) USING parquet") + sql("""INSERT INTO t VALUES + (named_struct('a', 1, 'b', array('x', 'y'))), + (named_struct('a', 2, 'b', array())), + (named_struct('a', 3, 'b', null)), + (null)""") + checkSparkAnswer("SELECT c, hash(c) FROM t") + } + } + + test("hash - array of structs") { + withTable("t") { + sql("CREATE TABLE t(c ARRAY>) USING parquet") + sql("""INSERT INTO t VALUES + (array(named_struct('a', 1, 'b', 'x'), named_struct('a', 2, 'b', 'y'))), + (array(named_struct('a', 3, 'b', ''))), + (array()), + (null)""") + checkSparkAnswer("SELECT c, hash(c) FROM t") + } + } + + test("hash - map") { + // Spark prohibits hash on map types by default, enable legacy mode for testing + withSQLConf("spark.sql.legacy.allowHashOnMapType" -> "true") { + withTable("t") { + sql("CREATE TABLE t(c MAP) USING parquet") + sql("""INSERT INTO t VALUES + (map('a', 1, 'b', 2)), + (map('x', -1)), + (map()), + (null)""") + checkSparkAnswer("SELECT c, hash(c) FROM t") + } + } + } + + test("hash - map with complex value type") { + // Spark prohibits hash on map types by default, enable legacy mode for testing + withSQLConf("spark.sql.legacy.allowHashOnMapType" -> "true") { + withTable("t") { + sql("CREATE TABLE t(c MAP>) USING parquet") + sql("""INSERT INTO t VALUES + (map('a', array(1, 2), 'b', array(3))), + (map('x', array())), + (map()), + (null)""") + checkSparkAnswer("SELECT c, hash(c) FROM t") + } + } + } + + test("hash - multiple primitive columns") { + withTable("t") { + sql("CREATE TABLE t(a INT, b STRING, c DOUBLE) USING parquet") + sql("""INSERT INTO t VALUES + (1, 'hello', 3.14), + (2, '', 0.0), + (null, null, null), + (-1, 'test', -1.5)""") + checkSparkAnswerAndOperator( + "SELECT hash(a, b, c), hash(c, b, a), hash(a), hash(b), hash(c) FROM t") + } + } + + test("hash - multiple columns with arrays") { + withTable("t") { + sql("CREATE TABLE t(a INT, b ARRAY, c STRING) USING parquet") + sql("""INSERT INTO t VALUES + (1, array(1, 2, 3), 'hello'), + (2, array(), ''), + (null, null, null), + (3, array(-1, 0, 1), 'test')""") + checkSparkAnswer("SELECT hash(a, b, c), hash(b), hash(a, c) FROM t") + } + } + + test("hash - multiple columns with structs") { + withTable("t") { + sql("CREATE TABLE t(a INT, b STRUCT) USING parquet") + sql("""INSERT INTO t VALUES + (1, named_struct('x', 10, 'y', 'hello')), + (2, named_struct('x', 20, 'y', '')), + (null, null), + (3, named_struct('x', null, 'y', 'test'))""") + checkSparkAnswer("SELECT hash(a, b), hash(b, a), hash(b) FROM t") + } + } + + test("hash - empty strings and arrays") { + withTable("t") { + sql("CREATE TABLE t(s STRING, a ARRAY) USING parquet") + sql("INSERT INTO t VALUES ('', array()), ('a', array(1))") + checkSparkAnswer("SELECT hash(s), hash(a), hash(s, a) FROM t") + } + } + + test("hash - all nulls") { + withTable("t") { + sql("CREATE TABLE t(a INT, b STRING, c ARRAY) USING parquet") + sql("INSERT INTO t VALUES (null, null, null)") + checkSparkAnswer("SELECT hash(a), hash(b), hash(c), hash(a, b, c) FROM t") + } + } + + test("hash - with custom seed") { + withTable("t") { + sql("CREATE TABLE t(c INT) USING parquet") + sql("INSERT INTO t VALUES (1), (2), (3), (null)") + // hash() with seed 42 (default) and seed 0 + checkSparkAnswerAndOperator( + "SELECT hash(c), hash(c, 0), hash(c, 42), hash(c, -1) FROM t ORDER BY c") + } + } + + test("hash - large array") { + withTable("t") { + sql("CREATE TABLE t(c ARRAY) USING parquet") + // Create an array with 1000 elements + val largeArray = (1 to 1000).mkString("array(", ", ", ")") + sql(s"INSERT INTO t VALUES ($largeArray)") + checkSparkAnswer("SELECT hash(c) FROM t") + } + } + + test("hash - deeply nested structure") { + withTable("t") { + sql("""CREATE TABLE t(c STRUCT< + a: INT, + b: STRUCT< + x: STRING, + y: ARRAY> + > + >) USING parquet""") + sql("""INSERT INTO t VALUES + (named_struct('a', 1, 'b', named_struct('x', 'hello', 'y', + array(named_struct('p', 10, 'q', 'foo'), named_struct('p', 20, 'q', 'bar'))))), + (null)""") + checkSparkAnswer("SELECT c, hash(c) FROM t") + } + } + + test("hash - with dictionary encoding") { + Seq(true, false).foreach { dictionary => + withSQLConf("parquet.enable.dictionary" -> dictionary.toString) { + withTable("t") { + sql("CREATE TABLE t(c STRING) USING parquet") + // Repeated values to trigger dictionary encoding + sql("INSERT INTO t VALUES ('a'), ('b'), ('a'), ('b'), ('a'), ('c'), (null)") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t ORDER BY c") + } + } + } + } + + test("hash - array with dictionary encoding") { + Seq(true, false).foreach { dictionary => + withSQLConf("parquet.enable.dictionary" -> dictionary.toString) { + withTable("t") { + sql("CREATE TABLE t(c ARRAY) USING parquet") + sql("""INSERT INTO t VALUES + (array('a', 'b')), + (array('a', 'b')), + (array('c')), + (null)""") + checkSparkAnswer("SELECT c, hash(c) FROM t") + } + } + } + } +} From 72c5342fe0700fdb77958f271b0742aa9408a179 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 12 Jan 2026 14:19:10 -0700 Subject: [PATCH 2/8] add to workflow --- .github/workflows/pr_build_linux.yml | 1 + .github/workflows/pr_build_macos.yml | 1 + .../org/apache/comet/CometHashExpressionSuite.scala | 9 --------- 3 files changed, 2 insertions(+), 9 deletions(-) diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index beb5f9dcf7..9f5324b260 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -160,6 +160,7 @@ jobs: value: | org.apache.comet.CometExpressionSuite org.apache.comet.CometExpressionCoverageSuite + org.apache.comet.CometHashExpressionSuite org.apache.comet.CometTemporalExpressionSuite org.apache.comet.CometArrayExpressionSuite org.apache.comet.CometCastSuite diff --git a/.github/workflows/pr_build_macos.yml b/.github/workflows/pr_build_macos.yml index 9a45fe022d..58ba48134d 100644 --- a/.github/workflows/pr_build_macos.yml +++ b/.github/workflows/pr_build_macos.yml @@ -123,6 +123,7 @@ jobs: value: | org.apache.comet.CometExpressionSuite org.apache.comet.CometExpressionCoverageSuite + org.apache.comet.CometHashExpressionSuite org.apache.comet.CometTemporalExpressionSuite org.apache.comet.CometArrayExpressionSuite org.apache.comet.CometCastSuite diff --git a/spark/src/test/scala/org/apache/comet/CometHashExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometHashExpressionSuite.scala index d04317edb2..9abb950c70 100644 --- a/spark/src/test/scala/org/apache/comet/CometHashExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometHashExpressionSuite.scala @@ -33,15 +33,6 @@ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper */ class CometHashExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { - override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit - pos: Position): Unit = { - super.test(testName, testTags: _*) { - withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_AUTO) { - testFun - } - } - } - test("hash - boolean") { withTable("t") { sql("CREATE TABLE t(c BOOLEAN) USING parquet") From b76e3ebcb1dc0ff4d6f7c9bf91a651a5c9ee7a66 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 12 Jan 2026 14:26:38 -0700 Subject: [PATCH 3/8] save --- native/spark-expr/src/hash_funcs/murmur3.rs | 3 +- native/spark-expr/src/hash_funcs/utils.rs | 106 +++++++++++++++++- native/spark-expr/src/hash_funcs/xxhash64.rs | 3 +- .../scala/org/apache/comet/serde/hash.scala | 2 +- .../comet/CometHashExpressionSuite.scala | 37 +++--- 5 files changed, 127 insertions(+), 24 deletions(-) diff --git a/native/spark-expr/src/hash_funcs/murmur3.rs b/native/spark-expr/src/hash_funcs/murmur3.rs index 5cbd77a36b..dc0f804ab2 100644 --- a/native/spark-expr/src/hash_funcs/murmur3.rs +++ b/native/spark-expr/src/hash_funcs/murmur3.rs @@ -183,7 +183,8 @@ pub fn create_murmur3_hashes<'a>( arrays, hashes_buffer, spark_compatible_murmur3_hash, - create_hashes_dictionary + create_hashes_dictionary, + create_murmur3_hashes ); Ok(hashes_buffer) } diff --git a/native/spark-expr/src/hash_funcs/utils.rs b/native/spark-expr/src/hash_funcs/utils.rs index a1cee8465f..f50dd4b27d 100644 --- a/native/spark-expr/src/hash_funcs/utils.rs +++ b/native/spark-expr/src/hash_funcs/utils.rs @@ -206,6 +206,46 @@ macro_rules! hash_array_decimal { }; } +/// Hash a list array by recursively hashing each element. +/// For each row, we hash all elements in the list. +/// Spark hashes arrays by recursively hashing each element, where each +/// element's hash is computed using the previous element's hash as the seed. +/// This creates a chain: hash(elem_n, hash(elem_n-1, ... hash(elem_0, seed)...)) +#[macro_export] +macro_rules! hash_list_array { + ($array_type:ident, $offset_type:ty, $column: ident, $hashes: ident, $recursive_hash_method: ident) => { + let list_array = $column + .as_any() + .downcast_ref::<$array_type>() + .unwrap_or_else(|| { + panic!( + "Failed to downcast column to {}. Actual data type: {:?}.", + stringify!($array_type), + $column.data_type() + ) + }); + + let values = list_array.values(); + let offsets = list_array.offsets(); + + // For each row, hash the elements in its list + for (row_idx, hash) in $hashes.iter_mut().enumerate() { + if !list_array.is_null(row_idx) { + let start = offsets[row_idx] as usize; + let end = offsets[row_idx + 1] as usize; + let len = end - start; + // Hash each element in sequence, chaining the hash values + for elem_idx in 0..len { + let elem_array = values.slice(start + elem_idx, 1); + let mut single_hash = [*hash]; + $recursive_hash_method(&[elem_array], &mut single_hash)?; + *hash = single_hash[0]; + } + } + } + }; +} + /// Creates hash values for every row, based on the values in the /// columns. /// @@ -214,9 +254,10 @@ macro_rules! hash_array_decimal { /// /// `hash_method` is the hash function to use. /// `create_dictionary_hash_method` is the function to create hashes for dictionary arrays input. +/// `recursive_hash_method` is the function to call for recursive hashing of complex types. #[macro_export] macro_rules! create_hashes_internal { - ($arrays: ident, $hashes_buffer: ident, $hash_method: ident, $create_dictionary_hash_method: ident) => { + ($arrays: ident, $hashes_buffer: ident, $hash_method: ident, $create_dictionary_hash_method: ident, $recursive_hash_method: ident) => { use arrow::datatypes::{DataType, TimeUnit}; use arrow::array::{types::*, *}; @@ -425,6 +466,69 @@ macro_rules! create_hashes_internal { ))) } }, + DataType::List(_) => { + $crate::hash_list_array!(ListArray, i32, col, $hashes_buffer, $recursive_hash_method); + } + DataType::LargeList(_) => { + $crate::hash_list_array!(LargeListArray, i64, col, $hashes_buffer, $recursive_hash_method); + } + DataType::FixedSizeList(_, size) => { + let list_array = col.as_any().downcast_ref::().unwrap(); + let values = list_array.values(); + let list_size = *size as usize; + + // For each row, hash the elements in its fixed-size list + for (row_idx, hash) in $hashes_buffer.iter_mut().enumerate() { + if !list_array.is_null(row_idx) { + let start = row_idx * list_size; + // Hash each element in sequence, chaining the hash values + for elem_idx in 0..list_size { + let elem_array = values.slice(start + elem_idx, 1); + let mut single_hash = [*hash]; + $recursive_hash_method(&[elem_array], &mut single_hash)?; + *hash = single_hash[0]; + } + } + } + } + DataType::Struct(_) => { + let struct_array = col.as_any().downcast_ref::().unwrap(); + // Hash each field of the struct - Spark hashes all fields recursively + let columns: Vec = struct_array.columns().to_vec(); + if !columns.is_empty() { + $recursive_hash_method(&columns, $hashes_buffer)?; + } + } + DataType::Map(_, _) => { + let map_array = col.as_any().downcast_ref::().unwrap(); + // For maps, Spark hashes by iterating through (key, value) pairs + // For each entry, hash the key then the value + let keys = map_array.keys(); + let values = map_array.values(); + let offsets = map_array.offsets(); + + // For each row, hash the key-value pairs in sequence + for (row_idx, hash) in $hashes_buffer.iter_mut().enumerate() { + if !map_array.is_null(row_idx) { + let start = offsets[row_idx] as usize; + let end = offsets[row_idx + 1] as usize; + // Hash each key-value pair in sequence + for entry_idx in start..end { + // Hash the key + let key_array = keys.slice(entry_idx, 1); + let mut single_hash = [*hash]; + $recursive_hash_method(&[key_array], &mut single_hash)?; + *hash = single_hash[0]; + + // Hash the value + let value_array = values.slice(entry_idx, 1); + single_hash = [*hash]; + $recursive_hash_method(&[value_array], &mut single_hash)?; + *hash = single_hash[0]; + } + } + } + } _ => { // This is internal because we should have caught this before. return Err(DataFusionError::Internal(format!( diff --git a/native/spark-expr/src/hash_funcs/xxhash64.rs b/native/spark-expr/src/hash_funcs/xxhash64.rs index 2b518b88ef..caf0b490ed 100644 --- a/native/spark-expr/src/hash_funcs/xxhash64.rs +++ b/native/spark-expr/src/hash_funcs/xxhash64.rs @@ -129,7 +129,8 @@ fn create_xxhash64_hashes<'a>( arrays, hashes_buffer, spark_compatible_xxhash64, - create_xxhash64_hashes_dictionary + create_xxhash64_hashes_dictionary, + create_xxhash64_hashes ); Ok(hashes_buffer) } diff --git a/spark/src/main/scala/org/apache/comet/serde/hash.scala b/spark/src/main/scala/org/apache/comet/serde/hash.scala index 5cc689f39a..42a379e4ee 100644 --- a/spark/src/main/scala/org/apache/comet/serde/hash.scala +++ b/spark/src/main/scala/org/apache/comet/serde/hash.scala @@ -112,7 +112,7 @@ private object HashUtils { // Java BigDecimal before hashing withInfo(expr, s"Unsupported datatype: $dt (precision > 18)") return false - case dt if !supportedDataType(dt) => + case dt if !supportedDataType(dt, allowComplex = true) => withInfo(expr, s"Unsupported datatype $dt") return false case _ => diff --git a/spark/src/test/scala/org/apache/comet/CometHashExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometHashExpressionSuite.scala index 9abb950c70..635f3334cb 100644 --- a/spark/src/test/scala/org/apache/comet/CometHashExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometHashExpressionSuite.scala @@ -19,9 +19,6 @@ package org.apache.comet -import org.scalactic.source.Position -import org.scalatest.Tag - import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper @@ -162,7 +159,7 @@ class CometHashExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe (null), (array(null)), (array(1, null, 3))""") - checkSparkAnswer("SELECT c, hash(c) FROM t") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t") } } @@ -177,7 +174,7 @@ class CometHashExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe (null), (array(null)), (array('a', null, 'b'))""") - checkSparkAnswer("SELECT c, hash(c) FROM t") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t") } } @@ -189,7 +186,7 @@ class CometHashExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe (array(-1.0, 0.0, 1.0)), (array()), (null)""") - checkSparkAnswer("SELECT c, hash(c) FROM t") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t") } } @@ -201,7 +198,7 @@ class CometHashExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe (array(array(), array(1))), (array()), (null)""") - checkSparkAnswer("SELECT c, hash(c) FROM t") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t") } } @@ -214,7 +211,7 @@ class CometHashExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe (named_struct('a', null, 'b', 'test')), (named_struct('a', 42, 'b', null)), (null)""") - checkSparkAnswer("SELECT c, hash(c) FROM t") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t") } } @@ -226,7 +223,7 @@ class CometHashExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe (named_struct('a', 2, 'b', named_struct('x', '', 'y', 0.0))), (named_struct('a', 3, 'b', null)), (null)""") - checkSparkAnswer("SELECT c, hash(c) FROM t") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t") } } @@ -238,7 +235,7 @@ class CometHashExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe (named_struct('a', 2, 'b', array())), (named_struct('a', 3, 'b', null)), (null)""") - checkSparkAnswer("SELECT c, hash(c) FROM t") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t") } } @@ -250,7 +247,7 @@ class CometHashExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe (array(named_struct('a', 3, 'b', ''))), (array()), (null)""") - checkSparkAnswer("SELECT c, hash(c) FROM t") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t") } } @@ -264,7 +261,7 @@ class CometHashExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe (map('x', -1)), (map()), (null)""") - checkSparkAnswer("SELECT c, hash(c) FROM t") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t") } } } @@ -279,7 +276,7 @@ class CometHashExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe (map('x', array())), (map()), (null)""") - checkSparkAnswer("SELECT c, hash(c) FROM t") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t") } } } @@ -305,7 +302,7 @@ class CometHashExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe (2, array(), ''), (null, null, null), (3, array(-1, 0, 1), 'test')""") - checkSparkAnswer("SELECT hash(a, b, c), hash(b), hash(a, c) FROM t") + checkSparkAnswerAndOperator("SELECT hash(a, b, c), hash(b), hash(a, c) FROM t") } } @@ -317,7 +314,7 @@ class CometHashExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe (2, named_struct('x', 20, 'y', '')), (null, null), (3, named_struct('x', null, 'y', 'test'))""") - checkSparkAnswer("SELECT hash(a, b), hash(b, a), hash(b) FROM t") + checkSparkAnswerAndOperator("SELECT hash(a, b), hash(b, a), hash(b) FROM t") } } @@ -325,7 +322,7 @@ class CometHashExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe withTable("t") { sql("CREATE TABLE t(s STRING, a ARRAY) USING parquet") sql("INSERT INTO t VALUES ('', array()), ('a', array(1))") - checkSparkAnswer("SELECT hash(s), hash(a), hash(s, a) FROM t") + checkSparkAnswerAndOperator("SELECT hash(s), hash(a), hash(s, a) FROM t") } } @@ -333,7 +330,7 @@ class CometHashExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe withTable("t") { sql("CREATE TABLE t(a INT, b STRING, c ARRAY) USING parquet") sql("INSERT INTO t VALUES (null, null, null)") - checkSparkAnswer("SELECT hash(a), hash(b), hash(c), hash(a, b, c) FROM t") + checkSparkAnswerAndOperator("SELECT hash(a), hash(b), hash(c), hash(a, b, c) FROM t") } } @@ -353,7 +350,7 @@ class CometHashExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe // Create an array with 1000 elements val largeArray = (1 to 1000).mkString("array(", ", ", ")") sql(s"INSERT INTO t VALUES ($largeArray)") - checkSparkAnswer("SELECT hash(c) FROM t") + checkSparkAnswerAndOperator("SELECT hash(c) FROM t") } } @@ -370,7 +367,7 @@ class CometHashExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe (named_struct('a', 1, 'b', named_struct('x', 'hello', 'y', array(named_struct('p', 10, 'q', 'foo'), named_struct('p', 20, 'q', 'bar'))))), (null)""") - checkSparkAnswer("SELECT c, hash(c) FROM t") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t") } } @@ -397,7 +394,7 @@ class CometHashExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe (array('a', 'b')), (array('c')), (null)""") - checkSparkAnswer("SELECT c, hash(c) FROM t") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t") } } } From 45c679b5ae1cfde9840f27b75799778141dbc5ab Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 12 Jan 2026 15:20:35 -0700 Subject: [PATCH 4/8] fix --- .../org/apache/comet/CometHashExpressionSuite.scala | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/spark/src/test/scala/org/apache/comet/CometHashExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometHashExpressionSuite.scala index 635f3334cb..bb2d2f5ac2 100644 --- a/spark/src/test/scala/org/apache/comet/CometHashExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometHashExpressionSuite.scala @@ -19,6 +19,9 @@ package org.apache.comet +import org.scalactic.source.Position +import org.scalatest.Tag + import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper @@ -30,6 +33,15 @@ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper */ class CometHashExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { + override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit + pos: Position): Unit = { + super.test(testName, testTags: _*) { + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_AUTO) { + testFun + } + } + } + test("hash - boolean") { withTable("t") { sql("CREATE TABLE t(c BOOLEAN) USING parquet") From 86355d83e9dbcb2500916e4fe9a4ca7a9072a9cd Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 13 Jan 2026 12:15:26 -0700 Subject: [PATCH 5/8] address feedback --- native/spark-expr/src/hash_funcs/utils.rs | 70 ++++++++++++++++++++--- 1 file changed, 61 insertions(+), 9 deletions(-) diff --git a/native/spark-expr/src/hash_funcs/utils.rs b/native/spark-expr/src/hash_funcs/utils.rs index f50dd4b27d..f72d422219 100644 --- a/native/spark-expr/src/hash_funcs/utils.rs +++ b/native/spark-expr/src/hash_funcs/utils.rs @@ -228,9 +228,9 @@ macro_rules! hash_list_array { let values = list_array.values(); let offsets = list_array.offsets(); - // For each row, hash the elements in its list - for (row_idx, hash) in $hashes.iter_mut().enumerate() { - if !list_array.is_null(row_idx) { + if list_array.null_count() == 0 { + // Fast path: no nulls, skip null checks + for (row_idx, hash) in $hashes.iter_mut().enumerate() { let start = offsets[row_idx] as usize; let end = offsets[row_idx + 1] as usize; let len = end - start; @@ -242,6 +242,22 @@ macro_rules! hash_list_array { *hash = single_hash[0]; } } + } else { + // Slow path: array has nulls, check each row + for (row_idx, hash) in $hashes.iter_mut().enumerate() { + if !list_array.is_null(row_idx) { + let start = offsets[row_idx] as usize; + let end = offsets[row_idx + 1] as usize; + let len = end - start; + // Hash each element in sequence, chaining the hash values + for elem_idx in 0..len { + let elem_array = values.slice(start + elem_idx, 1); + let mut single_hash = [*hash]; + $recursive_hash_method(&[elem_array], &mut single_hash)?; + *hash = single_hash[0]; + } + } + } } }; } @@ -477,9 +493,9 @@ macro_rules! create_hashes_internal { let values = list_array.values(); let list_size = *size as usize; - // For each row, hash the elements in its fixed-size list - for (row_idx, hash) in $hashes_buffer.iter_mut().enumerate() { - if !list_array.is_null(row_idx) { + if list_array.null_count() == 0 { + // Fast path: no nulls, skip null checks + for (row_idx, hash) in $hashes_buffer.iter_mut().enumerate() { let start = row_idx * list_size; // Hash each element in sequence, chaining the hash values for elem_idx in 0..list_size { @@ -489,6 +505,20 @@ macro_rules! create_hashes_internal { *hash = single_hash[0]; } } + } else { + // Slow path: array has nulls, check each row + for (row_idx, hash) in $hashes_buffer.iter_mut().enumerate() { + if !list_array.is_null(row_idx) { + let start = row_idx * list_size; + // Hash each element in sequence, chaining the hash values + for elem_idx in 0..list_size { + let elem_array = values.slice(start + elem_idx, 1); + let mut single_hash = [*hash]; + $recursive_hash_method(&[elem_array], &mut single_hash)?; + *hash = single_hash[0]; + } + } + } } } DataType::Struct(_) => { @@ -507,9 +537,9 @@ macro_rules! create_hashes_internal { let values = map_array.values(); let offsets = map_array.offsets(); - // For each row, hash the key-value pairs in sequence - for (row_idx, hash) in $hashes_buffer.iter_mut().enumerate() { - if !map_array.is_null(row_idx) { + if map_array.null_count() == 0 { + // Fast path: no nulls, skip null checks + for (row_idx, hash) in $hashes_buffer.iter_mut().enumerate() { let start = offsets[row_idx] as usize; let end = offsets[row_idx + 1] as usize; // Hash each key-value pair in sequence @@ -527,6 +557,28 @@ macro_rules! create_hashes_internal { *hash = single_hash[0]; } } + } else { + // Slow path: array has nulls, check each row + for (row_idx, hash) in $hashes_buffer.iter_mut().enumerate() { + if !map_array.is_null(row_idx) { + let start = offsets[row_idx] as usize; + let end = offsets[row_idx + 1] as usize; + // Hash each key-value pair in sequence + for entry_idx in start..end { + // Hash the key + let key_array = keys.slice(entry_idx, 1); + let mut single_hash = [*hash]; + $recursive_hash_method(&[key_array], &mut single_hash)?; + *hash = single_hash[0]; + + // Hash the value + let value_array = values.slice(entry_idx, 1); + single_hash = [*hash]; + $recursive_hash_method(&[value_array], &mut single_hash)?; + *hash = single_hash[0]; + } + } + } } } _ => { From a31b2c068d5a44e26ea45fd1a6f542693a34e4d2 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 13 Jan 2026 13:12:54 -0700 Subject: [PATCH 6/8] remove comment --- .../scala/org/apache/comet/CometHashExpressionSuite.scala | 4 ---- 1 file changed, 4 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometHashExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometHashExpressionSuite.scala index bb2d2f5ac2..f2c077ccd7 100644 --- a/spark/src/test/scala/org/apache/comet/CometHashExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometHashExpressionSuite.scala @@ -157,10 +157,6 @@ class CometHashExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe } } - // ==================== Complex Types ==================== - // Note: The SQL hash() expression for complex types falls back to Spark execution. - // These tests verify correctness of the hash values (used by native shuffle partitioning). - test("hash - array of integers") { withTable("t") { sql("CREATE TABLE t(c ARRAY) USING parquet") From 078b204750eda364ade362e97cbbb197f2535ebc Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 13 Jan 2026 15:35:46 -0700 Subject: [PATCH 7/8] fuzz test and bug fix --- .../scala/org/apache/comet/serde/hash.scala | 35 ++++++++---- .../comet/CometHashExpressionSuite.scala | 53 +++++++++++++++++++ 2 files changed, 77 insertions(+), 11 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/hash.scala b/spark/src/main/scala/org/apache/comet/serde/hash.scala index 42a379e4ee..b059199735 100644 --- a/spark/src/main/scala/org/apache/comet/serde/hash.scala +++ b/spark/src/main/scala/org/apache/comet/serde/hash.scala @@ -20,7 +20,7 @@ package org.apache.comet.serde import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Murmur3Hash, Sha1, Sha2, XxHash64} -import org.apache.spark.sql.types.{DecimalType, IntegerType, LongType, StringType} +import org.apache.spark.sql.types.{ArrayType, DataType, DecimalType, IntegerType, LongType, MapType, StringType, StructType} import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, scalarFunctionExprToProtoWithReturnType, serializeDataType, supportedDataType} @@ -106,18 +106,31 @@ object CometSha1 extends CometExpressionSerde[Sha1] { private object HashUtils { def isSupportedType(expr: Expression): Boolean = { for (child <- expr.children) { - child.dataType match { - case dt: DecimalType if dt.precision > 18 => - // Spark converts decimals with precision > 18 into - // Java BigDecimal before hashing - withInfo(expr, s"Unsupported datatype: $dt (precision > 18)") - return false - case dt if !supportedDataType(dt, allowComplex = true) => - withInfo(expr, s"Unsupported datatype $dt") - return false - case _ => + if (!isSupportedDataType(expr, child.dataType)) { + return false } } true } + + private def isSupportedDataType(expr: Expression, dt: DataType): Boolean = { + dt match { + case d: DecimalType if d.precision > 18 => + // Spark converts decimals with precision > 18 into + // Java BigDecimal before hashing + withInfo(expr, s"Unsupported datatype: $dt (precision > 18)") + false + case s: StructType => + s.fields.forall(f => isSupportedDataType(expr, f.dataType)) + case a: ArrayType => + isSupportedDataType(expr, a.elementType) + case m: MapType => + isSupportedDataType(expr, m.keyType) && isSupportedDataType(expr, m.valueType) + case _ if !supportedDataType(dt, allowComplex = true) => + withInfo(expr, s"Unsupported datatype $dt") + false + case _ => + true + } + } } diff --git a/spark/src/test/scala/org/apache/comet/CometHashExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometHashExpressionSuite.scala index f2c077ccd7..563ee18520 100644 --- a/spark/src/test/scala/org/apache/comet/CometHashExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometHashExpressionSuite.scala @@ -19,12 +19,16 @@ package org.apache.comet +import scala.util.Random + import org.scalactic.source.Position import org.scalatest.Tag import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator, ParquetGenerator, SchemaGenOptions} + /** * Test suite for Spark murmur3 hash function compatibility between Spark and Comet. * @@ -157,6 +161,35 @@ class CometHashExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe } } + test("hash - array of decimal (precision > 18) falls back to Spark") { + withTable("t") { + sql("CREATE TABLE t(c ARRAY) USING parquet") + sql("INSERT INTO t VALUES (array(1.23, 2.34)), (null)") + // Should fall back to Spark due to nested high-precision decimal + checkSparkAnswerAndFallbackReason("SELECT c, hash(c) FROM t", "precision > 18") + } + } + + test("hash - struct with decimal (precision > 18) falls back to Spark") { + withTable("t") { + sql("CREATE TABLE t(c STRUCT) USING parquet") + sql("INSERT INTO t VALUES (named_struct('a', 1, 'b', 1.23)), (null)") + // Should fall back to Spark due to nested high-precision decimal + checkSparkAnswerAndFallbackReason("SELECT c, hash(c) FROM t", "precision > 18") + } + } + + test("hash - map with decimal (precision > 18) value falls back to Spark") { + withSQLConf("spark.sql.legacy.allowHashOnMapType" -> "true") { + withTable("t") { + sql("CREATE TABLE t(c MAP) USING parquet") + sql("INSERT INTO t VALUES (map('a', 1.23)), (null)") + // Should fall back to Spark due to nested high-precision decimal + checkSparkAnswerAndFallbackReason("SELECT c, hash(c) FROM t", "precision > 18") + } + } + } + test("hash - array of integers") { withTable("t") { sql("CREATE TABLE t(c ARRAY) USING parquet") @@ -407,4 +440,24 @@ class CometHashExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe } } } + + test("hash - fuzz test") { + val r = new Random(42) + val options = SchemaGenOptions(generateStruct = true) + val schema = FuzzDataGenerator.generateNestedSchema(r, 50, 1, 2, options) + withTempPath { filename => + ParquetGenerator.makeParquetFile( + r, + spark, + filename.toString, + schema, + 1000, + DataGenOptions()) + spark.read.parquet(filename.toString).createOrReplaceTempView("t1") + for (col <- schema.fields) { + val name = col.name + checkSparkAnswer(s"select $name, hash($name) from t1 order by $name") + } + } + } } From a50e09c18ea2c55c0ec6f235a6aa6146aaa561be Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 14 Jan 2026 10:07:43 -0700 Subject: [PATCH 8/8] microbenchmarks --- .../CometHashExpressionBenchmark.scala | 167 ++++++++++++++++++ 1 file changed, 167 insertions(+) diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometHashExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometHashExpressionBenchmark.scala index c230e44c4e..68ee3c1d1e 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometHashExpressionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometHashExpressionBenchmark.scala @@ -70,5 +70,172 @@ object CometHashExpressionBenchmark extends CometBenchmarkBase { } } } + + runMurmur3HashBenchmarks(values) + } + + /** + * Comprehensive benchmarks for murmur3 hash function across different data types. These + * benchmarks cover primitive types, complex types (arrays, structs), and nested structures to + * measure hash performance comprehensively. + */ + private def runMurmur3HashBenchmarks(values: Int): Unit = { + // Primitive type benchmarks + runPrimitiveTypeBenchmarks(values) + // Complex type benchmarks (arrays, structs) + runComplexTypeBenchmarks(values) + // Nested structure benchmarks + runNestedTypeBenchmarks(values) + } + + private def runPrimitiveTypeBenchmarks(values: Int): Unit = { + runBenchmarkWithTable("Murmur3 hash - primitive types", values) { v => + withTempPath { dir => + withTempTable("parquetV1Table") { + prepareTable( + dir, + spark.sql(s""" + SELECT + CASE WHEN value % 100 = 0 THEN NULL ELSE CAST(value % 2 = 0 AS BOOLEAN) END AS c_bool, + CASE WHEN value % 100 = 1 THEN NULL ELSE CAST(value % 128 AS TINYINT) END AS c_byte, + CASE WHEN value % 100 = 2 THEN NULL ELSE CAST(value % 32768 AS SMALLINT) END AS c_short, + CASE WHEN value % 100 = 3 THEN NULL ELSE CAST(value AS INT) END AS c_int, + CASE WHEN value % 100 = 4 THEN NULL ELSE CAST(value * 1000 AS BIGINT) END AS c_long, + CASE WHEN value % 100 = 5 THEN NULL ELSE CAST(value * 1.5 AS FLOAT) END AS c_float, + CASE WHEN value % 100 = 6 THEN NULL ELSE CAST(value * 1.5 AS DOUBLE) END AS c_double, + CASE WHEN value % 100 = 7 THEN NULL ELSE CONCAT('str_', CAST(value AS STRING)) END AS c_string, + CASE WHEN value % 100 = 8 THEN NULL ELSE CAST(CONCAT('bin_', CAST(value AS STRING)) AS BINARY) END AS c_binary, + CASE WHEN value % 100 = 9 THEN NULL ELSE DATE_ADD(DATE '2020-01-01', CAST(value % 1000 AS INT)) END AS c_date, + CASE WHEN value % 100 = 10 THEN NULL ELSE CAST(value AS DECIMAL(10, 2)) END AS c_decimal + FROM $tbl + """)) + + val primitiveHashBenchmarks = List( + HashExprConfig("hash_boolean", "SELECT hash(c_bool) FROM parquetV1Table"), + HashExprConfig("hash_byte", "SELECT hash(c_byte) FROM parquetV1Table"), + HashExprConfig("hash_short", "SELECT hash(c_short) FROM parquetV1Table"), + HashExprConfig("hash_int", "SELECT hash(c_int) FROM parquetV1Table"), + HashExprConfig("hash_long", "SELECT hash(c_long) FROM parquetV1Table"), + HashExprConfig("hash_float", "SELECT hash(c_float) FROM parquetV1Table"), + HashExprConfig("hash_double", "SELECT hash(c_double) FROM parquetV1Table"), + HashExprConfig("hash_string", "SELECT hash(c_string) FROM parquetV1Table"), + HashExprConfig("hash_binary", "SELECT hash(c_binary) FROM parquetV1Table"), + HashExprConfig("hash_date", "SELECT hash(c_date) FROM parquetV1Table"), + HashExprConfig("hash_decimal", "SELECT hash(c_decimal) FROM parquetV1Table")) + + primitiveHashBenchmarks.foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } + } + } + } + + private def runComplexTypeBenchmarks(values: Int): Unit = { + runBenchmarkWithTable("Murmur3 hash - complex types", values) { v => + withTempPath { dir => + withTempTable("parquetV1Table") { + prepareTable( + dir, + spark.sql(s""" + SELECT + CASE WHEN value % 100 = 0 THEN NULL + ELSE array(CAST(value AS INT), CAST(value + 1 AS INT), CAST(value + 2 AS INT)) + END AS c_array_int, + CASE WHEN value % 100 = 1 THEN NULL + ELSE array(CONCAT('s', CAST(value AS STRING)), CONCAT('t', CAST(value AS STRING))) + END AS c_array_string, + CASE WHEN value % 100 = 2 THEN NULL + ELSE array(CAST(value * 1.1 AS DOUBLE), CAST(value * 2.2 AS DOUBLE)) + END AS c_array_double, + CASE WHEN value % 100 = 3 THEN NULL + ELSE named_struct('a', CAST(value AS INT), 'b', CONCAT('str_', CAST(value AS STRING))) + END AS c_struct, + CASE WHEN value % 100 = 4 THEN NULL + ELSE named_struct( + 'x', CAST(value AS INT), + 'y', CAST(value * 1.5 AS DOUBLE), + 'z', CAST(value % 2 = 0 AS BOOLEAN) + ) + END AS c_struct_multi + FROM $tbl + """)) + + val complexHashBenchmarks = List( + HashExprConfig("hash_array_int", "SELECT hash(c_array_int) FROM parquetV1Table"), + HashExprConfig( + "hash_array_string", + "SELECT hash(c_array_string) FROM parquetV1Table"), + HashExprConfig( + "hash_array_double", + "SELECT hash(c_array_double) FROM parquetV1Table"), + HashExprConfig("hash_struct", "SELECT hash(c_struct) FROM parquetV1Table"), + HashExprConfig( + "hash_struct_multi_fields", + "SELECT hash(c_struct_multi) FROM parquetV1Table")) + + complexHashBenchmarks.foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } + } + } + } + + private def runNestedTypeBenchmarks(values: Int): Unit = { + runBenchmarkWithTable("Murmur3 hash - nested types", values) { v => + withTempPath { dir => + withTempTable("parquetV1Table") { + prepareTable( + dir, + spark.sql(s""" + SELECT + CASE WHEN value % 100 = 0 THEN NULL + ELSE array( + array(CAST(value AS INT), CAST(value + 1 AS INT)), + array(CAST(value + 2 AS INT)) + ) + END AS c_nested_array, + CASE WHEN value % 100 = 1 THEN NULL + ELSE named_struct( + 'a', CAST(value AS INT), + 'b', named_struct('x', CONCAT('s', CAST(value AS STRING)), 'y', CAST(value * 1.5 AS DOUBLE)) + ) + END AS c_nested_struct, + CASE WHEN value % 100 = 2 THEN NULL + ELSE named_struct( + 'id', CAST(value AS INT), + 'items', array(CONCAT('item_', CAST(value AS STRING)), CONCAT('item_', CAST(value + 1 AS STRING))) + ) + END AS c_struct_with_array, + CASE WHEN value % 100 = 3 THEN NULL + ELSE array( + named_struct('k', CAST(value AS INT), 'v', CONCAT('val_', CAST(value AS STRING))), + named_struct('k', CAST(value + 1 AS INT), 'v', CONCAT('val_', CAST(value + 1 AS STRING))) + ) + END AS c_array_of_struct + FROM $tbl + """)) + + val nestedHashBenchmarks = List( + HashExprConfig( + "hash_nested_array", + "SELECT hash(c_nested_array) FROM parquetV1Table"), + HashExprConfig( + "hash_nested_struct", + "SELECT hash(c_nested_struct) FROM parquetV1Table"), + HashExprConfig( + "hash_struct_with_array", + "SELECT hash(c_struct_with_array) FROM parquetV1Table"), + HashExprConfig( + "hash_array_of_struct", + "SELECT hash(c_array_of_struct) FROM parquetV1Table")) + + nestedHashBenchmarks.foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } + } + } } }