diff --git a/python/pyspark/pandas/data_type_ops/base.py b/python/pyspark/pandas/data_type_ops/base.py index 72ce6cf7d9301..f910c0f2c99d7 100644 --- a/python/pyspark/pandas/data_type_ops/base.py +++ b/python/pyspark/pandas/data_type_ops/base.py @@ -548,6 +548,28 @@ def restore(self, col: pd.Series) -> pd.Series: def prepare(self, col: pd.Series) -> pd.Series: """Prepare column when from_pandas.""" + + from distutils.version import LooseVersion + + if LooseVersion(pd.__version__) >= LooseVersion("3.0.0"): + # In pandas 3, list-valued columns store elements as np.ndarray objects. + # np.ndarray is not hashable, so col.replace({np.nan: None}) raises + # "ValueError: The truth value of an array is ambiguous" when the Series + # has object dtype and contains ndarray elements. + # Convert any np.ndarray elements to Python lists first so that: + # 1. replace({np.nan: None}) can safely run on the scalar/null values, and + # 2. PyArrow correctly infers ArrayType for the Spark schema. + # We recurse into nested structures so that 2D/nested ndarrays are fully + # converted to plain Python lists at every level. + def _ndarray_to_list(x: Any) -> Any: + if isinstance(x, np.ndarray): + return [_ndarray_to_list(item) for item in x] + return x + + if col.dtype == np.dtype("object"): + col = col.where(pd.notna(col), None) + return col.apply(_ndarray_to_list) + return col.replace({np.nan: None}) def isnull(self, index_ops: IndexOpsLike) -> IndexOpsLike: diff --git a/python/pyspark/pandas/tests/data_type_ops/test_complex_ops.py b/python/pyspark/pandas/tests/data_type_ops/test_complex_ops.py index 1b5d18f0e8b89..456934e00f967 100644 --- a/python/pyspark/pandas/tests/data_type_ops/test_complex_ops.py +++ b/python/pyspark/pandas/tests/data_type_ops/test_complex_ops.py @@ -18,7 +18,9 @@ import decimal import datetime +import numpy as np import pandas as pd +from distutils.version import LooseVersion from pyspark import pandas as ps from pyspark.testing.pandasutils import PandasOnSparkTestCase @@ -94,13 +96,11 @@ def complex_pdf(self): @property def complex_psdf(self): - pssers = { - "this_array": self.psser, - "that_array": ps.Series([[2, 3, 4]]), - "this_struct": ps.Index([("x", 1)]).to_series().reset_index(drop=True), - "that_struct": ps.Index([("a", 2)]).to_series().reset_index(drop=True), - } - return ps.concat(pssers, axis=1) + s1 = self.psser.rename("this_array") + s2 = ps.Series([[2, 3, 4]], name="that_array") + s3 = ps.Index([("x", 1)]).to_series(name="this_struct").reset_index(drop=True) + s4 = ps.Index([("a", 2)]).to_series(name="that_struct").reset_index(drop=True) + return ps.concat([s1, s2, s3, s4], axis=1) def test_add(self): pdf, psdf = self.array_pdf, self.array_psdf @@ -247,6 +247,31 @@ def test_from_to_pandas(self): self.assert_eq(pser, psser._to_pandas(), check_exact=False) self.assert_eq(ps.from_pandas(pser), psser) + def test_from_pandas_with_np_array_elements(self): + # SPARK-55242: pyspark.pandas should handle list-valued columns whose elements + # are stored as np.ndarray by pandas 3 (e.g. [[e] for e in ...]). + # Previously this raised "ValueError: The truth value of an array is ambiguous" + # inside DataTypeOps.prepare() when it called col.replace({np.nan: None}). + import warnings + from pyspark.pandas.utils import PandasAPIOnSparkAdviceWarning + + pdf = pd.DataFrame( + { + "a": [1, 2, 3, 4, 5, 6, 7, 8, 9], + "b": [[e] for e in [4, 5, 6, 3, 2, 1, 0, 0, 0]], + }, + index=np.random.rand(9), + ) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", PandasAPIOnSparkAdviceWarning) + # from_pandas must not raise; the resulting DataFrame must match the original. + psdf = ps.from_pandas(pdf) + # Sort both by "a" to ensure deterministic order for comparison. + pdf = pdf.sort_values(by="a").reset_index(drop=True) + psdf = psdf.sort_values(by="a").reset_index(drop=True) + self.assert_eq(pdf["a"], psdf["a"]) + self.assert_eq(pdf["b"], psdf["b"]) + def test_isnull(self): pdf, psdf = self.array_pdf, self.array_psdf for col in self.array_df_cols: @@ -266,91 +291,133 @@ def test_invert(self): self.assertRaises(TypeError, lambda: ~self.psser) def test_eq(self): - pdf, psdf = self.complex_pdf, self.complex_pdf - self.assert_eq( - pdf["this_array"] == pdf["that_array"], psdf["this_array"] == psdf["that_array"] - ) + pdf, psdf = self.complex_pdf, self.complex_psdf + if LooseVersion(pd.__version__) >= LooseVersion("3.0.0"): + pdf_this_array = pdf["this_array"].apply( + lambda x: list(x) if isinstance(x, np.ndarray) else x + ) + pdf_that_array = pdf["that_array"].apply( + lambda x: list(x) if isinstance(x, np.ndarray) else x + ) + else: + pdf_this_array, pdf_that_array = pdf["this_array"], pdf["that_array"] + + self.assert_eq(pdf_this_array == pdf_that_array, psdf["this_array"] == psdf["that_array"]) self.assert_eq( pdf["this_struct"] == pdf["that_struct"], psdf["this_struct"] == psdf["that_struct"] ) - self.assert_eq( - pdf["this_array"] == pdf["this_array"], psdf["this_array"] == psdf["this_array"] - ) + + self.assert_eq(pdf_this_array == pdf_this_array, psdf["this_array"] == psdf["this_array"]) self.assert_eq( pdf["this_struct"] == pdf["this_struct"], psdf["this_struct"] == psdf["this_struct"] ) def test_ne(self): - pdf, psdf = self.complex_pdf, self.complex_pdf - self.assert_eq( - pdf["this_array"] != pdf["that_array"], psdf["this_array"] != psdf["that_array"] - ) + pdf, psdf = self.complex_pdf, self.complex_psdf + if LooseVersion(pd.__version__) >= LooseVersion("3.0.0"): + pdf_this_array = pdf["this_array"].apply( + lambda x: list(x) if isinstance(x, np.ndarray) else x + ) + pdf_that_array = pdf["that_array"].apply( + lambda x: list(x) if isinstance(x, np.ndarray) else x + ) + else: + pdf_this_array, pdf_that_array = pdf["this_array"], pdf["that_array"] + + self.assert_eq(pdf_this_array != pdf_that_array, psdf["this_array"] != psdf["that_array"]) self.assert_eq( pdf["this_struct"] != pdf["that_struct"], psdf["this_struct"] != psdf["that_struct"] ) - self.assert_eq( - pdf["this_array"] != pdf["this_array"], psdf["this_array"] != psdf["this_array"] - ) + + self.assert_eq(pdf_this_array != pdf_this_array, psdf["this_array"] != psdf["this_array"]) self.assert_eq( pdf["this_struct"] != pdf["this_struct"], psdf["this_struct"] != psdf["this_struct"] ) def test_lt(self): - pdf, psdf = self.complex_pdf, self.complex_pdf - self.assert_eq( - pdf["this_array"] < pdf["that_array"], psdf["this_array"] < psdf["that_array"] - ) + pdf, psdf = self.complex_pdf, self.complex_psdf + if LooseVersion(pd.__version__) >= LooseVersion("3.0.0"): + pdf_this_array = pdf["this_array"].apply( + lambda x: list(x) if isinstance(x, np.ndarray) else x + ) + pdf_that_array = pdf["that_array"].apply( + lambda x: list(x) if isinstance(x, np.ndarray) else x + ) + else: + pdf_this_array, pdf_that_array = pdf["this_array"], pdf["that_array"] + + self.assert_eq(pdf_this_array < pdf_that_array, psdf["this_array"] < psdf["that_array"]) self.assert_eq( pdf["this_struct"] < pdf["that_struct"], psdf["this_struct"] < psdf["that_struct"] ) - self.assert_eq( - pdf["this_array"] < pdf["this_array"], psdf["this_array"] < psdf["this_array"] - ) + + self.assert_eq(pdf_this_array < pdf_this_array, psdf["this_array"] < psdf["this_array"]) self.assert_eq( pdf["this_struct"] < pdf["this_struct"], psdf["this_struct"] < psdf["this_struct"] ) def test_le(self): - pdf, psdf = self.complex_pdf, self.complex_pdf - self.assert_eq( - pdf["this_array"] <= pdf["that_array"], psdf["this_array"] <= psdf["that_array"] - ) + pdf, psdf = self.complex_pdf, self.complex_psdf + if LooseVersion(pd.__version__) >= LooseVersion("3.0.0"): + pdf_this_array = pdf["this_array"].apply( + lambda x: list(x) if isinstance(x, np.ndarray) else x + ) + pdf_that_array = pdf["that_array"].apply( + lambda x: list(x) if isinstance(x, np.ndarray) else x + ) + else: + pdf_this_array, pdf_that_array = pdf["this_array"], pdf["that_array"] + + self.assert_eq(pdf_this_array <= pdf_that_array, psdf["this_array"] <= psdf["that_array"]) self.assert_eq( pdf["this_struct"] <= pdf["that_struct"], psdf["this_struct"] <= psdf["that_struct"] ) - self.assert_eq( - pdf["this_array"] <= pdf["this_array"], psdf["this_array"] <= psdf["this_array"] - ) + + self.assert_eq(pdf_this_array <= pdf_this_array, psdf["this_array"] <= psdf["this_array"]) self.assert_eq( pdf["this_struct"] <= pdf["this_struct"], psdf["this_struct"] <= psdf["this_struct"] ) def test_gt(self): - pdf, psdf = self.complex_pdf, self.complex_pdf - self.assert_eq( - pdf["this_array"] > pdf["that_array"], psdf["this_array"] > psdf["that_array"] - ) + pdf, psdf = self.complex_pdf, self.complex_psdf + if LooseVersion(pd.__version__) >= LooseVersion("3.0.0"): + pdf_this_array = pdf["this_array"].apply( + lambda x: list(x) if isinstance(x, np.ndarray) else x + ) + pdf_that_array = pdf["that_array"].apply( + lambda x: list(x) if isinstance(x, np.ndarray) else x + ) + else: + pdf_this_array, pdf_that_array = pdf["this_array"], pdf["that_array"] + + self.assert_eq(pdf_this_array > pdf_that_array, psdf["this_array"] > psdf["that_array"]) self.assert_eq( pdf["this_struct"] > pdf["that_struct"], psdf["this_struct"] > psdf["that_struct"] ) - self.assert_eq( - pdf["this_array"] > pdf["this_array"], psdf["this_array"] > psdf["this_array"] - ) + + self.assert_eq(pdf_this_array > pdf_this_array, psdf["this_array"] > psdf["this_array"]) self.assert_eq( pdf["this_struct"] > pdf["this_struct"], psdf["this_struct"] > psdf["this_struct"] ) def test_ge(self): - pdf, psdf = self.complex_pdf, self.complex_pdf - self.assert_eq( - pdf["this_array"] >= pdf["that_array"], psdf["this_array"] >= psdf["that_array"] - ) + pdf, psdf = self.complex_pdf, self.complex_psdf + if LooseVersion(pd.__version__) >= LooseVersion("3.0.0"): + pdf_this_array = pdf["this_array"].apply( + lambda x: list(x) if isinstance(x, np.ndarray) else x + ) + pdf_that_array = pdf["that_array"].apply( + lambda x: list(x) if isinstance(x, np.ndarray) else x + ) + else: + pdf_this_array, pdf_that_array = pdf["this_array"], pdf["that_array"] + + self.assert_eq(pdf_this_array >= pdf_that_array, psdf["this_array"] >= psdf["that_array"]) self.assert_eq( pdf["this_struct"] >= pdf["that_struct"], psdf["this_struct"] >= psdf["that_struct"] ) - self.assert_eq( - pdf["this_array"] >= pdf["this_array"], psdf["this_array"] >= psdf["this_array"] - ) + + self.assert_eq(pdf_this_array >= pdf_this_array, psdf["this_array"] >= psdf["this_array"]) self.assert_eq( pdf["this_struct"] >= pdf["this_struct"], psdf["this_struct"] >= psdf["this_struct"] )