diff --git a/sagemaker-core/src/sagemaker/core/telemetry/constants.py b/sagemaker-core/src/sagemaker/core/telemetry/constants.py index 5f6b6ff607..8b0bf9e5c3 100644 --- a/sagemaker-core/src/sagemaker/core/telemetry/constants.py +++ b/sagemaker-core/src/sagemaker/core/telemetry/constants.py @@ -28,6 +28,7 @@ class Feature(Enum): MODEL_TRAINER = 14 MODEL_CUSTOMIZATION = 15 MLOPS = 16 + FEATURE_STORE = 17 def __str__(self): # pylint: disable=E0307 """Return the feature name.""" diff --git a/sagemaker-core/src/sagemaker/core/telemetry/telemetry_logging.py b/sagemaker-core/src/sagemaker/core/telemetry/telemetry_logging.py index 728c8c2d5b..6388cdfe5e 100644 --- a/sagemaker-core/src/sagemaker/core/telemetry/telemetry_logging.py +++ b/sagemaker-core/src/sagemaker/core/telemetry/telemetry_logging.py @@ -56,6 +56,7 @@ str(Feature.MODEL_TRAINER): 14, str(Feature.MODEL_CUSTOMIZATION): 15, str(Feature.MLOPS): 16, + str(Feature.FEATURE_STORE): 17, } STATUS_TO_CODE = { @@ -81,6 +82,9 @@ def wrapper(*args, **kwargs): if len(args) > 0 and hasattr(args[0], "sagemaker_session"): # Get the sagemaker_session from the instance method args sagemaker_session = args[0].sagemaker_session + elif len(args) > 0 and hasattr(args[0], "_sagemaker_session"): + # Get the sagemaker_session from the instance method args (private attribute) + sagemaker_session = args[0]._sagemaker_session elif feature == Feature.REMOTE_FUNCTION: # Get the sagemaker_session from the function keyword arguments for remote function sagemaker_session = kwargs.get( diff --git a/sagemaker-core/src/sagemaker/core/utils/utils.py b/sagemaker-core/src/sagemaker/core/utils/utils.py index 909e8463a9..8a8d90be6e 100644 --- a/sagemaker-core/src/sagemaker/core/utils/utils.py +++ b/sagemaker-core/src/sagemaker/core/utils/utils.py @@ -298,6 +298,22 @@ def __new__(cls): cls._instance = super().__new__(cls) return cls._instance + def __repr__(self): + """Return clean representation for debugging.""" + return "Unassigned()" + + def __str__(self): + """Return empty string for clean printing.""" + return "" + + def __iter__(self): + """Return empty iterator to prevent iteration errors.""" + return iter([]) + + def __bool__(self): + """Return False for truthiness checks.""" + return False + class SingletonMeta(type): """ diff --git a/sagemaker-core/tests/unit/generated/test_utils.py b/sagemaker-core/tests/unit/generated/test_utils.py index de67ac5527..44856d17d5 100644 --- a/sagemaker-core/tests/unit/generated/test_utils.py +++ b/sagemaker-core/tests/unit/generated/test_utils.py @@ -371,3 +371,50 @@ def test_serialize_method_nested_shape(): "StringValue": "string", }, } + + +class TestUnassignedBehavior: + """Test Unassigned class methods for proper behavior. + + Bug fix: GetRecordResponse is not printable and cannot be parsed via iterator. + Error: TypeError: 'Unassigned' object is not iterable + """ + + def test_unassigned_repr(self): + """Test that Unassigned has clean repr.""" + u = Unassigned() + assert repr(u) == "Unassigned()" + + def test_unassigned_str(self): + """Test that Unassigned converts to empty string.""" + u = Unassigned() + assert str(u) == "" + + def test_unassigned_bool(self): + """Test that Unassigned is falsy.""" + u = Unassigned() + assert not u + assert bool(u) is False + + def test_unassigned_iter(self): + """Test that Unassigned is iterable and returns empty list.""" + u = Unassigned() + result = list(u) + assert result == [] + + def test_unassigned_singleton(self): + """Test that Unassigned is a singleton.""" + u1 = Unassigned() + u2 = Unassigned() + assert u1 is u2 + + def test_unassigned_in_conditional(self): + """Test that Unassigned works correctly in conditionals.""" + u = Unassigned() + + # Should evaluate to False + if u: + pytest.fail("Unassigned should be falsy") + + # Should work with not + assert not u diff --git a/sagemaker-mlops/src/sagemaker/mlops/feature_store/athena_query.py b/sagemaker-mlops/src/sagemaker/mlops/feature_store/athena_query.py index 123b3c4305..2163badf3b 100644 --- a/sagemaker-mlops/src/sagemaker/mlops/feature_store/athena_query.py +++ b/sagemaker-mlops/src/sagemaker/mlops/feature_store/athena_query.py @@ -14,6 +14,7 @@ ) from sagemaker.core.helper.session_helper import Session +from sagemaker.core.telemetry import Feature, _telemetry_emitter @dataclass class AthenaQuery: @@ -37,6 +38,7 @@ class AthenaQuery: _result_bucket: str = field(default=None, init=False) _result_file_prefix: str = field(default=None, init=False) + @_telemetry_emitter(Feature.FEATURE_STORE, "AthenaQuery.run") def run( self, query_string: str, output_location: str, kms_key: str = None, workgroup: str = None ) -> str: @@ -82,6 +84,7 @@ def get_query_execution(self) -> Dict[str, Any]: """ return get_query_execution(self.sagemaker_session, self._current_query_execution_id) + @_telemetry_emitter(Feature.FEATURE_STORE, "AthenaQuery.as_dataframe") def as_dataframe(self, **kwargs) -> DataFrame: """Download the result of the current query and load it into a DataFrame. diff --git a/sagemaker-mlops/src/sagemaker/mlops/feature_store/dataset_builder.py b/sagemaker-mlops/src/sagemaker/mlops/feature_store/dataset_builder.py index 72e9535320..39a8bc9f5c 100644 --- a/sagemaker-mlops/src/sagemaker/mlops/feature_store/dataset_builder.py +++ b/sagemaker-mlops/src/sagemaker/mlops/feature_store/dataset_builder.py @@ -9,6 +9,7 @@ import pandas as pd from sagemaker.core.helper.session_helper import Session +from sagemaker.core.telemetry import Feature, _telemetry_emitter from sagemaker.mlops.feature_store import FeatureGroup from sagemaker.mlops.feature_store.feature_definition import FeatureDefinition, FeatureTypeEnum from sagemaker.mlops.feature_store.feature_utils import ( @@ -143,7 +144,7 @@ def construct_feature_group_to_be_merged( raise RuntimeError(f"No metastore configured for FeatureGroup {fg.feature_group_name}.") catalog_config = fg.offline_store_config.data_catalog_config - disable_glue = catalog_config.disable_glue_table_creation or False + disable_glue = getattr(catalog_config, "disable_glue_table_creation", False) or False features = [fd.feature_name for fd in fg.feature_definitions] record_id = fg.record_identifier_feature_name @@ -422,12 +423,18 @@ def with_event_time_range( self._event_time_ending_timestamp = ending_timestamp return self + @_telemetry_emitter(Feature.FEATURE_STORE, "DatasetBuilder.to_csv_file") def to_csv_file(self) -> tuple[str, str]: """Get query string and result in .csv format file. Returns: - The S3 path of the .csv file. - The query string executed. + tuple: A tuple containing: + - str: The S3 path of the .csv file + - str: The query string executed + + Note: + This method returns a tuple (csv_path, query_string). + To get just the CSV path: csv_path, _ = builder.to_csv_file() """ if isinstance(self._base, pd.DataFrame): return self._to_csv_from_dataframe() @@ -435,12 +442,18 @@ def to_csv_file(self) -> tuple[str, str]: return self._to_csv_from_feature_group() raise ValueError("Base must be either a FeatureGroup or a DataFrame.") + @_telemetry_emitter(Feature.FEATURE_STORE, "DatasetBuilder.to_dataframe") def to_dataframe(self) -> tuple[pd.DataFrame, str]: """Get query string and result in pandas.DataFrame. Returns: - The pandas.DataFrame object. - The query string executed. + tuple: A tuple containing: + - pd.DataFrame: The pandas DataFrame object + - str: The query string executed + + Note: + This method returns a tuple (dataframe, query_string). + To get just the DataFrame: df, _ = builder.to_dataframe() """ csv_file, query_string = self.to_csv_file() df = download_csv_from_s3(csv_file, self._sagemaker_session, self._kms_key_id) @@ -471,8 +484,8 @@ def _to_csv_from_dataframe(self) -> tuple[str, str]: table_name=temp_table_name, record_identifier_feature_name=self._record_identifier_feature_name, event_time_identifier_feature=FeatureDefinition( - self._event_time_identifier_feature_name, - self._event_time_identifier_feature_type, + feature_name=self._event_time_identifier_feature_name, + feature_type=self._event_time_identifier_feature_type, ), table_type=TableType.DATA_FRAME, ) diff --git a/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_utils.py b/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_utils.py index 0b7c747515..3e3e7813df 100644 --- a/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_utils.py +++ b/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_utils.py @@ -4,15 +4,19 @@ import logging import os import time +from pathlib import Path +import re from typing import Any, Dict, Sequence, Union import boto3 +import pandas import pandas as pd -from pandas import DataFrame, Series - +from pandas import DataFrame, Series, read_csv +from sagemaker.core.utils.utils import Unassigned from sagemaker.mlops.feature_store import FeatureGroup as CoreFeatureGroup, FeatureGroup from sagemaker.core.helper.session_helper import Session from sagemaker.core.s3.client import S3Uploader, S3Downloader +from sagemaker.core.telemetry import Feature, _telemetry_emitter from sagemaker.mlops.feature_store.feature_definition import ( FeatureDefinition, FractionalFeatureDefinition, @@ -366,7 +370,7 @@ def create_athena_query(feature_group_name: str, session: Session): raise RuntimeError("No metastore is configured with this feature group.") catalog_config = fg.offline_store_config.data_catalog_config - disable_glue = catalog_config.disable_glue_table_creation or False + disable_glue = getattr(catalog_config, "disable_glue_table_creation", False) or False return AthenaQuery( catalog=catalog_config.catalog if disable_glue else "AwsDataCatalog", @@ -417,6 +421,7 @@ def as_hive_ddl( return ddl +@_telemetry_emitter(Feature.FEATURE_STORE, "ingest_dataframe") def ingest_dataframe( feature_group_name: str, data_frame: DataFrame, @@ -448,7 +453,16 @@ def ingest_dataframe( raise ValueError("max_workers must be greater than 0.") fg = CoreFeatureGroup.get(feature_group_name=feature_group_name) - feature_definitions = {fd.feature_name: fd.feature_type for fd in fg.feature_definitions} + feature_definitions = {} + for fd in fg.feature_definitions: + collection_type = getattr(fd, "collection_type", None) + # Handle Unassigned, empty string, or None as None + if isinstance(collection_type, Unassigned) or collection_type == "" or collection_type is None: + collection_type = None + feature_definitions[fd.feature_name] = { + "FeatureType": fd.feature_type, + "CollectionType": collection_type, + } manager = IngestionManagerPandas( feature_group_name=feature_group_name, @@ -459,3 +473,262 @@ def ingest_dataframe( manager.run(data_frame=data_frame, wait=wait, timeout=timeout) return manager +@_telemetry_emitter(Feature.FEATURE_STORE, "get_feature_group_as_dataframe") +def get_feature_group_as_dataframe( + feature_group_name: str, + athena_bucket: str, + query: str = """SELECT * FROM "sagemaker_featurestore"."#{table}" + WHERE is_deleted=False """, + role: str = None, + region: str = None, + session=None, + event_time_feature_name: str = None, + latest_ingestion: bool = True, + verbose: bool = True, + **kwargs, +) -> DataFrame: + """:class:`sagemaker.feature_store.feature_group.FeatureGroup` as :class:`pandas.DataFrame` + + Examples: + >>> from sagemaker.mlops.feature_store.feature_utils import get_feature_group_as_dataframe + >>> + >>> region = "eu-west-1" + >>> fg_data = get_feature_group_as_dataframe(feature_group_name="feature_group", + >>> athena_bucket="s3://bucket/athena_queries", + >>> region=region, + >>> event_time_feature_name="EventTimeId" + >>> ) + >>> + >>> type(fg_data) + + + Description: + Method to run an athena query over a + :class:`sagemaker.feature_store.feature_group.FeatureGroup` in a Feature Store + to retrieve its data. It needs the :class:`sagemaker.session.Session` linked to a role + or the region and/or role used to work with Feature Stores (it uses the module + `sagemaker.feature_store.feature_utils.get_session_from_role` + to get the session). + + Args: + region (str): region of the target Feature Store + feature_group_name (str): feature store name + query (str): query to run. By default, it will take the latest ingest with data that + wasn't deleted. If latest_ingestion is False it will take all the data + in the feature group that wasn't deleted. It needs to use the keyword + "#{table}" to refer to the FeatureGroup name. e.g.: + 'SELECT * FROM "sagemaker_featurestore"."#{table}"' + It must not end by ';'. + athena_bucket (str): Amazon S3 bucket for running the query + role (str): role to be assumed to extract data from feature store. If not specified + the default sagemaker execution role will be used. + session (str): :class:`sagemaker.session.Session` + of SageMaker used to work with the feature store. Optional, with + role and region parameters it will infer the session. + event_time_feature_name (str): eventTimeId feature. Mandatory only if the + latest ingestion is True. + latest_ingestion (bool): if True it will get the data only from the latest ingestion. + If False it will take whatever is specified in the query, or + if not specify it, it will get all the data that wasn't deleted. + verbose (bool): if True show messages, if False is silent. + **kwargs (object): key arguments used for the method pandas.read_csv to be able to + have a better tuning on data. For more info read: + https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html + Returns: + :class:`pandas.DataFrame`: dataset with the data retrieved from feature group + """ + + logger.setLevel(logging.WARNING) + if verbose: + logger.setLevel(logging.INFO) + + if latest_ingestion: + if event_time_feature_name is not None: + query += str( + f"AND {event_time_feature_name}=(SELECT " + f"MAX({event_time_feature_name}) FROM " + '"sagemaker_featurestore"."#{table}")' + ) + else: + exc = Exception( + "Argument event_time_feature_name must be specified " + "when using latest_ingestion=True." + ) + logger.exception(exc) + raise exc + + query += ";" + + if session is not None: + sagemaker_session = session + elif region is not None: + sagemaker_session = get_session_from_role(region=region, assume_role=role) + else: + exc = Exception("Argument Session or role and region must be specified.") + logger.exception(exc) + raise exc + + msg = f"Feature Group used: {feature_group_name}" + logger.info(msg) + + fg = FeatureGroup(name=feature_group_name, sagemaker_session=sagemaker_session) + + sample_query = fg.athena_query() + query_string = re.sub(r"#\{(table)\}", sample_query.table_name, query) + + msg = f"Running query:\n\t{sample_query} \n\n\t-> Save on bucket {athena_bucket}\n" + logger.info(msg) + + sample_query.run(query_string=query_string, output_location=athena_bucket) + + sample_query.wait() + + # run Athena query. The output is loaded to a Pandas dataframe. + dataset = sample_query.as_dataframe(**kwargs) + + msg = f"Data shape retrieve from {feature_group_name}: {dataset.shape}" + logger.info(msg) + + return dataset + + +def prepare_fg_from_dataframe_or_file( + dataframe_or_path: Union[str, Path, pandas.DataFrame], + feature_group_name: str, + role: str = None, + region: str = None, + session=None, + record_id: str = "record_id", + event_id: str = "data_as_of_date", + verbose: bool = False, + **kwargs, +) -> FeatureGroup: + """Prepares a dataframe to create a :class:`sagemaker.feature_store.feature_group.FeatureGroup` + + Description: + Function to prepare a :class:`pandas.DataFrame` read from a path to a csv file or pass it + directly to create a :class:`sagemaker.feature_store.feature_group.FeatureGroup`. + The path to the file needs proper dtypes, feature names and mandatory features (record_id, + event_id). + It needs the :class:`sagemaker.session.Session` linked to a role + or the region and/or role used to work with Feature Stores (it uses the module + `sagemaker.feature_store.feature_utils.get_session_from_role` + to get the session). + If record_id or event_id are not specified it will create ones + by default with the names 'record_id' and 'data_as_of_date'. + + Args: + feature_group_name (str): feature group name + dataframe_or_path (str, Path, pandas.DataFrame) : pandas.DataFrame or path to the data + verbose (bool) : True for displaying messages, False for silent method. + record_id (str, 'record_id'): (Optional) Feature identifier of the rows. If specified each + value of that feature has to be unique. If not specified or + record_id='record_id', then it will create a new feature from + the index of the pandas.DataFrame. + event_id (str) : (Optional) Feature with the time of the creation of data rows. + If not specified it will create one with the current time + called `data_as_of_date` + role (str) : role used to get the session. + region (str) : region used to get the session. + session (str): session of SageMaker used to work with the feature store + **kwargs (object): key arguments used for the method pandas.read_csv to be able to + have a better tuning on data. For more info read: + https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html + + Returns: + :class:`sagemaker.feature_store.feature_group.FeatureGroup`: + FG prepared with all the methods and definitions properly defined + """ + + logger.setLevel(logging.WARNING) + if verbose: + logger.setLevel(logging.INFO) + + if isinstance(dataframe_or_path, DataFrame): + data = dataframe_or_path + elif isinstance(dataframe_or_path, str): + kwargs.pop("filepath_or_buffer", None) + data = read_csv(filepath_or_buffer=dataframe_or_path, **kwargs) + else: + exc = Exception( + str( + f"Invalid type {type(dataframe_or_path)} for " + "argument dataframe_or_path. \nParameter must be" + " of type pandas.DataFrame or string" + ) + ) + logger.exception(exc) + raise exc + + # Formatting cols + data = _format_column_names(data=data) + data = _cast_object_to_string(data_frame=data) + + if record_id == "record_id" and record_id not in data.columns: + data[record_id] = data.index + + lg_uniq = len(data[record_id].unique()) + lg_id = len(data[record_id]) + + if lg_id != lg_uniq: + exc = Exception( + str( + f"Record identifier {record_id} have {abs(lg_id - lg_uniq)} " + "duplicated rows. \nRecord identifier must be unique" + " in each row." + ) + ) + logger.exception(exc) + raise exc + + if event_id not in data.columns: + current_time_sec = int(time.time() + 0.5) + data[event_id] = Series([current_time_sec] * lg_id, dtype="float64") + + if session is not None: + sagemaker_session = session + elif role is not None and region is not None: + sagemaker_session = get_session_from_role(region=region) + else: + exc = Exception("Argument Session or role and region must be specified.") + logger.exception(exc) + raise exc + + feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=sagemaker_session) + + feature_group.load_feature_definitions(data_frame=data) + + return feature_group + + +def _format_column_names(data: pandas.DataFrame) -> pandas.DataFrame: + """Formats the column names for :class:`sagemaker.feature_store.feature_group.FeatureGroup` + + Description: + Module to format correctly the name of the columns of a DataFrame + to later generate the features names of a Feature Group + + Args: + data (:class:`pandas.DataFrame`): dataframe used + + Returns: + :class:`pandas.DataFrame` + """ + data.rename(columns=lambda x: x.replace(" ", "_").replace(".", "").lower()[:62], inplace=True) + return data + +def _cast_object_to_string(data_frame: pandas.DataFrame) -> pandas.DataFrame: + """Cast properly pandas object types to strings + + Description: + Method to convert 'object' and 'O' column dtypes of a pandas.DataFrame to + a valid string type recognized by Feature Groups. + + Args: + data_frame: dataframe used + Returns: + pandas.DataFrame + """ + for label in data_frame.select_dtypes(["object", "O"]).columns.tolist(): + data_frame[label] = data_frame[label].astype("str").astype("string") + return data_frame \ No newline at end of file diff --git a/sagemaker-mlops/src/sagemaker/mlops/feature_store/ingestion_manager_pandas.py b/sagemaker-mlops/src/sagemaker/mlops/feature_store/ingestion_manager_pandas.py index 4d7b4e5375..b2b7ae5085 100644 --- a/sagemaker-mlops/src/sagemaker/mlops/feature_store/ingestion_manager_pandas.py +++ b/sagemaker-mlops/src/sagemaker/mlops/feature_store/ingestion_manager_pandas.py @@ -15,6 +15,8 @@ from sagemaker.core.resources import FeatureGroup as CoreFeatureGroup from sagemaker.core.shapes import FeatureValue +from sagemaker.core.utils.utils import Unassigned +from sagemaker.core.telemetry import Feature, _telemetry_emitter logger = logging.getLogger(__name__) @@ -66,6 +68,7 @@ def failed_rows(self) -> List[int]: """ return self._failed_indices + @_telemetry_emitter(Feature.FEATURE_STORE, "IngestionManagerPandas.run") def run( self, data_frame: DataFrame, @@ -82,7 +85,17 @@ def run( wait (bool): whether to wait for the ingestion to finish or not. timeout (Union[int, float]): ``concurrent.futures.TimeoutError`` will be raised if timeout is reached. + + Raises: + ValueError: If wait=False with max_workers=1 and max_processes=1. """ + # Validate async ingestion requirements + if not wait and self.max_workers == 1 and self.max_processes == 1: + raise ValueError( + "Async ingestion (wait=False) requires max_processes > 1 or max_workers > 1. " + "Single-threaded ingestion only supports synchronous mode (wait=True)." + ) + if self.max_workers == 1 and self.max_processes == 1: self._run_single_process_single_thread(data_frame=data_frame, target_stores=target_stores) else: @@ -293,7 +306,10 @@ def _is_feature_collection_type( """Check if the feature is a collection type.""" feature_def = feature_definitions.get(feature_name) if feature_def: - return feature_def.get("CollectionType") is not None + collection_type = feature_def.get("CollectionType") + if isinstance(collection_type, Unassigned) or collection_type is None or collection_type == "": + return False + return True return False @staticmethod diff --git a/sagemaker-mlops/tests/integ/test_feature_store.py b/sagemaker-mlops/tests/integ/test_feature_store.py new file mode 100644 index 0000000000..0dadd579dd --- /dev/null +++ b/sagemaker-mlops/tests/integ/test_feature_store.py @@ -0,0 +1,408 @@ +"""Integration tests for sagemaker.mlops.feature_store.""" +import time +import pytest +import pandas as pd +import boto3 + +from sagemaker.core.helper.session_helper import Session, get_execution_role +from sagemaker.mlops.feature_store import ( + FeatureGroup, + OfflineStoreConfig, + OnlineStoreConfig, + S3StorageConfig, +) +from sagemaker.mlops.feature_store.feature_utils import ( + load_feature_definitions_from_dataframe, + ingest_dataframe, + create_athena_query, +) +from sagemaker.mlops.feature_store.dataset_builder import DatasetBuilder +from sagemaker.core.utils import unique_name_from_base +from sagemaker.core.resources import FeatureGroup as CoreFeatureGroup + + +@pytest.fixture(scope="module") +def sagemaker_session(): + return Session() + + +@pytest.fixture(scope="module") +def role(): + return get_execution_role() + + +@pytest.fixture(scope="module") +def region(): + return boto3.Session().region_name + + +@pytest.fixture(scope="module") +def bucket(sagemaker_session): + return sagemaker_session.default_bucket() + + +@pytest.fixture +def feature_group_name(): + return unique_name_from_base("integ-test-fg") + + +@pytest.fixture +def sample_dataframe(): + """Create sample DataFrame for testing.""" + current_time = int(time.time()) + return pd.DataFrame({ + "record_id": [f"id-{i}" for i in range(10)], + "feature_1": [i * 1.5 for i in range(10)], + "feature_2": [i * 2 for i in range(10)], + "event_time": [float(current_time + i) for i in range(10)], + }) + + +def cleanup_feature_group(feature_group_name): + """Helper to cleanup feature group.""" + try: + fg = FeatureGroup.get(feature_group_name=feature_group_name) + fg.delete() + time.sleep(2) + except Exception: + pass + + +# Test 1: Create FeatureGroup with both online and offline stores +def test_create_feature_group_with_both_stores( + feature_group_name, sample_dataframe, bucket, role, region +): + """Test creating a FeatureGroup with both online and offline stores.""" + try: + feature_definitions = load_feature_definitions_from_dataframe(sample_dataframe) + + fg = FeatureGroup.create( + feature_group_name=feature_group_name, + record_identifier_feature_name="record_id", + event_time_feature_name="event_time", + feature_definitions=feature_definitions, + role_arn=role, + online_store_config=OnlineStoreConfig(enable_online_store=True), + offline_store_config=OfflineStoreConfig( + s3_storage_config=S3StorageConfig(s3_uri=f"s3://{bucket}/feature-store"), + ), + ) + + assert fg.feature_group_name == feature_group_name + assert fg.online_store_config is not None + assert fg.offline_store_config is not None + + time.sleep(5) + + retrieved_fg = FeatureGroup.get(feature_group_name=feature_group_name) + assert retrieved_fg.feature_group_name == feature_group_name + + finally: + cleanup_feature_group(feature_group_name) + + +# Test 2: Ingest DataFrame and retrieve from online store +def test_ingest_and_retrieve_from_online_store( + feature_group_name, sample_dataframe, bucket, role +): + """Test ingesting data and retrieving from online store.""" + try: + feature_definitions = load_feature_definitions_from_dataframe(sample_dataframe) + + fg = FeatureGroup.create( + feature_group_name=feature_group_name, + record_identifier_feature_name="record_id", + event_time_feature_name="event_time", + feature_definitions=feature_definitions, + role_arn=role, + online_store_config=OnlineStoreConfig(enable_online_store=True), + ) + + # Wait for FeatureGroup to become active + fg.wait_for_status("Created") + + ingest_dataframe( + feature_group_name=feature_group_name, + data_frame=sample_dataframe, + max_workers=1, + max_processes=1, + ) + + time.sleep(15) + + record = fg.get_record(record_identifier_value_as_string="id-0") + assert record is not None + assert len(record.record) > 0 + + finally: + cleanup_feature_group(feature_group_name) + + +# Test 3: Delete FeatureGroup +def test_delete_feature_group(feature_group_name, sample_dataframe, bucket, role): + """Test deleting a FeatureGroup.""" + feature_definitions = load_feature_definitions_from_dataframe(sample_dataframe) + + fg = FeatureGroup.create( + feature_group_name=feature_group_name, + record_identifier_feature_name="record_id", + event_time_feature_name="event_time", + feature_definitions=feature_definitions, + role_arn=role, + online_store_config=OnlineStoreConfig(enable_online_store=True), + ) + + fg.wait_for_status("Created") + + fg.delete() + time.sleep(2) + + with pytest.raises(Exception): + FeatureGroup.get(feature_group_name=feature_group_name) + + +# Test 7: Ingest to both OnlineStore and OfflineStore +def test_ingest_to_both_stores(feature_group_name, sample_dataframe, bucket, role): + """Test ingesting data to both online and offline stores.""" + try: + feature_definitions = load_feature_definitions_from_dataframe(sample_dataframe) + + fg = FeatureGroup.create( + feature_group_name=feature_group_name, + record_identifier_feature_name="record_id", + event_time_feature_name="event_time", + feature_definitions=feature_definitions, + role_arn=role, + online_store_config=OnlineStoreConfig(enable_online_store=True), + offline_store_config=OfflineStoreConfig( + s3_storage_config=S3StorageConfig(s3_uri=f"s3://{bucket}/feature-store"), + ), + ) + + # Wait for FeatureGroup to become active + fg.wait_for_status("Created") + + ingest_dataframe( + feature_group_name=feature_group_name, + data_frame=sample_dataframe, + max_workers=1, + max_processes=1, + ) + + time.sleep(15) + + record = fg.get_record(record_identifier_value_as_string="id-0") + assert record is not None + + finally: + cleanup_feature_group(feature_group_name) + + +# Test 8: Query offline store with Athena and return DataFrame +def test_query_offline_store_with_athena( + feature_group_name, sample_dataframe, bucket, role, sagemaker_session +): + """Test querying offline store with Athena.""" + try: + feature_definitions = load_feature_definitions_from_dataframe(sample_dataframe) + + fg = FeatureGroup.create( + feature_group_name=feature_group_name, + record_identifier_feature_name="record_id", + event_time_feature_name="event_time", + feature_definitions=feature_definitions, + role_arn=role, + offline_store_config=OfflineStoreConfig( + s3_storage_config=S3StorageConfig(s3_uri=f"s3://{bucket}/feature-store"), + ), + ) + + fg.wait_for_status("Created") + + ingest_dataframe( + feature_group_name=feature_group_name, + data_frame=sample_dataframe, + max_workers=1, + max_processes=1, + ) + + time.sleep(300) + + # Note: Offline store sync can take 15+ minutes, test may return empty results + athena_query = create_athena_query(feature_group_name, sagemaker_session) + query_string = f'SELECT * FROM "{athena_query.database}"."{athena_query.table_name}" LIMIT 10' + output_location = f"s3://{bucket}/athena-results/" + + query_id = athena_query.run(query_string, output_location) + assert query_id is not None + + athena_query.wait() + df = athena_query.as_dataframe() + + assert df is not None + + finally: + cleanup_feature_group(feature_group_name) + + +# Test 9: Query with WHERE conditions and aggregations +def test_query_with_conditions_and_aggregations( + feature_group_name, sample_dataframe, bucket, role, sagemaker_session +): + """Test Athena queries with WHERE and aggregations.""" + try: + feature_definitions = load_feature_definitions_from_dataframe(sample_dataframe) + + fg = FeatureGroup.create( + feature_group_name=feature_group_name, + record_identifier_feature_name="record_id", + event_time_feature_name="event_time", + feature_definitions=feature_definitions, + role_arn=role, + offline_store_config=OfflineStoreConfig( + s3_storage_config=S3StorageConfig(s3_uri=f"s3://{bucket}/feature-store"), + ), + ) + + fg.wait_for_status("Created") + + ingest_dataframe( + feature_group_name=feature_group_name, + data_frame=sample_dataframe, + max_workers=1, + max_processes=1, + ) + + time.sleep(300) + + athena_query = create_athena_query(feature_group_name, sagemaker_session) + query_string = f""" + SELECT COUNT(*) as count, AVG(feature_1) as avg_feature + FROM "{athena_query.database}"."{athena_query.table_name}" + WHERE feature_2 > 5 + """ + output_location = f"s3://{bucket}/athena-results/" + + athena_query.run(query_string, output_location) + athena_query.wait() + df = athena_query.as_dataframe() + + assert df is not None + + finally: + cleanup_feature_group(feature_group_name) + + + +# Test 11: Create dataset from single FeatureGroup +def test_create_dataset_from_single_feature_group( + feature_group_name, sample_dataframe, bucket, role, sagemaker_session +): + """Test creating a dataset from a single FeatureGroup.""" + try: + feature_definitions = load_feature_definitions_from_dataframe(sample_dataframe) + + fg = FeatureGroup.create( + feature_group_name=feature_group_name, + record_identifier_feature_name="record_id", + event_time_feature_name="event_time", + feature_definitions=feature_definitions, + role_arn=role, + offline_store_config=OfflineStoreConfig( + s3_storage_config=S3StorageConfig(s3_uri=f"s3://{bucket}/feature-store"), + ), + ) + + fg.wait_for_status("Created") + + ingest_dataframe( + feature_group_name=feature_group_name, + data_frame=sample_dataframe, + max_workers=1, + max_processes=1, + ) + + time.sleep(300) + + output_path = f"s3://{bucket}/dataset-output/" + builder = DatasetBuilder.create( + base=fg, + output_path=output_path, + session=sagemaker_session, + ) + + df, query = builder.to_dataframe() + + assert df is not None + assert query is not None + assert "SELECT" in query + + finally: + cleanup_feature_group(feature_group_name) + + +# Test 15: Export dataset with deleted/duplicated records handling +def test_export_dataset_with_record_handling( + feature_group_name, sample_dataframe, bucket, role, sagemaker_session +): + """Test exporting dataset with options for deleted and duplicated records.""" + try: + feature_definitions = load_feature_definitions_from_dataframe(sample_dataframe) + + fg = FeatureGroup.create( + feature_group_name=feature_group_name, + record_identifier_feature_name="record_id", + event_time_feature_name="event_time", + feature_definitions=feature_definitions, + role_arn=role, + offline_store_config=OfflineStoreConfig( + s3_storage_config=S3StorageConfig(s3_uri=f"s3://{bucket}/feature-store"), + ), + ) + + fg.wait_for_status("Created") + + ingest_dataframe( + feature_group_name=feature_group_name, + data_frame=sample_dataframe, + max_workers=1, + max_processes=1, + ) + + updated_df = sample_dataframe.copy() + updated_df["feature_1"] = updated_df["feature_1"] * 2 + updated_df["event_time"] = updated_df["event_time"] + 100 + + ingest_dataframe( + feature_group_name=feature_group_name, + data_frame=updated_df, + max_workers=1, + max_processes=1, + ) + + time.sleep(300) + + output_path = f"s3://{bucket}/dataset-output/" + + builder = DatasetBuilder.create( + base=fg, + output_path=output_path, + session=sagemaker_session, + ) + builder.include_duplicated_records() + + df_with_dups, _ = builder.to_dataframe() + assert df_with_dups is not None + + builder2 = DatasetBuilder.create( + base=fg, + output_path=output_path, + session=sagemaker_session, + ) + builder2.with_number_of_recent_records_by_record_identifier(1) + + df_recent, _ = builder2.to_dataframe() + assert df_recent is not None + + finally: + cleanup_feature_group(feature_group_name) diff --git a/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/test_athena_query.py b/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/test_athena_query.py index 2fed784208..5085ef9613 100644 --- a/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/test_athena_query.py +++ b/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/test_athena_query.py @@ -15,6 +15,7 @@ def mock_session(self): session = Mock() session.boto_session.client.return_value = Mock() session.boto_region_name = "us-west-2" + session.sagemaker_config = {} return session @pytest.fixture diff --git a/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/test_dataset_builder.py b/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/test_dataset_builder.py index 254fb0e196..4cd2e47e08 100644 --- a/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/test_dataset_builder.py +++ b/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/test_dataset_builder.py @@ -332,7 +332,9 @@ def test_create_with_dataframe_requires_identifiers(self, mock_session): class TestDatasetBuilderValidation: @pytest.fixture def mock_session(self): - return Mock() + session = Mock() + session.sagemaker_config = {} + return session def test_to_csv_raises_for_invalid_base(self, mock_session): builder = DatasetBuilder( diff --git a/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/test_feature_utils.py b/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/test_feature_utils.py index a9d5408bf6..91098247a5 100644 --- a/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/test_feature_utils.py +++ b/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/test_feature_utils.py @@ -200,3 +200,519 @@ def test_assumes_role_when_provided(self, mock_session_class, mock_boto3): mock_sts.assume_role.assert_called_once() assert mock_boto3.Session.call_count == 2 # Initial + after assume + + +class TestGetFeatureGroupAsDataframe: + @patch("sagemaker.mlops.feature_store.feature_utils.FeatureGroup") + @patch("sagemaker.mlops.feature_store.feature_utils.get_session_from_role") + def test_with_session_provided(self, mock_get_session, mock_fg_class): + mock_session = MagicMock() + mock_fg = MagicMock() + mock_athena_query = MagicMock() + mock_athena_query.table_name = "my_table" + mock_athena_query.as_dataframe.return_value = pd.DataFrame({"id": [1, 2]}) + mock_fg.athena_query.return_value = mock_athena_query + mock_fg_class.return_value = mock_fg + + from sagemaker.mlops.feature_store.feature_utils import get_feature_group_as_dataframe + + result = get_feature_group_as_dataframe( + feature_group_name="test-fg", + athena_bucket="s3://bucket/path", + session=mock_session, + latest_ingestion=False, + verbose=False, + ) + + mock_fg_class.assert_called_once_with(name="test-fg", sagemaker_session=mock_session) + mock_get_session.assert_not_called() + mock_athena_query.run.assert_called_once() + mock_athena_query.wait.assert_called_once() + assert len(result) == 2 + + @patch("sagemaker.mlops.feature_store.feature_utils.FeatureGroup") + @patch("sagemaker.mlops.feature_store.feature_utils.get_session_from_role") + def test_with_region_provided(self, mock_get_session, mock_fg_class): + mock_session = MagicMock() + mock_get_session.return_value = mock_session + mock_fg = MagicMock() + mock_athena_query = MagicMock() + mock_athena_query.table_name = "my_table" + mock_athena_query.as_dataframe.return_value = pd.DataFrame({"id": [1]}) + mock_fg.athena_query.return_value = mock_athena_query + mock_fg_class.return_value = mock_fg + + from sagemaker.mlops.feature_store.feature_utils import get_feature_group_as_dataframe + + result = get_feature_group_as_dataframe( + feature_group_name="test-fg", + athena_bucket="s3://bucket/path", + region="us-east-1", + latest_ingestion=False, + ) + + mock_get_session.assert_called_once_with(region="us-east-1", assume_role=None) + assert len(result) == 1 + + @patch("sagemaker.mlops.feature_store.feature_utils.FeatureGroup") + @patch("sagemaker.mlops.feature_store.feature_utils.get_session_from_role") + def test_with_region_and_role(self, mock_get_session, mock_fg_class): + mock_session = MagicMock() + mock_get_session.return_value = mock_session + mock_fg = MagicMock() + mock_athena_query = MagicMock() + mock_athena_query.table_name = "my_table" + mock_athena_query.as_dataframe.return_value = pd.DataFrame({"id": [1]}) + mock_fg.athena_query.return_value = mock_athena_query + mock_fg_class.return_value = mock_fg + + from sagemaker.mlops.feature_store.feature_utils import get_feature_group_as_dataframe + + result = get_feature_group_as_dataframe( + feature_group_name="test-fg", + athena_bucket="s3://bucket/path", + region="us-east-1", + role="arn:aws:iam::123:role/MyRole", + latest_ingestion=False, + ) + + mock_get_session.assert_called_once_with(region="us-east-1", assume_role="arn:aws:iam::123:role/MyRole") + + def test_raises_when_no_session_or_region(self): + from sagemaker.mlops.feature_store.feature_utils import get_feature_group_as_dataframe + + with pytest.raises(Exception, match="Session or role and region must be specified"): + get_feature_group_as_dataframe( + feature_group_name="test-fg", + athena_bucket="s3://bucket/path", + latest_ingestion=False, + ) + + @patch("sagemaker.mlops.feature_store.feature_utils.FeatureGroup") + def test_raises_when_latest_ingestion_without_event_time(self, mock_fg_class): + mock_session = MagicMock() + + from sagemaker.mlops.feature_store.feature_utils import get_feature_group_as_dataframe + + with pytest.raises(Exception, match="event_time_feature_name must be specified"): + get_feature_group_as_dataframe( + feature_group_name="test-fg", + athena_bucket="s3://bucket/path", + session=mock_session, + latest_ingestion=True, + event_time_feature_name=None, + ) + + @patch("sagemaker.mlops.feature_store.feature_utils.FeatureGroup") + def test_with_latest_ingestion_and_event_time(self, mock_fg_class): + mock_session = MagicMock() + mock_fg = MagicMock() + mock_athena_query = MagicMock() + mock_athena_query.table_name = "my_table" + mock_athena_query.as_dataframe.return_value = pd.DataFrame({"id": [1, 2], "event_time": [123, 123]}) + mock_fg.athena_query.return_value = mock_athena_query + mock_fg_class.return_value = mock_fg + + from sagemaker.mlops.feature_store.feature_utils import get_feature_group_as_dataframe + + result = get_feature_group_as_dataframe( + feature_group_name="test-fg", + athena_bucket="s3://bucket/path", + session=mock_session, + latest_ingestion=True, + event_time_feature_name="event_time", + verbose=False, + ) + + call_args = mock_athena_query.run.call_args + query_string = call_args[1]["query_string"] + assert "event_time=(SELECT MAX(event_time)" in query_string + assert len(result) == 2 + + @patch("sagemaker.mlops.feature_store.feature_utils.FeatureGroup") + def test_custom_query_with_table_placeholder(self, mock_fg_class): + mock_session = MagicMock() + mock_fg = MagicMock() + mock_athena_query = MagicMock() + mock_athena_query.table_name = "actual_table_name" + mock_athena_query.as_dataframe.return_value = pd.DataFrame({"id": [1]}) + mock_fg.athena_query.return_value = mock_athena_query + mock_fg_class.return_value = mock_fg + + from sagemaker.mlops.feature_store.feature_utils import get_feature_group_as_dataframe + + result = get_feature_group_as_dataframe( + feature_group_name="test-fg", + athena_bucket="s3://bucket/path", + session=mock_session, + query='SELECT * FROM "sagemaker_featurestore"."#{table}" WHERE id > 0 ', + latest_ingestion=False, + ) + + call_args = mock_athena_query.run.call_args + query_string = call_args[1]["query_string"] + assert "actual_table_name" in query_string + assert "#{table}" not in query_string + assert query_string.endswith(";") + + @patch("sagemaker.mlops.feature_store.feature_utils.FeatureGroup") + def test_verbose_logging(self, mock_fg_class): + mock_session = MagicMock() + mock_fg = MagicMock() + mock_athena_query = MagicMock() + mock_athena_query.table_name = "my_table" + mock_athena_query.as_dataframe.return_value = pd.DataFrame({"id": [1]}) + mock_fg.athena_query.return_value = mock_athena_query + mock_fg_class.return_value = mock_fg + + from sagemaker.mlops.feature_store.feature_utils import get_feature_group_as_dataframe + import logging + + with patch("sagemaker.mlops.feature_store.feature_utils.logger") as mock_logger: + get_feature_group_as_dataframe( + feature_group_name="test-fg", + athena_bucket="s3://bucket/path", + session=mock_session, + latest_ingestion=False, + verbose=True, + ) + + mock_logger.setLevel.assert_called_with(logging.INFO) + assert mock_logger.info.call_count >= 1 + + @patch("sagemaker.mlops.feature_store.feature_utils.FeatureGroup") + def test_silent_mode(self, mock_fg_class): + mock_session = MagicMock() + mock_fg = MagicMock() + mock_athena_query = MagicMock() + mock_athena_query.table_name = "my_table" + mock_athena_query.as_dataframe.return_value = pd.DataFrame({"id": [1]}) + mock_fg.athena_query.return_value = mock_athena_query + mock_fg_class.return_value = mock_fg + + from sagemaker.mlops.feature_store.feature_utils import get_feature_group_as_dataframe + import logging + + with patch("sagemaker.mlops.feature_store.feature_utils.logger") as mock_logger: + get_feature_group_as_dataframe( + feature_group_name="test-fg", + athena_bucket="s3://bucket/path", + session=mock_session, + latest_ingestion=False, + verbose=False, + ) + + mock_logger.setLevel.assert_called_with(logging.WARNING) + + @patch("sagemaker.mlops.feature_store.feature_utils.FeatureGroup") + def test_passes_kwargs_to_as_dataframe(self, mock_fg_class): + mock_session = MagicMock() + mock_fg = MagicMock() + mock_athena_query = MagicMock() + mock_athena_query.table_name = "my_table" + mock_athena_query.as_dataframe.return_value = pd.DataFrame({"id": [1]}) + mock_fg.athena_query.return_value = mock_athena_query + mock_fg_class.return_value = mock_fg + + from sagemaker.mlops.feature_store.feature_utils import get_feature_group_as_dataframe + + get_feature_group_as_dataframe( + feature_group_name="test-fg", + athena_bucket="s3://bucket/path", + session=mock_session, + latest_ingestion=False, + dtype={"id": "int32"}, + na_values=["NA"], + ) + + mock_athena_query.as_dataframe.assert_called_once_with(dtype={"id": "int32"}, na_values=["NA"]) + + +class TestPrepareFgFromDataframeOrFile: + @patch("sagemaker.mlops.feature_store.feature_utils.FeatureGroup") + def test_with_dataframe_and_session(self, mock_fg_class): + mock_session = MagicMock() + mock_fg = MagicMock() + mock_fg_class.return_value = mock_fg + + df = pd.DataFrame({ + "id": [1, 2, 3], + "value": [1.1, 2.2, 3.3], + }) + + from sagemaker.mlops.feature_store.feature_utils import prepare_fg_from_dataframe_or_file + + result = prepare_fg_from_dataframe_or_file( + dataframe_or_path=df, + feature_group_name="test-fg", + session=mock_session, + verbose=False, + ) + + mock_fg_class.assert_called_once() + assert result == mock_fg + assert "record_id" in df.columns + assert "data_as_of_date" in df.columns + + @patch("sagemaker.mlops.feature_store.feature_utils.FeatureGroup") + @patch("sagemaker.mlops.feature_store.feature_utils.read_csv") + def test_with_file_path(self, mock_read_csv, mock_fg_class): + mock_session = MagicMock() + mock_fg = MagicMock() + mock_fg_class.return_value = mock_fg + + df = pd.DataFrame({"id": [1, 2], "value": [1.1, 2.2]}) + mock_read_csv.return_value = df + + from sagemaker.mlops.feature_store.feature_utils import prepare_fg_from_dataframe_or_file + + result = prepare_fg_from_dataframe_or_file( + dataframe_or_path="/path/to/file.csv", + feature_group_name="test-fg", + session=mock_session, + ) + + mock_read_csv.assert_called_once() + assert result == mock_fg + + @patch("sagemaker.mlops.feature_store.feature_utils.FeatureGroup") + @patch("sagemaker.mlops.feature_store.feature_utils.get_session_from_role") + def test_with_region_and_role(self, mock_get_session, mock_fg_class): + mock_session = MagicMock() + mock_get_session.return_value = mock_session + mock_fg = MagicMock() + mock_fg_class.return_value = mock_fg + + df = pd.DataFrame({"id": [1, 2]}) + + from sagemaker.mlops.feature_store.feature_utils import prepare_fg_from_dataframe_or_file + + prepare_fg_from_dataframe_or_file( + dataframe_or_path=df, + feature_group_name="test-fg", + region="us-east-1", + role="arn:aws:iam::123:role/MyRole", + ) + + mock_get_session.assert_called_once_with(region="us-east-1") + + def test_raises_on_invalid_type(self): + from sagemaker.mlops.feature_store.feature_utils import prepare_fg_from_dataframe_or_file + + with pytest.raises(Exception, match="Invalid type"): + prepare_fg_from_dataframe_or_file( + dataframe_or_path=123, + feature_group_name="test-fg", + session=MagicMock(), + ) + + def test_raises_when_no_session_or_region(self): + from sagemaker.mlops.feature_store.feature_utils import prepare_fg_from_dataframe_or_file + + df = pd.DataFrame({"id": [1, 2]}) + + with pytest.raises(Exception, match="Session or role and region must be specified"): + prepare_fg_from_dataframe_or_file( + dataframe_or_path=df, + feature_group_name="test-fg", + ) + + @patch("sagemaker.mlops.feature_store.feature_utils.FeatureGroup") + def test_creates_record_id_from_index(self, mock_fg_class): + mock_session = MagicMock() + mock_fg = MagicMock() + mock_fg_class.return_value = mock_fg + + df = pd.DataFrame({"value": [1.1, 2.2, 3.3]}) + + from sagemaker.mlops.feature_store.feature_utils import prepare_fg_from_dataframe_or_file + + prepare_fg_from_dataframe_or_file( + dataframe_or_path=df, + feature_group_name="test-fg", + session=mock_session, + ) + + assert "record_id" in df.columns + assert list(df["record_id"]) == [0, 1, 2] + + @patch("sagemaker.mlops.feature_store.feature_utils.FeatureGroup") + def test_uses_existing_record_id(self, mock_fg_class): + mock_session = MagicMock() + mock_fg = MagicMock() + mock_fg_class.return_value = mock_fg + + df = pd.DataFrame({"my_id": [10, 20, 30], "value": [1.1, 2.2, 3.3]}) + + from sagemaker.mlops.feature_store.feature_utils import prepare_fg_from_dataframe_or_file + + prepare_fg_from_dataframe_or_file( + dataframe_or_path=df, + feature_group_name="test-fg", + session=mock_session, + record_id="my_id", + ) + + assert "my_id" in df.columns + assert "record_id" not in df.columns + + @patch("sagemaker.mlops.feature_store.feature_utils.FeatureGroup") + def test_raises_on_duplicate_record_ids(self, mock_fg_class): + mock_session = MagicMock() + + df = pd.DataFrame({"my_id": [1, 1, 2], "value": [1.1, 2.2, 3.3]}) + + from sagemaker.mlops.feature_store.feature_utils import prepare_fg_from_dataframe_or_file + + with pytest.raises(Exception, match="duplicated rows"): + prepare_fg_from_dataframe_or_file( + dataframe_or_path=df, + feature_group_name="test-fg", + session=mock_session, + record_id="my_id", + ) + + @patch("sagemaker.mlops.feature_store.feature_utils.FeatureGroup") + @patch("sagemaker.mlops.feature_store.feature_utils.time") + def test_creates_event_id_with_timestamp(self, mock_time, mock_fg_class): + mock_session = MagicMock() + mock_fg = MagicMock() + mock_fg_class.return_value = mock_fg + mock_time.time.return_value = 1234567890.5 + + df = pd.DataFrame({"id": [1, 2]}) + + from sagemaker.mlops.feature_store.feature_utils import prepare_fg_from_dataframe_or_file + + prepare_fg_from_dataframe_or_file( + dataframe_or_path=df, + feature_group_name="test-fg", + session=mock_session, + ) + + assert "data_as_of_date" in df.columns + assert all(df["data_as_of_date"] == 1234567891.0) + + @patch("sagemaker.mlops.feature_store.feature_utils.FeatureGroup") + def test_uses_existing_event_id(self, mock_fg_class): + mock_session = MagicMock() + mock_fg = MagicMock() + mock_fg_class.return_value = mock_fg + + df = pd.DataFrame({"id": [1, 2], "timestamp": [100, 200]}) + + from sagemaker.mlops.feature_store.feature_utils import prepare_fg_from_dataframe_or_file + + prepare_fg_from_dataframe_or_file( + dataframe_or_path=df, + feature_group_name="test-fg", + session=mock_session, + event_id="timestamp", + ) + + assert "timestamp" in df.columns + assert "data_as_of_date" not in df.columns + + @patch("sagemaker.mlops.feature_store.feature_utils.FeatureGroup") + def test_formats_column_names(self, mock_fg_class): + mock_session = MagicMock() + mock_fg = MagicMock() + mock_fg_class.return_value = mock_fg + + df = pd.DataFrame({"My Column": [1, 2], "Value.Test": [1.1, 2.2]}) + + from sagemaker.mlops.feature_store.feature_utils import prepare_fg_from_dataframe_or_file + + prepare_fg_from_dataframe_or_file( + dataframe_or_path=df, + feature_group_name="test-fg", + session=mock_session, + ) + + assert "my_column" in df.columns + assert "valuetest" in df.columns + + @patch("sagemaker.mlops.feature_store.feature_utils.FeatureGroup") + def test_verbose_logging(self, mock_fg_class): + mock_session = MagicMock() + mock_fg = MagicMock() + mock_fg_class.return_value = mock_fg + + df = pd.DataFrame({"id": [1, 2]}) + + from sagemaker.mlops.feature_store.feature_utils import prepare_fg_from_dataframe_or_file + import logging + + with patch("sagemaker.mlops.feature_store.feature_utils.logger") as mock_logger: + prepare_fg_from_dataframe_or_file( + dataframe_or_path=df, + feature_group_name="test-fg", + session=mock_session, + verbose=True, + ) + + mock_logger.setLevel.assert_called_with(logging.INFO) + + @patch("sagemaker.mlops.feature_store.feature_utils.FeatureGroup") + def test_silent_mode(self, mock_fg_class): + mock_session = MagicMock() + mock_fg = MagicMock() + mock_fg_class.return_value = mock_fg + + df = pd.DataFrame({"id": [1, 2]}) + + from sagemaker.mlops.feature_store.feature_utils import prepare_fg_from_dataframe_or_file + import logging + + with patch("sagemaker.mlops.feature_store.feature_utils.logger") as mock_logger: + prepare_fg_from_dataframe_or_file( + dataframe_or_path=df, + feature_group_name="test-fg", + session=mock_session, + verbose=False, + ) + + mock_logger.setLevel.assert_called_with(logging.WARNING) + + @patch("sagemaker.mlops.feature_store.feature_utils.FeatureGroup") + @patch("sagemaker.mlops.feature_store.feature_utils.read_csv") + def test_passes_kwargs_to_read_csv(self, mock_read_csv, mock_fg_class): + mock_session = MagicMock() + mock_fg = MagicMock() + mock_fg_class.return_value = mock_fg + + df = pd.DataFrame({"id": [1, 2]}) + mock_read_csv.return_value = df + + from sagemaker.mlops.feature_store.feature_utils import prepare_fg_from_dataframe_or_file + + prepare_fg_from_dataframe_or_file( + dataframe_or_path="/path/to/file.csv", + feature_group_name="test-fg", + session=mock_session, + sep=";", + encoding="utf-8", + ) + + mock_read_csv.assert_called_once() + call_kwargs = mock_read_csv.call_args[1] + assert call_kwargs["sep"] == ";" + assert call_kwargs["encoding"] == "utf-8" + + @patch("sagemaker.mlops.feature_store.feature_utils.FeatureGroup") + def test_calls_load_feature_definitions(self, mock_fg_class): + mock_session = MagicMock() + mock_fg = MagicMock() + mock_fg_class.return_value = mock_fg + + df = pd.DataFrame({"id": [1, 2]}) + + from sagemaker.mlops.feature_store.feature_utils import prepare_fg_from_dataframe_or_file + + prepare_fg_from_dataframe_or_file( + dataframe_or_path=df, + feature_group_name="test-fg", + session=mock_session, + ) + + mock_fg.load_feature_definitions.assert_called_once() diff --git a/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/test_ingestion_manager_pandas.py b/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/test_ingestion_manager_pandas.py index 2ecf495967..46cdef158e 100644 --- a/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/test_ingestion_manager_pandas.py +++ b/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/test_ingestion_manager_pandas.py @@ -254,3 +254,70 @@ def test_ingest_row_skips_none_values(self, feature_definitions): # Only id should be in record, name is None assert len(record) == 1 assert record[0].feature_name == "id" + + +class TestAsyncIngestionValidation: + """Test async ingestion validation with max_processes=1. + + Bug fix: Error message unclear when trying to use async ingestion with 1 process. + """ + + def test_async_with_single_process_single_worker_raises_clear_error(self): + """Test that wait=False with max_processes=1 and max_workers=1 raises clear error.""" + manager = IngestionManagerPandas( + feature_group_name="test-fg", + feature_definitions={"id": {"FeatureType": "String", "CollectionType": None}}, + max_workers=1, + max_processes=1, + ) + + df = pd.DataFrame({"id": ["1", "2", "3"]}) + + with pytest.raises(ValueError) as exc_info: + manager.run(data_frame=df, wait=False) + + error_message = str(exc_info.value) + assert "Async ingestion (wait=False)" in error_message + assert "max_processes > 1 or max_workers > 1" in error_message + assert "wait=True" in error_message + + @patch("sagemaker.mlops.feature_store.ingestion_manager_pandas.CoreFeatureGroup") + def test_sync_with_single_process_single_worker_works(self, mock_fg_class): + """Test that wait=True with max_processes=1 and max_workers=1 works.""" + mock_fg = Mock() + mock_fg_class.return_value = mock_fg + + manager = IngestionManagerPandas( + feature_group_name="test-fg", + feature_definitions={"id": {"FeatureType": "String", "CollectionType": None}}, + max_workers=1, + max_processes=1, + ) + + df = pd.DataFrame({"id": ["1", "2", "3"]}) + + # Should not raise validation error + manager.run(data_frame=df, wait=True) + + @pytest.mark.parametrize("max_workers,max_processes", [ + (2, 1), # Multiple workers, single process + (1, 2), # Single worker, multiple processes + (2, 2), # Multiple workers and processes + ]) + @patch.object(IngestionManagerPandas, '_run_multi_process') + def test_async_with_parallelism_no_validation_error(self, mock_run, max_workers, max_processes): + """Test that wait=False works with any parallelism configuration where max_workers > 1 OR max_processes > 1.""" + manager = IngestionManagerPandas( + feature_group_name="test-fg", + feature_definitions={"id": {"FeatureType": "String", "CollectionType": None}}, + max_workers=max_workers, + max_processes=max_processes, + ) + + df = pd.DataFrame({"id": ["1", "2", "3"]}) + + # Should not raise validation error + manager.run(data_frame=df, wait=False) + + # Verify it called the multi-process method (positive assertion) + mock_run.assert_called_once()