Background
Feast currently has two separate mechanisms for applying BatchFeatureView (BFV) transformations during historical feature retrieval:
-
Compute Engine DAG path (SparkComputeEngine.get_historical_features(), RayComputeEngine.get_historical_features()) — uses FeatureBuilder + ExecutionPlan with proper node sequencing: source → transform → filter → dedup. This path already handles BFV transformations correctly for both materialization and retrieval.
-
Offline Store path (passthrough_provider.get_historical_features() → offline_store.get_historical_features()) — the standard path invoked by FeatureStore.get_historical_features(). This path bypasses the compute engine entirely.
Because of this split, each offline store that wants BFV compute-on-read during historical retrieval must re-implement transformation detection and application logic independently:
- RayOfflineStore.get_historical_features() implements it via resolve_feature_view_source_with_fallback() + map_batches()
- PR #6357 adds a parallel implementation for SparkOfflineStore
This means transformation logic is duplicated across stores rather than living in the shared compute engine layer.
Root Cause
passthrough_provider.get_historical_features() always delegates to self.offline_store, never to self.batch_engine:
# sdk/python/feast/infra/passthrough_provider.py
def get_historical_features(self, config, feature_views, ...):
job = self.offline_store.get_historical_features(...) # batch_engine never called
return job
Note the contrast with materialization, which correctly routes through the engine:
jobs = self.batch_engine.materialize(registry, task)
Blocking Interface Mismatch
The compute engine cannot be directly substituted today because of a fundamental impedance mismatch:
- passthrough_provider.get_historical_features() receives feature_views: List[FeatureView] — multiple views for a single multi-view PIT join
- HistoricalRetrievalTask (the compute engine's input) has feature_view: Union[BatchFeatureView, StreamFeatureView] — a single view
# sdk/python/feast/infra/common/retrieval_task.py
@dataclass
class HistoricalRetrievalTask:
feature_view: Union[BatchFeatureView, StreamFeatureView] # singular
entity_df: Union[pd.DataFrame, str]
...
The compute engine is designed for per-view DAG execution. The PIT join logic that combines multiple views into one entity-aligned training dataset lives in each offline store's SQL template, not in the compute engine.
Proposed Solution
This is a multi-step refactor:
Step 1 — Extend HistoricalRetrievalTask to support multiple feature views:
@dataclass
class HistoricalRetrievalTask:
feature_views: List[Union[BatchFeatureView, FeatureView]] # plural
entity_df: Union[pd.DataFrame, str]
feature_refs: List[str]
Step 2 — Move PIT join coordination into ComputeEngine.get_historical_features():
The base ComputeEngine should orchestrate per-view DAG execution across all requested feature views and join the results back to entity_df — making the PIT join logic engine-owned rather than SQL-template-owned.
Step 3 — Wire the provider to call batch_engine for historical retrieval:
# passthrough_provider.py
def get_historical_features(self, config, feature_views, feature_refs, entity_df, ...):
task = HistoricalRetrievalTask(
feature_views=feature_views,
feature_refs=feature_refs,
entity_df=entity_df,
...
)
return self.batch_engine.get_historical_features(registry, task)
Step 4 — Retire per-store transformation logic:
Once the compute engine owns this path, the custom BFV transformation code in RayOfflineStore and SparkOfflineStore can be removed. All stores get transformation support for free through the engine's FeatureBuilder.
Background
Feast currently has two separate mechanisms for applying BatchFeatureView (BFV) transformations during historical feature retrieval:
Compute Engine DAG path (SparkComputeEngine.get_historical_features(), RayComputeEngine.get_historical_features()) — uses FeatureBuilder + ExecutionPlan with proper node sequencing: source → transform → filter → dedup. This path already handles BFV transformations correctly for both materialization and retrieval.
Offline Store path (passthrough_provider.get_historical_features() → offline_store.get_historical_features()) — the standard path invoked by FeatureStore.get_historical_features(). This path bypasses the compute engine entirely.
Because of this split, each offline store that wants BFV compute-on-read during historical retrieval must re-implement transformation detection and application logic independently:
This means transformation logic is duplicated across stores rather than living in the shared compute engine layer.
Root Cause
passthrough_provider.get_historical_features() always delegates to self.offline_store, never to self.batch_engine:
Note the contrast with materialization, which correctly routes through the engine:
jobs = self.batch_engine.materialize(registry, task)Blocking Interface Mismatch
The compute engine cannot be directly substituted today because of a fundamental impedance mismatch:
The compute engine is designed for per-view DAG execution. The PIT join logic that combines multiple views into one entity-aligned training dataset lives in each offline store's SQL template, not in the compute engine.
Proposed Solution
This is a multi-step refactor:
Step 1 — Extend HistoricalRetrievalTask to support multiple feature views:
Step 2 — Move PIT join coordination into ComputeEngine.get_historical_features():
The base ComputeEngine should orchestrate per-view DAG execution across all requested feature views and join the results back to entity_df — making the PIT join logic engine-owned rather than SQL-template-owned.
Step 3 — Wire the provider to call batch_engine for historical retrieval:
Step 4 — Retire per-store transformation logic:
Once the compute engine owns this path, the custom BFV transformation code in RayOfflineStore and SparkOfflineStore can be removed. All stores get transformation support for free through the engine's FeatureBuilder.