[SPARK-57864][PYTHON] Speed up Arrow-optimized Python UDF input conversion for array columns#56943
[SPARK-57864][PYTHON] Speed up Arrow-optimized Python UDF input conversion for array columns#56943viirya wants to merge 2 commits into
Conversation
…rsion for array columns ### What changes were proposed in this pull request? In the Arrow-optimized (non-legacy) regular Python UDF path, input columns are converted from Arrow to Python objects. For an array column whose leaf element type needs no per-element input converter (numeric / bool / string / binary and nested arrays of those), the column was materialized with `pa.Array.to_pylist()`, which is markedly slower than `pa.Array.to_pandas()` for list types. This PR converts such columns via `to_pandas()` and then recursively turns the resulting numpy ndarrays back into Python lists, so the UDF still receives the same objects (Python `list`, not `ndarray`) that `to_pylist()` would have produced, at any nesting depth. A per-column predicate `_input_fast_listify_safe` gates this: it applies only when the column-level input converter is `None` (no per-element coercion needed) and the type is an array whose leaf needs no converter. Columns needing a per-element converter (e.g. timestamp, struct, map, UDT) keep the existing `to_pylist()`-based path unchanged. ### Why are the changes needed? `to_pylist()` on nested/list Arrow arrays is the dominant cost of the Arrow Python UDF input path. Combined with the output-side fix (SPARK-57863), this removes the remaining nested-column regression from the pandas->Arrow default. Microbenchmark (`select max(f(arr))`, 4M rows, best-of-6), new PyArrow path vs legacy pandas path: | input type | before (ratio) | after (ratio) | |-----------------------|----------------|---------------| | array<long> | 1.44x | 0.89x | | array<string> | 2.44x | 1.02x | | array<array<int>> | 1.43x | 0.80x | ### Does this PR introduce _any_ user-facing change? No. The UDF receives the same Python objects as before; only the conversion mechanism changes, gated to cases proven equivalent. ### How was this patch tested? New tests in `test_arrow_python_udf.py` verify array inputs (including nested arrays) reach the UDF as Python lists and that array<string> values are preserved. The full `test_arrow_python_udf` module passes. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code (Opus 4.8) Co-authored-by: Claude Code
…W_BATCHED_UDF benchmark ### What changes were proposed in this pull request? Extends the `SQL_ARROW_BATCHED_UDF` ASV microbenchmark in `python/benchmarks/bench_eval_type.py` to cover array input columns: - `string_array` and `nested_int_array` entries in `MockDataFactory.TYPE_REGISTRY`, plus `pure_string_arrays` and `pure_nested_int_arrays` type pools / scenarios; - a `consume_udf` in `_ArrowBatchedBenchMixin._udfs` that reads the input value and returns a scalar (trivial output), so the Arrow->Python input conversion dominates the measurement. It is type-agnostic, so it stays valid across the whole scenario x UDF cross product. ### Why are the changes needed? This gives the standard PySpark benchmark suite coverage of the array (and nested-array) input conversion optimized by this change. Driving the worker directly on `consume_udf`: `pure_string_arrays` improves from ~499ms to ~185ms and `pure_nested_int_arrays` from ~924ms to ~392ms per run. ### Does this PR introduce _any_ user-facing change? No (benchmark only). ### How was this patch tested? Ran the new scenarios through `ArrowBatchedUDFTimeBench` / `ArrowBatchedUDFPeakmemBench` and verified the full scenario x UDF cross product (40 combinations) runs without error. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code (Opus 4.8) Co-authored-by: Claude Code
|
A few reasons I'm against this:
To me, this is an arrow issue. |
Note that OSS doesn't have UDF servers. 😄 |
Yeah, whatever node that's running the UDF process. |
|
Also this might requires extra memory too - probably a copy for pandas dataframe. |
|
Thanks @gaogaotiantian — you're right, and I reproduced the failing case. I'm going to close this. The concrete breakage is a numeric array with null elements. For And I think your deeper point is the right framing: this is fundamentally an Arrow issue. Closing this. I'll look at whether the |
What changes were proposed in this pull request?
In the Arrow-optimized (non-legacy) regular Python UDF path, input columns are converted from Arrow to Python objects. For an array column whose leaf element type needs no per-element input converter (numeric / bool / string / binary and nested arrays of those), the column was materialized with
pa.Array.to_pylist(), which is markedly slower thanpa.Array.to_pandas()for list types.This PR converts such columns via
to_pandas()and then recursively turns the resulting numpy ndarrays back into Python lists, so the UDF still receives the same objects (Pythonlist, notndarray) thatto_pylist()would have produced, at any nesting depth. A per-column predicate_input_fast_listify_safegates this: it applies only when the column-level input converter isNone(no per-element coercion needed) and the type is an array whose leaf needs no converter. Columns needing a per-element converter (e.g. timestamp, struct, map, UDT) keep the existingto_pylist()-based path unchanged.This is independent of, and complementary to, the output-side fix in SPARK-57863: that one removes redundant per-element output conversion, this one speeds up input conversion. Together they eliminate the nested-column regression introduced when the Arrow serializer became the default for regular Python UDFs.
Why are the changes needed?
to_pylist()on nested/list Arrow arrays is the dominant cost of the Arrow Python UDF input path. Microbenchmark (select max(f(arr)), 4M rows, best-of-6), new Arrow path vs the legacy pandas path (spark.sql.legacy.execution.pythonUDF.pandas.conversion.enabled=true):array<long>array<string>array<array<int>>(ratio = Arrow path time / legacy pandas path time; > 1 means the Arrow path is slower.)
Does this PR introduce any user-facing change?
No. The UDF receives the same Python objects as before; only the conversion mechanism changes, gated to cases proven equivalent.
How was this patch tested?
New tests in
test_arrow_python_udf.pyverify array inputs (including nested arrays) reach the UDF as Python lists and that array values are preserved. The fulltest_arrow_python_udfmodule passes.This PR also adds array/nested-array input scenarios to the
SQL_ARROW_BATCHED_UDFmicrobenchmark inpython/benchmarks/bench_eval_type.py(pure_string_arrays,pure_nested_int_arrays, and aconsume_udfwhose output is trivial so the input conversion dominates). Driving the worker directly onconsume_udf(best-of-3, 50k rows x 10 array columns):pure_string_arrayspure_nested_int_arraysRelated PRs
This is the input-side half of the pandas->Arrow default regression on nested columns. The output-side half is fixed in #56940 (SPARK-57863).
Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Opus 4.8)