Skip to content

Commit 6aa90c3

Browse files
committed
Add Spark 4.0 support via deequ:2.0.14-spark-4.0
- Add "4.0" entry to SPARK_TO_DEEQU_COORD_MAPPING in configs.py - Widen pyspark optional dep bound to <5.0.0 in pyproject.toml - Replace scala.collection.JavaConversions (removed in Scala 2.13) with JavaConverters in scala_utils.py and profiles.py - Replace scala.collection.Seq.empty() (inaccessible via Py4J in Scala 2.13) with to_scala_seq(jvm, jvm.java.util.ArrayList()) in analyzers.py and checks.py - Add Spark 4.0.0 to CI matrix with Java 17; use include: style to pair each Spark version with its required Java version - Fix CI for Spark 4.0: - use Python 3.9 and version-marker pyspark dep - use pip install instead of poetry add - install pandas>=2.0.0 required by PySpark 4.0 - Fix empty Seq compatibility across Scala 2.12 and 2.13 Fixes #258
1 parent f17a9f9 commit 6aa90c3

7 files changed

Lines changed: 49 additions & 16 deletions

File tree

.github/workflows/base.yml

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,28 +12,45 @@ jobs:
1212
strategy:
1313
fail-fast: false
1414
matrix:
15-
PYSPARK_VERSION: ["3.1.3", "3.2", "3.3", "3.5"]
15+
include:
16+
- PYSPARK_VERSION: "3.1.3"
17+
JAVA_VERSION: "11"
18+
PYTHON_VERSION: "3.8"
19+
- PYSPARK_VERSION: "3.2"
20+
JAVA_VERSION: "11"
21+
PYTHON_VERSION: "3.8"
22+
- PYSPARK_VERSION: "3.3"
23+
JAVA_VERSION: "11"
24+
PYTHON_VERSION: "3.8"
25+
- PYSPARK_VERSION: "3.5"
26+
JAVA_VERSION: "11"
27+
PYTHON_VERSION: "3.8"
28+
- PYSPARK_VERSION: "4.0.0"
29+
JAVA_VERSION: "17"
30+
PYTHON_VERSION: "3.9"
31+
PANDAS_VERSION: ">=2.0.0"
1632

1733
steps:
1834
- uses: actions/checkout@v3
1935

2036
- uses: actions/setup-python@v2
21-
name: Install Python 3.8
37+
name: Install Python ${{matrix.PYTHON_VERSION}}
2238
with:
23-
python-version: 3.8
39+
python-version: ${{matrix.PYTHON_VERSION}}
2440

2541
- uses: actions/setup-java@v1
26-
name: Setup Java 11
27-
if: startsWith(matrix.PYSPARK_VERSION, '3')
42+
name: Setup Java ${{matrix.JAVA_VERSION}}
2843
with:
29-
java-version: "11"
44+
java-version: ${{matrix.JAVA_VERSION}}
3045

3146
- name: Running tests with pyspark==${{matrix.PYSPARK_VERSION}}
3247
env:
3348
SPARK_VERSION: ${{matrix.PYSPARK_VERSION}}
49+
PANDAS_VERSION: ${{matrix.PANDAS_VERSION}}
3450
run: |
3551
pip install --upgrade pip
3652
pip install poetry==1.7.1
3753
poetry install
38-
poetry add pyspark==$SPARK_VERSION
54+
poetry run pip install pyspark==$SPARK_VERSION
55+
if [ -n "$PANDAS_VERSION" ]; then poetry run pip install "pandas$PANDAS_VERSION"; fi
3956
poetry run python -m pytest -s tests

pydeequ/analyzers.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from pydeequ.pandas_utils import ensure_pyspark_df
1010
from pydeequ.repository import MetricsRepository, ResultKey
1111
from enum import Enum
12-
from pydeequ.scala_utils import to_scala_seq
12+
from pydeequ.scala_utils import empty_scala_seq, to_scala_seq
1313
from pydeequ.configs import SPARK_VERSION
1414

1515
class _AnalyzerObject:
@@ -311,7 +311,7 @@ def _analyzer_jvm(self):
311311
self.instance,
312312
self.predicate,
313313
self._jvm.scala.Option.apply(self.where),
314-
self._jvm.scala.collection.Seq.empty(),
314+
empty_scala_seq(self._jvm),
315315
self._jvm.scala.Option.apply(None)
316316
)
317317

pydeequ/checks.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from pyspark.sql import SparkSession
66

77
from pydeequ.check_functions import is_one
8-
from pydeequ.scala_utils import ScalaFunction1, to_scala_seq
8+
from pydeequ.scala_utils import ScalaFunction1, empty_scala_seq, to_scala_seq
99
from pydeequ.configs import SPARK_VERSION
1010

1111
# TODO implement custom assertions
@@ -563,7 +563,7 @@ def satisfies(self, columnCondition, constraintName, assertion=None, hint=None):
563563
constraintName,
564564
assertion_func,
565565
hint,
566-
self._jvm.scala.collection.Seq.empty(),
566+
empty_scala_seq(self._jvm),
567567
self._jvm.scala.Option.apply(None)
568568
)
569569
return self

pydeequ/configs.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66

77
SPARK_TO_DEEQU_COORD_MAPPING = {
8+
"4.0": "com.amazon.deequ:deequ:2.0.14-spark-4.0",
89
"3.5": "com.amazon.deequ:deequ:2.0.8-spark-3.5",
910
"3.3": "com.amazon.deequ:deequ:2.0.8-spark-3.3",
1011
"3.2": "com.amazon.deequ:deequ:2.0.8-spark-3.2",

pydeequ/profiles.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ def _columnProfilesFromColumnRunBuilderRun(self, run):
254254
:return: a setter for columnProfilerRunner result
255255
"""
256256
self._run_result = run
257-
profile_map = self._jvm.scala.collection.JavaConversions.mapAsJavaMap(run.profiles()) # TODO from ScalaUtils
257+
profile_map = self._jvm.scala.collection.JavaConverters.mapAsJavaMapConverter(run.profiles()).asJava() # TODO from ScalaUtils
258258
self._profiles = {column: self._columnProfileBuilder(column, profile_map[column]) for column in profile_map}
259259
return self
260260

pydeequ/scala_utils.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,19 @@ def to_scala_seq(jvm, iterable):
7777
Returns:
7878
Scala sequence
7979
"""
80-
return jvm.scala.collection.JavaConversions.iterableAsScalaIterable(iterable).toSeq()
80+
return jvm.scala.collection.JavaConverters.iterableAsScalaIterableConverter(iterable).asScala().toSeq()
81+
82+
83+
def empty_scala_seq(jvm):
84+
"""
85+
Returns an empty Scala immutable List (Nil), usable as Seq[_].
86+
Uses JavaConverters.toList() to produce an immutable.List rather than
87+
a Stream, which is required for Py4J constructor/method lookup to succeed
88+
across both Scala 2.12 (Spark 3.x) and Scala 2.13 (Spark 4+).
89+
"""
90+
return jvm.scala.collection.JavaConverters.iterableAsScalaIterableConverter(
91+
jvm.java.util.ArrayList()
92+
).asScala().toList()
8193

8294

8395
def to_scala_map(spark_session, d):
@@ -93,11 +105,11 @@ def to_scala_map(spark_session, d):
93105

94106

95107
def scala_map_to_dict(jvm, scala_map):
96-
return dict(jvm.scala.collection.JavaConversions.mapAsJavaMap(scala_map))
108+
return dict(jvm.scala.collection.JavaConverters.mapAsJavaMapConverter(scala_map).asJava())
97109

98110

99111
def scala_map_to_java_map(jvm, scala_map):
100-
return jvm.scala.collection.JavaConversions.mapAsJavaMap(scala_map)
112+
return jvm.scala.collection.JavaConverters.mapAsJavaMapConverter(scala_map).asJava()
101113

102114

103115
def java_list_to_python_list(java_list: str, datatype):

pyproject.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@ classifiers = [
3131
python = ">=3.8,<4"
3232
numpy = ">=1.14.1"
3333
pandas = ">=0.23.0"
34-
pyspark = { version = ">=2.4.7,<3.4.0", optional = true }
34+
pyspark = [
35+
{ version = ">=2.4.7,<4.0.0", optional = true, python = ">=3.8,<3.9" },
36+
{ version = ">=2.4.7,<5.0.0", optional = true, python = ">=3.9" },
37+
]
3538

3639
[tool.poetry.dev-dependencies]
3740
pytest = "^6.2.4"

0 commit comments

Comments
 (0)