Skip to content

Commit 76c4363

Browse files
committed
mlflowclient wrapper
Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED
1 parent 0d7f916 commit 76c4363

10 files changed

Lines changed: 883 additions & 26 deletions

File tree

docs/reference/mlflow.md

Lines changed: 95 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -4,23 +4,24 @@ This module provides **native integration** between Feast and [MLflow](https://m
44

55
## Overview
66

7-
When enabled, the integration logs to the active MLflow run during:
8-
9-
- **Historical feature retrieval**`get_historical_features()` tags the run with feature refs, feature views, entity count, and retrieval duration
10-
- **Online feature retrieval**`get_online_features()` tags the run with the same metadata
11-
- **Entity DataFrame archival** — optionally saves the training entity DataFrame as an MLflow artifact for full reproducibility
12-
13-
The integration also provides utilities for:
14-
15-
- **Model → Feature Service resolution** — map any MLflow model URI back to its Feast feature service
16-
- **Training reproducibility** — reconstruct the exact entity DataFrame from a past MLflow run
7+
When enabled, the integration provides:
8+
9+
- **Historical feature retrieval** -- `get_historical_features()` tags the run with feature refs, feature views, entity count, and retrieval duration
10+
- **Online feature retrieval** -- `get_online_features()` tags the run with the same metadata
11+
- **Entity DataFrame archival** -- optionally saves the training entity DataFrame as an MLflow artifact for full reproducibility
12+
- **Execution context tagging** -- tags runs with where they ran (workbench, KFP pipeline, feature server, or standalone)
13+
- **Operation logging** -- optionally logs `feast apply` and `feast materialize` to a separate MLflow experiment
14+
- **Model-to-Feature resolution** -- map any MLflow model URI back to its Feast feature service
15+
- **Training reproducibility** -- reconstruct the exact entity DataFrame from a past MLflow run
16+
- **Training-to-prediction linkage** -- `FeastMlflowClient.load_model()` links prediction runs back to their training runs
17+
- **Feast MLflow Client** -- a thin wrapper that eliminates direct `import mlflow` in user code
1718

1819
## Installation
1920

2021
MLflow is an optional dependency. Install it with:
2122

2223
```bash
23-
pip install mlflow
24+
pip install feast[mlflow]
2425
```
2526

2627
## Configuration
@@ -41,23 +42,29 @@ mlflow:
4142
auto_log: true
4243
auto_log_entity_df: true
4344
entity_df_max_rows: 100000
45+
log_execution_context: true
46+
log_operations: false
47+
ops_experiment_suffix: "-feast-ops"
4448
```
4549
4650
### Configuration options
4751
4852
| Option | Type | Default | Description |
4953
|--------|------|---------|-------------|
5054
| `enabled` | bool | `false` | Enable or disable the MLflow integration |
51-
| `tracking_uri` | string | *(none)* | MLflow tracking server URI. When not set, the `MLFLOW_TRACKING_URI` environment variable is used. If neither is set, MLflow falls back to its own default (`./mlruns`). |
55+
| `tracking_uri` | string | *(none)* | MLflow tracking server URI. Falls back to `MLFLOW_TRACKING_URI` env var, then MLflow default (`./mlruns`). |
5256
| `auto_log` | bool | `true` | Automatically log feature metadata on every retrieval |
5357
| `auto_log_entity_df` | bool | `false` | Save the entity DataFrame as an MLflow artifact (`entity_df.parquet`) |
54-
| `entity_df_max_rows` | int | `100000` | Maximum entity DataFrame rows to save as an artifact. DataFrames exceeding this limit are skipped to avoid OOM and slow uploads. |
58+
| `entity_df_max_rows` | int | `100000` | Maximum entity DataFrame rows to save as an artifact |
59+
| `log_execution_context` | bool | `true` | Tag runs with execution context (pipeline, workbench, feature_server, standalone) |
60+
| `log_operations` | bool | `false` | Log `feast apply` and `feast materialize` to a separate MLflow experiment |
61+
| `ops_experiment_suffix` | string | `"-feast-ops"` | Suffix for the operations experiment name |
5562

5663
## What gets logged
5764

58-
When `auto_log: true`, each `get_historical_features` or `get_online_features` call records the following on the active MLflow run:
65+
### Tags on retrieval runs
5966

60-
### Tags
67+
When `auto_log: true`, each `get_historical_features` or `get_online_features` call records:
6168

6269
| Tag | Example | Description |
6370
|-----|---------|-------------|
@@ -69,6 +76,18 @@ When `auto_log: true`, each `get_historical_features` or `get_online_features` c
6976
| `feast.entity_count` | `200` | Number of entities in the request |
7077
| `feast.feature_count` | `5` | Number of features retrieved |
7178

79+
### Execution context tags
80+
81+
When `log_execution_context: true`:
82+
83+
| Tag | When set | Example |
84+
|-----|----------|---------|
85+
| `feast.execution_context` | Always | `pipeline` / `workbench` / `feature_server` / `standalone` |
86+
| `feast.kfp_run_id` | Pipeline (KFP) | `abc-123-def` |
87+
| `feast.kfp_pipeline` | Pipeline (KFP) | `fraud-training-pipeline` |
88+
| `feast.workbench` | RHOAI workbench | `my-jupyter-notebook` |
89+
| `feast.namespace` | Pipeline or workbench | `user-project` |
90+
7291
### Metrics
7392

7493
| Metric | Example | Description |
@@ -77,7 +96,17 @@ When `auto_log: true`, each `get_historical_features` or `get_online_features` c
7796

7897
### Artifacts
7998

80-
When `auto_log_entity_df: true`, the entity DataFrame is saved as `entity_df.parquet` in the run's artifacts (if the row count is within `entity_df_max_rows`), enabling exact reproduction of training data.
99+
When `auto_log_entity_df: true`, the entity DataFrame is saved as `entity_df.parquet`.
100+
101+
### Operation logs (when `log_operations: true`)
102+
103+
`feast apply` and `feast materialize` create runs in the `{project}-feast-ops` experiment:
104+
105+
| Tag | Example |
106+
|-----|---------|
107+
| `feast.operation` | `apply` / `materialize` / `materialize_incremental` |
108+
| `feast.feature_views_changed` | `driver_hourly_stats` (apply only) |
109+
| `feast.materialize.feature_views` | `driver_hourly_stats` (materialize only) |
81110

82111
## Usage
83112

@@ -97,37 +126,78 @@ with mlflow.start_run(run_name="my_training"):
97126
entity_df=entity_df,
98127
).to_df()
99128
100-
# Feature metadata is already logged to this run — no extra code needed
101129
model = train(training_df)
102130
mlflow.sklearn.log_model(model, "model")
103131
```
104132

105-
### Resolve a model back to its feature service
133+
### FeastMlflowClient (zero mlflow imports)
134+
135+
The `FeastMlflowClient` wraps MLflow so user code never needs `import mlflow`:
136+
137+
```python
138+
from feast import FeatureStore
106139
107-
Given an MLflow model URI, determine which Feast feature service was used during training:
140+
store = FeatureStore(".")
141+
client = store.get_mlflow_client()
142+
143+
# Training
144+
with client.start_run(run_name="v1_training"):
145+
df = store.get_historical_features(
146+
features=store.get_feature_service("driver_activity_v1"),
147+
entity_df=entity_df,
148+
).to_df()
149+
150+
model = LogisticRegression().fit(X, y)
151+
client.log_params({"model_type": "logistic_regression"})
152+
client.log_metrics({"f1": 0.85})
153+
client.log_model(model, "model")
154+
train_run_id = client.active_run_id
155+
156+
client.register_model(f"runs:/{train_run_id}/model", "driver_model")
157+
158+
# Prediction (auto-links to training run)
159+
with client.start_run(run_name="prediction"):
160+
model = client.load_model("models:/driver_model/1")
161+
# This run is now tagged with feast.training_run_id pointing to train_run_id
162+
online_features = store.get_online_features(...).to_dict()
163+
predictions = model.predict(...)
164+
```
165+
166+
### FeastMlflowClient API
167+
168+
| Method | Description |
169+
|--------|-------------|
170+
| `store.get_mlflow_client()` | Create a client from the FeatureStore |
171+
| `client.start_run(run_name, tags)` | Context manager, auto-tags `feast.project` |
172+
| `client.log_params(params)` | Log parameters |
173+
| `client.log_metrics(metrics, step)` | Log metrics |
174+
| `client.log_metric(key, value, step)` | Log a single metric |
175+
| `client.log_model(model, path, flavor)` | Log model + auto-attach `required_features.json` |
176+
| `client.load_model(model_uri)` | Load model + auto-tag prediction run with training lineage |
177+
| `client.register_model(model_uri, name)` | Register + auto-tag version with `feast.feature_service` |
178+
| `client.resolve_features(model_uri)` | Resolve model URI to Feast feature service name |
179+
| `client.get_training_entity_df(run_id)` | Recover entity DataFrame from a past run |
180+
| `client.mlflow` | Escape hatch: raw mlflow module |
181+
| `client.active_run_id` | Current active run ID |
182+
183+
### Resolve a model back to its feature service
108184

109185
```python
110186
from feast.mlflow_integration import resolve_feature_service_from_model_uri
111187
112188
fs_name = resolve_feature_service_from_model_uri("models:/my_model/1")
113-
# Returns "driver_activity_v1" — resolved from the training run's tags
114189
```
115190

116191
Resolution order:
117192
1. Model version tag `feast.feature_service` (explicit override)
118193
2. Training run tag `feast.feature_service` (set by auto-log)
119194

120-
If neither tag is found, a `FeastMlflowModelResolutionError` is raised with guidance on how to set the tag.
121-
122195
### Reproduce training from a past run
123196

124-
Retrieve the exact entity DataFrame that was used in a previous training run:
125-
126197
```python
127198
from feast.mlflow_integration import get_entity_df_from_mlflow_run
128199
129200
entity_df = get_entity_df_from_mlflow_run(run_id="abc123")
130-
# Returns the entity DataFrame saved during the original run
131201
132202
training_df = store.get_historical_features(
133203
features=store.get_feature_service("driver_activity_v1"),

sdk/python/feast/feature_store.py

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,16 @@ def __init__(
226226

227227
self._init_mlflow_tracking()
228228

229+
def get_mlflow_client(self):
230+
"""Return a :class:`~feast.mlflow_integration.client.FeastMlflowClient`.
231+
232+
The client wraps MLflow so that ``import mlflow`` is never needed
233+
in user code. Configuration is inherited from ``feature_store.yaml``.
234+
"""
235+
from feast.mlflow_integration.client import FeastMlflowClient
236+
237+
return FeastMlflowClient(self)
238+
229239
def _init_mlflow_tracking(self):
230240
"""Configure the global MLflow tracking URI and experiment from feature_store.yaml.
231241
@@ -1187,6 +1197,24 @@ def _apply_diffs(
11871197
# Emit OpenLineage events for applied objects
11881198
self._emit_openlineage_apply_diffs(registry_diff)
11891199

1200+
# Emit MLflow events for applied objects (Phase 7)
1201+
self._mlflow_log_apply_diffs(registry_diff)
1202+
1203+
def _mlflow_log_apply_diffs(self, registry_diff: RegistryDiff):
1204+
"""Log apply operation to MLflow ops experiment."""
1205+
try:
1206+
mlflow_cfg = self.config.mlflow
1207+
if mlflow_cfg is None or not mlflow_cfg.enabled or not mlflow_cfg.log_operations:
1208+
return
1209+
objects: List[Any] = []
1210+
for feast_object_diff in registry_diff.feast_object_diffs:
1211+
if feast_object_diff.new_feast_object is not None:
1212+
objects.append(feast_object_diff.new_feast_object)
1213+
if objects:
1214+
self._mlflow_log_apply(objects)
1215+
except Exception as e:
1216+
_logger.debug("MLflow apply logging failed: %s", e)
1217+
11901218
def _emit_openlineage_apply_diffs(self, registry_diff: RegistryDiff):
11911219
"""Emit OpenLineage events for objects applied via diffs."""
11921220
if self.openlineage_emitter is None:
@@ -1482,6 +1510,26 @@ def apply(
14821510
# Emit OpenLineage events for applied objects
14831511
self._emit_openlineage_apply(objects)
14841512

1513+
# Emit MLflow events for applied objects (Phase 7)
1514+
self._mlflow_log_apply(objects)
1515+
1516+
def _mlflow_log_apply(self, objects: List[Any]):
1517+
"""Log applied objects to MLflow ops experiment."""
1518+
try:
1519+
mlflow_cfg = self.config.mlflow
1520+
if mlflow_cfg is None or not mlflow_cfg.enabled or not mlflow_cfg.log_operations:
1521+
return
1522+
from feast.mlflow_integration.logger import log_apply_to_mlflow
1523+
1524+
log_apply_to_mlflow(
1525+
changed_objects=objects,
1526+
project=self.project,
1527+
tracking_uri=mlflow_cfg.get_tracking_uri(),
1528+
ops_experiment_suffix=mlflow_cfg.ops_experiment_suffix,
1529+
)
1530+
except Exception as e:
1531+
_logger.debug("MLflow apply logging failed: %s", e)
1532+
14851533
def _emit_openlineage_apply(self, objects: List[Any]):
14861534
"""Emit OpenLineage events for applied objects."""
14871535
if self.openlineage_emitter is None:
@@ -2062,6 +2110,12 @@ def tqdm_builder(length):
20622110
self._emit_openlineage_materialize_complete(
20632111
ol_run_id, feature_views_to_materialize
20642112
)
2113+
2114+
# Emit MLflow event for materialization (Phase 7)
2115+
_mat_duration = time.monotonic() - _retrieval_start if '_retrieval_start' in dir() else 0
2116+
self._mlflow_log_materialize(
2117+
feature_views_to_materialize, None, end_date, _mat_duration, incremental=True,
2118+
)
20652119
except Exception as e:
20662120
# Emit OpenLineage FAIL event
20672121
self._emit_openlineage_materialize_fail(ol_run_id, str(e))
@@ -2190,11 +2244,45 @@ def tqdm_builder(length):
21902244
self._emit_openlineage_materialize_complete(
21912245
ol_run_id, feature_views_to_materialize
21922246
)
2247+
2248+
# Emit MLflow event for materialization (Phase 7)
2249+
self._mlflow_log_materialize(
2250+
feature_views_to_materialize, start_date, end_date, 0, incremental=False,
2251+
)
21932252
except Exception as e:
21942253
# Emit OpenLineage FAIL event
21952254
self._emit_openlineage_materialize_fail(ol_run_id, str(e))
21962255
raise
21972256

2257+
def _mlflow_log_materialize(
2258+
self,
2259+
feature_views: List[Any],
2260+
start_date: Optional[datetime],
2261+
end_date: datetime,
2262+
duration_seconds: float,
2263+
incremental: bool = False,
2264+
):
2265+
"""Log materialization to MLflow ops experiment."""
2266+
try:
2267+
mlflow_cfg = self.config.mlflow
2268+
if mlflow_cfg is None or not mlflow_cfg.enabled or not mlflow_cfg.log_operations:
2269+
return
2270+
from feast.mlflow_integration.logger import log_materialize_to_mlflow
2271+
2272+
fv_names = [getattr(fv, "name", str(fv)) for fv in feature_views]
2273+
log_materialize_to_mlflow(
2274+
feature_view_names=fv_names,
2275+
start_date=start_date,
2276+
end_date=end_date,
2277+
duration_seconds=duration_seconds,
2278+
project=self.project,
2279+
tracking_uri=mlflow_cfg.get_tracking_uri(),
2280+
incremental=incremental,
2281+
ops_experiment_suffix=mlflow_cfg.ops_experiment_suffix,
2282+
)
2283+
except Exception as e:
2284+
_logger.debug("MLflow materialize logging failed: %s", e)
2285+
21982286
def _emit_openlineage_materialize_start(
21992287
self,
22002288
feature_views: List[Any],

sdk/python/feast/mlflow_integration/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,16 @@
2626
from a previous MLflow run's artifacts.
2727
"""
2828

29+
from feast.mlflow_integration.client import FeastMlflowClient
2930
from feast.mlflow_integration.config import MlflowConfig
3031
from feast.mlflow_integration.entity_df_builder import (
3132
FeastMlflowEntityDfError,
3233
get_entity_df_from_mlflow_run,
3334
)
3435
from feast.mlflow_integration.logger import (
36+
log_apply_to_mlflow,
3537
log_feature_retrieval_to_mlflow,
38+
log_materialize_to_mlflow,
3639
log_training_dataset_to_mlflow,
3740
)
3841
from feast.mlflow_integration.model_resolver import (
@@ -41,9 +44,12 @@
4144
)
4245

4346
__all__ = [
47+
"FeastMlflowClient",
4448
"MlflowConfig",
4549
"log_feature_retrieval_to_mlflow",
4650
"log_training_dataset_to_mlflow",
51+
"log_apply_to_mlflow",
52+
"log_materialize_to_mlflow",
4753
"resolve_feature_service_from_model_uri",
4854
"FeastMlflowModelResolutionError",
4955
"get_entity_df_from_mlflow_run",

0 commit comments

Comments
 (0)