Skip to content

Commit 3cd4686

Browse files
committed
refactor(feature-processor): Migrate to FeatureGroup resource API
- Replace sagemaker_session.describe_feature_group() calls with FeatureGroup.get() - Update _input_loader.py to use FeatureGroup resource attributes instead of dictionary access - Update feature_scheduler.py to use FeatureGroup.get() and access creation_time as attribute - Update _feature_group_lineage_entity_handler.py to return FeatureGroup resource instead of Dict - Remove unused imports (Dict, Any, FEATURE_GROUP, CREATION_TIME constants) - Replace dictionary key access with typed resource properties (offline_store_config, data_catalog_config, event_time_feature_name, etc.) - Update unit tests to reflect new FeatureGroup resource API usage - Improves type safety and reduces reliance on dictionary-based API responses
1 parent a1b1bc3 commit 3cd4686

File tree

7 files changed

+119
-60
lines changed

7 files changed

+119
-60
lines changed

sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/_input_loader.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
InputOffsetParser,
3535
)
3636
from sagemaker.mlops.feature_store.feature_processor._env import EnvironmentHelper
37+
from sagemaker.core.resources import FeatureGroup
3738

3839
T = TypeVar("T")
3940

@@ -96,26 +97,30 @@ def load_from_feature_group(
9697
sagemaker_session: Session = self.sagemaker_session or Session()
9798

9899
feature_group_name = feature_group_data_source.name
99-
feature_group = sagemaker_session.describe_feature_group(
100-
self._parse_name_from_arn(feature_group_name)
100+
feature_group = FeatureGroup.get(
101+
feature_group_name=self._parse_name_from_arn(feature_group_name),
102+
session=sagemaker_session.boto_session,
101103
)
102104
logger.debug(
103105
"Called describe_feature_group with %s and received: %s",
104106
feature_group_name,
105107
feature_group,
106108
)
107109

108-
if "OfflineStoreConfig" not in feature_group:
110+
if not feature_group.offline_store_config:
109111
raise ValueError(
110112
f"Input Feature Groups must have an enabled Offline Store."
111113
f" Feature Group: {feature_group_name} does not have an Offline Store enabled."
112114
)
113115

114-
offline_store_uri = feature_group["OfflineStoreConfig"]["S3StorageConfig"][
115-
"ResolvedOutputS3Uri"
116-
]
116+
offline_store_config = feature_group.offline_store_config
117+
offline_store_uri = offline_store_config.s3_storage_config.resolved_output_s3_uri
117118

118-
table_format = feature_group["OfflineStoreConfig"].get("TableFormat", None)
119+
table_format = (
120+
offline_store_config.table_format
121+
if offline_store_config.table_format
122+
else None
123+
)
119124

120125
if table_format not in self._supported_table_format:
121126
raise ValueError(
@@ -127,15 +132,15 @@ def load_from_feature_group(
127132
end_offset = feature_group_data_source.input_end_offset
128133

129134
if table_format == "Iceberg":
130-
data_catalog_config = feature_group["OfflineStoreConfig"]["DataCatalogConfig"]
135+
data_catalog_config = offline_store_config.data_catalog_config
131136
return self.load_from_iceberg_table(
132137
IcebergTableDataSource(
133138
offline_store_uri,
134-
data_catalog_config["Catalog"],
135-
data_catalog_config["Database"],
136-
data_catalog_config["TableName"],
139+
data_catalog_config.catalog,
140+
data_catalog_config.database,
141+
data_catalog_config.table_name,
137142
),
138-
feature_group["EventTimeFeatureName"],
143+
feature_group.event_time_feature_name,
139144
start_offset,
140145
end_offset,
141146
)

sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/feature_scheduler.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
)
7474

7575
from sagemaker.core.s3 import s3_path_join
76+
from sagemaker.core.resources import FeatureGroup
7677

7778
from sagemaker.core.helper.session_helper import Session, get_execution_role
7879
from sagemaker.mlops.feature_store.feature_processor._event_bridge_scheduler_helper import (
@@ -770,8 +771,8 @@ def _validate_fg_lineage_resources(feature_group_name: str, sagemaker_session: S
770771
groups.
771772
"""
772773

773-
feature_group = sagemaker_session.describe_feature_group(feature_group_name=feature_group_name)
774-
feature_group_creation_time = feature_group["CreationTime"].strftime("%s")
774+
feature_group = FeatureGroup.get(feature_group_name=feature_group_name, session=sagemaker_session.boto_session)
775+
feature_group_creation_time = feature_group.creation_time.strftime("%s")
775776
feature_group_context = _get_feature_group_lineage_context_name(
776777
feature_group_name=feature_group_name,
777778
feature_group_creation_time=feature_group_creation_time,

sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/lineage/_feature_group_lineage_entity_handler.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
from __future__ import absolute_import
1515

1616
import re
17-
from typing import Dict, Any
1817
import logging
1918

2019
from sagemaker.core.helper.session_helper import Session
@@ -24,10 +23,9 @@
2423
)
2524
from sagemaker.mlops.feature_store.feature_processor.lineage.constants import (
2625
SAGEMAKER,
27-
FEATURE_GROUP,
28-
CREATION_TIME,
2926
)
3027
from sagemaker.core.lineage.context import Context
28+
from sagemaker.core.resources import FeatureGroup
3129

3230
# pylint: disable=C0301
3331
from sagemaker.mlops.feature_store.feature_processor.lineage._feature_processor_lineage_name_helper import (
@@ -62,8 +60,8 @@ def retrieve_feature_group_context_arns(
6260
),
6361
sagemaker_session=sagemaker_session,
6462
)
65-
feature_group_name = feature_group[FEATURE_GROUP]
66-
feature_group_creation_time = feature_group[CREATION_TIME].strftime("%s")
63+
feature_group_name = feature_group.feature_group_name
64+
feature_group_creation_time = feature_group.creation_time.strftime("%s")
6765
feature_group_pipeline_context = (
6866
FeatureGroupLineageEntityHandler._load_feature_group_pipeline_context(
6967
feature_group_name=feature_group_name,
@@ -87,7 +85,7 @@ def retrieve_feature_group_context_arns(
8785
@staticmethod
8886
def _describe_feature_group(
8987
feature_group_name: str, sagemaker_session: Session
90-
) -> Dict[str, Any]:
88+
) -> FeatureGroup:
9189
"""Retrieve the Feature Group.
9290
9391
Arguments:
@@ -97,9 +95,9 @@ def _describe_feature_group(
9795
function creates one using the default AWS configuration chain.
9896
9997
Returns:
100-
Dict[str, Any]: The Feature Group details.
98+
FeatureGroup: The Feature Group resource.
10199
"""
102-
feature_group = sagemaker_session.describe_feature_group(feature_group_name)
100+
feature_group = FeatureGroup.get(feature_group_name=feature_group_name, session=sagemaker_session.boto_session)
103101
logger.debug(
104102
"Called describe_feature_group with %s and received: %s",
105103
feature_group_name,

sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/lineage/test_constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
CREATION_TIME = "123123123"
4747
LAST_UPDATE_TIME = "234234234"
4848
SAGEMAKER_SESSION_MOCK = Mock(Session)
49+
SAGEMAKER_SESSION_MOCK.boto_session = Mock()
4950
CONTEXT_MOCK_01 = Mock(Context)
5051
CONTEXT_MOCK_02 = Mock(Context)
5152

sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/lineage/test_feature_group_lineage_entity_handler.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,13 @@
1111
# ANY KIND, either express or implied. See the License for the specific
1212
# language governing permissions and limitations under the License.
1313
from __future__ import absolute_import
14-
from mock import patch, call
14+
from mock import patch, call, Mock
1515

1616
from sagemaker.mlops.feature_store.feature_processor.lineage._feature_group_lineage_entity_handler import (
1717
FeatureGroupLineageEntityHandler,
1818
)
1919
from sagemaker.core.lineage.context import Context
20+
from sagemaker.core.resources import FeatureGroup
2021

2122
from test_constants import (
2223
SAGEMAKER_SESSION_MOCK,
@@ -26,11 +27,15 @@
2627
FEATURE_GROUP_NAME,
2728
)
2829

30+
FEATURE_GROUP_MOCK = Mock()
31+
FEATURE_GROUP_MOCK.feature_group_name = FEATURE_GROUP["FeatureGroupName"]
32+
FEATURE_GROUP_MOCK.creation_time = FEATURE_GROUP["CreationTime"]
33+
2934

3035
def test_retrieve_feature_group_context_arns():
3136
with patch.object(
32-
SAGEMAKER_SESSION_MOCK, "describe_feature_group", return_value=FEATURE_GROUP
33-
) as fg_describe_method:
37+
FeatureGroup, "get", return_value=FEATURE_GROUP_MOCK
38+
) as fg_get_method:
3439
with patch.object(
3540
Context, "load", side_effect=[CONTEXT_MOCK_01, CONTEXT_MOCK_02]
3641
) as context_load:
@@ -44,16 +49,17 @@ def test_retrieve_feature_group_context_arns():
4449
assert result.name == FEATURE_GROUP_NAME
4550
assert result.pipeline_context_arn == "context-arn-fep"
4651
assert result.pipeline_version_context_arn == "context-arn-fep-ver"
47-
fg_describe_method.assert_called_once_with(FEATURE_GROUP_NAME)
52+
fg_get_method.assert_called_once_with(feature_group_name=FEATURE_GROUP_NAME, session=SAGEMAKER_SESSION_MOCK.boto_session)
53+
creation_time_str = FEATURE_GROUP_MOCK.creation_time.strftime("%s")
4854
context_load.assert_has_calls(
4955
[
5056
call(
51-
context_name=f'{FEATURE_GROUP_NAME}-{FEATURE_GROUP["CreationTime"].strftime("%s")}'
57+
context_name=f"{FEATURE_GROUP_NAME}-{creation_time_str}"
5258
f"-feature-group-pipeline",
5359
sagemaker_session=SAGEMAKER_SESSION_MOCK,
5460
),
5561
call(
56-
context_name=f'{FEATURE_GROUP_NAME}-{FEATURE_GROUP["CreationTime"].strftime("%s")}'
62+
context_name=f"{FEATURE_GROUP_NAME}-{creation_time_str}"
5763
f"-feature-group-pipeline-version",
5864
sagemaker_session=SAGEMAKER_SESSION_MOCK,
5965
),

sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_feature_scheduler.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
)
3030
from sagemaker.core.lineage.context import Context
3131
from sagemaker.core.remote_function.spark_config import SparkConfig
32+
from sagemaker.core.resources import FeatureGroup
3233

3334
from sagemaker.core.helper.session_helper import Session
3435
from sagemaker.mlops.feature_store.feature_processor._enums import FeatureProcessorMode
@@ -89,10 +90,13 @@
8990
PIPELINE_VERSION_CONTEXT_NAME_TAG_KEY = "sm-fs-fe:feature-engineering-pipeline-version-context-name"
9091
NOW = datetime.now()
9192
SAGEMAKER_SESSION_MOCK = Mock(Session)
93+
SAGEMAKER_SESSION_MOCK.boto_session = Mock()
9294
CONTEXT_MOCK_01 = Mock(Context)
9395
CONTEXT_MOCK_02 = Mock(Context)
9496
CONTEXT_MOCK_03 = Mock(Context)
9597
FEATURE_GROUP = tdh.DESCRIBE_FEATURE_GROUP_RESPONSE.copy()
98+
FEATURE_GROUP_MOCK = Mock()
99+
FEATURE_GROUP_MOCK.creation_time = FEATURE_GROUP["CreationTime"]
96100
PIPELINE = tdh.PIPELINE.copy()
97101
TAGS = [dict(Key="key_1", Value="value_1"), dict(Key="key_2", Value="value_2")]
98102

@@ -783,8 +787,8 @@ def test_execute(validation):
783787

784788
def test_validate_fg_lineage_resources_happy_case():
785789
with patch.object(
786-
SAGEMAKER_SESSION_MOCK, "describe_feature_group", return_value=FEATURE_GROUP
787-
) as fg_describe_method:
790+
FeatureGroup, "get", return_value=FEATURE_GROUP_MOCK
791+
) as fg_get_method:
788792
with patch.object(
789793
Context, "load", side_effect=[CONTEXT_MOCK_01, CONTEXT_MOCK_02, CONTEXT_MOCK_03]
790794
) as context_load:
@@ -795,16 +799,17 @@ def test_validate_fg_lineage_resources_happy_case():
795799
feature_group_name="some_fg",
796800
sagemaker_session=SAGEMAKER_SESSION_MOCK,
797801
)
798-
fg_describe_method.assert_called_once_with(feature_group_name="some_fg")
802+
fg_get_method.assert_called_once_with(feature_group_name="some_fg", session=SAGEMAKER_SESSION_MOCK.boto_session)
803+
creation_time_str = FEATURE_GROUP_MOCK.creation_time.strftime("%s")
799804
context_load.assert_has_calls(
800805
[
801806
call(
802-
context_name=f'{"some_fg"}-{FEATURE_GROUP["CreationTime"].strftime("%s")}'
807+
context_name=f'{"some_fg"}-{creation_time_str}'
803808
f"-feature-group-pipeline",
804809
sagemaker_session=SAGEMAKER_SESSION_MOCK,
805810
),
806811
call(
807-
context_name=f'{"some_fg"}-{FEATURE_GROUP["CreationTime"].strftime("%s")}'
812+
context_name=f'{"some_fg"}-{creation_time_str}'
808813
f"-feature-group-pipeline-version",
809814
sagemaker_session=SAGEMAKER_SESSION_MOCK,
810815
),
@@ -814,7 +819,7 @@ def test_validate_fg_lineage_resources_happy_case():
814819

815820

816821
def test_validete_fg_lineage_resources_rnf():
817-
with patch.object(SAGEMAKER_SESSION_MOCK, "describe_feature_group", return_value=FEATURE_GROUP):
822+
with patch.object(FeatureGroup, "get", return_value=FEATURE_GROUP_MOCK):
818823
with patch.object(
819824
Context,
820825
"load",
@@ -824,7 +829,7 @@ def test_validete_fg_lineage_resources_rnf():
824829
),
825830
):
826831
feature_group_name = "some_fg"
827-
feature_group_creation_time = FEATURE_GROUP["CreationTime"].strftime("%s")
832+
feature_group_creation_time = FEATURE_GROUP_MOCK.creation_time.strftime("%s")
828833
context_name = f"{feature_group_name}-{feature_group_creation_time}"
829834
with pytest.raises(
830835
ValueError,

0 commit comments

Comments
 (0)