Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions python/pyspark/pandas/data_type_ops/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,28 @@ def restore(self, col: pd.Series) -> pd.Series:

def prepare(self, col: pd.Series) -> pd.Series:
"""Prepare column when from_pandas."""

from distutils.version import LooseVersion

if LooseVersion(pd.__version__) >= LooseVersion("3.0.0"):
# In pandas 3, list-valued columns store elements as np.ndarray objects.
# np.ndarray is not hashable, so col.replace({np.nan: None}) raises
# "ValueError: The truth value of an array is ambiguous" when the Series
# has object dtype and contains ndarray elements.
# Convert any np.ndarray elements to Python lists first so that:
# 1. replace({np.nan: None}) can safely run on the scalar/null values, and
# 2. PyArrow correctly infers ArrayType for the Spark schema.
# We recurse into nested structures so that 2D/nested ndarrays are fully
# converted to plain Python lists at every level.
def _ndarray_to_list(x: Any) -> Any:
if isinstance(x, np.ndarray):
return [_ndarray_to_list(item) for item in x]
return x

if col.dtype == np.dtype("object"):
col = col.where(pd.notna(col), None)
return col.apply(_ndarray_to_list)

return col.replace({np.nan: None})

def isnull(self, index_ops: IndexOpsLike) -> IndexOpsLike:
Expand Down
165 changes: 116 additions & 49 deletions python/pyspark/pandas/tests/data_type_ops/test_complex_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import decimal
import datetime

import numpy as np
import pandas as pd
from distutils.version import LooseVersion

from pyspark import pandas as ps
from pyspark.testing.pandasutils import PandasOnSparkTestCase
Expand Down Expand Up @@ -94,13 +96,11 @@ def complex_pdf(self):

@property
def complex_psdf(self):
pssers = {
"this_array": self.psser,
"that_array": ps.Series([[2, 3, 4]]),
"this_struct": ps.Index([("x", 1)]).to_series().reset_index(drop=True),
"that_struct": ps.Index([("a", 2)]).to_series().reset_index(drop=True),
}
return ps.concat(pssers, axis=1)
s1 = self.psser.rename("this_array")
s2 = ps.Series([[2, 3, 4]], name="that_array")
s3 = ps.Index([("x", 1)]).to_series(name="this_struct").reset_index(drop=True)
s4 = ps.Index([("a", 2)]).to_series(name="that_struct").reset_index(drop=True)
return ps.concat([s1, s2, s3, s4], axis=1)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we changing the test here?


def test_add(self):
pdf, psdf = self.array_pdf, self.array_psdf
Expand Down Expand Up @@ -247,6 +247,31 @@ def test_from_to_pandas(self):
self.assert_eq(pser, psser._to_pandas(), check_exact=False)
self.assert_eq(ps.from_pandas(pser), psser)

def test_from_pandas_with_np_array_elements(self):
# SPARK-55242: pyspark.pandas should handle list-valued columns whose elements
# are stored as np.ndarray by pandas 3 (e.g. [[e] for e in ...]).
# Previously this raised "ValueError: The truth value of an array is ambiguous"
# inside DataTypeOps.prepare() when it called col.replace({np.nan: None}).
import warnings
from pyspark.pandas.utils import PandasAPIOnSparkAdviceWarning

pdf = pd.DataFrame(
{
"a": [1, 2, 3, 4, 5, 6, 7, 8, 9],
"b": [[e] for e in [4, 5, 6, 3, 2, 1, 0, 0, 0]],
},
index=np.random.rand(9),
)
with warnings.catch_warnings():
warnings.simplefilter("ignore", PandasAPIOnSparkAdviceWarning)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have this?

# from_pandas must not raise; the resulting DataFrame must match the original.
psdf = ps.from_pandas(pdf)
# Sort both by "a" to ensure deterministic order for comparison.
pdf = pdf.sort_values(by="a").reset_index(drop=True)
psdf = psdf.sort_values(by="a").reset_index(drop=True)
self.assert_eq(pdf["a"], psdf["a"])
self.assert_eq(pdf["b"], psdf["b"])

def test_isnull(self):
pdf, psdf = self.array_pdf, self.array_psdf
for col in self.array_df_cols:
Expand All @@ -266,91 +291,133 @@ def test_invert(self):
self.assertRaises(TypeError, lambda: ~self.psser)

def test_eq(self):
pdf, psdf = self.complex_pdf, self.complex_pdf
self.assert_eq(
pdf["this_array"] == pdf["that_array"], psdf["this_array"] == psdf["that_array"]
)
pdf, psdf = self.complex_pdf, self.complex_psdf
if LooseVersion(pd.__version__) >= LooseVersion("3.0.0"):
pdf_this_array = pdf["this_array"].apply(
lambda x: list(x) if isinstance(x, np.ndarray) else x
)
pdf_that_array = pdf["that_array"].apply(
lambda x: list(x) if isinstance(x, np.ndarray) else x
)
else:
pdf_this_array, pdf_that_array = pdf["this_array"], pdf["that_array"]

self.assert_eq(pdf_this_array == pdf_that_array, psdf["this_array"] == psdf["that_array"])
self.assert_eq(
pdf["this_struct"] == pdf["that_struct"], psdf["this_struct"] == psdf["that_struct"]
)
self.assert_eq(
pdf["this_array"] == pdf["this_array"], psdf["this_array"] == psdf["this_array"]
)

self.assert_eq(pdf_this_array == pdf_this_array, psdf["this_array"] == psdf["this_array"])
self.assert_eq(
pdf["this_struct"] == pdf["this_struct"], psdf["this_struct"] == psdf["this_struct"]
)

def test_ne(self):
pdf, psdf = self.complex_pdf, self.complex_pdf
self.assert_eq(
pdf["this_array"] != pdf["that_array"], psdf["this_array"] != psdf["that_array"]
)
pdf, psdf = self.complex_pdf, self.complex_psdf
if LooseVersion(pd.__version__) >= LooseVersion("3.0.0"):
pdf_this_array = pdf["this_array"].apply(
lambda x: list(x) if isinstance(x, np.ndarray) else x
)
pdf_that_array = pdf["that_array"].apply(
lambda x: list(x) if isinstance(x, np.ndarray) else x
)
else:
pdf_this_array, pdf_that_array = pdf["this_array"], pdf["that_array"]

self.assert_eq(pdf_this_array != pdf_that_array, psdf["this_array"] != psdf["that_array"])
self.assert_eq(
pdf["this_struct"] != pdf["that_struct"], psdf["this_struct"] != psdf["that_struct"]
)
self.assert_eq(
pdf["this_array"] != pdf["this_array"], psdf["this_array"] != psdf["this_array"]
)

self.assert_eq(pdf_this_array != pdf_this_array, psdf["this_array"] != psdf["this_array"])
self.assert_eq(
pdf["this_struct"] != pdf["this_struct"], psdf["this_struct"] != psdf["this_struct"]
)

def test_lt(self):
pdf, psdf = self.complex_pdf, self.complex_pdf
self.assert_eq(
pdf["this_array"] < pdf["that_array"], psdf["this_array"] < psdf["that_array"]
)
pdf, psdf = self.complex_pdf, self.complex_psdf
if LooseVersion(pd.__version__) >= LooseVersion("3.0.0"):
pdf_this_array = pdf["this_array"].apply(
lambda x: list(x) if isinstance(x, np.ndarray) else x
)
pdf_that_array = pdf["that_array"].apply(
lambda x: list(x) if isinstance(x, np.ndarray) else x
)
else:
pdf_this_array, pdf_that_array = pdf["this_array"], pdf["that_array"]

self.assert_eq(pdf_this_array < pdf_that_array, psdf["this_array"] < psdf["that_array"])
self.assert_eq(
pdf["this_struct"] < pdf["that_struct"], psdf["this_struct"] < psdf["that_struct"]
)
self.assert_eq(
pdf["this_array"] < pdf["this_array"], psdf["this_array"] < psdf["this_array"]
)

self.assert_eq(pdf_this_array < pdf_this_array, psdf["this_array"] < psdf["this_array"])
self.assert_eq(
pdf["this_struct"] < pdf["this_struct"], psdf["this_struct"] < psdf["this_struct"]
)

def test_le(self):
pdf, psdf = self.complex_pdf, self.complex_pdf
self.assert_eq(
pdf["this_array"] <= pdf["that_array"], psdf["this_array"] <= psdf["that_array"]
)
pdf, psdf = self.complex_pdf, self.complex_psdf
if LooseVersion(pd.__version__) >= LooseVersion("3.0.0"):
pdf_this_array = pdf["this_array"].apply(
lambda x: list(x) if isinstance(x, np.ndarray) else x
)
pdf_that_array = pdf["that_array"].apply(
lambda x: list(x) if isinstance(x, np.ndarray) else x
)
else:
pdf_this_array, pdf_that_array = pdf["this_array"], pdf["that_array"]

self.assert_eq(pdf_this_array <= pdf_that_array, psdf["this_array"] <= psdf["that_array"])
self.assert_eq(
pdf["this_struct"] <= pdf["that_struct"], psdf["this_struct"] <= psdf["that_struct"]
)
self.assert_eq(
pdf["this_array"] <= pdf["this_array"], psdf["this_array"] <= psdf["this_array"]
)

self.assert_eq(pdf_this_array <= pdf_this_array, psdf["this_array"] <= psdf["this_array"])
self.assert_eq(
pdf["this_struct"] <= pdf["this_struct"], psdf["this_struct"] <= psdf["this_struct"]
)

def test_gt(self):
pdf, psdf = self.complex_pdf, self.complex_pdf
self.assert_eq(
pdf["this_array"] > pdf["that_array"], psdf["this_array"] > psdf["that_array"]
)
pdf, psdf = self.complex_pdf, self.complex_psdf
if LooseVersion(pd.__version__) >= LooseVersion("3.0.0"):
pdf_this_array = pdf["this_array"].apply(
lambda x: list(x) if isinstance(x, np.ndarray) else x
)
pdf_that_array = pdf["that_array"].apply(
lambda x: list(x) if isinstance(x, np.ndarray) else x
)
else:
pdf_this_array, pdf_that_array = pdf["this_array"], pdf["that_array"]

self.assert_eq(pdf_this_array > pdf_that_array, psdf["this_array"] > psdf["that_array"])
self.assert_eq(
pdf["this_struct"] > pdf["that_struct"], psdf["this_struct"] > psdf["that_struct"]
)
self.assert_eq(
pdf["this_array"] > pdf["this_array"], psdf["this_array"] > psdf["this_array"]
)

self.assert_eq(pdf_this_array > pdf_this_array, psdf["this_array"] > psdf["this_array"])
self.assert_eq(
pdf["this_struct"] > pdf["this_struct"], psdf["this_struct"] > psdf["this_struct"]
)

def test_ge(self):
pdf, psdf = self.complex_pdf, self.complex_pdf
self.assert_eq(
pdf["this_array"] >= pdf["that_array"], psdf["this_array"] >= psdf["that_array"]
)
pdf, psdf = self.complex_pdf, self.complex_psdf
if LooseVersion(pd.__version__) >= LooseVersion("3.0.0"):
pdf_this_array = pdf["this_array"].apply(
lambda x: list(x) if isinstance(x, np.ndarray) else x
)
pdf_that_array = pdf["that_array"].apply(
lambda x: list(x) if isinstance(x, np.ndarray) else x
)
else:
pdf_this_array, pdf_that_array = pdf["this_array"], pdf["that_array"]

self.assert_eq(pdf_this_array >= pdf_that_array, psdf["this_array"] >= psdf["that_array"])
self.assert_eq(
pdf["this_struct"] >= pdf["that_struct"], psdf["this_struct"] >= psdf["that_struct"]
)
self.assert_eq(
pdf["this_array"] >= pdf["this_array"], psdf["this_array"] >= psdf["this_array"]
)

self.assert_eq(pdf_this_array >= pdf_this_array, psdf["this_array"] >= psdf["this_array"])
self.assert_eq(
pdf["this_struct"] >= pdf["this_struct"], psdf["this_struct"] >= psdf["this_struct"]
)
Expand Down