Skip to content

[SPARK-57863][PYTHON] Avoid redundant per-element output conversion in Arrow-optimized Python UDF#56940

Closed
viirya wants to merge 3 commits into
apache:masterfrom
viirya:es-2023952-arrow-udf-regression
Closed

[SPARK-57863][PYTHON] Avoid redundant per-element output conversion in Arrow-optimized Python UDF#56940
viirya wants to merge 3 commits into
apache:masterfrom
viirya:es-2023952-arrow-udf-regression

Conversation

@viirya

@viirya viirya commented Jul 1, 2026

Copy link
Copy Markdown
Member

What changes were proposed in this pull request?

In the Arrow-optimized (non-legacy) regular Python UDF path, the UDF output is converted from Python objects to an Arrow array. Previously, when the return type needed a converter (e.g. array<string>), a per-element Python converter (LocalDataToArrowConversion) was run over every element of every row before building the Arrow array, even when the elements already matched the declared type and PyArrow could build the array directly.

This PR adds a fast path: for return types with no value-transforming coercion, it first tries pa.array(results, type=arrow_return_type) on the raw UDF results and only falls back to the per-element converter if PyArrow rejects them (an element genuinely needs coercion). A static per-UDF predicate, _output_fast_path_safe, gates the fast path and excludes types whose converter transforms a value that pa.array would also accept (silent divergence) or where pa.array accepts input the converter is meant to reject:

  • Timestamp / TimestampNTZ: converter truncates to the session time zone.
  • Decimal: pa.array coerces int -> decimal, bypassing the intToDecimalCoercionEnabled gate and rescaling / Decimal('NaN') handling.
  • UDT / Variant / Geometry / Geography: converter serializes to a storage form.
  • Null.

Numeric / bool / string / binary and nested containers of those are safe.

Why are the changes needed?

The per-element output converter is pure overhead when results already match the declared type. For an array<string> UDF (the reverse_array shape), the converter accounts for a large fraction of output time; a microbenchmark on select max(reverse_array(array(a, b))) over 4M rows (best-of-6) shows the Arrow path improving from 2.48s to 1.86s (~25% faster), narrowing the gap to the legacy pandas path from 1.77x to 1.31x.

Does this PR introduce any user-facing change?

No. Output values are unchanged; the fast path is only taken when it produces results identical to the existing converter path.

How was this patch tested?

New tests in test_arrow_python_udf.py covering array<string> output (already correct types), array<string> output requiring int->string coercion (fallback path), and array<timestamp> output verifying session-timezone handling (an excluded type). They run under both legacy and non-legacy conversion configs. The full test_arrow_python_udf module (301 tests) passes.

This PR also adds an array<string> scenario and a to_string_array_udf (array in / array out) to the SQL_ARROW_BATCHED_UDF microbenchmark in python/benchmarks/bench_eval_type.py. Driving the worker directly on to_string_array_udf over an array<string> column, the output conversion savings grow with the number of rows (at the suite's default small size they are masked by fixed per-batch and input-conversion cost):

scenario before after speedup
1M-row array<string>, to_string_array_udf ~1438 ms ~1212 ms ~1.2x

Related PRs

This is the output-side half of the pandas->Arrow default regression on nested columns. The input-side half is fixed in #56943 (SPARK-57864).

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (Opus 4.8)

viirya added 3 commits July 1, 2026 13:02
…n Arrow-optimized Python UDF

### What changes were proposed in this pull request?

In the Arrow-optimized (non-legacy) regular Python UDF path, the UDF output is
converted from Python objects to an Arrow array. Previously, when the return
type needed a converter (e.g. `array<string>`), a per-element Python converter
(`LocalDataToArrowConversion`) was run over every element of every row before
building the Arrow array, even when the elements already matched the declared
type and PyArrow could build the array directly.

This PR adds a fast path: for return types with no value-transforming coercion,
it first tries `pa.array(results, type=arrow_return_type)` on the raw UDF
results and only falls back to the per-element converter if PyArrow rejects them
(an element genuinely needs coercion). A static per-UDF predicate,
`_output_fast_path_safe`, gates the fast path and excludes types whose converter
transforms a value that `pa.array` would also accept (silent divergence) or
where `pa.array` accepts input the converter is meant to reject:

- Timestamp / TimestampNTZ: converter truncates to the session time zone.
- Decimal: `pa.array` coerces int -> decimal, bypassing the
  `intToDecimalCoercionEnabled` gate and rescaling / `Decimal('NaN')` handling.
- UDT / Variant / Geometry / Geography: converter serializes to a storage form.
- Null.

Numeric / bool / string / binary and nested containers of those are safe.

### Why are the changes needed?

The per-element output converter is pure overhead when results already match the
declared type. For an `array<string>` UDF (the `reverse_array` shape), the
converter accounts for a large fraction of output time; a microbenchmark on
`select max(reverse_array(array(a, b)))` over 4M rows (best-of-6) shows the
Arrow path improving from 2.48s to 1.86s (~25% faster), narrowing the gap to
the legacy pandas path from 1.77x to 1.31x.

### Does this PR introduce _any_ user-facing change?

No. Output values are unchanged; the fast path is only taken when it produces
results identical to the existing converter path.

### How was this patch tested?

New tests in `test_arrow_python_udf.py` covering `array<string>` output (already
correct types), `array<string>` output requiring int->string coercion (fallback
path), and `array<timestamp>` output verifying session-timezone handling (an
excluded type). They run under both legacy and non-legacy conversion configs.
The full `test_arrow_python_udf` module (301 tests) passes.

Co-authored-by: Claude Code
…_ARROW_BATCHED_UDF benchmark

### What changes were proposed in this pull request?

Extends the existing `SQL_ARROW_BATCHED_UDF` ASV microbenchmark in
`python/benchmarks/bench_eval_type.py` to cover a nested (array) type:

- a `string_array` entry in `MockDataFactory.TYPE_REGISTRY` and a
  `pure_string_arrays` type pool / scenario, producing an `array<string>` input
  column;
- a `to_string_array_udf` in `_ArrowBatchedBenchMixin._udfs` that returns
  `array<string>`. It is type-agnostic on input (like the existing UDFs) so it
  stays valid across the whole scenario x UDF cross product, but paired with the
  `pure_string_arrays` scenario it gives an array<string>-in / array<string>-out
  case that stresses the output-side Python->Arrow conversion this change
  optimizes.

### Why are the changes needed?

The main change adds an output fast path for nested types; this gives the
standard PySpark benchmark suite coverage of that path so the effect is
measurable in a consistent environment. Driving the worker directly at 1M rows
shows the `array<string>` output improving from ~1412ms to ~1213ms.

### Does this PR introduce _any_ user-facing change?

No (benchmark only).

### How was this patch tested?

Ran the new scenario and UDF through `ArrowBatchedUDFTimeBench` and
`ArrowBatchedUDFPeakmemBench`, and verified the full scenario x UDF cross product
runs without error.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (Opus 4.8)

Co-authored-by: Claude Code
…non-legacy path

test_array_string_output_requires_coercion asserts int->string coercion inside
an array<string> return, which only the non-legacy (Arrow-direct) conversion
path performs; the legacy pandas path raises. Wrap the test body in
sql_conf(...pandas.conversion.enabled=False) so it exercises the intended path
under all suite variants, matching test_arrow_udf_int_to_decimal_coercion.

Co-authored-by: Claude Code
@gaogaotiantian

Copy link
Copy Markdown
Contributor

This skips the original string conversion logic so the result would be incorrect.

            def convert_string(value: Any) -> Any:
                if value is None:
                    if not nullable:
                        raise PySparkValueError(f"input for {dataType} must not be None")
                    return None
                else:
                    if isinstance(value, bool):
                        # To match the PySpark Classic which convert bool to string in
                        # the JVM side (python.EvaluatePython.makeFromJava)
                        return str(value).lower()
                    else:
                        return str(value)

@viirya

viirya commented Jul 1, 2026

Copy link
Copy Markdown
Member Author

Closing this PR. @gaogaotiantian is right — the output fast path silently bypasses the string/binary conversion logic, and I verified it is a breaking change.

The fast path (pa.array(raw_results, type=...), falling back to the per-element converter only on ArrowException) only ever activates when result_conv is not None. Among the return types the gate treats as safe, that is exactly StringType and BinaryType — numeric/bool/date leaves have no converter, so the fast path is skipped for them entirely. And pa.array performs permissive cross-type coercion that diverges from the converters for precisely those two types:

  • array<string> (or top-level string) returning bytes: pa.array decodes to "abc", while convert_string produces the Python repr "b'abc'". Verified end-to-end — master (non-legacy) returns ["b'abc'", "b'de'"], this PR returns ["abc", "de"].
  • binary returning str: pa.array encodes to b'abc', while convert_binary raises (it asserts bytes) — so an error is silently turned into a value.

(The bool case in your snippet is actually safe by luck: pa.array rejects bool->string with ArrowTypeError, so it falls back to the converter and still lowercases. But the concern is correct — bytes/str are the cases that slip through.)

The _output_fast_path_safe gate only excluded value-transforming types (timestamp/decimal); it missed pa.array's cross-type coercion, and the only types where the fast path runs are the ones that diverge — so the approach is unsafe wherever it takes effect.

The larger part of the pandas->Arrow nested-column regression is on the input side, handled independently in #56943 (SPARK-57864) via a different mechanism (to_pandas() + recursive listify, gated on columns needing no per-element conversion), which is not affected by this. Closing in favor of that; thanks for the catch.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants