Skip to content

Commit 80deb5b

Browse files
committed
handle fallback
Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED
1 parent 014686f commit 80deb5b

File tree

4 files changed

+101
-84
lines changed

4 files changed

+101
-84
lines changed

sdk/python/feast/feature_store.py

Lines changed: 76 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -1641,42 +1641,45 @@ def get_historical_features(
16411641
)
16421642

16431643
# Auto-log to MLflow if configured
1644-
if (
1645-
self.config.mlflow is not None
1646-
and self.config.mlflow.enabled
1647-
and self.config.mlflow.auto_log
1648-
):
1649-
_log_fn = _get_mlflow_log_fn()
1650-
if _log_fn is not None:
1651-
_duration = time.monotonic() - _retrieval_start
1652-
if isinstance(entity_df, pd.DataFrame):
1653-
_entity_count = len(entity_df)
1654-
elif isinstance(entity_df, str):
1655-
_entity_count = -1
1656-
else:
1657-
_entity_count = 0
1658-
_fs = features if isinstance(features, FeatureService) else None
1659-
_fs_name = (
1660-
features.name
1661-
if isinstance(features, FeatureService)
1662-
else self._resolve_feature_service_name(_feature_refs)
1663-
)
1664-
_log_fn(
1665-
feature_refs=_feature_refs,
1666-
entity_count=_entity_count,
1667-
duration_seconds=_duration,
1668-
retrieval_type="historical",
1669-
feature_service=_fs,
1670-
feature_service_name=_fs_name,
1671-
project=self.project,
1672-
tracking_uri=self.config.mlflow.tracking_uri,
1673-
)
1674-
1675-
if self.config.mlflow.auto_log_entity_df:
1676-
self._auto_log_entity_df_info(
1677-
entity_df, start_date=start_date, end_date=end_date
1644+
try:
1645+
if (
1646+
self.config.mlflow is not None
1647+
and self.config.mlflow.enabled
1648+
and self.config.mlflow.auto_log
1649+
):
1650+
_log_fn = _get_mlflow_log_fn()
1651+
if _log_fn is not None:
1652+
_duration = time.monotonic() - _retrieval_start
1653+
if isinstance(entity_df, pd.DataFrame):
1654+
_entity_count = len(entity_df)
1655+
elif isinstance(entity_df, str):
1656+
_entity_count = -1
1657+
else:
1658+
_entity_count = 0
1659+
_fs = features if isinstance(features, FeatureService) else None
1660+
_fs_name = (
1661+
features.name
1662+
if isinstance(features, FeatureService)
1663+
else self._resolve_feature_service_name(_feature_refs)
1664+
)
1665+
_log_fn(
1666+
feature_refs=_feature_refs,
1667+
entity_count=_entity_count,
1668+
duration_seconds=_duration,
1669+
retrieval_type="historical",
1670+
feature_service=_fs,
1671+
feature_service_name=_fs_name,
1672+
project=self.project,
1673+
tracking_uri=self.config.mlflow.tracking_uri,
16781674
)
16791675

1676+
if self.config.mlflow.auto_log_entity_df:
1677+
self._auto_log_entity_df_info(
1678+
entity_df, start_date=start_date, end_date=end_date
1679+
)
1680+
except Exception as e:
1681+
_logger.warning("MLflow auto-log failed for historical retrieval: %s", e)
1682+
16801683
return job
16811684

16821685
def create_saved_dataset(
@@ -2817,42 +2820,46 @@ def get_online_features(
28172820
)
28182821

28192822
# Auto-log to MLflow if configured
2820-
if (
2821-
self.config.mlflow is not None
2822-
and self.config.mlflow.enabled
2823-
and self.config.mlflow.auto_log
2824-
):
2825-
_log_fn = _get_mlflow_log_fn()
2826-
if _log_fn is not None:
2827-
_duration = time.monotonic() - _retrieval_start
2828-
_feature_refs = utils._get_features(
2829-
self.registry, self.project, features, allow_cache=True
2830-
)
2831-
if isinstance(entity_rows, list):
2832-
_entity_count = len(entity_rows)
2833-
elif isinstance(entity_rows, Mapping):
2834-
try:
2835-
_entity_count = len(next(iter(entity_rows.values())))
2836-
except Exception:
2823+
try:
2824+
if (
2825+
self.config.mlflow is not None
2826+
and self.config.mlflow.enabled
2827+
and self.config.mlflow.auto_log
2828+
):
2829+
_log_fn = _get_mlflow_log_fn()
2830+
if _log_fn is not None:
2831+
_duration = time.monotonic() - _retrieval_start
2832+
_feature_refs = utils._get_features(
2833+
self.registry, self.project, features, allow_cache=True
2834+
)
2835+
if isinstance(entity_rows, list):
2836+
_entity_count = len(entity_rows)
2837+
elif isinstance(entity_rows, Mapping):
2838+
try:
2839+
_entity_count = len(next(iter(entity_rows.values())))
2840+
except Exception:
2841+
_entity_count = 0
2842+
else:
28372843
_entity_count = 0
2838-
else:
2839-
_entity_count = 0
2840-
_fs = features if isinstance(features, FeatureService) else None
2841-
_fs_name = (
2842-
features.name
2843-
if isinstance(features, FeatureService)
2844-
else self._resolve_feature_service_name(_feature_refs)
2845-
)
2846-
_log_fn(
2847-
feature_refs=_feature_refs,
2848-
entity_count=_entity_count,
2849-
duration_seconds=_duration,
2850-
retrieval_type="online",
2851-
feature_service=_fs,
2852-
feature_service_name=_fs_name,
2853-
project=self.project,
2854-
tracking_uri=self.config.mlflow.tracking_uri,
2855-
)
2844+
_fs = features if isinstance(features, FeatureService) else None
2845+
_fs_name = (
2846+
features.name
2847+
if isinstance(features, FeatureService)
2848+
else self._resolve_feature_service_name(_feature_refs)
2849+
)
2850+
_log_fn(
2851+
feature_refs=_feature_refs,
2852+
entity_count=_entity_count,
2853+
duration_seconds=_duration,
2854+
retrieval_type="online",
2855+
feature_service=_fs,
2856+
feature_service_name=_fs_name,
2857+
project=self.project,
2858+
tracking_uri=self.config.mlflow.tracking_uri,
2859+
)
2860+
except Exception as e:
2861+
_logger.warning("MLflow auto-log failed for online retrieval: %s", e)
2862+
28562863
return response
28572864

28582865
async def get_online_features_async(

sdk/python/feast/mlflow_integration/logger.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,11 @@ def log_feature_retrieval_to_mlflow(
7676
client.set_tag(run_id, "feast.feature_views", fv_str)
7777

7878
refs_str = ",".join(feature_refs)
79-
if len(refs_str) > 490:
80-
refs_str = refs_str[:487] + "..."
81-
client.log_param(run_id, "feast.feature_refs", refs_str)
82-
client.log_param(run_id, "feast.entity_count", str(entity_count))
83-
client.log_param(run_id, "feast.feature_count", str(len(feature_refs)))
79+
if len(refs_str) > 4990:
80+
refs_str = refs_str[:4987] + "..."
81+
client.set_tag(run_id, "feast.feature_refs", refs_str)
82+
client.set_tag(run_id, "feast.entity_count", str(entity_count))
83+
client.set_tag(run_id, "feast.feature_count", str(len(feature_refs)))
8484

8585
client.log_metric(
8686
run_id, "feast.job_submission_sec", round(duration_seconds, 4)

sdk/python/feast/repo_config.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -496,12 +496,18 @@ def openlineage(self) -> Optional[OpenLineageConfig]:
496496
def mlflow(self):
497497
"""Get the MLflow configuration."""
498498
if not self._mlflow:
499-
if isinstance(self.mlflow_config, Dict):
499+
if isinstance(self.mlflow_config, dict):
500500
from feast.mlflow_integration.config import MlflowConfig
501501

502502
self._mlflow = MlflowConfig(**self.mlflow_config)
503-
elif self.mlflow_config:
504-
self._mlflow = self.mlflow_config
503+
elif self.mlflow_config is not None:
504+
warnings.warn(
505+
f"Invalid mlflow config type: {type(self.mlflow_config).__name__}. "
506+
"Expected a dict (e.g. mlflow: {{enabled: true}}). "
507+
"MLflow integration disabled.",
508+
stacklevel=2,
509+
)
510+
return None
505511
return self._mlflow
506512

507513
@model_validator(mode="before")

sdk/python/feast/ui_server.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -152,21 +152,25 @@ def get_mlflow_runs(max_results: int = 50):
152152
for run in runs:
153153
run_tags = run.data.tags
154154
run_params = run.data.params
155+
fv_raw = run_tags.get("feast.feature_views", "")
156+
refs_raw = run_tags.get(
157+
"feast.feature_refs",
158+
run_params.get("feast.feature_refs", ""),
159+
)
155160
result.append(
156161
{
157162
"run_id": run.info.run_id,
158163
"run_name": run.info.run_name,
159164
"status": run.info.status,
160165
"start_time": run.info.start_time,
161166
"feature_service": run_tags.get("feast.feature_service"),
162-
"feature_views": run_tags.get("feast.feature_views", "").split(
163-
","
164-
),
165-
"feature_refs": run_params.get("feast.feature_refs", "").split(
166-
","
167-
),
167+
"feature_views": [v for v in fv_raw.split(",") if v],
168+
"feature_refs": [v for v in refs_raw.split(",") if v],
168169
"retrieval_type": run_tags.get("feast.retrieval_type"),
169-
"entity_count": run_params.get("feast.entity_count"),
170+
"entity_count": run_tags.get(
171+
"feast.entity_count",
172+
run_params.get("feast.entity_count"),
173+
),
170174
"mlflow_url": (
171175
f"{mlflow_ui_base}/#/experiments/"
172176
f"{run.info.experiment_id}/runs/{run.info.run_id}"

0 commit comments

Comments
 (0)