Skip to content

Commit f5cdf6c

Browse files
committed
feat(feature-processor): Wire image resolver into pipeline and unlock dependencies
Replace _get_default_spark_image() call in feature_scheduler.py with _get_spark_image_uri() from the new image resolver module. Remove pyspark==3.3.2 version pin and switch to unified sagemaker-feature-store-pyspark connector package in pyproject.toml. --- X-AI-Prompt: wire image resolver into feature_scheduler.py and update pyproject.toml dependencies X-AI-Tool: kiro
1 parent dffc2fb commit f5cdf6c

File tree

3 files changed

+24
-9
lines changed

3 files changed

+24
-9
lines changed

sagemaker-mlops/pyproject.toml

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,7 @@ dependencies = [
3232

3333
[project.optional-dependencies]
3434
feature-processor = [
35-
"pyspark==3.3.2",
36-
"sagemaker-feature-store-pyspark-3.3",
35+
"sagemaker-feature-store-pyspark",
3736
"setuptools<82",
3837
]
3938

@@ -42,8 +41,7 @@ test = [
4241
"pytest-cov",
4342
"mock",
4443
"setuptools<82",
45-
"pyspark==3.3.2",
46-
"sagemaker-feature-store-pyspark-3.3",
44+
"sagemaker-feature-store-pyspark",
4745
"pandas<3.0",
4846
"numpy<3.0",
4947
]

sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/feature_scheduler.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@
9898
FeatureProcessorLineageHandler,
9999
TransformationCode,
100100
)
101+
from sagemaker.mlops.feature_store.feature_processor._image_resolver import _get_spark_image_uri
101102

102103
from sagemaker.core.remote_function.job import (
103104
_JobSettings,
@@ -1039,7 +1040,7 @@ def _get_remote_decorator_config_from_input(
10391040
# TODO: This needs to be removed when new mode is introduced.
10401041
if remote_decorator_config.spark_config is None:
10411042
remote_decorator_config.spark_config = SparkConfig()
1042-
remote_decorator_config.image_uri = _JobSettings._get_default_spark_image(sagemaker_session)
1043+
remote_decorator_config.image_uri = _get_spark_image_uri(sagemaker_session)
10431044

10441045
return remote_decorator_config
10451046

sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_feature_scheduler.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,10 @@ def config_uploader():
162162
return uploader
163163

164164

165+
@patch(
166+
"sagemaker.core.remote_function.job._JobSettings._get_default_spark_image",
167+
new=Mock(return_value="some_image_uri"),
168+
)
165169
@patch(
166170
"sagemaker.mlops.feature_store.feature_processor.feature_scheduler._validate_fg_lineage_resources",
167171
return_value=None,
@@ -171,7 +175,7 @@ def config_uploader():
171175
return_value=mock_pipeline(),
172176
)
173177
@patch(
174-
"sagemaker.core.remote_function.job._JobSettings._get_default_spark_image",
178+
"sagemaker.mlops.feature_store.feature_processor.feature_scheduler._get_spark_image_uri",
175179
return_value="some_image_uri",
176180
)
177181
@patch("sagemaker.mlops.feature_store.feature_processor._config_uploader.TrainingInput")
@@ -470,9 +474,13 @@ def test_to_pipeline_not_wrapped_by_remote(get_execution_role, session):
470474
)
471475

472476

473-
@patch("sagemaker.core.remote_function.job.Session", return_value=mock_session())
474477
@patch(
475478
"sagemaker.core.remote_function.job._JobSettings._get_default_spark_image",
479+
new=Mock(return_value="some_image_uri"),
480+
)
481+
@patch("sagemaker.core.remote_function.job.Session", return_value=mock_session())
482+
@patch(
483+
"sagemaker.mlops.feature_store.feature_processor.feature_scheduler._get_spark_image_uri",
476484
return_value="some_image_uri",
477485
)
478486
@patch("sagemaker.core.remote_function.job.get_execution_role", return_value=EXECUTION_ROLE_ARN)
@@ -522,9 +530,13 @@ def test_to_pipeline_wrong_mode(get_execution_role, mock_spark_image, session):
522530
)
523531

524532

525-
@patch("sagemaker.core.remote_function.job.Session", return_value=mock_session())
526533
@patch(
527534
"sagemaker.core.remote_function.job._JobSettings._get_default_spark_image",
535+
new=Mock(return_value="some_image_uri"),
536+
)
537+
@patch("sagemaker.core.remote_function.job.Session", return_value=mock_session())
538+
@patch(
539+
"sagemaker.mlops.feature_store.feature_processor.feature_scheduler._get_spark_image_uri",
528540
return_value="some_image_uri",
529541
)
530542
@patch("sagemaker.core.remote_function.job.get_execution_role", return_value=EXECUTION_ROLE_ARN)
@@ -577,9 +589,13 @@ def test_to_pipeline_pipeline_name_length_limit_exceeds(
577589
)
578590

579591

580-
@patch("sagemaker.core.remote_function.job.Session", return_value=mock_session())
581592
@patch(
582593
"sagemaker.core.remote_function.job._JobSettings._get_default_spark_image",
594+
new=Mock(return_value="some_image_uri"),
595+
)
596+
@patch("sagemaker.core.remote_function.job.Session", return_value=mock_session())
597+
@patch(
598+
"sagemaker.mlops.feature_store.feature_processor.feature_scheduler._get_spark_image_uri",
583599
return_value="some_image_uri",
584600
)
585601
@patch("sagemaker.core.remote_function.job.get_execution_role", return_value=EXECUTION_ROLE_ARN)

0 commit comments

Comments
 (0)