diff --git a/datafusion/spark/src/function/array/arrays_zip.rs b/datafusion/spark/src/function/array/arrays_zip.rs new file mode 100644 index 0000000000000..95f0c337cd22c --- /dev/null +++ b/datafusion/spark/src/function/array/arrays_zip.rs @@ -0,0 +1,143 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{Array, ArrayRef, AsArray, ListArray, StructArray}; +use arrow::datatypes::{DataType, Field, Fields}; +use datafusion_common::cast::as_list_array; +use datafusion_common::{Result, ScalarValue, exec_err}; +use datafusion_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, +}; +use datafusion_functions_nested::arrays_zip::ArraysZip; +use std::sync::Arc; + +/// Spark-compatible `arrays_zip` function. +/// +/// Delegates to DataFusion's `arrays_zip` and renames the inner struct fields +/// to use 0-based ordinals (`0`, `1`, `2`, ...) instead of DataFusion's 1-based +/// ordinals, matching Spark's [`arrays_zip`] semantics. +/// +/// [`arrays_zip`]: https://spark.apache.org/docs/latest/api/sql/index.html#arrays_zip +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkArraysZip { + signature: Signature, +} + +impl Default for SparkArraysZip { + fn default() -> Self { + Self::new() + } +} + +impl SparkArraysZip { + pub fn new() -> Self { + Self { + signature: Signature::variadic_any(Volatility::Immutable), + } + } +} + +impl ScalarUDFImpl for SparkArraysZip { + fn name(&self) -> &str { + "arrays_zip" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> Result { + let inner = ArraysZip::new().return_type(arg_types)?; + rename_return_type_zero_based(&inner) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let number_rows = args.number_rows; + let result = ArraysZip::new().invoke_with_args(args)?; + match result { + ColumnarValue::Array(arr) => { + let renamed = rename_list_struct_fields_zero_based(&arr)?; + Ok(ColumnarValue::Array(renamed)) + } + ColumnarValue::Scalar(scalar) => { + let arr = scalar.to_array_of_size(number_rows.max(1))?; + let renamed = rename_list_struct_fields_zero_based(&arr)?; + let new_scalar = ScalarValue::try_from_array(&renamed, 0)?; + Ok(ColumnarValue::Scalar(new_scalar)) + } + } + } +} + +/// Rename struct fields inside a `List>` data type to use 0-based ordinals. +fn rename_return_type_zero_based(data_type: &DataType) -> Result { + let DataType::List(list_field) = data_type else { + return exec_err!("arrays_zip expected List return type, got {data_type}"); + }; + let DataType::Struct(fields) = list_field.data_type() else { + return exec_err!( + "arrays_zip expected List> return type, got {data_type}" + ); + }; + + let new_struct = DataType::Struct(zero_based_fields(fields)); + Ok(DataType::List(Arc::new(Field::new( + list_field.name(), + new_struct, + list_field.is_nullable(), + )))) +} + +/// Rebuild a `List>` array so that the inner struct fields use 0-based +/// ordinal names. The underlying column data and null buffers are reused; only +/// the schema is replaced. +fn rename_list_struct_fields_zero_based(array: &dyn Array) -> Result { + let list = as_list_array(array)?; + let struct_array = list.values().as_struct(); + let new_fields = zero_based_fields(struct_array.fields()); + + let new_struct = StructArray::try_new( + new_fields, + struct_array.columns().to_vec(), + struct_array.nulls().cloned(), + )?; + + let new_list_field = + Arc::new(Field::new_list_field(new_struct.data_type().clone(), true)); + let new_list = ListArray::try_new( + new_list_field, + list.offsets().clone(), + Arc::new(new_struct), + list.nulls().cloned(), + )?; + Ok(Arc::new(new_list)) +} + +fn zero_based_fields(fields: &Fields) -> Fields { + fields + .iter() + .enumerate() + .map(|(i, f)| { + Arc::new(Field::new( + i.to_string(), + f.data_type().clone(), + f.is_nullable(), + )) + }) + .collect::>() + .into() +} diff --git a/datafusion/spark/src/function/array/mod.rs b/datafusion/spark/src/function/array/mod.rs index 6c16e05361641..d15bf4c64489f 100644 --- a/datafusion/spark/src/function/array/mod.rs +++ b/datafusion/spark/src/function/array/mod.rs @@ -16,6 +16,7 @@ // under the License. pub mod array_contains; +pub mod arrays_zip; pub mod repeat; pub mod shuffle; pub mod slice; @@ -30,6 +31,7 @@ make_udf_function!(spark_array::SparkArray, array); make_udf_function!(shuffle::SparkShuffle, shuffle); make_udf_function!(repeat::SparkArrayRepeat, array_repeat); make_udf_function!(slice::SparkSlice, slice); +make_udf_function!(arrays_zip::SparkArraysZip, arrays_zip); pub mod expr_fn { use datafusion_functions::export_functions; @@ -55,6 +57,11 @@ pub mod expr_fn { "Returns a slice of the array from the start index with the given length.", array start length )); + export_functions!(( + arrays_zip, + "Returns an array of structs created by combining the elements of each input array at the same index. If the arrays have different lengths, shorter arrays are padded with NULLs.", + args + )); } pub fn functions() -> Vec> { @@ -64,5 +71,6 @@ pub fn functions() -> Vec> { shuffle(), array_repeat(), slice(), + arrays_zip(), ] } diff --git a/datafusion/sqllogictest/test_files/spark/array/arrays_zip.slt b/datafusion/sqllogictest/test_files/spark/array/arrays_zip.slt new file mode 100644 index 0000000000000..d1569fadea5c7 --- /dev/null +++ b/datafusion/sqllogictest/test_files/spark/array/arrays_zip.slt @@ -0,0 +1,131 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +## Spark `arrays_zip` returns a list of structs whose fields use 0-based ordinal +## names (`0`, `1`, ...), in contrast to DataFusion's 1-based ordinals. +## +## Scenarios below mirror Apache Spark's +## `DataFrameFunctionsSuite#"dataframe arrays_zip function"` and +## `"SPARK-24633: arrays_zip splits input processing correctly"`. + +# Spark docs example: arrays_zip(array(1, 2, 3), array(2, 3, 4), array(3, 4, 5)) +query ? +SELECT arrays_zip(array(1, 2, 3), array(2, 3, 4), array(3, 4, 5)); +---- +[{0: 1, 1: 2, 2: 3}, {0: 2, 1: 3, 2: 4}, {0: 3, 1: 4, 2: 5}] + +# Spark df1: equal-length integer arrays. +# Seq(9001, 9002, 9003), Seq(4, 5, 6) +query ? +SELECT arrays_zip(array(9001, 9002, 9003), array(4, 5, 6)); +---- +[{0: 9001, 1: 4}, {0: 9002, 1: 5}, {0: 9003, 1: 6}] + +# Spark df2: three arrays with mixed element types (string, boolean, int). +# Seq("a", "b"), Seq(true, false), Seq(10, 11) +query ? +SELECT arrays_zip(array('a', 'b'), array(true, false), array(10, 11)); +---- +[{0: a, 1: true, 2: 10}, {0: b, 1: false, 2: 11}] + +# Spark df3: shorter first array padded with NULL. +# Seq("a", "b"), Seq(4, 5, 6) +query ? +SELECT arrays_zip(array('a', 'b'), array(4, 5, 6)); +---- +[{0: a, 1: 4}, {0: b, 1: 5}, {0: NULL, 1: 6}] + +# Spark df4: NULL inside an array plus shorter second array. +# Seq("a", "b", null), Seq(4L) +query ? +SELECT arrays_zip(array('a', 'b', NULL), array(arrow_cast(4, 'Int64'))); +---- +[{0: a, 1: 4}, {0: b, 1: NULL}, {0: NULL, 1: NULL}] + +# Spark df5: four arrays exercising single-element, single-null, empty, and all-null cases. +# Seq(-1), Seq(null), Seq(), Seq(null, null) +query ? +SELECT arrays_zip( + array(-1), + array(arrow_cast(NULL, 'Int32')), + arrow_cast(make_array(), 'List(Int32)'), + array(arrow_cast(NULL, 'Int32'), arrow_cast(NULL, 'Int32')) +); +---- +[{0: -1, 1: NULL, 2: NULL, 3: NULL}, {0: NULL, 1: NULL, 2: NULL, 3: NULL}] + +# Spark df7: nested arrays zipped with doubles. +# Seq(Seq(1, 2, 3), Seq(4, 5)), Seq(1.1, 2.2) +query ? +SELECT arrays_zip(array(array(1, 2, 3), array(4, 5)), array(1.1, 2.2)); +---- +[{0: [1, 2, 3], 1: 1.1}, {0: [4, 5], 1: 2.2}] + +# SPARK-24633: arrays_zip with many single-element arrays merges into one row. +# (0 to 5).map(x => array(id + x)) on spark.range(1) → Row(Seq(Row(0, 1, 2, 3, 4, 5))) +query ? +SELECT arrays_zip(array(0), array(1), array(2), array(3), array(4), array(5)); +---- +[{0: 0, 1: 1, 2: 2, 3: 3, 4: 4, 5: 5}] + +# Both arguments are NULL list → result is NULL. +query ? +SELECT arrays_zip(arrow_cast(NULL, 'List(Int32)'), arrow_cast(NULL, 'List(Int32)')); +---- +NULL + +# Single argument: still produces a 0-based struct. +query ? +SELECT arrays_zip(array(1, 2, 3)); +---- +[{0: 1}, {0: 2}, {0: 3}] + +# Column-level: multiple rows with different lengths. +query ? +SELECT arrays_zip(a, b) +FROM (VALUES ([1, 2], [10, 20]), ([3, 4, 5], [30]), ([6], [60, 70])) AS t(a, b); +---- +[{0: 1, 1: 10}, {0: 2, 1: 20}] +[{0: 3, 1: 30}, {0: 4, 1: NULL}, {0: 5, 1: NULL}] +[{0: 6, 1: 60}, {0: NULL, 1: 70}] + +# Column-level: NULL rows in the input. +query ? +SELECT arrays_zip(a, b) +FROM (VALUES ([1, 2], [10, 20]), (null, [30, 40]), ([5, 6], null)) AS t(a, b); +---- +[{0: 1, 1: 10}, {0: 2, 1: 20}] +[{0: NULL, 1: 30}, {0: NULL, 1: 40}] +[{0: 5, 1: NULL}, {0: 6, 1: NULL}] + +# LargeList inputs are accepted (DataFusion-specific list flavor). +query ? +SELECT arrays_zip( + arrow_cast(make_array(1, 2, 3), 'LargeList(Int64)'), + arrow_cast(make_array(10, 20, 30), 'LargeList(Int64)') +); +---- +[{0: 1, 1: 10}, {0: 2, 1: 20}, {0: 3, 1: 30}] + +# FixedSizeList inputs are accepted (DataFusion-specific list flavor). +query ? +SELECT arrays_zip( + arrow_cast(make_array(1, 2, 3), 'FixedSizeList(3, Int64)'), + arrow_cast(make_array(10, 20, 30), 'FixedSizeList(3, Int64)') +); +---- +[{0: 1, 1: 10}, {0: 2, 1: 20}, {0: 3, 1: 30}]