Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sagemaker-core/src/sagemaker/core/telemetry/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class Feature(Enum):
MODEL_TRAINER = 14
MODEL_CUSTOMIZATION = 15
MLOPS = 16
FEATURE_STORE = 17

def __str__(self): # pylint: disable=E0307
"""Return the feature name."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
str(Feature.MODEL_TRAINER): 14,
str(Feature.MODEL_CUSTOMIZATION): 15,
str(Feature.MLOPS): 16,
str(Feature.FEATURE_STORE): 17,
}

STATUS_TO_CODE = {
Expand All @@ -81,6 +82,9 @@ def wrapper(*args, **kwargs):
if len(args) > 0 and hasattr(args[0], "sagemaker_session"):
# Get the sagemaker_session from the instance method args
sagemaker_session = args[0].sagemaker_session
elif len(args) > 0 and hasattr(args[0], "_sagemaker_session"):
# Get the sagemaker_session from the instance method args (private attribute)
sagemaker_session = args[0]._sagemaker_session
elif feature == Feature.REMOTE_FUNCTION:
# Get the sagemaker_session from the function keyword arguments for remote function
sagemaker_session = kwargs.get(
Expand Down
16 changes: 16 additions & 0 deletions sagemaker-core/src/sagemaker/core/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,22 @@ def __new__(cls):
cls._instance = super().__new__(cls)
return cls._instance

def __repr__(self):
"""Return clean representation for debugging."""
return "Unassigned()"

def __str__(self):
"""Return empty string for clean printing."""
return ""

def __iter__(self):
"""Return empty iterator to prevent iteration errors."""
return iter([])

def __bool__(self):
"""Return False for truthiness checks."""
return False


class SingletonMeta(type):
"""
Expand Down
47 changes: 47 additions & 0 deletions sagemaker-core/tests/unit/generated/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,3 +371,50 @@ def test_serialize_method_nested_shape():
"StringValue": "string",
},
}


class TestUnassignedBehavior:
"""Test Unassigned class methods for proper behavior.

Bug fix: GetRecordResponse is not printable and cannot be parsed via iterator.
Error: TypeError: 'Unassigned' object is not iterable
"""

def test_unassigned_repr(self):
"""Test that Unassigned has clean repr."""
u = Unassigned()
assert repr(u) == "Unassigned()"

def test_unassigned_str(self):
"""Test that Unassigned converts to empty string."""
u = Unassigned()
assert str(u) == ""

def test_unassigned_bool(self):
"""Test that Unassigned is falsy."""
u = Unassigned()
assert not u
assert bool(u) is False

def test_unassigned_iter(self):
"""Test that Unassigned is iterable and returns empty list."""
u = Unassigned()
result = list(u)
assert result == []

def test_unassigned_singleton(self):
"""Test that Unassigned is a singleton."""
u1 = Unassigned()
u2 = Unassigned()
assert u1 is u2

def test_unassigned_in_conditional(self):
"""Test that Unassigned works correctly in conditionals."""
u = Unassigned()

# Should evaluate to False
if u:
pytest.fail("Unassigned should be falsy")

# Should work with not
assert not u
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
)

from sagemaker.core.helper.session_helper import Session
from sagemaker.core.telemetry import Feature, _telemetry_emitter

@dataclass
class AthenaQuery:
Expand All @@ -37,6 +38,7 @@ class AthenaQuery:
_result_bucket: str = field(default=None, init=False)
_result_file_prefix: str = field(default=None, init=False)

@_telemetry_emitter(Feature.FEATURE_STORE, "AthenaQuery.run")
def run(
self, query_string: str, output_location: str, kms_key: str = None, workgroup: str = None
) -> str:
Expand Down Expand Up @@ -82,6 +84,7 @@ def get_query_execution(self) -> Dict[str, Any]:
"""
return get_query_execution(self.sagemaker_session, self._current_query_execution_id)

@_telemetry_emitter(Feature.FEATURE_STORE, "AthenaQuery.as_dataframe")
def as_dataframe(self, **kwargs) -> DataFrame:
"""Download the result of the current query and load it into a DataFrame.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import pandas as pd

from sagemaker.core.helper.session_helper import Session
from sagemaker.core.telemetry import Feature, _telemetry_emitter
from sagemaker.mlops.feature_store import FeatureGroup
from sagemaker.mlops.feature_store.feature_definition import FeatureDefinition, FeatureTypeEnum
from sagemaker.mlops.feature_store.feature_utils import (
Expand Down Expand Up @@ -143,7 +144,7 @@ def construct_feature_group_to_be_merged(
raise RuntimeError(f"No metastore configured for FeatureGroup {fg.feature_group_name}.")

catalog_config = fg.offline_store_config.data_catalog_config
disable_glue = catalog_config.disable_glue_table_creation or False
disable_glue = getattr(catalog_config, "disable_glue_table_creation", False) or False

features = [fd.feature_name for fd in fg.feature_definitions]
record_id = fg.record_identifier_feature_name
Expand Down Expand Up @@ -422,25 +423,37 @@ def with_event_time_range(
self._event_time_ending_timestamp = ending_timestamp
return self

@_telemetry_emitter(Feature.FEATURE_STORE, "DatasetBuilder.to_csv_file")
def to_csv_file(self) -> tuple[str, str]:
"""Get query string and result in .csv format file.

Returns:
The S3 path of the .csv file.
The query string executed.
tuple: A tuple containing:
- str: The S3 path of the .csv file
- str: The query string executed

Note:
This method returns a tuple (csv_path, query_string).
To get just the CSV path: csv_path, _ = builder.to_csv_file()
"""
if isinstance(self._base, pd.DataFrame):
return self._to_csv_from_dataframe()
if isinstance(self._base, FeatureGroup):
return self._to_csv_from_feature_group()
raise ValueError("Base must be either a FeatureGroup or a DataFrame.")

@_telemetry_emitter(Feature.FEATURE_STORE, "DatasetBuilder.to_dataframe")
def to_dataframe(self) -> tuple[pd.DataFrame, str]:
"""Get query string and result in pandas.DataFrame.

Returns:
The pandas.DataFrame object.
The query string executed.
tuple: A tuple containing:
- pd.DataFrame: The pandas DataFrame object
- str: The query string executed

Note:
This method returns a tuple (dataframe, query_string).
To get just the DataFrame: df, _ = builder.to_dataframe()
"""
csv_file, query_string = self.to_csv_file()
df = download_csv_from_s3(csv_file, self._sagemaker_session, self._kms_key_id)
Expand Down Expand Up @@ -471,8 +484,8 @@ def _to_csv_from_dataframe(self) -> tuple[str, str]:
table_name=temp_table_name,
record_identifier_feature_name=self._record_identifier_feature_name,
event_time_identifier_feature=FeatureDefinition(
self._event_time_identifier_feature_name,
self._event_time_identifier_feature_type,
feature_name=self._event_time_identifier_feature_name,
feature_type=self._event_time_identifier_feature_type,
),
table_type=TableType.DATA_FRAME,
)
Expand Down
Loading
Loading