Skip to content

Commit 98e10b9

Browse files
BassemHalimAditi2424adishaabhhalimmollyheamazon
authored
Feature processor v3 (#5565)
* Feature store v3 (#5490) * feat: Add Feature Store Support to V3 * Add feature store tests --------- Co-authored-by: adishaa <adishaa@amazon.com> * feat: feature_processor v3 * integ tests * fix * chore(docs): Add API docs * fix: Fix flaky integ tests * fix diff * chore: rename parameter + cleanup comments * Feature store v3 (#5490) * feat: Add Feature Store Support to V3 * Add feature store tests --------- Co-authored-by: adishaa <adishaa@amazon.com> * add pyspark to test deps * add test deps * fix unit test deps * pin setuptools<82 for feature-processor and unit tests * Set JAVA_HOME for integ tests which requires java * fix spark session bug * fix(feature-processor): Fix Spark session config and Ivy cache race condition Isolate Ivy cache per Spark session via spark.jars.ivy to prevent concurrent pytest-xdist workers from corrupting shared /root/.ivy2/cache during Maven dependency resolution in CI. * revert previous change + create different ivy cache per test to fix concurrent writes in CI * revert changes to sagemaker-core * refactor(feature-processor): Migrate to FeatureGroup resource API - Replace sagemaker_session.describe_feature_group() calls with FeatureGroup.get() - Update _input_loader.py to use FeatureGroup resource attributes instead of dictionary access - Update feature_scheduler.py to use FeatureGroup.get() and access creation_time as attribute - Update _feature_group_lineage_entity_handler.py to return FeatureGroup resource instead of Dict - Remove unused imports (Dict, Any, FEATURE_GROUP, CREATION_TIME constants) - Replace dictionary key access with typed resource properties (offline_store_config, data_catalog_config, event_time_feature_name, etc.) - Update unit tests to reflect new FeatureGroup resource API usage - Improves type safety and reduces reliance on dictionary-based API responses * add `build` to test_requirements * add upper bounds for test dependencies * move feature-processor config to sagemaker-mlops optional deps --------- Co-authored-by: Aditi Sharma <165942273+Aditi2424@users.noreply.github.com> Co-authored-by: adishaa <adishaa@amazon.com> Co-authored-by: Basssem Halim <bhhalim@amazon.com> Co-authored-by: Molly He <mollyhe@amazon.com>
1 parent 71c8d70 commit 98e10b9

72 files changed

Lines changed: 15517 additions & 2 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

docs/api/sagemaker_mlops.rst

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,12 @@ Local Development
2828
:members:
2929
:undoc-members:
3030
:show-inheritance:
31+
32+
33+
Feature Store
34+
-------------
35+
36+
.. automodule:: sagemaker.mlops.feature_store
37+
:members:
38+
:undoc-members:
39+
:show-inheritance:

requirements/extras/test_requirements.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,5 @@ scipy
1111
omegaconf
1212
graphene
1313
typing_extensions>=4.9.0
14-
tensorflow>=2.16.2,<=2.19.0
14+
tensorflow>=2.16.2,<=2.19.0
15+
build

sagemaker-mlops/pyproject.toml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,21 @@ dependencies = [
3131
]
3232

3333
[project.optional-dependencies]
34+
feature-processor = [
35+
"pyspark==3.3.2",
36+
"sagemaker-feature-store-pyspark-3.3",
37+
"setuptools<82",
38+
]
39+
3440
test = [
3541
"pytest",
3642
"pytest-cov",
3743
"mock",
44+
"setuptools<82",
45+
"pyspark==3.3.2",
46+
"sagemaker-feature-store-pyspark-3.3",
47+
"pandas<3.0",
48+
"numpy<3.0",
3849
]
3950
dev = [
4051
"pytest",
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License"). You
4+
# may not use this file except in compliance with the License. A copy of
5+
# the License is located at
6+
#
7+
# http://aws.amazon.com/apache2.0/
8+
#
9+
# or in the "license" file accompanying this file. This file is
10+
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11+
# ANY KIND, either express or implied. See the License for the specific
12+
# language governing permissions and limitations under the License.
13+
"""Exported classes for the sagemaker.mlops.feature_store.feature_processor module."""
14+
from __future__ import absolute_import
15+
16+
from sagemaker.mlops.feature_store.feature_processor._data_source import ( # noqa: F401
17+
CSVDataSource,
18+
FeatureGroupDataSource,
19+
ParquetDataSource,
20+
BaseDataSource,
21+
PySparkDataSource,
22+
)
23+
from sagemaker.mlops.feature_store.feature_processor._exceptions import ( # noqa: F401
24+
IngestionError,
25+
)
26+
from sagemaker.mlops.feature_store.feature_processor.feature_processor import ( # noqa: F401
27+
feature_processor,
28+
)
29+
from sagemaker.mlops.feature_store.feature_processor.feature_scheduler import ( # noqa: F401
30+
to_pipeline,
31+
schedule,
32+
describe,
33+
put_trigger,
34+
delete_trigger,
35+
enable_trigger,
36+
disable_trigger,
37+
delete_schedule,
38+
list_pipelines,
39+
execute,
40+
TransformationCode,
41+
FeatureProcessorPipelineEvents,
42+
)
43+
from sagemaker.mlops.feature_store.feature_processor._enums import ( # noqa: F401
44+
FeatureProcessorPipelineExecutionStatus,
45+
)
Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License"). You
4+
# may not use this file except in compliance with the License. A copy of
5+
# the License is located at
6+
#
7+
# http://aws.amazon.com/apache2.0/
8+
#
9+
# or in the "license" file accompanying this file. This file is
10+
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11+
# ANY KIND, either express or implied. See the License for the specific
12+
# language governing permissions and limitations under the License.
13+
"""Contains classes for preparing and uploading configs for a scheduled feature processor."""
14+
from __future__ import absolute_import
15+
from typing import Callable, Dict, Optional, Tuple, List, Union
16+
17+
import attr
18+
19+
from sagemaker.core.helper.session_helper import Session
20+
from sagemaker.mlops.feature_store.feature_processor._constants import (
21+
SPARK_JAR_FILES_PATH,
22+
SPARK_PY_FILES_PATH,
23+
SPARK_FILES_PATH,
24+
S3_DATA_DISTRIBUTION_TYPE,
25+
)
26+
from sagemaker.core.inputs import TrainingInput
27+
from sagemaker.core.shapes import Channel, DataSource, S3DataSource
28+
from sagemaker.core.remote_function.core.stored_function import StoredFunction
29+
from sagemaker.core.remote_function.job import (
30+
_prepare_and_upload_workspace,
31+
_prepare_and_upload_runtime_scripts,
32+
_JobSettings,
33+
RUNTIME_SCRIPTS_CHANNEL_NAME,
34+
REMOTE_FUNCTION_WORKSPACE,
35+
SPARK_CONF_CHANNEL_NAME,
36+
_prepare_and_upload_spark_dependent_files,
37+
)
38+
from sagemaker.core.remote_function.runtime_environment.runtime_environment_manager import (
39+
RuntimeEnvironmentManager,
40+
)
41+
from sagemaker.core.remote_function.spark_config import SparkConfig
42+
from sagemaker.core.remote_function.custom_file_filter import CustomFileFilter
43+
from sagemaker.core.s3 import s3_path_join
44+
45+
46+
@attr.s
47+
class ConfigUploader:
48+
"""Prepares and uploads customer provided configs to S3"""
49+
50+
remote_decorator_config: _JobSettings = attr.ib()
51+
runtime_env_manager: RuntimeEnvironmentManager = attr.ib()
52+
53+
def prepare_step_input_channel_for_spark_mode(
54+
self, func: Callable, s3_base_uri: str, sagemaker_session: Session
55+
) -> Tuple[List[Channel], Dict]:
56+
"""Prepares input channels for SageMaker Pipeline Step.
57+
58+
Returns:
59+
Tuple of (List[Channel], spark_dependency_paths dict)
60+
"""
61+
self._prepare_and_upload_callable(func, s3_base_uri, sagemaker_session)
62+
bootstrap_scripts_s3uri = self._prepare_and_upload_runtime_scripts(
63+
self.remote_decorator_config.spark_config,
64+
s3_base_uri,
65+
self.remote_decorator_config.s3_kms_key,
66+
sagemaker_session,
67+
)
68+
dependencies_list_path = self.runtime_env_manager.snapshot(
69+
self.remote_decorator_config.dependencies
70+
)
71+
user_workspace_s3uri = self._prepare_and_upload_workspace(
72+
dependencies_list_path,
73+
self.remote_decorator_config.include_local_workdir,
74+
self.remote_decorator_config.pre_execution_commands,
75+
self.remote_decorator_config.pre_execution_script,
76+
s3_base_uri,
77+
self.remote_decorator_config.s3_kms_key,
78+
sagemaker_session,
79+
self.remote_decorator_config.custom_file_filter,
80+
)
81+
82+
(
83+
submit_jars_s3_paths,
84+
submit_py_files_s3_paths,
85+
submit_files_s3_path,
86+
config_file_s3_uri,
87+
) = self._prepare_and_upload_spark_dependent_files(
88+
self.remote_decorator_config.spark_config,
89+
s3_base_uri,
90+
self.remote_decorator_config.s3_kms_key,
91+
sagemaker_session,
92+
)
93+
94+
channels = [
95+
Channel(
96+
channel_name=RUNTIME_SCRIPTS_CHANNEL_NAME,
97+
data_source=DataSource(
98+
s3_data_source=S3DataSource(
99+
s3_uri=bootstrap_scripts_s3uri,
100+
s3_data_type="S3Prefix",
101+
s3_data_distribution_type=S3_DATA_DISTRIBUTION_TYPE,
102+
)
103+
),
104+
input_mode="File",
105+
)
106+
]
107+
108+
if user_workspace_s3uri:
109+
channels.append(
110+
Channel(
111+
channel_name=REMOTE_FUNCTION_WORKSPACE,
112+
data_source=DataSource(
113+
s3_data_source=S3DataSource(
114+
s3_uri=s3_path_join(s3_base_uri, REMOTE_FUNCTION_WORKSPACE),
115+
s3_data_type="S3Prefix",
116+
s3_data_distribution_type=S3_DATA_DISTRIBUTION_TYPE,
117+
)
118+
),
119+
input_mode="File",
120+
)
121+
)
122+
123+
if config_file_s3_uri:
124+
channels.append(
125+
Channel(
126+
channel_name=SPARK_CONF_CHANNEL_NAME,
127+
data_source=DataSource(
128+
s3_data_source=S3DataSource(
129+
s3_uri=config_file_s3_uri,
130+
s3_data_type="S3Prefix",
131+
s3_data_distribution_type=S3_DATA_DISTRIBUTION_TYPE,
132+
)
133+
),
134+
input_mode="File",
135+
)
136+
)
137+
138+
return channels, {
139+
SPARK_JAR_FILES_PATH: submit_jars_s3_paths,
140+
SPARK_PY_FILES_PATH: submit_py_files_s3_paths,
141+
SPARK_FILES_PATH: submit_files_s3_path,
142+
}
143+
144+
def _prepare_and_upload_callable(
145+
self, func: Callable, s3_base_uri: str, sagemaker_session: Session
146+
) -> None:
147+
"""Prepares and uploads callable to S3"""
148+
stored_function = StoredFunction(
149+
sagemaker_session=sagemaker_session,
150+
s3_base_uri=s3_base_uri,
151+
s3_kms_key=self.remote_decorator_config.s3_kms_key,
152+
)
153+
stored_function.save(func)
154+
155+
def _prepare_and_upload_workspace(
156+
self,
157+
local_dependencies_path: str,
158+
include_local_workdir: bool,
159+
pre_execution_commands: List[str],
160+
pre_execution_script_local_path: str,
161+
s3_base_uri: str,
162+
s3_kms_key: str,
163+
sagemaker_session: Session,
164+
custom_file_filter: Optional[Union[Callable[[str, List], List], CustomFileFilter]] = None,
165+
) -> str:
166+
"""Upload the training step dependencies to S3 if present"""
167+
return _prepare_and_upload_workspace(
168+
local_dependencies_path=local_dependencies_path,
169+
include_local_workdir=include_local_workdir,
170+
pre_execution_commands=pre_execution_commands,
171+
pre_execution_script_local_path=pre_execution_script_local_path,
172+
s3_base_uri=s3_base_uri,
173+
s3_kms_key=s3_kms_key,
174+
sagemaker_session=sagemaker_session,
175+
custom_file_filter=custom_file_filter,
176+
)
177+
178+
def _prepare_and_upload_runtime_scripts(
179+
self,
180+
spark_config: SparkConfig,
181+
s3_base_uri: str,
182+
s3_kms_key: str,
183+
sagemaker_session: Session,
184+
) -> str:
185+
"""Copy runtime scripts to a folder and upload to S3"""
186+
return _prepare_and_upload_runtime_scripts(
187+
spark_config=spark_config,
188+
s3_base_uri=s3_base_uri,
189+
s3_kms_key=s3_kms_key,
190+
sagemaker_session=sagemaker_session,
191+
)
192+
193+
def _prepare_and_upload_spark_dependent_files(
194+
self,
195+
spark_config: SparkConfig,
196+
s3_base_uri: str,
197+
s3_kms_key: str,
198+
sagemaker_session: Session,
199+
) -> Tuple:
200+
"""Upload the spark dependencies to S3 if present"""
201+
if not spark_config:
202+
return None, None, None, None
203+
204+
return _prepare_and_upload_spark_dependent_files(
205+
spark_config=spark_config,
206+
s3_base_uri=s3_base_uri,
207+
s3_kms_key=s3_kms_key,
208+
sagemaker_session=sagemaker_session,
209+
)
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License"). You
4+
# may not use this file except in compliance with the License. A copy of
5+
# the License is located at
6+
#
7+
# http://aws.amazon.com/apache2.0/
8+
#
9+
# or in the "license" file accompanying this file. This file is
10+
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11+
# ANY KIND, either express or implied. See the License for the specific
12+
# language governing permissions and limitations under the License.
13+
"""Module containing constants for feature_processor and feature_scheduler module."""
14+
from __future__ import absolute_import
15+
16+
from sagemaker.core.workflow.parameters import Parameter, ParameterTypeEnum
17+
18+
DEFAULT_INSTANCE_TYPE = "ml.m5.xlarge"
19+
DEFAULT_SCHEDULE_STATE = "ENABLED"
20+
DEFAULT_TRIGGER_STATE = "ENABLED"
21+
UNDERSCORE = "_"
22+
RESOURCE_NOT_FOUND_EXCEPTION = "ResourceNotFoundException"
23+
RESOURCE_NOT_FOUND = "ResourceNotFound"
24+
EXECUTION_TIME_PIPELINE_PARAMETER = "scheduled_time"
25+
VALIDATION_EXCEPTION = "ValidationException"
26+
EVENT_BRIDGE_INVOCATION_TIME = "<aws.scheduler.scheduled-time>"
27+
SCHEDULED_TIME_PIPELINE_PARAMETER = Parameter(
28+
name=EXECUTION_TIME_PIPELINE_PARAMETER, parameter_type=ParameterTypeEnum.STRING
29+
)
30+
EXECUTION_TIME_PIPELINE_PARAMETER_FORMAT = "%Y-%m-%dT%H:%M:%SZ" # 2023-01-01T07:00:00Z
31+
NO_FLEXIBLE_TIME_WINDOW = dict(Mode="OFF")
32+
PIPELINE_NAME_MAXIMUM_LENGTH = 80
33+
PIPELINE_CONTEXT_TYPE = "FeatureEngineeringPipeline"
34+
SPARK_JAR_FILES_PATH = "submit_jars_s3_paths"
35+
SPARK_PY_FILES_PATH = "submit_py_files_s3_paths"
36+
SPARK_FILES_PATH = "submit_files_s3_path"
37+
FEATURE_PROCESSOR_TAG_KEY = "sm-fs-fe:created-from"
38+
FEATURE_PROCESSOR_TAG_VALUE = "fp-to-pipeline"
39+
FEATURE_GROUP_ARN_REGEX_PATTERN = r"arn:(.*?):sagemaker:(.*?):(.*?):feature-group/(.*?)$"
40+
PIPELINE_ARN_REGEX_PATTERN = r"arn:(.*?):sagemaker:(.*?):(.*?):pipeline/(.*?)$"
41+
EVENTBRIDGE_RULE_ARN_REGEX_PATTERN = r"arn:(.*?):events:(.*?):(.*?):rule/(.*?)$"
42+
SAGEMAKER_WHL_FILE_S3_PATH = "s3://ada-private-beta/sagemaker-2.151.1.dev0-py2.py3-none-any.whl"
43+
S3_DATA_DISTRIBUTION_TYPE = "FullyReplicated"
44+
PIPELINE_CONTEXT_NAME_TAG_KEY = "sm-fs-fe:feature-engineering-pipeline-context-name"
45+
PIPELINE_VERSION_CONTEXT_NAME_TAG_KEY = "sm-fs-fe:feature-engineering-pipeline-version-context-name"
46+
TO_PIPELINE_RESERVED_TAG_KEYS = [
47+
FEATURE_PROCESSOR_TAG_KEY,
48+
PIPELINE_CONTEXT_NAME_TAG_KEY,
49+
PIPELINE_VERSION_CONTEXT_NAME_TAG_KEY,
50+
]
51+
BASE_EVENT_PATTERN = {
52+
"source": ["aws.sagemaker"],
53+
"detail": {"currentPipelineExecutionStatus": [], "pipelineArn": []},
54+
}

0 commit comments

Comments
 (0)