From 953154ed9896e224afb386b9d3d235cefbb14b5b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 12 Jan 2026 14:18:08 -0700 Subject: [PATCH 1/9] tests --- .../comet/CometHashExpressionSuite.scala | 414 ++++++++++++++++++ 1 file changed, 414 insertions(+) create mode 100644 spark/src/test/scala/org/apache/comet/CometHashExpressionSuite.scala diff --git a/spark/src/test/scala/org/apache/comet/CometHashExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometHashExpressionSuite.scala new file mode 100644 index 0000000000..d04317edb2 --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/CometHashExpressionSuite.scala @@ -0,0 +1,414 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet + +import org.scalactic.source.Position +import org.scalatest.Tag + +import org.apache.spark.sql.CometTestBase +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper + +/** + * Test suite for Spark murmur3 hash function compatibility between Spark and Comet. + * + * These tests verify that Comet's native implementation of murmur3 hash produces identical + * results to Spark's implementation for all supported data types. + */ +class CometHashExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { + + override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit + pos: Position): Unit = { + super.test(testName, testTags: _*) { + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_AUTO) { + testFun + } + } + } + + test("hash - boolean") { + withTable("t") { + sql("CREATE TABLE t(c BOOLEAN) USING parquet") + sql("INSERT INTO t VALUES (true), (false), (null)") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t ORDER BY c") + } + } + + test("hash - byte/tinyint") { + withTable("t") { + sql("CREATE TABLE t(c TINYINT) USING parquet") + sql("INSERT INTO t VALUES (1), (0), (-1), (127), (-128), (null)") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t ORDER BY c") + } + } + + test("hash - short/smallint") { + withTable("t") { + sql("CREATE TABLE t(c SMALLINT) USING parquet") + sql("INSERT INTO t VALUES (1), (0), (-1), (32767), (-32768), (null)") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t ORDER BY c") + } + } + + test("hash - int") { + withTable("t") { + sql("CREATE TABLE t(c INT) USING parquet") + sql("INSERT INTO t VALUES (1), (0), (-1), (2147483647), (-2147483648), (null)") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t ORDER BY c") + } + } + + test("hash - long/bigint") { + withTable("t") { + sql("CREATE TABLE t(c BIGINT) USING parquet") + sql( + "INSERT INTO t VALUES (1), (0), (-1), (9223372036854775807), (-9223372036854775808), (null)") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t ORDER BY c") + } + } + + test("hash - float") { + withTable("t") { + sql("CREATE TABLE t(c FLOAT) USING parquet") + sql("INSERT INTO t VALUES (1.0), (0.0), (-0.0), (-1.0), (3.14159), (null)") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t ORDER BY c") + } + } + + test("hash - double") { + withTable("t") { + sql("CREATE TABLE t(c DOUBLE) USING parquet") + sql("INSERT INTO t VALUES (1.0), (0.0), (-0.0), (-1.0), (3.14159265358979), (null)") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t ORDER BY c") + } + } + + test("hash - string") { + withTable("t") { + sql("CREATE TABLE t(c STRING) USING parquet") + sql("INSERT INTO t VALUES ('hello'), (''), ('Spark SQL'), ('苹果手机'), (null)") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t ORDER BY c") + } + } + + test("hash - binary") { + withTable("t") { + sql("CREATE TABLE t(c BINARY) USING parquet") + sql("INSERT INTO t VALUES (X''), (X'00'), (X'0102030405'), (null)") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t ORDER BY c") + } + } + + test("hash - date") { + withTable("t") { + sql("CREATE TABLE t(c DATE) USING parquet") + sql( + "INSERT INTO t VALUES (DATE '2023-01-01'), (DATE '1970-01-01'), (DATE '2000-12-31'), (null)") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t ORDER BY c") + } + } + + test("hash - timestamp") { + withTable("t") { + sql("CREATE TABLE t(c TIMESTAMP) USING parquet") + sql("""INSERT INTO t VALUES + (TIMESTAMP '2023-01-01 12:00:00'), + (TIMESTAMP '1970-01-01 00:00:00'), + (TIMESTAMP '2000-12-31 23:59:59'), + (null)""") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t ORDER BY c") + } + } + + test("hash - decimal (precision <= 18)") { + Seq((10, 2), (18, 0), (18, 10)).foreach { case (precision, scale) => + withTable("t") { + sql(s"CREATE TABLE t(c DECIMAL($precision, $scale)) USING parquet") + sql("INSERT INTO t VALUES (1.23), (-1.23), (0.0), (null)") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t ORDER BY c") + } + } + } + + test("hash - decimal (precision > 18)") { + Seq((20, 2), (38, 10)).foreach { case (precision, scale) => + withTable("t") { + sql(s"CREATE TABLE t(c DECIMAL($precision, $scale)) USING parquet") + sql("INSERT INTO t VALUES (1.23), (-1.23), (0.0), (null)") + // Large decimals may fall back to Spark, so just check the answer + checkSparkAnswer("SELECT c, hash(c) FROM t ORDER BY c") + } + } + } + + // ==================== Complex Types ==================== + // Note: The SQL hash() expression for complex types falls back to Spark execution. + // These tests verify correctness of the hash values (used by native shuffle partitioning). + + test("hash - array of integers") { + withTable("t") { + sql("CREATE TABLE t(c ARRAY) USING parquet") + sql("""INSERT INTO t VALUES + (array(1, 2, 3)), + (array(-1, 0, 1)), + (array()), + (null), + (array(null)), + (array(1, null, 3))""") + checkSparkAnswer("SELECT c, hash(c) FROM t") + } + } + + test("hash - array of strings") { + withTable("t") { + sql("CREATE TABLE t(c ARRAY) USING parquet") + sql("""INSERT INTO t VALUES + (array('hello', 'world')), + (array('Spark', 'SQL')), + (array('')), + (array()), + (null), + (array(null)), + (array('a', null, 'b'))""") + checkSparkAnswer("SELECT c, hash(c) FROM t") + } + } + + test("hash - array of doubles") { + withTable("t") { + sql("CREATE TABLE t(c ARRAY) USING parquet") + sql("""INSERT INTO t VALUES + (array(1.0, 2.0, 3.0)), + (array(-1.0, 0.0, 1.0)), + (array()), + (null)""") + checkSparkAnswer("SELECT c, hash(c) FROM t") + } + } + + test("hash - nested array (array of arrays)") { + withTable("t") { + sql("CREATE TABLE t(c ARRAY>) USING parquet") + sql("""INSERT INTO t VALUES + (array(array(1, 2), array(3, 4))), + (array(array(), array(1))), + (array()), + (null)""") + checkSparkAnswer("SELECT c, hash(c) FROM t") + } + } + + test("hash - struct") { + withTable("t") { + sql("CREATE TABLE t(c STRUCT) USING parquet") + sql("""INSERT INTO t VALUES + (named_struct('a', 1, 'b', 'hello')), + (named_struct('a', -1, 'b', '')), + (named_struct('a', null, 'b', 'test')), + (named_struct('a', 42, 'b', null)), + (null)""") + checkSparkAnswer("SELECT c, hash(c) FROM t") + } + } + + test("hash - nested struct") { + withTable("t") { + sql("CREATE TABLE t(c STRUCT>) USING parquet") + sql("""INSERT INTO t VALUES + (named_struct('a', 1, 'b', named_struct('x', 'hello', 'y', 3.14))), + (named_struct('a', 2, 'b', named_struct('x', '', 'y', 0.0))), + (named_struct('a', 3, 'b', null)), + (null)""") + checkSparkAnswer("SELECT c, hash(c) FROM t") + } + } + + test("hash - struct with array field") { + withTable("t") { + sql("CREATE TABLE t(c STRUCT>) USING parquet") + sql("""INSERT INTO t VALUES + (named_struct('a', 1, 'b', array('x', 'y'))), + (named_struct('a', 2, 'b', array())), + (named_struct('a', 3, 'b', null)), + (null)""") + checkSparkAnswer("SELECT c, hash(c) FROM t") + } + } + + test("hash - array of structs") { + withTable("t") { + sql("CREATE TABLE t(c ARRAY>) USING parquet") + sql("""INSERT INTO t VALUES + (array(named_struct('a', 1, 'b', 'x'), named_struct('a', 2, 'b', 'y'))), + (array(named_struct('a', 3, 'b', ''))), + (array()), + (null)""") + checkSparkAnswer("SELECT c, hash(c) FROM t") + } + } + + test("hash - map") { + // Spark prohibits hash on map types by default, enable legacy mode for testing + withSQLConf("spark.sql.legacy.allowHashOnMapType" -> "true") { + withTable("t") { + sql("CREATE TABLE t(c MAP) USING parquet") + sql("""INSERT INTO t VALUES + (map('a', 1, 'b', 2)), + (map('x', -1)), + (map()), + (null)""") + checkSparkAnswer("SELECT c, hash(c) FROM t") + } + } + } + + test("hash - map with complex value type") { + // Spark prohibits hash on map types by default, enable legacy mode for testing + withSQLConf("spark.sql.legacy.allowHashOnMapType" -> "true") { + withTable("t") { + sql("CREATE TABLE t(c MAP>) USING parquet") + sql("""INSERT INTO t VALUES + (map('a', array(1, 2), 'b', array(3))), + (map('x', array())), + (map()), + (null)""") + checkSparkAnswer("SELECT c, hash(c) FROM t") + } + } + } + + test("hash - multiple primitive columns") { + withTable("t") { + sql("CREATE TABLE t(a INT, b STRING, c DOUBLE) USING parquet") + sql("""INSERT INTO t VALUES + (1, 'hello', 3.14), + (2, '', 0.0), + (null, null, null), + (-1, 'test', -1.5)""") + checkSparkAnswerAndOperator( + "SELECT hash(a, b, c), hash(c, b, a), hash(a), hash(b), hash(c) FROM t") + } + } + + test("hash - multiple columns with arrays") { + withTable("t") { + sql("CREATE TABLE t(a INT, b ARRAY, c STRING) USING parquet") + sql("""INSERT INTO t VALUES + (1, array(1, 2, 3), 'hello'), + (2, array(), ''), + (null, null, null), + (3, array(-1, 0, 1), 'test')""") + checkSparkAnswer("SELECT hash(a, b, c), hash(b), hash(a, c) FROM t") + } + } + + test("hash - multiple columns with structs") { + withTable("t") { + sql("CREATE TABLE t(a INT, b STRUCT) USING parquet") + sql("""INSERT INTO t VALUES + (1, named_struct('x', 10, 'y', 'hello')), + (2, named_struct('x', 20, 'y', '')), + (null, null), + (3, named_struct('x', null, 'y', 'test'))""") + checkSparkAnswer("SELECT hash(a, b), hash(b, a), hash(b) FROM t") + } + } + + test("hash - empty strings and arrays") { + withTable("t") { + sql("CREATE TABLE t(s STRING, a ARRAY) USING parquet") + sql("INSERT INTO t VALUES ('', array()), ('a', array(1))") + checkSparkAnswer("SELECT hash(s), hash(a), hash(s, a) FROM t") + } + } + + test("hash - all nulls") { + withTable("t") { + sql("CREATE TABLE t(a INT, b STRING, c ARRAY) USING parquet") + sql("INSERT INTO t VALUES (null, null, null)") + checkSparkAnswer("SELECT hash(a), hash(b), hash(c), hash(a, b, c) FROM t") + } + } + + test("hash - with custom seed") { + withTable("t") { + sql("CREATE TABLE t(c INT) USING parquet") + sql("INSERT INTO t VALUES (1), (2), (3), (null)") + // hash() with seed 42 (default) and seed 0 + checkSparkAnswerAndOperator( + "SELECT hash(c), hash(c, 0), hash(c, 42), hash(c, -1) FROM t ORDER BY c") + } + } + + test("hash - large array") { + withTable("t") { + sql("CREATE TABLE t(c ARRAY) USING parquet") + // Create an array with 1000 elements + val largeArray = (1 to 1000).mkString("array(", ", ", ")") + sql(s"INSERT INTO t VALUES ($largeArray)") + checkSparkAnswer("SELECT hash(c) FROM t") + } + } + + test("hash - deeply nested structure") { + withTable("t") { + sql("""CREATE TABLE t(c STRUCT< + a: INT, + b: STRUCT< + x: STRING, + y: ARRAY> + > + >) USING parquet""") + sql("""INSERT INTO t VALUES + (named_struct('a', 1, 'b', named_struct('x', 'hello', 'y', + array(named_struct('p', 10, 'q', 'foo'), named_struct('p', 20, 'q', 'bar'))))), + (null)""") + checkSparkAnswer("SELECT c, hash(c) FROM t") + } + } + + test("hash - with dictionary encoding") { + Seq(true, false).foreach { dictionary => + withSQLConf("parquet.enable.dictionary" -> dictionary.toString) { + withTable("t") { + sql("CREATE TABLE t(c STRING) USING parquet") + // Repeated values to trigger dictionary encoding + sql("INSERT INTO t VALUES ('a'), ('b'), ('a'), ('b'), ('a'), ('c'), (null)") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t ORDER BY c") + } + } + } + } + + test("hash - array with dictionary encoding") { + Seq(true, false).foreach { dictionary => + withSQLConf("parquet.enable.dictionary" -> dictionary.toString) { + withTable("t") { + sql("CREATE TABLE t(c ARRAY) USING parquet") + sql("""INSERT INTO t VALUES + (array('a', 'b')), + (array('a', 'b')), + (array('c')), + (null)""") + checkSparkAnswer("SELECT c, hash(c) FROM t") + } + } + } + } +} From 72c5342fe0700fdb77958f271b0742aa9408a179 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 12 Jan 2026 14:19:10 -0700 Subject: [PATCH 2/9] add to workflow --- .github/workflows/pr_build_linux.yml | 1 + .github/workflows/pr_build_macos.yml | 1 + .../org/apache/comet/CometHashExpressionSuite.scala | 9 --------- 3 files changed, 2 insertions(+), 9 deletions(-) diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index beb5f9dcf7..9f5324b260 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -160,6 +160,7 @@ jobs: value: | org.apache.comet.CometExpressionSuite org.apache.comet.CometExpressionCoverageSuite + org.apache.comet.CometHashExpressionSuite org.apache.comet.CometTemporalExpressionSuite org.apache.comet.CometArrayExpressionSuite org.apache.comet.CometCastSuite diff --git a/.github/workflows/pr_build_macos.yml b/.github/workflows/pr_build_macos.yml index 9a45fe022d..58ba48134d 100644 --- a/.github/workflows/pr_build_macos.yml +++ b/.github/workflows/pr_build_macos.yml @@ -123,6 +123,7 @@ jobs: value: | org.apache.comet.CometExpressionSuite org.apache.comet.CometExpressionCoverageSuite + org.apache.comet.CometHashExpressionSuite org.apache.comet.CometTemporalExpressionSuite org.apache.comet.CometArrayExpressionSuite org.apache.comet.CometCastSuite diff --git a/spark/src/test/scala/org/apache/comet/CometHashExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometHashExpressionSuite.scala index d04317edb2..9abb950c70 100644 --- a/spark/src/test/scala/org/apache/comet/CometHashExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometHashExpressionSuite.scala @@ -33,15 +33,6 @@ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper */ class CometHashExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { - override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit - pos: Position): Unit = { - super.test(testName, testTags: _*) { - withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_AUTO) { - testFun - } - } - } - test("hash - boolean") { withTable("t") { sql("CREATE TABLE t(c BOOLEAN) USING parquet") From b76e3ebcb1dc0ff4d6f7c9bf91a651a5c9ee7a66 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 12 Jan 2026 14:26:38 -0700 Subject: [PATCH 3/9] save --- native/spark-expr/src/hash_funcs/murmur3.rs | 3 +- native/spark-expr/src/hash_funcs/utils.rs | 106 +++++++++++++++++- native/spark-expr/src/hash_funcs/xxhash64.rs | 3 +- .../scala/org/apache/comet/serde/hash.scala | 2 +- .../comet/CometHashExpressionSuite.scala | 37 +++--- 5 files changed, 127 insertions(+), 24 deletions(-) diff --git a/native/spark-expr/src/hash_funcs/murmur3.rs b/native/spark-expr/src/hash_funcs/murmur3.rs index 5cbd77a36b..dc0f804ab2 100644 --- a/native/spark-expr/src/hash_funcs/murmur3.rs +++ b/native/spark-expr/src/hash_funcs/murmur3.rs @@ -183,7 +183,8 @@ pub fn create_murmur3_hashes<'a>( arrays, hashes_buffer, spark_compatible_murmur3_hash, - create_hashes_dictionary + create_hashes_dictionary, + create_murmur3_hashes ); Ok(hashes_buffer) } diff --git a/native/spark-expr/src/hash_funcs/utils.rs b/native/spark-expr/src/hash_funcs/utils.rs index a1cee8465f..f50dd4b27d 100644 --- a/native/spark-expr/src/hash_funcs/utils.rs +++ b/native/spark-expr/src/hash_funcs/utils.rs @@ -206,6 +206,46 @@ macro_rules! hash_array_decimal { }; } +/// Hash a list array by recursively hashing each element. +/// For each row, we hash all elements in the list. +/// Spark hashes arrays by recursively hashing each element, where each +/// element's hash is computed using the previous element's hash as the seed. +/// This creates a chain: hash(elem_n, hash(elem_n-1, ... hash(elem_0, seed)...)) +#[macro_export] +macro_rules! hash_list_array { + ($array_type:ident, $offset_type:ty, $column: ident, $hashes: ident, $recursive_hash_method: ident) => { + let list_array = $column + .as_any() + .downcast_ref::<$array_type>() + .unwrap_or_else(|| { + panic!( + "Failed to downcast column to {}. Actual data type: {:?}.", + stringify!($array_type), + $column.data_type() + ) + }); + + let values = list_array.values(); + let offsets = list_array.offsets(); + + // For each row, hash the elements in its list + for (row_idx, hash) in $hashes.iter_mut().enumerate() { + if !list_array.is_null(row_idx) { + let start = offsets[row_idx] as usize; + let end = offsets[row_idx + 1] as usize; + let len = end - start; + // Hash each element in sequence, chaining the hash values + for elem_idx in 0..len { + let elem_array = values.slice(start + elem_idx, 1); + let mut single_hash = [*hash]; + $recursive_hash_method(&[elem_array], &mut single_hash)?; + *hash = single_hash[0]; + } + } + } + }; +} + /// Creates hash values for every row, based on the values in the /// columns. /// @@ -214,9 +254,10 @@ macro_rules! hash_array_decimal { /// /// `hash_method` is the hash function to use. /// `create_dictionary_hash_method` is the function to create hashes for dictionary arrays input. +/// `recursive_hash_method` is the function to call for recursive hashing of complex types. #[macro_export] macro_rules! create_hashes_internal { - ($arrays: ident, $hashes_buffer: ident, $hash_method: ident, $create_dictionary_hash_method: ident) => { + ($arrays: ident, $hashes_buffer: ident, $hash_method: ident, $create_dictionary_hash_method: ident, $recursive_hash_method: ident) => { use arrow::datatypes::{DataType, TimeUnit}; use arrow::array::{types::*, *}; @@ -425,6 +466,69 @@ macro_rules! create_hashes_internal { ))) } }, + DataType::List(_) => { + $crate::hash_list_array!(ListArray, i32, col, $hashes_buffer, $recursive_hash_method); + } + DataType::LargeList(_) => { + $crate::hash_list_array!(LargeListArray, i64, col, $hashes_buffer, $recursive_hash_method); + } + DataType::FixedSizeList(_, size) => { + let list_array = col.as_any().downcast_ref::().unwrap(); + let values = list_array.values(); + let list_size = *size as usize; + + // For each row, hash the elements in its fixed-size list + for (row_idx, hash) in $hashes_buffer.iter_mut().enumerate() { + if !list_array.is_null(row_idx) { + let start = row_idx * list_size; + // Hash each element in sequence, chaining the hash values + for elem_idx in 0..list_size { + let elem_array = values.slice(start + elem_idx, 1); + let mut single_hash = [*hash]; + $recursive_hash_method(&[elem_array], &mut single_hash)?; + *hash = single_hash[0]; + } + } + } + } + DataType::Struct(_) => { + let struct_array = col.as_any().downcast_ref::().unwrap(); + // Hash each field of the struct - Spark hashes all fields recursively + let columns: Vec = struct_array.columns().to_vec(); + if !columns.is_empty() { + $recursive_hash_method(&columns, $hashes_buffer)?; + } + } + DataType::Map(_, _) => { + let map_array = col.as_any().downcast_ref::().unwrap(); + // For maps, Spark hashes by iterating through (key, value) pairs + // For each entry, hash the key then the value + let keys = map_array.keys(); + let values = map_array.values(); + let offsets = map_array.offsets(); + + // For each row, hash the key-value pairs in sequence + for (row_idx, hash) in $hashes_buffer.iter_mut().enumerate() { + if !map_array.is_null(row_idx) { + let start = offsets[row_idx] as usize; + let end = offsets[row_idx + 1] as usize; + // Hash each key-value pair in sequence + for entry_idx in start..end { + // Hash the key + let key_array = keys.slice(entry_idx, 1); + let mut single_hash = [*hash]; + $recursive_hash_method(&[key_array], &mut single_hash)?; + *hash = single_hash[0]; + + // Hash the value + let value_array = values.slice(entry_idx, 1); + single_hash = [*hash]; + $recursive_hash_method(&[value_array], &mut single_hash)?; + *hash = single_hash[0]; + } + } + } + } _ => { // This is internal because we should have caught this before. return Err(DataFusionError::Internal(format!( diff --git a/native/spark-expr/src/hash_funcs/xxhash64.rs b/native/spark-expr/src/hash_funcs/xxhash64.rs index 2b518b88ef..caf0b490ed 100644 --- a/native/spark-expr/src/hash_funcs/xxhash64.rs +++ b/native/spark-expr/src/hash_funcs/xxhash64.rs @@ -129,7 +129,8 @@ fn create_xxhash64_hashes<'a>( arrays, hashes_buffer, spark_compatible_xxhash64, - create_xxhash64_hashes_dictionary + create_xxhash64_hashes_dictionary, + create_xxhash64_hashes ); Ok(hashes_buffer) } diff --git a/spark/src/main/scala/org/apache/comet/serde/hash.scala b/spark/src/main/scala/org/apache/comet/serde/hash.scala index 5cc689f39a..42a379e4ee 100644 --- a/spark/src/main/scala/org/apache/comet/serde/hash.scala +++ b/spark/src/main/scala/org/apache/comet/serde/hash.scala @@ -112,7 +112,7 @@ private object HashUtils { // Java BigDecimal before hashing withInfo(expr, s"Unsupported datatype: $dt (precision > 18)") return false - case dt if !supportedDataType(dt) => + case dt if !supportedDataType(dt, allowComplex = true) => withInfo(expr, s"Unsupported datatype $dt") return false case _ => diff --git a/spark/src/test/scala/org/apache/comet/CometHashExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometHashExpressionSuite.scala index 9abb950c70..635f3334cb 100644 --- a/spark/src/test/scala/org/apache/comet/CometHashExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometHashExpressionSuite.scala @@ -19,9 +19,6 @@ package org.apache.comet -import org.scalactic.source.Position -import org.scalatest.Tag - import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper @@ -162,7 +159,7 @@ class CometHashExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe (null), (array(null)), (array(1, null, 3))""") - checkSparkAnswer("SELECT c, hash(c) FROM t") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t") } } @@ -177,7 +174,7 @@ class CometHashExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe (null), (array(null)), (array('a', null, 'b'))""") - checkSparkAnswer("SELECT c, hash(c) FROM t") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t") } } @@ -189,7 +186,7 @@ class CometHashExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe (array(-1.0, 0.0, 1.0)), (array()), (null)""") - checkSparkAnswer("SELECT c, hash(c) FROM t") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t") } } @@ -201,7 +198,7 @@ class CometHashExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe (array(array(), array(1))), (array()), (null)""") - checkSparkAnswer("SELECT c, hash(c) FROM t") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t") } } @@ -214,7 +211,7 @@ class CometHashExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe (named_struct('a', null, 'b', 'test')), (named_struct('a', 42, 'b', null)), (null)""") - checkSparkAnswer("SELECT c, hash(c) FROM t") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t") } } @@ -226,7 +223,7 @@ class CometHashExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe (named_struct('a', 2, 'b', named_struct('x', '', 'y', 0.0))), (named_struct('a', 3, 'b', null)), (null)""") - checkSparkAnswer("SELECT c, hash(c) FROM t") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t") } } @@ -238,7 +235,7 @@ class CometHashExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe (named_struct('a', 2, 'b', array())), (named_struct('a', 3, 'b', null)), (null)""") - checkSparkAnswer("SELECT c, hash(c) FROM t") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t") } } @@ -250,7 +247,7 @@ class CometHashExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe (array(named_struct('a', 3, 'b', ''))), (array()), (null)""") - checkSparkAnswer("SELECT c, hash(c) FROM t") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t") } } @@ -264,7 +261,7 @@ class CometHashExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe (map('x', -1)), (map()), (null)""") - checkSparkAnswer("SELECT c, hash(c) FROM t") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t") } } } @@ -279,7 +276,7 @@ class CometHashExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe (map('x', array())), (map()), (null)""") - checkSparkAnswer("SELECT c, hash(c) FROM t") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t") } } } @@ -305,7 +302,7 @@ class CometHashExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe (2, array(), ''), (null, null, null), (3, array(-1, 0, 1), 'test')""") - checkSparkAnswer("SELECT hash(a, b, c), hash(b), hash(a, c) FROM t") + checkSparkAnswerAndOperator("SELECT hash(a, b, c), hash(b), hash(a, c) FROM t") } } @@ -317,7 +314,7 @@ class CometHashExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe (2, named_struct('x', 20, 'y', '')), (null, null), (3, named_struct('x', null, 'y', 'test'))""") - checkSparkAnswer("SELECT hash(a, b), hash(b, a), hash(b) FROM t") + checkSparkAnswerAndOperator("SELECT hash(a, b), hash(b, a), hash(b) FROM t") } } @@ -325,7 +322,7 @@ class CometHashExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe withTable("t") { sql("CREATE TABLE t(s STRING, a ARRAY) USING parquet") sql("INSERT INTO t VALUES ('', array()), ('a', array(1))") - checkSparkAnswer("SELECT hash(s), hash(a), hash(s, a) FROM t") + checkSparkAnswerAndOperator("SELECT hash(s), hash(a), hash(s, a) FROM t") } } @@ -333,7 +330,7 @@ class CometHashExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe withTable("t") { sql("CREATE TABLE t(a INT, b STRING, c ARRAY) USING parquet") sql("INSERT INTO t VALUES (null, null, null)") - checkSparkAnswer("SELECT hash(a), hash(b), hash(c), hash(a, b, c) FROM t") + checkSparkAnswerAndOperator("SELECT hash(a), hash(b), hash(c), hash(a, b, c) FROM t") } } @@ -353,7 +350,7 @@ class CometHashExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe // Create an array with 1000 elements val largeArray = (1 to 1000).mkString("array(", ", ", ")") sql(s"INSERT INTO t VALUES ($largeArray)") - checkSparkAnswer("SELECT hash(c) FROM t") + checkSparkAnswerAndOperator("SELECT hash(c) FROM t") } } @@ -370,7 +367,7 @@ class CometHashExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe (named_struct('a', 1, 'b', named_struct('x', 'hello', 'y', array(named_struct('p', 10, 'q', 'foo'), named_struct('p', 20, 'q', 'bar'))))), (null)""") - checkSparkAnswer("SELECT c, hash(c) FROM t") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t") } } @@ -397,7 +394,7 @@ class CometHashExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe (array('a', 'b')), (array('c')), (null)""") - checkSparkAnswer("SELECT c, hash(c) FROM t") + checkSparkAnswerAndOperator("SELECT c, hash(c) FROM t") } } } From b354f4ddd631426bfef15e2997e5e2bfdae0ed1b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 12 Jan 2026 15:09:02 -0700 Subject: [PATCH 4/9] support complex types as hash-partitioning keys --- .../shuffle/CometShuffleExchangeExec.scala | 16 +++++++-- .../comet/exec/CometNativeShuffleSuite.scala | 34 +++++++++++++++++-- 2 files changed, 45 insertions(+), 5 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index 1805711d01..0ebd9935f2 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -260,14 +260,24 @@ object CometShuffleExchangeExec * Determine which data types are supported as partition columns in native shuffle. * * For HashPartitioning this defines the key that determines how data should be collocated for - * operations like `groupByKey`, `reduceByKey`, or `join`. Native code does not support - * hashing complex types, see hash_funcs/utils.rs + * operations like `groupByKey`, `reduceByKey`, or `join`. Native code supports hashing both + * primitive and complex types. */ def supportedHashPartitioningDataType(dt: DataType): Boolean = dt match { case _: BooleanType | _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType | _: DoubleType | _: StringType | _: BinaryType | _: TimestampType | - _: TimestampNTZType | _: DecimalType | _: DateType => + _: TimestampNTZType | _: DateType => true + case dt: DecimalType => + // High precision decimals (> 18) fall back to Spark because Spark converts them + // to Java BigDecimal before hashing, which has different behavior than native hashing + dt.precision <= 18 + case StructType(fields) => + fields.nonEmpty && fields.forall(f => supportedHashPartitioningDataType(f.dataType)) + case ArrayType(elementType, _) => + supportedHashPartitioningDataType(elementType) + case MapType(keyType, valueType, _) => + supportedHashPartitioningDataType(keyType) && supportedHashPartitioningDataType(valueType) case _ => false } diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala index a682ff91a5..cba302569c 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala @@ -100,8 +100,8 @@ class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper .filter($"_3" > 10) .repartition(numPartitions, $"_2") - // Partitioning on nested array falls back to Spark - checkShuffleAnswer(df, 0) + // Partitioning on nested array works with native shuffle + checkShuffleAnswer(df, 1) df = sql("SELECT * FROM tbl") .filter($"_3" > 10) @@ -116,6 +116,36 @@ class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper } } + test("native shuffle with struct as partition key") { + Seq(10, 201).foreach { numPartitions => + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> "native_datafusion") { + withParquetTable((0 until 50).map(i => (i, (i % 10, s"value_${i % 10}"), i + 1)), "tbl") { + val df = sql("SELECT * FROM tbl") + .filter($"_3" > 10) + .repartition(numPartitions, $"_2") + + // Partitioning on struct works with native shuffle + checkShuffleAnswer(df, 1) + } + } + } + } + + test("native shuffle with map as partition key") { + Seq(10, 201).foreach { numPartitions => + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> "native_datafusion") { + withParquetTable((0 until 50).map(i => (i, Map("key" -> (i % 10)), i + 1)), "tbl") { + val df = sql("SELECT * FROM tbl") + .filter($"_3" > 10) + .repartition(numPartitions, $"_2") + + // Partitioning on map works with native shuffle + checkShuffleAnswer(df, 1) + } + } + } + } + test("hash-based native shuffle") { withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc) From da042a88bb1d5952d81bddb03574ccfaafca3895 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 12 Jan 2026 17:46:58 -0700 Subject: [PATCH 5/9] fix --- .../scala/org/apache/comet/exec/CometNativeShuffleSuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala index cba302569c..1fae78166f 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala @@ -68,7 +68,9 @@ class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper withTempDir { dir => val path = new Path(dir.toURI.toString, "test.parquet") makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = dictionaryEnabled, 1000) - var allTypes: Seq[Int] = (1 to 20) + // Exclude _17 which is DECIMAL(38, 37) - high precision decimals are not supported + // as partitioning keys in native shuffle + var allTypes: Seq[Int] = (1 to 20).filterNot(_ == 17) allTypes.map(i => s"_$i").foreach { c => withSQLConf( CometConf.COMET_EXEC_ENABLED.key -> execEnabled.toString, From 630e54845a3f571ba08b78336a7ca52157e524cd Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 12 Jan 2026 17:57:50 -0700 Subject: [PATCH 6/9] fix --- .../comet/execution/shuffle/CometShuffleExchangeExec.scala | 4 ---- 1 file changed, 4 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index 0ebd9935f2..f49892d236 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -268,10 +268,6 @@ object CometShuffleExchangeExec _: FloatType | _: DoubleType | _: StringType | _: BinaryType | _: TimestampType | _: TimestampNTZType | _: DateType => true - case dt: DecimalType => - // High precision decimals (> 18) fall back to Spark because Spark converts them - // to Java BigDecimal before hashing, which has different behavior than native hashing - dt.precision <= 18 case StructType(fields) => fields.nonEmpty && fields.forall(f => supportedHashPartitioningDataType(f.dataType)) case ArrayType(elementType, _) => From 2f96e97363f3b66accc34324de235e4560a1c775 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 13 Jan 2026 07:21:33 -0700 Subject: [PATCH 7/9] fix regressions --- docs/source/contributor-guide/jvm_shuffle.md | 6 +++--- docs/source/contributor-guide/native_shuffle.md | 6 +++--- docs/source/user-guide/latest/tuning.md | 4 ++-- .../scala/org/apache/comet/CometFuzzTestSuite.scala | 10 ++-------- .../org/apache/comet/CometHashExpressionSuite.scala | 12 ++++++++++++ 5 files changed, 22 insertions(+), 16 deletions(-) diff --git a/docs/source/contributor-guide/jvm_shuffle.md b/docs/source/contributor-guide/jvm_shuffle.md index e011651d2c..285693a7b9 100644 --- a/docs/source/contributor-guide/jvm_shuffle.md +++ b/docs/source/contributor-guide/jvm_shuffle.md @@ -49,9 +49,9 @@ JVM shuffle (`CometColumnarExchange`) is used instead of native shuffle (`CometE 3. **Unsupported partitioning type**: Native shuffle only supports `HashPartitioning`, `RangePartitioning`, and `SinglePartition`. JVM shuffle additionally supports `RoundRobinPartitioning`. -4. **Unsupported partition key types**: For `HashPartitioning` and `RangePartitioning`, native shuffle - only supports primitive types as partition keys. Complex types (struct, array, map) cannot be used - as partition keys in native shuffle, though they are fully supported as data columns in both implementations. +4. **Unsupported partition key types**: For `RangePartitioning`, native shuffle only supports primitive + types as partition keys. Complex types (struct, array, map) are supported as hash partition keys in + native shuffle. ## Input Handling diff --git a/docs/source/contributor-guide/native_shuffle.md b/docs/source/contributor-guide/native_shuffle.md index e3d2dea473..7a316383c7 100644 --- a/docs/source/contributor-guide/native_shuffle.md +++ b/docs/source/contributor-guide/native_shuffle.md @@ -55,9 +55,9 @@ Native shuffle (`CometExchange`) is selected when all of the following condition `RoundRobinPartitioning` requires JVM shuffle. -4. **Supported partition key types**: For `HashPartitioning` and `RangePartitioning`, partition - keys must be primitive types. Complex types (struct, array, map) as partition keys require - JVM shuffle. Note that complex types are fully supported as data columns in native shuffle. +4. **Supported partition key types**: For `HashPartitioning`, both primitive and complex types + (struct, array, map) are supported as partition keys. For `RangePartitioning`, only primitive + types are supported as partition keys. ## Architecture diff --git a/docs/source/user-guide/latest/tuning.md b/docs/source/user-guide/latest/tuning.md index 5939e89ef3..e69cc5ed5d 100644 --- a/docs/source/user-guide/latest/tuning.md +++ b/docs/source/user-guide/latest/tuning.md @@ -141,8 +141,8 @@ back to Spark for shuffle operations. #### Native Shuffle Comet provides a fully native shuffle implementation, which generally provides the best performance. Native shuffle -supports `HashPartitioning`, `RangePartitioning` and `SinglePartitioning` but currently only supports primitive type -partitioning keys. Columns that are not partitioning keys may contain complex types like maps, structs, and arrays. +supports `HashPartitioning`, `RangePartitioning` and `SinglePartitioning`. Complex types (structs, arrays, and maps) +are supported as hash partition keys. Range partitioning only supports primitive types as partition keys. #### Columnar (JVM) Shuffle diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala index 833314a5c6..5202e57ddd 100644 --- a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala @@ -179,7 +179,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase { df.createOrReplaceTempView("t1") val columns = df.schema.fields.filter(f => isComplexType(f.dataType)).map(_.name) for (col <- columns) { - // DISTRIBUTE BY is equivalent to df.repartition($col) and uses + // DISTRIBUTE BY is equivalent to df.repartition($col) val sql = s"SELECT $col FROM t1 DISTRIBUTE BY $col" val df = spark.sql(sql) df.collect() @@ -191,13 +191,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase { // native_comet does not support reading complex types 0 case _ => - CometConf.COMET_SHUFFLE_MODE.get() match { - case "jvm" => - 1 - case "native" => - // native shuffle does not support complex types as partitioning keys - 0 - } + 1 } assert(cometShuffleExchanges.length == expectedNumCometShuffles) } diff --git a/spark/src/test/scala/org/apache/comet/CometHashExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometHashExpressionSuite.scala index 635f3334cb..bb2d2f5ac2 100644 --- a/spark/src/test/scala/org/apache/comet/CometHashExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometHashExpressionSuite.scala @@ -19,6 +19,9 @@ package org.apache.comet +import org.scalactic.source.Position +import org.scalatest.Tag + import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper @@ -30,6 +33,15 @@ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper */ class CometHashExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { + override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit + pos: Position): Unit = { + super.test(testName, testTags: _*) { + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_AUTO) { + testFun + } + } + } + test("hash - boolean") { withTable("t") { sql("CREATE TABLE t(c BOOLEAN) USING parquet") From 37a020c8b255f3e4b22c420c3d0dd733fed57831 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 13 Jan 2026 07:26:36 -0700 Subject: [PATCH 8/9] fix regressions --- .../sql/comet/execution/shuffle/CometShuffleExchangeExec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index f49892d236..a8216a27f2 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -266,7 +266,7 @@ object CometShuffleExchangeExec def supportedHashPartitioningDataType(dt: DataType): Boolean = dt match { case _: BooleanType | _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType | _: DoubleType | _: StringType | _: BinaryType | _: TimestampType | - _: TimestampNTZType | _: DateType => + _: TimestampNTZType | _: DecimalType | _: DateType => true case StructType(fields) => fields.nonEmpty && fields.forall(f => supportedHashPartitioningDataType(f.dataType)) From 89b81ffb96a4cc9366ce995b2faa94c6d83998a0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 14 Jan 2026 20:19:18 -0700 Subject: [PATCH 9/9] format --- .../spark-4.0/org/apache/comet/shims/CometExprShim.scala | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala index fc3db183b3..2eae878ae8 100644 --- a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala @@ -20,6 +20,8 @@ package org.apache.comet.shims import org.apache.spark.sql.catalyst.expressions._ +// Import MapSort for Spark 4.0 support +import org.apache.spark.sql.catalyst.expressions.MapSort import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.types.StringTypeWithCollation @@ -55,6 +57,11 @@ trait CometExprShim extends CommonStringExprs { inputs: Seq[Attribute], binding: Boolean): Option[Expr] = { expr match { + // MapSort is used by Spark 4.0+ to make maps comparable for partitioning. + // For hash partitioning, we can just use the underlying map expression. + case MapSort(child) => + exprToProtoInternal(child, inputs, binding) + case s: StaticInvoke if s.staticObject == classOf[StringDecode] && s.dataType.isInstanceOf[StringType] &&