Skip to content

Commit 70469a2

Browse files
committed
[SPARK-55934][PYTHON][TEST][FOLLOWUP] Fix MAP_ARROW_ITER bench UDF return type
### What changes were proposed in this pull request? Fix the default `ret_type` resolution in `_MapArrowIterBenchMixin._write_scenario` in `python/benchmarks/bench_eval_type.py`. The three benchmark UDFs (`identity_udf`, `sort_udf`, `filter_udf`) all return whole `pa.RecordBatch`es with the input row schema, so their declared return type should be the inner row `StructType`, not the first nested field's data type. Before: ```python ret_type = schema.fields[0].dataType.fields[0].dataType # first column's type ``` After: ```python ret_type = schema.fields[0].dataType # the row's StructType ``` ### Why are the changes needed? `mapInArrow` UDFs are contractually `(Iterator[pa.RecordBatch]) -> Iterator[pa.RecordBatch]`, with the user-supplied `schema` describing the output rows as a whole. The benchmark UDFs return full batches but were declaring just one column's type, which is semantically inconsistent with the API. This did not surface as an error because `worker.py` discards `return_type` for `SQL_MAP_ARROW_ITER_UDF` in `read_single_udf` (returns `None`) and only checks `Iterator[pa.RecordBatch]` structurally via `verify_return_type`. Schema-level validation is currently absent, so the mismatched type was tolerated. If the worker ever adds schema validation for this eval type, the previous declaration would break the benchmark. ### Does this PR introduce _any_ user-facing change? No. Test-only change in the benchmark module. ### How was this patch tested? - Confirmed the new default resolves to a 5-field `StructType` matching the input row schema for `sm_batch_few_col`. - Ran `MapArrowIterUDFTimeBench.setup` + `time_worker` for `(sm_batch_few_col, pure_ints) x (identity_udf, sort_udf, filter_udf)`. - Ran `MapArrowIterUDFPeakmemBench.setup` + `peakmem_worker` for `sm_batch_few_col/identity_udf`. ### Was this patch authored or co-authored using generative AI tooling? Yes. Generated-by: Claude Code (claude-opus-4-7) Closes apache#56168 from viirya/SPARK-55934-followup. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
1 parent 63f9c88 commit 70469a2

1 file changed

Lines changed: 4 additions & 1 deletion

File tree

python/benchmarks/bench_eval_type.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1457,7 +1457,10 @@ def _write_scenario(self, scenario, udf_name, buf):
14571457
batches, schema = self._build_scenario(scenario)
14581458
udf_func, ret_type, arg_offsets = self._udfs[udf_name]
14591459
if ret_type is None:
1460-
ret_type = schema.fields[0].dataType.fields[0].dataType
1460+
# mapInArrow UDFs return an Iterator[pa.RecordBatch] with the same
1461+
# schema as the input row (the inner struct, since make_batches
1462+
# wraps the row schema in a single struct column for the wire).
1463+
ret_type = schema.fields[0].dataType
14611464
MockProtocolWriter.write_worker_input(
14621465
PythonEvalType.SQL_MAP_ARROW_ITER_UDF,
14631466
lambda b: MockProtocolWriter.write_udf_payload(udf_func, ret_type, arg_offsets, b),

0 commit comments

Comments
 (0)