From f7d2ba5784ed36bcfcd2f18fc41ea09956f801bc Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 1 Jul 2026 13:02:58 -0700 Subject: [PATCH 1/3] [SPARK-57863][PYTHON] Avoid redundant per-element output conversion in Arrow-optimized Python UDF ### What changes were proposed in this pull request? In the Arrow-optimized (non-legacy) regular Python UDF path, the UDF output is converted from Python objects to an Arrow array. Previously, when the return type needed a converter (e.g. `array`), a per-element Python converter (`LocalDataToArrowConversion`) was run over every element of every row before building the Arrow array, even when the elements already matched the declared type and PyArrow could build the array directly. This PR adds a fast path: for return types with no value-transforming coercion, it first tries `pa.array(results, type=arrow_return_type)` on the raw UDF results and only falls back to the per-element converter if PyArrow rejects them (an element genuinely needs coercion). A static per-UDF predicate, `_output_fast_path_safe`, gates the fast path and excludes types whose converter transforms a value that `pa.array` would also accept (silent divergence) or where `pa.array` accepts input the converter is meant to reject: - Timestamp / TimestampNTZ: converter truncates to the session time zone. - Decimal: `pa.array` coerces int -> decimal, bypassing the `intToDecimalCoercionEnabled` gate and rescaling / `Decimal('NaN')` handling. - UDT / Variant / Geometry / Geography: converter serializes to a storage form. - Null. Numeric / bool / string / binary and nested containers of those are safe. ### Why are the changes needed? The per-element output converter is pure overhead when results already match the declared type. For an `array` UDF (the `reverse_array` shape), the converter accounts for a large fraction of output time; a microbenchmark on `select max(reverse_array(array(a, b)))` over 4M rows (best-of-6) shows the Arrow path improving from 2.48s to 1.86s (~25% faster), narrowing the gap to the legacy pandas path from 1.77x to 1.31x. ### Does this PR introduce _any_ user-facing change? No. Output values are unchanged; the fast path is only taken when it produces results identical to the existing converter path. ### How was this patch tested? New tests in `test_arrow_python_udf.py` covering `array` output (already correct types), `array` output requiring int->string coercion (fallback path), and `array` output verifying session-timezone handling (an excluded type). They run under both legacy and non-legacy conversion configs. The full `test_arrow_python_udf` module (301 tests) passes. Co-authored-by: Claude Code --- .../sql/tests/arrow/test_arrow_python_udf.py | 58 +++++++++++ python/pyspark/worker.py | 96 +++++++++++++++++-- 2 files changed, 145 insertions(+), 9 deletions(-) diff --git a/python/pyspark/sql/tests/arrow/test_arrow_python_udf.py b/python/pyspark/sql/tests/arrow/test_arrow_python_udf.py index f4f1219ee7cd1..cb9d5ca68e990 100644 --- a/python/pyspark/sql/tests/arrow/test_arrow_python_udf.py +++ b/python/pyspark/sql/tests/arrow/test_arrow_python_udf.py @@ -32,6 +32,7 @@ StringType, StructField, StructType, + TimestampType, VarcharType, ) from pyspark.testing.sqlutils import ReusedSQLTestCase @@ -270,6 +271,63 @@ def f(v: float): rounded = df.select(f("v").alias("d")).first().d self.assertEqual(rounded, Decimal("1.233999999999999986")) + def test_array_string_output_fast_path(self): + # Regression test for the Arrow Python UDF output fast path: an + # array UDF whose elements are already strings must produce the + # same result whether or not the per-element output converter is used. + df = self.spark.range(0, 5) + + @udf(returnType=ArrayType(StringType())) + def reverse_each(i): + words = ["alpha", "beta", "gamma"] + return [w[::-1] for w in words[: (int(i) % 3) + 1]] + + result = [r.res for r in df.select(reverse_each("id").alias("res")).collect()] + self.assertEqual( + result, + [ + ["ahpla"], + ["ahpla", "ateb"], + ["ahpla", "ateb", "ammag"], + ["ahpla"], + ["ahpla", "ateb"], + ], + ) + + def test_array_string_output_requires_coercion(self): + # When the array UDF returns non-string elements, the output must + # still be coerced to string (fast path must fall back to the converter). + df = self.spark.range(0, 3) + + @udf(returnType=ArrayType(StringType())) + def ints_as_strings(i): + return [int(i), int(i) + 1] + + result = [r.res for r in df.select(ints_as_strings("id").alias("res")).collect()] + self.assertEqual(result, [["0", "1"], ["1", "2"], ["2", "3"]]) + + def test_array_timestamp_output_timezone(self): + # array is excluded from the fast path because the converter + # applies session-timezone truncation that raw pa.array would skip. Verify + # the timestamps come back correctly (i.e. the converter path was used). + import datetime + + with self.sql_conf({"spark.sql.session.timeZone": "America/Los_Angeles"}): + df = self.spark.range(0, 2) + + @udf(returnType=ArrayType(TimestampType())) + def make_ts(i): + return [datetime.datetime(2020, 1, 1, 12, 0, 0)] + + result = [r.res for r in df.select(make_ts("id").alias("res")).collect()] + self.assertEqual( + result, + [ + [datetime.datetime(2020, 1, 1, 12, 0, 0)], + [datetime.datetime(2020, 1, 1, 12, 0, 0)], + ], + ) + def test_err_return_type(self): with self.assertRaises(PySparkNotImplementedError) as pe: udf(lambda x: x, VarcharType(10), useArrow=True) diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 1b19409ec562e..e9e80201c2b46 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -90,11 +90,19 @@ ArrayType, BinaryType, DataType, + DecimalType, + GeographyType, + GeometryType, MapType, + NullType, Row, StringType, StructField, StructType, + TimestampNTZType, + TimestampType, + UserDefinedType, + VariantType, _create_row, _parse_datatype_json_string, ) @@ -3031,6 +3039,49 @@ def cogrouped_func( ): import pyarrow as pa + def _output_fast_path_safe(dt: DataType) -> bool: + # True when building an Arrow array directly from raw UDF results + # (pa.array(results, type=...)) yields values identical to running the + # per-element LocalDataToArrowConversion converter first. This holds for + # numeric/bool/string/binary (and nested containers of them), where the + # converter is a no-op for already-correct-type values and pa.array + # raises for anything that would need coercion (so we fall back safely). + # + # It does NOT hold, and the type must be excluded, when the converter can + # transform a value that pa.array would ALSO accept (silently producing a + # different result) or when pa.array accepts an input the converter is + # meant to reject: + # - Timestamp/TimestampNTZ: converter truncates to session tz. + # - Decimal: pa.array coerces int->decimal, but the converter only does + # so when intToDecimalCoercionEnabled; skipping it would bypass that + # gate (and rescaling / Decimal('NaN')->None handling). + # - UDT/Variant/Geo: converter serializes to a storage form. + # - Null: trivial; keep on the safe/simple side. + if isinstance(dt, ArrayType): + return _output_fast_path_safe(dt.elementType) + elif isinstance(dt, MapType): + return _output_fast_path_safe(dt.keyType) and _output_fast_path_safe( + dt.valueType + ) + elif isinstance(dt, StructType): + return all(_output_fast_path_safe(f.dataType) for f in dt.fields) + elif isinstance( + dt, + ( + TimestampType, + TimestampNTZType, + DecimalType, + UserDefinedType, + VariantType, + GeographyType, + GeometryType, + NullType, + ), + ): + return False + else: + return True + # --- UDF preparation --- udf_infos = [] for udf_func, udf_args_offsets, udf_kwargs_offsets, udf_return_type in udfs: @@ -3053,6 +3104,7 @@ def cogrouped_func( none_on_identity=True, int_to_decimal_coercion_enabled=runner_conf.int_to_decimal_coercion_enabled, ), + _output_fast_path_safe(udf_return_type), ) ) col_names = [f"_{i}" for i in range(len(udfs))] @@ -3088,7 +3140,14 @@ def func(split_index: int, data: Iterator[pa.RecordBatch]) -> Iterator[pa.Record # --- Process: evaluate each UDF row-by-row --- output_arrays = [] - for udf_func, offsets, zero_arg, arrow_return_type, result_conv in udf_infos: + for ( + udf_func, + offsets, + zero_arg, + arrow_return_type, + result_conv, + fast_ok, + ) in udf_infos: rows = ( [() for _ in range(num_rows)] if zero_arg @@ -3098,15 +3157,34 @@ def func(split_index: int, data: Iterator[pa.RecordBatch]) -> Iterator[pa.Record verify_result_row_count(len(results), num_rows) # --- Output: Python -> Arrow --- - converted = ( - [result_conv(r) for r in results] if result_conv is not None else results - ) - try: - arr = pa.array(converted, type=arrow_return_type) - except pa.lib.ArrowInvalid: - arr = pa.array(converted).cast( - target_type=arrow_return_type, safe=runner_conf.safecheck + # Fast path: when the return type has no value-transforming + # coercion (see output_fast_path_safe), try building the Arrow + # array directly from the raw UDF results and let PyArrow (C++) + # do the work, skipping the per-element Python converter, which + # is pure overhead when results already match the declared type. + # Only fall back to the converter if PyArrow rejects the raw + # results (an element genuinely needs coercion, e.g. int->string). + # NOTE: types like timestamp are excluded because raw pa.array + # succeeds but yields DIFFERENT values than the converter (tz + # truncation), so a try/except gate alone is unsafe for them. + arr = None + if result_conv is not None and fast_ok: + try: + arr = pa.array(results, type=arrow_return_type) + except pa.lib.ArrowException: + arr = None + if arr is None: + converted = ( + [result_conv(r) for r in results] + if result_conv is not None + else results ) + try: + arr = pa.array(converted, type=arrow_return_type) + except pa.lib.ArrowInvalid: + arr = pa.array(converted).cast( + target_type=arrow_return_type, safe=runner_conf.safecheck + ) output_arrays.append(arr) yield pa.RecordBatch.from_arrays(output_arrays, col_names) From 88b3300cb70aa40f18bf08694eaf211fdd47de67 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 1 Jul 2026 13:12:03 -0700 Subject: [PATCH 2/3] [SPARK-57863][PYTHON][FOLLOWUP] Add array scenario/UDF to SQL_ARROW_BATCHED_UDF benchmark ### What changes were proposed in this pull request? Extends the existing `SQL_ARROW_BATCHED_UDF` ASV microbenchmark in `python/benchmarks/bench_eval_type.py` to cover a nested (array) type: - a `string_array` entry in `MockDataFactory.TYPE_REGISTRY` and a `pure_string_arrays` type pool / scenario, producing an `array` input column; - a `to_string_array_udf` in `_ArrowBatchedBenchMixin._udfs` that returns `array`. It is type-agnostic on input (like the existing UDFs) so it stays valid across the whole scenario x UDF cross product, but paired with the `pure_string_arrays` scenario it gives an array-in / array-out case that stresses the output-side Python->Arrow conversion this change optimizes. ### Why are the changes needed? The main change adds an output fast path for nested types; this gives the standard PySpark benchmark suite coverage of that path so the effect is measurable in a consistent environment. Driving the worker directly at 1M rows shows the `array` output improving from ~1412ms to ~1213ms. ### Does this PR introduce _any_ user-facing change? No (benchmark only). ### How was this patch tested? Ran the new scenario and UDF through `ArrowBatchedUDFTimeBench` and `ArrowBatchedUDFPeakmemBench`, and verified the full scenario x UDF cross product runs without error. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code (Opus 4.8) Co-authored-by: Claude Code --- python/benchmarks/bench_eval_type.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/python/benchmarks/bench_eval_type.py b/python/benchmarks/bench_eval_type.py index f220b804be0b2..d2b8a8606b30c 100644 --- a/python/benchmarks/bench_eval_type.py +++ b/python/benchmarks/bench_eval_type.py @@ -39,6 +39,7 @@ from pyspark.cloudpickle import dumps as cloudpickle_dumps from pyspark.serializers import write_int, write_long, SpecialLengths from pyspark.sql.types import ( + ArrayType, BinaryType, BooleanType, DoubleType, @@ -251,6 +252,12 @@ class MockDataFactory: "string": (lambda r: pa.array([f"s{j}" for j in range(r)]), StringType()), "binary": (lambda r: pa.array([f"b{j}".encode() for j in range(r)]), BinaryType()), "boolean": (lambda r: pa.array(np.random.choice([True, False], r)), BooleanType()), + "string_array": ( + lambda r: pa.array( + [[f"s{j}", f"t{j}"] for j in range(r)], type=pa.list_(pa.string()) + ), + ArrayType(StringType()), + ), } MIXED_TYPES = [ @@ -266,6 +273,7 @@ class MockDataFactory: "pure_ints": [TYPE_REGISTRY["int"]], "pure_floats": [TYPE_REGISTRY["double"]], "pure_strings": [TYPE_REGISTRY["string"]], + "pure_string_arrays": [TYPE_REGISTRY["string_array"]], "pure_ts": [ ( lambda r: pa.array( @@ -480,6 +488,7 @@ class _ArrowBatchedBenchMixin: "pure_ints": ("pure_ints", 50_000, 10, 5_000), "pure_floats": ("pure_floats", 50_000, 10, 5_000), "pure_strings": ("pure_strings", 50_000, 10, 5_000), + "pure_string_arrays": ("pure_string_arrays", 50_000, 10, 5_000), "mixed_types": ("mixed", 50_000, 10, 5_000), } @@ -502,6 +511,16 @@ def _build_scenario(cls, name): "identity_udf": (lambda x: x, None, [0]), "stringify_udf": (lambda x: str(x), StringType(), [0]), "nullcheck_udf": (lambda x: x is not None, BooleanType(), [0]), + # array out. Exercises the output-side Python->Arrow conversion + # for a nested type. Type-agnostic on input (like the other UDFs here, so + # it runs across every scenario in the cross product); pair with the + # ``pure_string_arrays`` scenario for the array-in / array-out + # case that stresses the nested output conversion the most. + "to_string_array_udf": ( + lambda x: [str(e)[::-1] for e in x] if isinstance(x, (list, tuple)) else [str(x)], + ArrayType(StringType()), + [0], + ), } params = [list(_scenario_configs), list(_udfs)] param_names = ["scenario", "udf"] From 2d27499154bc6795889fccfd1cb81319b7fb5c89 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 1 Jul 2026 13:33:14 -0700 Subject: [PATCH 3/3] [SPARK-57863][PYTHON][FOLLOWUP] Scope array coercion test to non-legacy path test_array_string_output_requires_coercion asserts int->string coercion inside an array return, which only the non-legacy (Arrow-direct) conversion path performs; the legacy pandas path raises. Wrap the test body in sql_conf(...pandas.conversion.enabled=False) so it exercises the intended path under all suite variants, matching test_arrow_udf_int_to_decimal_coercion. Co-authored-by: Claude Code --- .../sql/tests/arrow/test_arrow_python_udf.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/python/pyspark/sql/tests/arrow/test_arrow_python_udf.py b/python/pyspark/sql/tests/arrow/test_arrow_python_udf.py index cb9d5ca68e990..c58f536a0bf37 100644 --- a/python/pyspark/sql/tests/arrow/test_arrow_python_udf.py +++ b/python/pyspark/sql/tests/arrow/test_arrow_python_udf.py @@ -297,14 +297,19 @@ def reverse_each(i): def test_array_string_output_requires_coercion(self): # When the array UDF returns non-string elements, the output must # still be coerced to string (fast path must fall back to the converter). - df = self.spark.range(0, 3) + # Scoped to the non-legacy conversion path, which this fast path targets; + # the legacy pandas path does not coerce non-string array elements. + with self.sql_conf( + {"spark.sql.legacy.execution.pythonUDF.pandas.conversion.enabled": False} + ): + df = self.spark.range(0, 3) - @udf(returnType=ArrayType(StringType())) - def ints_as_strings(i): - return [int(i), int(i) + 1] + @udf(returnType=ArrayType(StringType())) + def ints_as_strings(i): + return [int(i), int(i) + 1] - result = [r.res for r in df.select(ints_as_strings("id").alias("res")).collect()] - self.assertEqual(result, [["0", "1"], ["1", "2"], ["2", "3"]]) + result = [r.res for r in df.select(ints_as_strings("id").alias("res")).collect()] + self.assertEqual(result, [["0", "1"], ["1", "2"], ["2", "3"]]) def test_array_timestamp_output_timezone(self): # array is excluded from the fast path because the converter