Skip to content

Commit f46a4a4

Browse files
authored
added support for MapFromEntries (#21720)
## Which issue does this PR close? Closes # (none: prerequisite for apache/datafusion-comet#2706; follow-up to #17779 and #19274). ## Rationale for this change Spark's default mapKeyDedupPolicy is EXCEPTION, raising SparkRuntimeException with error class DUPLICATED_MAP_KEY on duplicate map keys. The existing Spark map_from_entries / map_from_arrays UDFs silently kept the last occurrence, forcing downstream engines like datafusion-comet to fall back to Spark. ## What changes are included in this PR? Duplicate map keys in Spark map_from_entries, map_from_arrays, and str_to_map now raise [DUPLICATED_MAP_KEY] Duplicate map key {key} was found, matching Spark's default behaviour and error class. ## Are these changes tested? Yes, via sqllogictest assertions covering the new error across the affected Spark map UDFs. ## Are there any user-facing changes? Yes. Duplicate keys now raise [DUPLICATED_MAP_KEY] under the default policy instead of silently collapsing to the last occurrence. No new config keys, no API changes. --------- Co-authored-by: Krishna Sudarshan J <75199111+KrishnaSudarshan7@users.noreply.github.com>
1 parent a8761a6 commit f46a4a4

11 files changed

Lines changed: 545 additions & 88 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -456,6 +456,53 @@ impl Display for SpillCompression {
456456
}
457457
}
458458

459+
/// Policy for handling duplicate keys in Spark-compatible map-construction
460+
/// functions (`map_from_arrays`, `map_from_entries`, `str_to_map`). Mirrors
461+
/// Spark's [`spark.sql.mapKeyDedupPolicy`](https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961).
462+
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
463+
pub enum MapKeyDedupPolicy {
464+
/// Raise `[DUPLICATED_MAP_KEY]` at runtime on any duplicate key.
465+
#[default]
466+
Exception,
467+
/// Keep the last occurrence of each duplicate key.
468+
LastWin,
469+
}
470+
471+
impl FromStr for MapKeyDedupPolicy {
472+
type Err = DataFusionError;
473+
474+
fn from_str(s: &str) -> Result<Self, Self::Err> {
475+
match s.to_ascii_uppercase().as_str() {
476+
"EXCEPTION" => Ok(Self::Exception),
477+
"LAST_WIN" => Ok(Self::LastWin),
478+
other => Err(DataFusionError::Configuration(format!(
479+
"Invalid MapKeyDedupPolicy: {other}. Expected one of: EXCEPTION, LAST_WIN"
480+
))),
481+
}
482+
}
483+
}
484+
485+
impl ConfigField for MapKeyDedupPolicy {
486+
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
487+
v.some(key, self, description)
488+
}
489+
490+
fn set(&mut self, _: &str, value: &str) -> Result<()> {
491+
*self = MapKeyDedupPolicy::from_str(value)?;
492+
Ok(())
493+
}
494+
}
495+
496+
impl Display for MapKeyDedupPolicy {
497+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
498+
let str = match self {
499+
Self::Exception => "EXCEPTION",
500+
Self::LastWin => "LAST_WIN",
501+
};
502+
write!(f, "{str}")
503+
}
504+
}
505+
459506
impl From<SpillCompression> for Option<CompressionType> {
460507
fn from(c: SpillCompression) -> Self {
461508
match c {
@@ -1499,6 +1546,24 @@ impl<'a> TryFrom<&'a FormatOptions> for arrow::util::display::FormatOptions<'a>
14991546
}
15001547
}
15011548

1549+
config_namespace! {
1550+
/// Options controlling DataFusion's Spark-compatibility layer (functions
1551+
/// under `datafusion/spark`). Keys here mirror their `spark.sql.*`
1552+
/// equivalents in Apache Spark.
1553+
pub struct SparkOptions {
1554+
/// Policy for handling duplicate keys in Spark-compatible map-construction
1555+
/// functions (`map_from_arrays`, `map_from_entries`, `str_to_map`).
1556+
///
1557+
/// Mirrors Spark's
1558+
/// [`spark.sql.mapKeyDedupPolicy`](https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961):
1559+
/// - `EXCEPTION` (default): raise `[DUPLICATED_MAP_KEY]` at runtime on any duplicate key.
1560+
/// - `LAST_WIN`: keep the last occurrence of each duplicate key.
1561+
///
1562+
/// Values are case-insensitive.
1563+
pub map_key_dedup_policy: MapKeyDedupPolicy, default = MapKeyDedupPolicy::Exception
1564+
}
1565+
}
1566+
15021567
/// A key value pair, with a corresponding description
15031568
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
15041569
pub struct ConfigEntry {
@@ -1530,6 +1595,8 @@ pub struct ConfigOptions {
15301595
pub extensions: Extensions,
15311596
/// Formatting options when printing batches
15321597
pub format: FormatOptions,
1598+
/// Spark-compatibility options (functions under `datafusion/spark`)
1599+
pub spark: SparkOptions,
15331600
}
15341601

15351602
impl ConfigField for ConfigOptions {
@@ -1540,6 +1607,7 @@ impl ConfigField for ConfigOptions {
15401607
self.explain.visit(v, "datafusion.explain", "");
15411608
self.sql_parser.visit(v, "datafusion.sql_parser", "");
15421609
self.format.visit(v, "datafusion.format", "");
1610+
self.spark.visit(v, "datafusion.spark", "");
15431611
}
15441612

15451613
fn set(&mut self, key: &str, value: &str) -> Result<()> {
@@ -1552,6 +1620,7 @@ impl ConfigField for ConfigOptions {
15521620
"explain" => self.explain.set(rem, value),
15531621
"sql_parser" => self.sql_parser.set(rem, value),
15541622
"format" => self.format.set(rem, value),
1623+
"spark" => self.spark.set(rem, value),
15551624
_ => _config_err!("Config value \"{key}\" not found on ConfigOptions"),
15561625
}
15571626
}
@@ -1591,6 +1660,7 @@ impl ConfigField for ConfigOptions {
15911660
"explain" => self.explain.reset(rem),
15921661
"sql_parser" => self.sql_parser.reset(rem),
15931662
"format" => self.format.reset(rem),
1663+
"spark" => self.spark.reset(rem),
15941664
other => _config_err!("Config value \"{other}\" not found on ConfigOptions"),
15951665
}
15961666
}

datafusion/spark/src/function/map/map_from_arrays.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use crate::function::map::utils::{
2222
use arrow::array::{Array, ArrayRef, NullArray};
2323
use arrow::compute::kernels::cast;
2424
use arrow::datatypes::{DataType, Field, FieldRef};
25+
use datafusion_common::config::MapKeyDedupPolicy;
2526
use datafusion_common::utils::take_function_args;
2627
use datafusion_common::{Result, internal_err};
2728
use datafusion_expr::{
@@ -81,11 +82,16 @@ impl ScalarUDFImpl for MapFromArrays {
8182
}
8283

8384
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
84-
make_scalar_function(map_from_arrays_inner, vec![])(&args.args)
85+
let last_value_wins =
86+
args.config_options.spark.map_key_dedup_policy == MapKeyDedupPolicy::LastWin;
87+
make_scalar_function(
88+
move |args: &[ArrayRef]| map_from_arrays_inner(args, last_value_wins),
89+
vec![],
90+
)(&args.args)
8591
}
8692
}
8793

88-
fn map_from_arrays_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
94+
fn map_from_arrays_inner(args: &[ArrayRef], last_value_wins: bool) -> Result<ArrayRef> {
8995
let [keys, values] = take_function_args("map_from_arrays", args)?;
9096

9197
if *keys.data_type() == DataType::Null || *values.data_type() == DataType::Null {
@@ -105,6 +111,7 @@ fn map_from_arrays_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
105111
&get_list_offsets(values)?,
106112
keys.nulls(),
107113
values.nulls(),
114+
last_value_wins,
108115
)
109116
}
110117

datafusion/spark/src/function/map/map_from_entries.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use crate::function::map::utils::{
2424
use arrow::array::{Array, ArrayRef, NullBufferBuilder, StructArray};
2525
use arrow::buffer::NullBuffer;
2626
use arrow::datatypes::{DataType, Field, FieldRef};
27+
use datafusion_common::config::MapKeyDedupPolicy;
2728
use datafusion_common::utils::take_function_args;
2829
use datafusion_common::{Result, exec_err, internal_err};
2930
use datafusion_expr::{
@@ -101,11 +102,16 @@ impl ScalarUDFImpl for MapFromEntries {
101102
}
102103

103104
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
104-
make_scalar_function(map_from_entries_inner, vec![])(&args.args)
105+
let last_value_wins =
106+
args.config_options.spark.map_key_dedup_policy == MapKeyDedupPolicy::LastWin;
107+
make_scalar_function(
108+
move |args: &[ArrayRef]| map_from_entries_inner(args, last_value_wins),
109+
vec![],
110+
)(&args.args)
105111
}
106112
}
107113

108-
fn map_from_entries_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
114+
fn map_from_entries_inner(args: &[ArrayRef], last_value_wins: bool) -> Result<ArrayRef> {
109115
let [entries] = take_function_args("map_from_entries", args)?;
110116
let entries_offsets = get_list_offsets(entries)?;
111117
let entries_values = get_list_values(entries)?;
@@ -148,6 +154,7 @@ fn map_from_entries_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
148154
&entries_offsets,
149155
None,
150156
res_nulls.as_ref(),
157+
last_value_wins,
151158
)
152159
}
153160

datafusion/spark/src/function/map/str_to_map.rs

Lines changed: 81 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::collections::HashSet;
18+
use std::collections::{HashMap, HashSet};
1919
use std::sync::Arc;
2020

2121
use arrow::array::{
@@ -33,6 +33,7 @@ use datafusion_expr::{
3333
};
3434

3535
use crate::function::map::utils::map_type_from_key_value_types;
36+
use datafusion_common::config::MapKeyDedupPolicy;
3637

3738
const DEFAULT_PAIR_DELIM: &str = ",";
3839
const DEFAULT_KV_DELIM: &str = ":";
@@ -48,11 +49,10 @@ const DEFAULT_KV_DELIM: &str = ":";
4849
/// - keyValueDelim: Delimiter between key and value (default: ':')
4950
///
5051
/// # Duplicate Key Handling
51-
/// Uses EXCEPTION behavior (Spark 3.0+ default): errors on duplicate keys.
52-
/// See `spark.sql.mapKeyDedupPolicy`:
53-
/// <https://github.com/apache/spark/blob/v4.0.0/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4502-L4511>
54-
///
55-
/// TODO: Support configurable `spark.sql.mapKeyDedupPolicy` (LAST_WIN) in a follow-up PR.
52+
/// Mirrors Spark's [`spark.sql.mapKeyDedupPolicy`](https://github.com/apache/spark/blob/v4.0.0/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4502-L4511),
53+
/// wired through DataFusion's `datafusion.spark.map_key_dedup_policy`:
54+
/// - `EXCEPTION` (default): error on duplicate keys.
55+
/// - `LAST_WIN`: keep the last occurrence of each duplicate key.
5656
#[derive(Debug, PartialEq, Eq, Hash)]
5757
pub struct SparkStrToMap {
5858
signature: Signature,
@@ -102,22 +102,32 @@ impl ScalarUDFImpl for SparkStrToMap {
102102
}
103103

104104
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
105+
let last_value_wins =
106+
args.config_options.spark.map_key_dedup_policy == MapKeyDedupPolicy::LastWin;
105107
let arrays: Vec<ArrayRef> = ColumnarValue::values_to_arrays(&args.args)?;
106-
let result = str_to_map_inner(&arrays)?;
108+
let result = str_to_map_inner(&arrays, last_value_wins)?;
107109
Ok(ColumnarValue::Array(result))
108110
}
109111
}
110112

111-
fn str_to_map_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
113+
fn str_to_map_inner(args: &[ArrayRef], last_value_wins: bool) -> Result<ArrayRef> {
112114
match args.len() {
113115
1 => match args[0].data_type() {
114-
DataType::Utf8 => str_to_map_impl(as_string_array(&args[0])?, None, None),
115-
DataType::LargeUtf8 => {
116-
str_to_map_impl(as_large_string_array(&args[0])?, None, None)
117-
}
118-
DataType::Utf8View => {
119-
str_to_map_impl(as_string_view_array(&args[0])?, None, None)
116+
DataType::Utf8 => {
117+
str_to_map_impl(as_string_array(&args[0])?, None, None, last_value_wins)
120118
}
119+
DataType::LargeUtf8 => str_to_map_impl(
120+
as_large_string_array(&args[0])?,
121+
None,
122+
None,
123+
last_value_wins,
124+
),
125+
DataType::Utf8View => str_to_map_impl(
126+
as_string_view_array(&args[0])?,
127+
None,
128+
None,
129+
last_value_wins,
130+
),
121131
other => exec_err!(
122132
"Unsupported data type {other:?} for str_to_map, \
123133
expected Utf8, LargeUtf8, or Utf8View"
@@ -128,16 +138,19 @@ fn str_to_map_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
128138
as_string_array(&args[0])?,
129139
Some(as_string_array(&args[1])?),
130140
None,
141+
last_value_wins,
131142
),
132143
(DataType::LargeUtf8, DataType::LargeUtf8) => str_to_map_impl(
133144
as_large_string_array(&args[0])?,
134145
Some(as_large_string_array(&args[1])?),
135146
None,
147+
last_value_wins,
136148
),
137149
(DataType::Utf8View, DataType::Utf8View) => str_to_map_impl(
138150
as_string_view_array(&args[0])?,
139151
Some(as_string_view_array(&args[1])?),
140152
None,
153+
last_value_wins,
141154
),
142155
(t1, t2) => exec_err!(
143156
"Unsupported data types ({t1:?}, {t2:?}) for str_to_map, \
@@ -153,19 +166,22 @@ fn str_to_map_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
153166
as_string_array(&args[0])?,
154167
Some(as_string_array(&args[1])?),
155168
Some(as_string_array(&args[2])?),
169+
last_value_wins,
156170
),
157171
(DataType::LargeUtf8, DataType::LargeUtf8, DataType::LargeUtf8) => {
158172
str_to_map_impl(
159173
as_large_string_array(&args[0])?,
160174
Some(as_large_string_array(&args[1])?),
161175
Some(as_large_string_array(&args[2])?),
176+
last_value_wins,
162177
)
163178
}
164179
(DataType::Utf8View, DataType::Utf8View, DataType::Utf8View) => {
165180
str_to_map_impl(
166181
as_string_view_array(&args[0])?,
167182
Some(as_string_view_array(&args[1])?),
168183
Some(as_string_view_array(&args[2])?),
184+
last_value_wins,
169185
)
170186
}
171187
(t1, t2, t3) => exec_err!(
@@ -181,6 +197,7 @@ fn str_to_map_impl<'a, V: StringArrayType<'a> + Copy>(
181197
text_array: V,
182198
pair_delim_array: Option<V>,
183199
kv_delim_array: Option<V>,
200+
last_value_wins: bool,
184201
) -> Result<ArrayRef> {
185202
let num_rows = text_array.len();
186203

@@ -206,6 +223,10 @@ fn str_to_map_impl<'a, V: StringArrayType<'a> + Copy>(
206223
);
207224

208225
let mut seen_keys = HashSet::new();
226+
// LAST_WIN buffers pairs to support in-place value overwrite at the key's
227+
// first-seen position — matches Spark's `ArrayBasedMapBuilder`.
228+
let mut pairs: Vec<(&str, Option<&str>)> = Vec::new();
229+
let mut key_positions: HashMap<&str, usize> = HashMap::new();
209230
for row_idx in 0..num_rows {
210231
if combined_nulls.as_ref().is_some_and(|n| n.is_null(row_idx)) {
211232
map_builder.append(false)?;
@@ -226,31 +247,56 @@ fn str_to_map_impl<'a, V: StringArrayType<'a> + Copy>(
226247
continue;
227248
}
228249

229-
seen_keys.clear();
230-
for pair in text.split(pair_delim) {
231-
if pair.is_empty() {
232-
continue;
250+
if last_value_wins {
251+
pairs.clear();
252+
key_positions.clear();
253+
for pair in text.split(pair_delim) {
254+
if pair.is_empty() {
255+
continue;
256+
}
257+
let mut kv_iter = pair.splitn(2, kv_delim);
258+
let key = kv_iter.next().unwrap_or("");
259+
let value = kv_iter.next();
260+
match key_positions.get(key) {
261+
Some(&idx) => pairs[idx].1 = value,
262+
None => {
263+
key_positions.insert(key, pairs.len());
264+
pairs.push((key, value));
265+
}
266+
}
267+
}
268+
for (key, value) in &pairs {
269+
map_builder.keys().append_value(key);
270+
match value {
271+
Some(v) => map_builder.values().append_value(v),
272+
None => map_builder.values().append_null(),
273+
}
233274
}
275+
} else {
276+
seen_keys.clear();
277+
for pair in text.split(pair_delim) {
278+
if pair.is_empty() {
279+
continue;
280+
}
234281

235-
let mut kv_iter = pair.splitn(2, kv_delim);
236-
let key = kv_iter.next().unwrap_or("");
237-
let value = kv_iter.next();
282+
let mut kv_iter = pair.splitn(2, kv_delim);
283+
let key = kv_iter.next().unwrap_or("");
284+
let value = kv_iter.next();
238285

239-
// TODO: Support LAST_WIN policy via spark.sql.mapKeyDedupPolicy config
240-
// EXCEPTION policy: error on duplicate keys (Spark 3.0+ default)
241-
if !seen_keys.insert(key) {
242-
return exec_err!(
243-
"Duplicate map key '{key}' was found, please check the input data. \
244-
If you want to remove the duplicated keys, you can set \
245-
spark.sql.mapKeyDedupPolicy to \"LAST_WIN\" so that the key \
246-
inserted at last takes precedence."
247-
);
248-
}
286+
if !seen_keys.insert(key) {
287+
return exec_err!(
288+
"[DUPLICATED_MAP_KEY] Duplicate map key '{key}' was found, \
289+
please check the input data. To allow duplicate keys with \
290+
last-value-wins semantics, set \
291+
`datafusion.spark.map_key_dedup_policy` to `LAST_WIN`."
292+
);
293+
}
249294

250-
map_builder.keys().append_value(key);
251-
match value {
252-
Some(v) => map_builder.values().append_value(v),
253-
None => map_builder.values().append_null(),
295+
map_builder.keys().append_value(key);
296+
match value {
297+
Some(v) => map_builder.values().append_value(v),
298+
None => map_builder.values().append_null(),
299+
}
254300
}
255301
}
256302
map_builder.append(true)?;

0 commit comments

Comments
 (0)