Skip to content

Commit d2ada84

Browse files
authored
feat: add MapSort expression support for Spark 4.0 (#4076)
1 parent 95c6a1b commit d2ada84

12 files changed

Lines changed: 965 additions & 68 deletions

File tree

docs/source/user-guide/latest/compatibility/expressions/map.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,16 @@ under the License.
1919

2020
# Map Expressions
2121

22+
## MapSort (Spark 4.0+)
23+
24+
Spark 4.0 inserts `MapSort` to normalize map values when they appear in shuffle hash partitioning
25+
keys, in `try_element_at`, and in other contexts where map ordering must be deterministic. Comet
26+
runs `MapSort` natively, so map shuffle and group-by-on-map stay on Comet under Spark 4.0.
27+
28+
When `spark.comet.exec.strictFloatingPoint=true`, `MapSort` falls back to Spark for maps whose
29+
keys contain `Float` or `Double` (consistent with `SortOrder` and `SortArray`). Arrow's sort uses
30+
IEEE total ordering for floating-point, which differs from Spark's `Double.compare` semantics for
31+
`NaN` and `-0.0`.
32+
2233
<!--BEGIN:EXPR_COMPAT[map]-->
2334
<!--END:EXPR_COMPAT-->

native/spark-expr/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,3 +112,7 @@ harness = false
112112
[[bench]]
113113
name = "cast_non_int_numeric_timestamp"
114114
harness = false
115+
116+
[[bench]]
117+
name = "map_sort"
118+
harness = false
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Benchmarks for spark_map_sort.
19+
20+
use arrow::array::builder::{Int32Builder, MapBuilder, StringBuilder};
21+
use arrow::array::{ArrayRef, MapArray, MapFieldNames};
22+
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
23+
use datafusion::physical_plan::ColumnarValue;
24+
use datafusion_comet_spark_expr::spark_map_sort;
25+
use std::hint::black_box;
26+
use std::sync::Arc;
27+
28+
const BATCH_SIZE: usize = 8192;
29+
30+
fn map_field_names() -> MapFieldNames {
31+
MapFieldNames {
32+
entry: "entries".into(),
33+
key: "key".into(),
34+
value: "value".into(),
35+
}
36+
}
37+
38+
/// Build a MapArray with `BATCH_SIZE` rows where each map has `entries_per_map` entries.
39+
/// Keys are integers in reverse order so every map needs a real sort.
40+
fn build_int_key_map(entries_per_map: usize) -> MapArray {
41+
let mut builder = MapBuilder::new(
42+
Some(map_field_names()),
43+
Int32Builder::new(),
44+
Int32Builder::new(),
45+
);
46+
for row in 0..BATCH_SIZE {
47+
for entry_idx in 0..entries_per_map {
48+
// Reverse order so input is unsorted; vary across rows so different maps differ.
49+
let key = (entries_per_map - entry_idx) as i32 + (row % 7) as i32;
50+
let value = entry_idx as i32;
51+
builder.keys().append_value(key);
52+
builder.values().append_value(value);
53+
}
54+
builder.append(true).unwrap();
55+
}
56+
builder.finish()
57+
}
58+
59+
/// Same shape as `build_int_key_map` but with string keys.
60+
fn build_string_key_map(entries_per_map: usize) -> MapArray {
61+
let mut builder = MapBuilder::new(
62+
Some(map_field_names()),
63+
StringBuilder::new(),
64+
Int32Builder::new(),
65+
);
66+
for row in 0..BATCH_SIZE {
67+
for entry_idx in 0..entries_per_map {
68+
let key = format!("key_{:04}", entries_per_map - entry_idx + (row % 7));
69+
let value = entry_idx as i32;
70+
builder.keys().append_value(&key);
71+
builder.values().append_value(value);
72+
}
73+
builder.append(true).unwrap();
74+
}
75+
builder.finish()
76+
}
77+
78+
fn bench_map_sort(c: &mut Criterion) {
79+
let mut group = c.benchmark_group("spark_map_sort");
80+
81+
for entries in [4usize, 16, 64] {
82+
let int_map: ArrayRef = Arc::new(build_int_key_map(entries));
83+
group.bench_with_input(
84+
BenchmarkId::new("int_keys", entries),
85+
&int_map,
86+
|b, array| {
87+
let args = vec![ColumnarValue::Array(Arc::clone(array))];
88+
b.iter(|| black_box(spark_map_sort(black_box(&args)).unwrap()));
89+
},
90+
);
91+
92+
let string_map: ArrayRef = Arc::new(build_string_key_map(entries));
93+
group.bench_with_input(
94+
BenchmarkId::new("string_keys", entries),
95+
&string_map,
96+
|b, array| {
97+
let args = vec![ColumnarValue::Array(Arc::clone(array))];
98+
b.iter(|| black_box(spark_map_sort(black_box(&args)).unwrap()));
99+
},
100+
);
101+
}
102+
103+
group.finish();
104+
}
105+
106+
criterion_group!(benches, bench_map_sort);
107+
criterion_main!(benches);

native/spark-expr/src/comet_scalar_funcs.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
// under the License.
1717

1818
use crate::hash_funcs::*;
19+
use crate::map_funcs::spark_map_sort;
1920
use crate::math_funcs::abs::abs;
2021
use crate::math_funcs::checked_arithmetic::{checked_add, checked_div, checked_mul, checked_sub};
2122
use crate::math_funcs::log::spark_log;
@@ -191,6 +192,10 @@ pub fn create_comet_physical_fun_with_eval_mode(
191192
let func = Arc::new(crate::string_funcs::spark_get_json_object);
192193
make_comet_scalar_udf!("get_json_object", func, without data_type)
193194
}
195+
"map_sort" => {
196+
let func = Arc::new(spark_map_sort);
197+
make_comet_scalar_udf!("spark_map_sort", func, without data_type)
198+
}
194199
_ => registry.udf(fun_name).map_err(|e| {
195200
DataFusionError::Execution(format!(
196201
"Function {fun_name} not found in the registry: {e}",

native/spark-expr/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ pub use bloom_filter::{BloomFilterAgg, BloomFilterMightContain};
5757

5858
mod conditional_funcs;
5959
mod conversion_funcs;
60+
mod map_funcs;
61+
pub use map_funcs::spark_map_sort;
6062
mod math_funcs;
6163
mod nondetermenistic_funcs;
6264

0 commit comments

Comments
 (0)