Skip to content

[SPARK-57864][PYTHON] Speed up Arrow-optimized Python UDF input conversion for array columns#56943

Closed
viirya wants to merge 2 commits into
apache:masterfrom
viirya:es-2023952-input-standalone
Closed

[SPARK-57864][PYTHON] Speed up Arrow-optimized Python UDF input conversion for array columns#56943
viirya wants to merge 2 commits into
apache:masterfrom
viirya:es-2023952-input-standalone

Conversation

@viirya

@viirya viirya commented Jul 1, 2026

Copy link
Copy Markdown
Member

What changes were proposed in this pull request?

In the Arrow-optimized (non-legacy) regular Python UDF path, input columns are converted from Arrow to Python objects. For an array column whose leaf element type needs no per-element input converter (numeric / bool / string / binary and nested arrays of those), the column was materialized with pa.Array.to_pylist(), which is markedly slower than pa.Array.to_pandas() for list types.

This PR converts such columns via to_pandas() and then recursively turns the resulting numpy ndarrays back into Python lists, so the UDF still receives the same objects (Python list, not ndarray) that to_pylist() would have produced, at any nesting depth. A per-column predicate _input_fast_listify_safe gates this: it applies only when the column-level input converter is None (no per-element coercion needed) and the type is an array whose leaf needs no converter. Columns needing a per-element converter (e.g. timestamp, struct, map, UDT) keep the existing to_pylist()-based path unchanged.

This is independent of, and complementary to, the output-side fix in SPARK-57863: that one removes redundant per-element output conversion, this one speeds up input conversion. Together they eliminate the nested-column regression introduced when the Arrow serializer became the default for regular Python UDFs.

Why are the changes needed?

to_pylist() on nested/list Arrow arrays is the dominant cost of the Arrow Python UDF input path. Microbenchmark (select max(f(arr)), 4M rows, best-of-6), new Arrow path vs the legacy pandas path (spark.sql.legacy.execution.pythonUDF.pandas.conversion.enabled=true):

input type before (ratio) after (ratio)
array<long> 1.44x 0.89x
array<string> 2.44x 1.02x
array<array<int>> 1.43x 0.80x

(ratio = Arrow path time / legacy pandas path time; > 1 means the Arrow path is slower.)

Does this PR introduce any user-facing change?

No. The UDF receives the same Python objects as before; only the conversion mechanism changes, gated to cases proven equivalent.

How was this patch tested?

New tests in test_arrow_python_udf.py verify array inputs (including nested arrays) reach the UDF as Python lists and that array values are preserved. The full test_arrow_python_udf module passes.

This PR also adds array/nested-array input scenarios to the SQL_ARROW_BATCHED_UDF microbenchmark in python/benchmarks/bench_eval_type.py (pure_string_arrays, pure_nested_int_arrays, and a consume_udf whose output is trivial so the input conversion dominates). Driving the worker directly on consume_udf (best-of-3, 50k rows x 10 array columns):

scenario before after speedup
pure_string_arrays ~498 ms ~185 ms ~2.7x
pure_nested_int_arrays ~931 ms ~392 ms ~2.4x

Related PRs

This is the input-side half of the pandas->Arrow default regression on nested columns. The output-side half is fixed in #56940 (SPARK-57863).

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (Opus 4.8)

…rsion for array columns

### What changes were proposed in this pull request?

In the Arrow-optimized (non-legacy) regular Python UDF path, input columns are
converted from Arrow to Python objects. For an array column whose leaf element
type needs no per-element input converter (numeric / bool / string / binary and
nested arrays of those), the column was materialized with
`pa.Array.to_pylist()`, which is markedly slower than `pa.Array.to_pandas()` for
list types.

This PR converts such columns via `to_pandas()` and then recursively turns the
resulting numpy ndarrays back into Python lists, so the UDF still receives the
same objects (Python `list`, not `ndarray`) that `to_pylist()` would have
produced, at any nesting depth. A per-column predicate `_input_fast_listify_safe`
gates this: it applies only when the column-level input converter is `None`
(no per-element coercion needed) and the type is an array whose leaf needs no
converter. Columns needing a per-element converter (e.g. timestamp, struct, map,
UDT) keep the existing `to_pylist()`-based path unchanged.

### Why are the changes needed?

`to_pylist()` on nested/list Arrow arrays is the dominant cost of the Arrow
Python UDF input path. Combined with the output-side fix (SPARK-57863), this
removes the remaining nested-column regression from the pandas->Arrow default.
Microbenchmark (`select max(f(arr))`, 4M rows, best-of-6), new PyArrow path vs
legacy pandas path:

| input type            | before (ratio) | after (ratio) |
|-----------------------|----------------|---------------|
| array<long>           | 1.44x          | 0.89x         |
| array<string>         | 2.44x          | 1.02x         |
| array<array<int>>     | 1.43x          | 0.80x         |

### Does this PR introduce _any_ user-facing change?

No. The UDF receives the same Python objects as before; only the conversion
mechanism changes, gated to cases proven equivalent.

### How was this patch tested?

New tests in `test_arrow_python_udf.py` verify array inputs (including nested
arrays) reach the UDF as Python lists and that array<string> values are
preserved. The full `test_arrow_python_udf` module passes.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (Opus 4.8)

Co-authored-by: Claude Code
…W_BATCHED_UDF benchmark

### What changes were proposed in this pull request?

Extends the `SQL_ARROW_BATCHED_UDF` ASV microbenchmark in
`python/benchmarks/bench_eval_type.py` to cover array input columns:

- `string_array` and `nested_int_array` entries in
  `MockDataFactory.TYPE_REGISTRY`, plus `pure_string_arrays` and
  `pure_nested_int_arrays` type pools / scenarios;
- a `consume_udf` in `_ArrowBatchedBenchMixin._udfs` that reads the input value
  and returns a scalar (trivial output), so the Arrow->Python input conversion
  dominates the measurement. It is type-agnostic, so it stays valid across the
  whole scenario x UDF cross product.

### Why are the changes needed?

This gives the standard PySpark benchmark suite coverage of the array (and
nested-array) input conversion optimized by this change. Driving the worker
directly on `consume_udf`: `pure_string_arrays` improves from ~499ms to ~185ms
and `pure_nested_int_arrays` from ~924ms to ~392ms per run.

### Does this PR introduce _any_ user-facing change?

No (benchmark only).

### How was this patch tested?

Ran the new scenarios through `ArrowBatchedUDFTimeBench` /
`ArrowBatchedUDFPeakmemBench` and verified the full scenario x UDF cross product
(40 combinations) runs without error.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (Opus 4.8)

Co-authored-by: Claude Code
@gaogaotiantian

gaogaotiantian commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

A few reasons I'm against this:

  1. This makes pandas a hard requirement for UDF servers, where it wasn't.
  2. The code is significantly more difficult to read and understand (for humans at least).
  3. We introduced another type layer which could potentially cause all kinds of coercion issue between pandas/arrow/python. It might work for simple cases, but I'm pretty sure we'll hit corner cases all the time. (CI already failed for a case)

To me, this is an arrow issue. to_pylist() should not be significantly slower than converting the arrow column to pandas then back to Python objects.

@viirya

viirya commented Jul 1, 2026

Copy link
Copy Markdown
Member Author

This makes pandas a hard requirement for UDF servers, where it wasn't.

Note that OSS doesn't have UDF servers. 😄

@gaogaotiantian

Copy link
Copy Markdown
Contributor

Note that OSS doesn't have UDF servers. 😄

Yeah, whatever node that's running the UDF process. pandas was never a hard requirement for spark. If users don't use pandas UDF, they don't need pandas on their machine. to_pandas() implicitly requires pandas - it would raise an exception if pandas does not exist.

@gaogaotiantian

Copy link
Copy Markdown
Contributor

Also this might requires extra memory too - probably a copy for pandas dataframe.

@viirya

viirya commented Jul 1, 2026

Copy link
Copy Markdown
Member Author

Thanks @gaogaotiantian — you're right, and I reproduced the failing case. I'm going to close this.

The concrete breakage is a numeric array with null elements. For array<int> value [1, None, 3], to_pandas() promotes the inner array to a numpy float64 array [1., nan, 3.] (numpy int can't hold nulls), so the UDF receives nan instead of None and the downstream int cast fails with Float value nan was truncated. to_pylist() returns the correct [1, None, 3]. The recursive ndarray->list step can't recover this, since the null is already lost to nan at the numpy layer. This is exactly the coercion corner case you predicted — my test_arrow_python_udf additions only used non-null arrays, so they missed it.

And I think your deeper point is the right framing: this is fundamentally an Arrow issue. to_pylist() on a nested/list array shouldn't be several times slower than to_pandas() + converting back, and the right fix is upstream in pyarrow rather than a pandas detour in Spark that keeps re-introducing type-coercion differences. Routing through pandas/numpy trades a perf gap for a class of correctness risks (this null case, plus the bytes/str coercion that sank the companion output-side PR #56940).

Closing this. I'll look at whether the to_pylist() slowness can be raised/fixed on the Arrow side instead.

@viirya viirya closed this Jul 1, 2026
@viirya viirya deleted the es-2023952-input-standalone branch July 1, 2026 22:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants