feat: Feast-MLflow Integration#6235
Conversation
| with tempfile.TemporaryDirectory() as tmp_dir: | ||
| path = os.path.join(tmp_dir, "entity_df.parquet") | ||
| entity_df.to_parquet(path, index=False) | ||
| mlflow.log_artifact(path) |
There was a problem hiding this comment.
🟡 Mixing global mlflow.log_artifact() with explicit-URI client causes artifact/metadata to target different servers
In _auto_log_entity_df_info, tags and params are logged via a locally-created MlflowClient(tracking_uri=tracking_uri) (lines 304, 309-318), but the entity DataFrame artifact is uploaded via the global mlflow.log_artifact(path) (line 327). The global function uses the tracking URI set by mlflow.set_tracking_uri(), not the explicit tracking_uri from the config. If the global tracking URI is changed by another library in the process, or if _init_mlflow_tracking failed silently (caught by except Exception at sdk/python/feast/feature_store.py:249), the artifact would be uploaded to a different server than where the tags/params were logged, splitting metadata across two servers.
Was this helpful? React with 👍 or 👎 to provide feedback.
| try: | ||
| import mlflow | ||
|
|
||
| tracking_uri = mlflow_cfg.tracking_uri or "http://127.0.0.1:5000" |
There was a problem hiding this comment.
🔴 UI server ignores MLFLOW_TRACKING_URI env var, falls back to hardcoded localhost
In ui_server.py, both the /api/mlflow-runs and /api/mlflow-feature-models endpoints resolve the MLflow tracking URI using mlflow_cfg.tracking_uri or "http://127.0.0.1:5000". This reads the raw tracking_uri field from the config and falls back to a hardcoded localhost URL, completely bypassing the MLFLOW_TRACKING_URI environment variable. In contrast, the rest of the codebase (feature_store.py:243, feature_store.py:325, feature_store.py:1696, feature_store.py:2885) correctly calls mlflow_cfg.get_tracking_uri() which checks the env var via sdk/python/feast/mlflow_integration/config.py:19-29. When a user sets MLFLOW_TRACKING_URI without setting tracking_uri in YAML (which is a very common deployment pattern, and documented in the PR's own docs at docs/reference/mlflow.md:51), the UI endpoints will incorrectly connect to http://127.0.0.1:5000 instead of the env-var-specified server.
| tracking_uri = mlflow_cfg.tracking_uri or "http://127.0.0.1:5000" | |
| tracking_uri = mlflow_cfg.get_tracking_uri() or "http://127.0.0.1:5000" |
Was this helpful? React with 👍 or 👎 to provide feedback.
| try: | ||
| import mlflow | ||
|
|
||
| tracking_uri = mlflow_cfg.tracking_uri or "http://127.0.0.1:5000" |
There was a problem hiding this comment.
🔴 Second UI endpoint also ignores MLFLOW_TRACKING_URI env var
Same issue as in the /api/mlflow-runs endpoint: the /api/mlflow-feature-models endpoint at sdk/python/feast/ui_server.py:234 uses mlflow_cfg.tracking_uri or "http://127.0.0.1:5000" instead of mlflow_cfg.get_tracking_uri(), causing the MLFLOW_TRACKING_URI environment variable to be ignored.
Was this helpful? React with 👍 or 👎 to provide feedback.
| ) | ||
|
|
||
| # Emit MLflow event for materialization (Phase 7) | ||
| _mat_duration = time.monotonic() - _retrieval_start if '_retrieval_start' in dir() else 0 |
There was a problem hiding this comment.
🟡 Undefined _retrieval_start variable in materialize_incremental causes bogus duration or NameError
In materialize_incremental, the code at line 2115 references _retrieval_start which is never defined in that method scope. The expression '_retrieval_start' in dir() checks local variable names but dir() without arguments returns module-level names, not local variables -- so the check is unreliable. If _retrieval_start is not in the returned list, _mat_duration defaults to 0 (acceptable but meaningless). However, if it IS in the name list (e.g., a module-level _retrieval_start existed from another context), it would attempt time.monotonic() - _retrieval_start on a potentially unrelated value, resulting in a wrong duration or a NameError/TypeError at runtime.
| _mat_duration = time.monotonic() - _retrieval_start if '_retrieval_start' in dir() else 0 | |
| _mat_duration = 0 |
Was this helpful? React with 👍 or 👎 to provide feedback.
| if tracking_uri: | ||
| mlflow.set_tracking_uri(tracking_uri) | ||
|
|
||
| experiment_name = f"{project}{ops_experiment_suffix}" | ||
| mlflow.set_experiment(experiment_name) |
There was a problem hiding this comment.
🔴 log_apply_to_mlflow and log_materialize_to_mlflow mutate global MLflow tracking URI and experiment, corrupting concurrent user runs
Both log_apply_to_mlflow (sdk/python/feast/mlflow_integration/logger.py:185-189) and log_materialize_to_mlflow (sdk/python/feast/mlflow_integration/logger.py:251-255) call mlflow.set_tracking_uri(tracking_uri) and mlflow.set_experiment(experiment_name) globally. These mutate process-wide global state. If a user has an active MLflow run in a different experiment (e.g. during feast apply inside a training script), these calls redirect subsequent MLflow operations to the ops experiment. While the functions try to restore mlflow.set_experiment(project) afterward (line 224, 274), the tracking URI is never restored. In the exception path, if mlflow.start_run() raises (line 202/258), the experiment remains set to the ops experiment. This corrupts any concurrent or subsequent user operations in the same process.
Prompt for agents
The functions log_apply_to_mlflow and log_materialize_to_mlflow in sdk/python/feast/mlflow_integration/logger.py mutate process-wide global state by calling mlflow.set_tracking_uri() and mlflow.set_experiment(). This corrupts any concurrent user MLflow runs in the same process.
The fix should save and restore the original tracking URI and experiment before/after the ops logging, or preferably use the MlflowClient API exclusively (which already takes tracking_uri as a parameter) instead of the global mlflow module-level functions. The MlflowClient.create_run() method allows creating runs in specific experiments without mutating global state. This pattern is already used in log_feature_retrieval_to_mlflow which correctly uses client = mlflow.MlflowClient(tracking_uri=tracking_uri) without touching global state.
Was this helpful? React with 👍 or 👎 to provide feedback.
| def mlflow(self): | ||
| """Get the MLflow configuration.""" | ||
| if not self._mlflow: | ||
| if isinstance(self.mlflow_config, Dict): | ||
| from feast.mlflow_integration.config import MlflowConfig | ||
|
|
||
| self._mlflow = MlflowConfig(**self.mlflow_config) | ||
| elif self.mlflow_config: | ||
| self._mlflow = self.mlflow_config | ||
| return self._mlflow |
There was a problem hiding this comment.
🟡 mlflow property returns falsy None on subsequent calls when MlflowConfig(enabled=False) is set
The mlflow property in RepoConfig (sdk/python/feast/repo_config.py:497-506) uses if not self._mlflow: as the guard. When the config is a valid MlflowConfig object with enabled=False, self._mlflow will be a truthy MlflowConfig instance (Pydantic models are truthy), so this works correctly on first access. However, if mlflow_config is None (the default), self._mlflow stays None forever and the property returns None -- which is fine. The real issue is: if mlflow_config is an empty dict {}, MlflowConfig(**{}) creates a valid config with enabled=False, and self._mlflow becomes truthy. But if mlflow_config is set to some falsy-like Pydantic object (unlikely but possible with custom subclasses), the if not self._mlflow guard would re-create it every time. This is the same pattern as openlineage property so it's consistent, but not actually a bug for standard usage.
Was this helpful? React with 👍 or 👎 to provide feedback.
| - **Online feature retrieval** -- `get_online_features()` tags the run with the same metadata | ||
| - **Entity DataFrame archival** -- optionally saves the training entity DataFrame as an MLflow artifact for full reproducibility | ||
| - **Execution context tagging** -- tags runs with where they ran (workbench, KFP pipeline, feature server, or standalone) | ||
| - **Operation logging** -- optionally logs `feast apply` and `feast materialize` to a separate MLflow experiment |
There was a problem hiding this comment.
not sure this is actually useful
There was a problem hiding this comment.
actually, this is not updated one, thought of this one for backtracking from mllfow run to the workbench experiment but now it is omitted
| - **Entity DataFrame archival** -- optionally saves the training entity DataFrame as an MLflow artifact for full reproducibility | ||
| - **Execution context tagging** -- tags runs with where they ran (workbench, KFP pipeline, feature server, or standalone) | ||
| - **Operation logging** -- optionally logs `feast apply` and `feast materialize` to a separate MLflow experiment | ||
| - **Model-to-Feature resolution** -- map any MLflow model URI back to its Feast feature service |
| - **Execution context tagging** -- tags runs with where they ran (workbench, KFP pipeline, feature server, or standalone) | ||
| - **Operation logging** -- optionally logs `feast apply` and `feast materialize` to a separate MLflow experiment | ||
| - **Model-to-Feature resolution** -- map any MLflow model URI back to its Feast feature service | ||
| - **Training reproducibility** -- reconstruct the exact entity DataFrame from a past MLflow run |
| - **Operation logging** -- optionally logs `feast apply` and `feast materialize` to a separate MLflow experiment | ||
| - **Model-to-Feature resolution** -- map any MLflow model URI back to its Feast feature service | ||
| - **Training reproducibility** -- reconstruct the exact entity DataFrame from a past MLflow run | ||
| - **Training-to-prediction linkage** -- `FeastMlflowClient.load_model()` links prediction runs back to their training runs |
| - **Model-to-Feature resolution** -- map any MLflow model URI back to its Feast feature service | ||
| - **Training reproducibility** -- reconstruct the exact entity DataFrame from a past MLflow run | ||
| - **Training-to-prediction linkage** -- `FeastMlflowClient.load_model()` links prediction runs back to their training runs | ||
| - **Feast MLflow Client** -- a thin wrapper that eliminates direct `import mlflow` in user code |
| _consecutive_failures = 0 | ||
|
|
||
|
|
||
| def log_feature_retrieval_to_mlflow( |
There was a problem hiding this comment.
i guess if we have this enabled why do we need to even have the wrapper? can't we full handle this for the user?
There was a problem hiding this comment.
I guess my question is, can't we just hide all of the mlflow client usage to the user?
There was a problem hiding this comment.
the auto-logging in logger.py fully handled for users who already use mlflow.start_run() or client.start_run() considering our FeastMlflowClient wrapper which is for users who want to avoid import mlflow entirely and get additional Feast-specific behavior on top ex: logging a model, registering it, loading it for inference etc soFeastMlflowClient is for users who want the extra lineage features without touching MLflow directly
There was a problem hiding this comment.
I would rather push all these functions to integration client as everything here is a part of mlflow feast integration?
Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED
Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED
Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED
Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED
Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED
Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED
Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED
Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED
Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED
Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED
Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED
Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED
Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED
Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED
jyejare
left a comment
There was a problem hiding this comment.
Comments inline:
- We also need to enable mlflow configuratins from feast operator else admin has to do it explicitly.
- Also, we need to remove duplicate mechanisms for initiating/using mlflow client and we should provide a clean way that is
store.mlflowwhenever possible.
| 2. `MLFLOW_TRACKING_URI` environment variable | ||
| 3. MLflow's default (`./mlruns` local directory) | ||
|
|
||
| This means you can omit `tracking_uri` from the YAML and set `MLFLOW_TRACKING_URI` in your environment instead. |
There was a problem hiding this comment.
| This means you can omit `tracking_uri` from the YAML and set `MLFLOW_TRACKING_URI` in your environment instead. | |
| This means you can omit `tracking_uri` from the YAML and set `MLFLOW_TRACKING_URI` in your environment instead or it would be pulled from /.mlruns automatically when both are not set. |
wdyt ?
|
|
||
| | Artifact | Description | | ||
| |----------|-------------| | ||
| | `required_features.json` | JSON list of feature references the model was trained on | |
There was a problem hiding this comment.
Could we rename to relate it with feast better and so that it would be self describing in MLFLow ecosystem, something like:
required_feast_features.json
OR
feast_features.json
| With the configuration above, feature metadata is logged whenever there is an active MLflow run: | ||
|
|
||
| ```python | ||
| import mlflow |
There was a problem hiding this comment.
I think explicit import is no more required as feast module gives you the ability to:
store.mlflow.start_run(....
| from feast import FeatureStore | ||
|
|
||
| store = FeatureStore(".") | ||
| client = store.get_mlflow_client() |
There was a problem hiding this comment.
Even this is changed now. Lets keep only store.mlflow. examples.
| | Condition | Error | | ||
| |-----------|-------| | ||
| | No `feature_store.yaml` in cwd and no store created | `RuntimeError` with guidance to call `feast.mlflow.init(store)` | | ||
| | `mlflow.enabled` is `false` or missing | `RuntimeError` with guidance to set `mlflow.enabled=true` | |
There was a problem hiding this comment.
Even if user wants to set it to false ?
| invalid, resolution fails, or validation against the store fails. | ||
| """ | ||
| try: | ||
| import mlflow |
There was a problem hiding this comment.
Same here. Create a class and initiate in integration object/class that takes store object as param.
| if mlflow_cfg is None or not mlflow_cfg.enabled: | ||
| return | ||
|
|
||
| import mlflow |
There was a problem hiding this comment.
The store object is available to you here as self, so why not :
self.mlflow.set_tracking_uri(....
| client = MlflowClient(tracking_uri=tracking_uri) | ||
| refs = ["z_view:f1", "a_view:f2", "z_view:f3", "a_view:f4"] | ||
|
|
||
| with mlflow.start_run() as run: |
There was a problem hiding this comment.
The tests should also be focusing only on store.mlflow... culture.
Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED
Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED
Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED
Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED
What this PR does / why we need it:
final_mlflow_demo.mp4
Auto-logging: Feature retrieval metadata is tagged on the active MLflow run (feast.feature_refs, feast.feature_views, feast.feature_service, feast.entity_count, etc.)Entity DataFrame archival: Optionally saves the training entity DataFrame as an MLflow artifact (entity_df.parquet) for full reproducibilityModel-to-feature-service resolution: resolve_feature_service_from_model_uri() maps any MLflow model URI back to its Feast feature service enabling serving pipelines to auto-discover which features a model needsEntity DataFrame reconstruction: get_entity_df_from_mlflow_run() rebuilds the exact entity DataFrame from a past run's artifacts, enabling training reproducibilityConfiguration: Controlled entirely via feature_store.yaml under a new mlflow: blockWhich issue(s) this PR fixes:
Checks
git commit -s)Testing Strategy
Misc