Skip to content

Commit 73cb166

Browse files
committed
test(feature-processor): Add integration tests for Spark version compatibility
Add test_feature_processor_spark_compat.py with 3 integration tests validating dynamic Hadoop version resolution, SparkSessionFactory config generation, and image resolver behavior for the installed PySpark + Python version combination. Verified with both PySpark 3.3 (py312 image resolver correctly skipped) and PySpark 3.5 (all 3 pass). --- X-AI-Prompt: create integration test for spark 3.5 multi-version compatibility X-AI-Tool: kiro
1 parent f5cdf6c commit 73cb166

File tree

1 file changed

+76
-0
lines changed

1 file changed

+76
-0
lines changed
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License"). You
4+
# may not use this file except in compliance with the License. A copy of
5+
# the License is located at
6+
#
7+
# http://aws.amazon.com/apache2.0/
8+
#
9+
# or in the "license" file accompanying this file. This file is
10+
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11+
# ANY KIND, either express or implied. See the License for the specific
12+
# language governing permissions and limitations under the License.
13+
"""Integration tests for Spark multi-version compatibility."""
14+
from __future__ import absolute_import
15+
16+
import pyspark
17+
import pytest
18+
from mock import Mock
19+
20+
from sagemaker.mlops.feature_store.feature_processor._spark_factory import (
21+
SparkSessionFactory,
22+
SPARK_TO_HADOOP_MAP,
23+
_get_hadoop_version,
24+
)
25+
from sagemaker.mlops.feature_store.feature_processor._image_resolver import (
26+
SPARK_IMAGE_SUPPORT_MATRIX,
27+
_get_spark_image_uri,
28+
)
29+
30+
31+
@pytest.mark.slow_test
32+
def test_hadoop_version_resolves_for_installed_pyspark():
33+
"""Verify that the installed PySpark version resolves to a known Hadoop version."""
34+
hadoop_version = _get_hadoop_version()
35+
spark_major_minor = ".".join(pyspark.__version__.split(".")[:2])
36+
37+
if spark_major_minor in SPARK_TO_HADOOP_MAP:
38+
assert hadoop_version == SPARK_TO_HADOOP_MAP[spark_major_minor]
39+
else:
40+
# Unknown version falls back to latest
41+
assert hadoop_version == "3.3.4"
42+
43+
44+
@pytest.mark.slow_test
45+
def test_spark_session_factory_configs_include_dynamic_hadoop():
46+
"""Verify SparkSessionFactory produces configs with the correct Hadoop Maven coordinates."""
47+
env_helper = Mock()
48+
factory = SparkSessionFactory(env_helper)
49+
configs = dict(factory._get_spark_configs(is_training_job=False))
50+
51+
hadoop_version = _get_hadoop_version()
52+
packages = configs.get("spark.jars.packages", "")
53+
assert f"org.apache.hadoop:hadoop-aws:{hadoop_version}" in packages
54+
assert f"org.apache.hadoop:hadoop-common:{hadoop_version}" in packages
55+
56+
57+
@pytest.mark.slow_test
58+
def test_image_resolver_returns_uri_for_installed_pyspark():
59+
"""Verify the image resolver returns a valid URI for the installed PySpark + Python version."""
60+
import sys
61+
62+
spark_major_minor = ".".join(pyspark.__version__.split(".")[:2])
63+
py_version = f"py{sys.version_info[0]}{sys.version_info[1]}"
64+
65+
supported_py = SPARK_IMAGE_SUPPORT_MATRIX.get(spark_major_minor)
66+
if supported_py is None or py_version not in supported_py:
67+
pytest.skip(
68+
f"Spark {spark_major_minor} + {py_version} not in support matrix; "
69+
f"skipping image resolver test"
70+
)
71+
72+
session = Mock(boto_region_name="us-west-2")
73+
image_uri = _get_spark_image_uri(session)
74+
75+
assert "sagemaker-spark-processing" in image_uri
76+
assert spark_major_minor in image_uri

0 commit comments

Comments
 (0)