Skip to content

Commit 8cae80d

Browse files
committed
Changes:
- Add feature_store_functions_report.md documenting all 63 functions across feature_store module (excluding feature_processor) - Add comprehensive unit tests for get_feature_group_as_dataframe: * Session handling (provided, from region, from role) * Error cases (missing session/region, missing event_time) * Latest ingestion logic with event time * Query string manipulation and table placeholder * Verbose and silent logging modes * Kwargs passing to as_dataframe - Add comprehensive unit tests for prepare_fg_from_dataframe_or_file: * DataFrame and file path input handling * Session/region/role configuration * Record ID creation and validation * Event ID creation with timestamp * Duplicate record detection * Column name formatting * CSV kwargs passing * Feature definition loading
1 parent 41c2541 commit 8cae80d

2 files changed

Lines changed: 780 additions & 2 deletions

File tree

sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_utils.py

Lines changed: 264 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@
44
import logging
55
import os
66
import time
7-
from typing import Any, Dict, Sequence, Union
7+
from pathlib import Path
8+
from typing import Any, Dict, Sequence, Union, re
89

910
import boto3
11+
import pandas
1012
import pandas as pd
11-
from pandas import DataFrame, Series
13+
from pandas import DataFrame, Series, read_csv
1214

1315
from sagemaker.mlops.feature_store import FeatureGroup as CoreFeatureGroup, FeatureGroup
1416
from sagemaker.core.helper.session_helper import Session
@@ -459,3 +461,263 @@ def ingest_dataframe(
459461
manager.run(data_frame=data_frame, wait=wait, timeout=timeout)
460462
return manager
461463

464+
def get_feature_group_as_dataframe(
465+
feature_group_name: str,
466+
athena_bucket: str,
467+
query: str = """SELECT * FROM "sagemaker_featurestore"."#{table}"
468+
WHERE is_deleted=False """,
469+
role: str = None,
470+
region: str = None,
471+
session=None,
472+
event_time_feature_name: str = None,
473+
latest_ingestion: bool = True,
474+
verbose: bool = True,
475+
**kwargs,
476+
) -> DataFrame:
477+
""":class:`sagemaker.feature_store.feature_group.FeatureGroup` as :class:`pandas.DataFrame`
478+
479+
Examples:
480+
>>> from sagemaker.mlops.feature_store.feature_utils import get_feature_group_as_dataframe
481+
>>>
482+
>>> region = "eu-west-1"
483+
>>> fg_data = get_feature_group_as_dataframe(feature_group_name="feature_group",
484+
>>> athena_bucket="s3://bucket/athena_queries",
485+
>>> region=region,
486+
>>> event_time_feature_name="EventTimeId"
487+
>>> )
488+
>>>
489+
>>> type(fg_data)
490+
<class 'pandas.core.frame.DataFrame'>
491+
492+
Description:
493+
Method to run an athena query over a
494+
:class:`sagemaker.feature_store.feature_group.FeatureGroup` in a Feature Store
495+
to retrieve its data. It needs the :class:`sagemaker.session.Session` linked to a role
496+
or the region and/or role used to work with Feature Stores (it uses the module
497+
`sagemaker.feature_store.feature_utils.get_session_from_role`
498+
to get the session).
499+
500+
Args:
501+
region (str): region of the target Feature Store
502+
feature_group_name (str): feature store name
503+
query (str): query to run. By default, it will take the latest ingest with data that
504+
wasn't deleted. If latest_ingestion is False it will take all the data
505+
in the feature group that wasn't deleted. It needs to use the keyword
506+
"#{table}" to refer to the FeatureGroup name. e.g.:
507+
'SELECT * FROM "sagemaker_featurestore"."#{table}"'
508+
It must not end by ';'.
509+
athena_bucket (str): Amazon S3 bucket for running the query
510+
role (str): role to be assumed to extract data from feature store. If not specified
511+
the default sagemaker execution role will be used.
512+
session (str): :class:`sagemaker.session.Session`
513+
of SageMaker used to work with the feature store. Optional, with
514+
role and region parameters it will infer the session.
515+
event_time_feature_name (str): eventTimeId feature. Mandatory only if the
516+
latest ingestion is True.
517+
latest_ingestion (bool): if True it will get the data only from the latest ingestion.
518+
If False it will take whatever is specified in the query, or
519+
if not specify it, it will get all the data that wasn't deleted.
520+
verbose (bool): if True show messages, if False is silent.
521+
**kwargs (object): key arguments used for the method pandas.read_csv to be able to
522+
have a better tuning on data. For more info read:
523+
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html
524+
Returns:
525+
:class:`pandas.DataFrame`: dataset with the data retrieved from feature group
526+
"""
527+
528+
logger.setLevel(logging.WARNING)
529+
if verbose:
530+
logger.setLevel(logging.INFO)
531+
532+
if latest_ingestion:
533+
if event_time_feature_name is not None:
534+
query += str(
535+
f"AND {event_time_feature_name}=(SELECT "
536+
f"MAX({event_time_feature_name}) FROM "
537+
'"sagemaker_featurestore"."#{table}")'
538+
)
539+
else:
540+
exc = Exception(
541+
"Argument event_time_feature_name must be specified "
542+
"when using latest_ingestion=True."
543+
)
544+
logger.exception(exc)
545+
raise exc
546+
547+
query += ";"
548+
549+
if session is not None:
550+
sagemaker_session = session
551+
elif region is not None:
552+
sagemaker_session = get_session_from_role(region=region, assume_role=role)
553+
else:
554+
exc = Exception("Argument Session or role and region must be specified.")
555+
logger.exception(exc)
556+
raise exc
557+
558+
msg = f"Feature Group used: {feature_group_name}"
559+
logger.info(msg)
560+
561+
fg = FeatureGroup(name=feature_group_name, sagemaker_session=sagemaker_session)
562+
563+
sample_query = fg.athena_query()
564+
query_string = re.sub(r"#\{(table)\}", sample_query.table_name, query)
565+
566+
msg = f"Running query:\n\t{sample_query} \n\n\t-> Save on bucket {athena_bucket}\n"
567+
logger.info(msg)
568+
569+
sample_query.run(query_string=query_string, output_location=athena_bucket)
570+
571+
sample_query.wait()
572+
573+
# run Athena query. The output is loaded to a Pandas dataframe.
574+
dataset = sample_query.as_dataframe(**kwargs)
575+
576+
msg = f"Data shape retrieve from {feature_group_name}: {dataset.shape}"
577+
logger.info(msg)
578+
579+
return dataset
580+
581+
582+
def prepare_fg_from_dataframe_or_file(
583+
dataframe_or_path: Union[str, Path, pandas.DataFrame],
584+
feature_group_name: str,
585+
role: str = None,
586+
region: str = None,
587+
session=None,
588+
record_id: str = "record_id",
589+
event_id: str = "data_as_of_date",
590+
verbose: bool = False,
591+
**kwargs,
592+
) -> FeatureGroup:
593+
"""Prepares a dataframe to create a :class:`sagemaker.feature_store.feature_group.FeatureGroup`
594+
595+
Description:
596+
Function to prepare a :class:`pandas.DataFrame` read from a path to a csv file or pass it
597+
directly to create a :class:`sagemaker.feature_store.feature_group.FeatureGroup`.
598+
The path to the file needs proper dtypes, feature names and mandatory features (record_id,
599+
event_id).
600+
It needs the :class:`sagemaker.session.Session` linked to a role
601+
or the region and/or role used to work with Feature Stores (it uses the module
602+
`sagemaker.feature_store.feature_utils.get_session_from_role`
603+
to get the session).
604+
If record_id or event_id are not specified it will create ones
605+
by default with the names 'record_id' and 'data_as_of_date'.
606+
607+
Args:
608+
feature_group_name (str): feature group name
609+
dataframe_or_path (str, Path, pandas.DataFrame) : pandas.DataFrame or path to the data
610+
verbose (bool) : True for displaying messages, False for silent method.
611+
record_id (str, 'record_id'): (Optional) Feature identifier of the rows. If specified each
612+
value of that feature has to be unique. If not specified or
613+
record_id='record_id', then it will create a new feature from
614+
the index of the pandas.DataFrame.
615+
event_id (str) : (Optional) Feature with the time of the creation of data rows.
616+
If not specified it will create one with the current time
617+
called `data_as_of_date`
618+
role (str) : role used to get the session.
619+
region (str) : region used to get the session.
620+
session (str): session of SageMaker used to work with the feature store
621+
**kwargs (object): key arguments used for the method pandas.read_csv to be able to
622+
have a better tuning on data. For more info read:
623+
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html
624+
625+
Returns:
626+
:class:`sagemaker.feature_store.feature_group.FeatureGroup`:
627+
FG prepared with all the methods and definitions properly defined
628+
"""
629+
630+
logger.setLevel(logging.WARNING)
631+
if verbose:
632+
logger.setLevel(logging.INFO)
633+
634+
if isinstance(dataframe_or_path, DataFrame):
635+
data = dataframe_or_path
636+
elif isinstance(dataframe_or_path, str):
637+
kwargs.pop("filepath_or_buffer", None)
638+
data = read_csv(filepath_or_buffer=dataframe_or_path, **kwargs)
639+
else:
640+
exc = Exception(
641+
str(
642+
f"Invalid type {type(dataframe_or_path)} for "
643+
"argument dataframe_or_path. \nParameter must be"
644+
" of type pandas.DataFrame or string"
645+
)
646+
)
647+
logger.exception(exc)
648+
raise exc
649+
650+
# Formatting cols
651+
data = _format_column_names(data=data)
652+
data = _cast_object_to_string(data_frame=data)
653+
654+
if record_id == "record_id" and record_id not in data.columns:
655+
data[record_id] = data.index
656+
657+
lg_uniq = len(data[record_id].unique())
658+
lg_id = len(data[record_id])
659+
660+
if lg_id != lg_uniq:
661+
exc = Exception(
662+
str(
663+
f"Record identifier {record_id} have {abs(lg_id - lg_uniq)} "
664+
"duplicated rows. \nRecord identifier must be unique"
665+
" in each row."
666+
)
667+
)
668+
logger.exception(exc)
669+
raise exc
670+
671+
if event_id not in data.columns:
672+
import time
673+
674+
current_time_sec = int(round(time.time()))
675+
data[event_id] = Series([current_time_sec] * lg_id, dtype="float64")
676+
677+
if session is not None:
678+
sagemaker_session = session
679+
elif role is not None and region is not None:
680+
sagemaker_session = get_session_from_role(region=region)
681+
else:
682+
exc = Exception("Argument Session or role and region must be specified.")
683+
logger.exception(exc)
684+
raise exc
685+
686+
feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=sagemaker_session)
687+
688+
feature_group.load_feature_definitions(data_frame=data)
689+
690+
return feature_group
691+
692+
693+
def _format_column_names(data: pandas.DataFrame) -> pandas.DataFrame:
694+
"""Formats the column names for :class:`sagemaker.feature_store.feature_group.FeatureGroup`
695+
696+
Description:
697+
Module to format correctly the name of the columns of a DataFrame
698+
to later generate the features names of a Feature Group
699+
700+
Args:
701+
data (:class:`pandas.DataFrame`): dataframe used
702+
703+
Returns:
704+
:class:`pandas.DataFrame`
705+
"""
706+
data.rename(columns=lambda x: x.replace(" ", "_").replace(".", "").lower()[:62], inplace=True)
707+
return data
708+
709+
def _cast_object_to_string(data_frame: pandas.DataFrame) -> pandas.DataFrame:
710+
"""Cast properly pandas object types to strings
711+
712+
Description:
713+
Method to convert 'object' and 'O' column dtypes of a pandas.DataFrame to
714+
a valid string type recognized by Feature Groups.
715+
716+
Args:
717+
data_frame: dataframe used
718+
Returns:
719+
pandas.DataFrame
720+
"""
721+
for label in data_frame.select_dtypes(["object", "O"]).columns.tolist():
722+
data_frame[label] = data_frame[label].astype("str").astype("string")
723+
return data_frame

0 commit comments

Comments
 (0)