Skip to content

Commit ee69865

Browse files
authored
Merge branch 'master' into oidc-rbac-ssl-logging
2 parents 9a93956 + f630056 commit ee69865

20 files changed

Lines changed: 817 additions & 70 deletions

File tree

docs/reference/beta-on-demand-feature-view.md

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,42 @@ def driver_aggregated_stats(inputs):
6969

7070
Aggregated columns are automatically named using the pattern `{function}_{column}` (e.g., `sum_trips`, `mean_rating`).
7171

72+
### Using `input_schema` with Aggregations
73+
74+
When the input data is not already stored as a feature view, use `input_schema` instead of `sources` to describe the fields that will be passed at request time. Feast will create an internal `RequestSource` automatically.
75+
76+
```python
77+
from datetime import timedelta
78+
from feast import Field, on_demand_feature_view
79+
from feast.aggregation import Aggregation
80+
from feast.types import Float64, Int64
81+
82+
@on_demand_feature_view(
83+
input_schema=[
84+
Field(name="txn_amount", dtype=Float64),
85+
],
86+
schema=[
87+
Field(name="txn_count", dtype=Int64),
88+
Field(name="total_txn_amount", dtype=Float64),
89+
Field(name="avg_txn_amount", dtype=Float64),
90+
],
91+
aggregations=[
92+
Aggregation(column="txn_amount", function="count", name="txn_count",
93+
time_window=timedelta(days=30)),
94+
Aggregation(column="txn_amount", function="sum", name="total_txn_amount",
95+
time_window=timedelta(days=30)),
96+
Aggregation(column="txn_amount", function="mean", name="avg_txn_amount",
97+
time_window=timedelta(days=30)),
98+
],
99+
entities=[user],
100+
)
101+
def user_transaction_stats(inputs):
102+
# Aggregations replace the transformation function — no body needed.
103+
pass
104+
```
105+
106+
`input_schema` also accepts fields that are not aggregation columns — for example, thresholds, currency codes, or other contextual values passed at request time that your UDF needs but that are not stored as features.
107+
72108
## Example
73109
See [https://github.com/feast-dev/on-demand-feature-views-demo](https://github.com/feast-dev/on-demand-feature-views-demo) for an example on how to use on demand feature views.
74110

sdk/python/feast/entity.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ def to_proto(self) -> EntityProto:
203203

204204
spec = EntitySpecProto(
205205
name=self.name,
206-
value_type=self.value_type.value,
206+
value_type=self.value_type.value, # type: ignore[arg-type]
207207
join_key=self.join_key,
208208
description=self.description,
209209
tags=self.tags,

sdk/python/feast/field.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ def to_proto(self) -> FieldProto:
136136
tags[NESTED_COLLECTION_INNER_TYPE_TAG] = _feast_type_to_str(self.dtype)
137137
return FieldProto(
138138
name=self.name,
139-
value_type=value_type.value,
139+
value_type=value_type.value, # type: ignore[arg-type]
140140
description=self.description,
141141
tags=tags,
142142
vector_index=self.vector_index,

sdk/python/feast/infra/compute_engines/feature_builder.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -158,12 +158,14 @@ def get_column_info(
158158
# we need to read ALL source columns, not just the output feature columns.
159159
# This is specifically for transformations that create new columns or need raw data.
160160
mode = getattr(getattr(view, "feature_transformation", None), "mode", None)
161-
if mode in ("ray", "pandas") or getattr(mode, "value", None) in (
161+
if mode in ("ray", "pandas", "python") or getattr(mode, "value", None) in (
162162
"ray",
163163
"pandas",
164+
"python",
164165
):
165-
# Signal to read all columns by passing empty list for feature_cols
166-
# The transformation will produce the output columns defined in the schema
166+
# Signal to read all columns by passing empty list for feature_cols.
167+
# "python" (BatchFeatureView) transformations need all raw source columns — the
168+
# UDF computes output features from raw input, not from pre-existing feature cols.
167169
feature_cols = []
168170

169171
return ColumnInfo(

sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -387,12 +387,18 @@ def pull_all_from_table_or_query(
387387
timestamp_fields = [timestamp_field]
388388
if created_timestamp_column:
389389
timestamp_fields.append(created_timestamp_column)
390-
(fields_with_aliases, aliases) = _get_fields_with_aliases(
391-
fields=join_key_columns + feature_name_columns + timestamp_fields,
392-
field_mappings=data_source.field_mapping,
393-
)
394390

395-
fields_with_alias_string = ", ".join(fields_with_aliases)
391+
if feature_name_columns:
392+
(fields_with_aliases, _) = _get_fields_with_aliases(
393+
fields=join_key_columns + feature_name_columns + timestamp_fields,
394+
field_mappings=data_source.field_mapping,
395+
)
396+
fields_with_alias_string = ", ".join(fields_with_aliases)
397+
else:
398+
# Empty feature_name_columns signals "read all source columns".
399+
# Used by BatchFeatureView with TransformationMode.PYTHON/ray/pandas where
400+
# the UDF computes output features from raw input — don't project upfront.
401+
fields_with_alias_string = "*"
396402

397403
from_expression = data_source.get_table_query_string()
398404
timestamp_filter = get_timestamp_filter_sql(

sdk/python/feast/infra/online_stores/remote.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,7 @@ def _construct_online_read_api_json_request(
501501
for row in entity_keys:
502502
entity_key = row.join_keys[0]
503503
entity_values.append(
504-
getattr(row.entity_values[0], row.entity_values[0].WhichOneof("val"))
504+
getattr(row.entity_values[0], row.entity_values[0].WhichOneof("val")) # type: ignore[arg-type]
505505
)
506506

507507
return {

sdk/python/feast/on_demand_feature_view.py

Lines changed: 118 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ class OnDemandFeatureView(BaseFeatureView):
134134
"""
135135

136136
_TRACK_METRICS_TAG = "feast:track_metrics"
137+
_INPUT_SCHEMA_SOURCE_PREFIX = "__input_schema__"
137138

138139
name: str
139140
entities: Optional[List[str]]
@@ -158,7 +159,8 @@ def __init__( # noqa: C901
158159
name: str,
159160
entities: Optional[List[Entity]] = None,
160161
schema: Optional[List[Field]] = None,
161-
sources: List[OnDemandSourceType],
162+
sources: Optional[List[OnDemandSourceType]] = None,
163+
input_schema: Optional[List[Field]] = None,
162164
udf: Optional[FunctionType] = None,
163165
udf_string: Optional[str] = "",
164166
feature_transformation: Optional[Transformation] = None,
@@ -183,6 +185,11 @@ def __init__( # noqa: C901
183185
sources: A map from input source names to the actual input sources, which may be
184186
feature views, or request data sources. These sources serve as inputs to the udf,
185187
which will refer to them by name.
188+
input_schema (optional): A list of Fields describing data that is accepted as input
189+
but not stored directly as features — e.g. aggregation columns, normalization
190+
parameters, thresholds, or other contextual values passed at request time.
191+
When provided, sources is not required — an internal RequestSource will be
192+
created automatically.
186193
udf: The user defined transformation function, which must take pandas
187194
dataframes as inputs.
188195
udf_string: The source code version of the udf (for diffing and displaying in Web UI)
@@ -214,15 +221,44 @@ def __init__( # noqa: C901
214221
self.version = version
215222
schema = schema or []
216223
self.entities = [e.name for e in entities] if entities else [DUMMY_ENTITY_NAME]
217-
self.sources = sources
224+
self.input_schema = input_schema
218225
self.mode = mode.lower()
219226
self.udf = udf
220227
self.udf_string = udf_string
221228
self.source_feature_view_projections: dict[str, FeatureViewProjection] = {}
222229
self.source_request_sources: dict[str, RequestSource] = {}
230+
self._input_schema_sentinel: Optional[RequestSource] = None
231+
232+
# Strip any existing sentinel from sources (handles __copy__ round-trip)
233+
effective_sources: List[OnDemandSourceType] = [
234+
s
235+
for s in (sources or [])
236+
if not (
237+
isinstance(s, RequestSource)
238+
and s.name.startswith(self._INPUT_SCHEMA_SOURCE_PREFIX)
239+
)
240+
]
241+
242+
if input_schema is not None:
243+
# Automatically create an internal RequestSource from input_schema.
244+
# Stored privately so it does not appear in source_request_sources for
245+
# external consumers (e.g. the feature server, apply(), utils.py).
246+
self._input_schema_sentinel = RequestSource(
247+
name=f"{self._INPUT_SCHEMA_SOURCE_PREFIX}{name}",
248+
schema=input_schema,
249+
)
250+
self.source_request_sources[self._input_schema_sentinel.name] = (
251+
self._input_schema_sentinel
252+
)
253+
elif not effective_sources:
254+
raise ValueError(
255+
"Either 'sources' or 'input_schema' must be provided for OnDemandFeatureView."
256+
)
257+
258+
self.sources = effective_sources
223259

224260
# Process each source with explicit type handling
225-
for odfv_source in sources:
261+
for odfv_source in effective_sources:
226262
self._add_source_to_collections(odfv_source)
227263

228264
features: List[Field] = []
@@ -274,6 +310,20 @@ def __init__( # noqa: C901
274310
self.track_metrics = track_metrics
275311
self.aggregations = aggregations or []
276312

313+
if input_schema is not None and self.aggregations:
314+
input_field_names = {f.name for f in input_schema}
315+
unknown = [
316+
agg.column
317+
for agg in self.aggregations
318+
if agg.column and agg.column not in input_field_names
319+
]
320+
if unknown:
321+
raise ValueError(
322+
f"Aggregation column(s) {unknown} not found in input_schema "
323+
f"for OnDemandFeatureView '{name}'. "
324+
f"Available fields: {sorted(input_field_names)}"
325+
)
326+
277327
def _add_source_to_collections(self, odfv_source: OnDemandSourceType) -> None:
278328
"""
279329
Add a source to the appropriate collection with explicit type checking.
@@ -328,6 +378,7 @@ def __copy__(self):
328378
schema=self.features,
329379
sources=list(self.source_feature_view_projections.values())
330380
+ list(self.source_request_sources.values()),
381+
input_schema=self.input_schema,
331382
feature_transformation=self.feature_transformation,
332383
mode=self.mode,
333384
description=self.description,
@@ -337,6 +388,7 @@ def __copy__(self):
337388
singleton=self.singleton,
338389
version=self.version,
339390
track_metrics=self.track_metrics,
391+
aggregations=self.aggregations,
340392
)
341393
fv.entities = self.entities
342394
fv.features = self.features
@@ -475,6 +527,10 @@ def _validate_sources_config(self) -> None:
475527

476528
def _validate_transformation_config(self) -> None:
477529
"""Validate transformation configuration."""
530+
# Aggregations provide their own transformation; no udf/feature_transformation required.
531+
if self.aggregations:
532+
return
533+
478534
if not self.feature_transformation:
479535
raise ValueError(ODFVErrorMessages.no_transformation_provided())
480536

@@ -536,6 +592,14 @@ def to_proto(self) -> OnDemandFeatureViewProto:
536592
request_data_source=request_sources.to_proto()
537593
)
538594

595+
# Serialize the input_schema sentinel so that from_proto() can reconstruct
596+
# input_schema correctly; it is excluded from source_request_sources so that
597+
# external consumers never see it as a real data source.
598+
if self._input_schema_sentinel is not None:
599+
sources[self._input_schema_sentinel.name] = OnDemandSource(
600+
request_data_source=self._input_schema_sentinel.to_proto()
601+
)
602+
539603
feature_transformation = transformation_to_proto(self.feature_transformation)
540604

541605
tags = dict(self.tags) if self.tags else {}
@@ -559,7 +623,7 @@ def to_proto(self) -> OnDemandFeatureViewProto:
559623
owner=self.owner,
560624
write_to_online_store=self.write_to_online_store,
561625
singleton=self.singleton or False,
562-
aggregations=self.aggregations,
626+
aggregations=[agg.to_proto() for agg in self.aggregations],
563627
version=self.version,
564628
)
565629
return OnDemandFeatureViewProto(spec=spec, meta=meta)
@@ -585,6 +649,18 @@ def from_proto(
585649
on_demand_feature_view_proto, skip_udf=skip_udf
586650
)
587651

652+
# Detect and strip input_schema sentinel from sources
653+
input_schema: Optional[List[Field]] = None
654+
sources_without_sentinel: List[OnDemandSourceType] = []
655+
for source in sources:
656+
if isinstance(source, RequestSource) and source.name.startswith(
657+
cls._INPUT_SCHEMA_SOURCE_PREFIX
658+
):
659+
input_schema = source.schema
660+
else:
661+
sources_without_sentinel.append(source)
662+
sources = sources_without_sentinel
663+
588664
# Parse transformation from proto (skip UDF deserialization if requested)
589665
transformation = cls._parse_transformation_from_proto(
590666
on_demand_feature_view_proto, skip_udf=skip_udf
@@ -607,6 +683,7 @@ def from_proto(
607683
name=on_demand_feature_view_proto.spec.name,
608684
schema=cls._parse_features_from_proto(on_demand_feature_view_proto),
609685
sources=cast(List[OnDemandSourceType], sources),
686+
input_schema=input_schema,
610687
feature_transformation=transformation,
611688
mode=on_demand_feature_view_proto.spec.mode or "pandas",
612689
description=on_demand_feature_view_proto.spec.description,
@@ -710,6 +787,8 @@ def _parse_transformation_from_proto(
710787
feature_transformation.substrait_transformation
711788
)
712789
elif transformation_type is None:
790+
if proto.spec.aggregations:
791+
return None
713792
# Handle backward compatibility case where feature_transformation is cleared
714793
return cls._handle_backward_compatible_udf(proto)
715794
else:
@@ -817,6 +896,10 @@ def get_request_data_schema(self) -> dict[str, ValueType]:
817896
raise TypeError(
818897
f"Request source schema is not correct type: ${str(type(request_source.schema))}"
819898
)
899+
# Include fields from the input_schema sentinel (stored privately)
900+
if self._input_schema_sentinel is not None:
901+
for field in self._input_schema_sentinel.schema:
902+
schema[field.name] = field.dtype.to_value_type()
820903
return schema
821904

822905
def _get_projected_feature_name(self, feature: str) -> str:
@@ -1036,6 +1119,13 @@ def _preprocess_feature_dict(
10361119
return preprocessed_dict, columns_to_cleanup
10371120

10381121
def infer_features(self) -> None:
1122+
if self.aggregations and not self.feature_transformation:
1123+
if not self.features:
1124+
raise RegistryInferenceFailure(
1125+
"OnDemandFeatureView",
1126+
f"Could not infer Features for the feature view '{self.name}'.",
1127+
)
1128+
return
10391129
assert self.feature_transformation is not None
10401130
random_input = self._construct_random_input(singleton=self.singleton)
10411131
inferred_features = self.feature_transformation.infer_features(
@@ -1092,7 +1182,7 @@ def _is_array_type(self, dtype) -> bool:
10921182
"""Check if the dtype represents an array type."""
10931183
# Use proper type checking instead of string comparison
10941184
dtype_str = str(dtype)
1095-
return "Array" in dtype_str or "List" in dtype_str
1185+
return "Array" in dtype_str or "List" in dtype_str or "Set" in dtype_str
10961186

10971187
def _construct_random_input(
10981188
self, singleton: bool = False
@@ -1137,6 +1227,13 @@ def _construct_random_input(
11371227
sample_value = sample_values.get(value_type, default_value)
11381228
feature_dict[field.name] = sample_value
11391229

1230+
# Add input_schema fields (stored privately outside source_request_sources)
1231+
if self._input_schema_sentinel is not None:
1232+
for field in self._input_schema_sentinel.schema:
1233+
value_type = field.dtype.to_value_type()
1234+
sample_value = sample_values.get(value_type, default_value)
1235+
feature_dict[field.name] = sample_value
1236+
11401237
return feature_dict
11411238

11421239
def _get_sample_values_by_type(self) -> dict[ValueType, list[Any]]:
@@ -1224,13 +1321,17 @@ def on_demand_feature_view(
12241321
name: Optional[str] = None,
12251322
entities: Optional[List[Entity]] = None,
12261323
schema: list[Field],
1227-
sources: list[
1228-
Union[
1229-
FeatureView,
1230-
RequestSource,
1231-
FeatureViewProjection,
1324+
sources: Optional[
1325+
list[
1326+
Union[
1327+
FeatureView,
1328+
RequestSource,
1329+
FeatureViewProjection,
1330+
]
12321331
]
1233-
],
1332+
] = None,
1333+
input_schema: Optional[list[Field]] = None,
1334+
aggregations: Optional[List[Aggregation]] = None,
12341335
mode: str = "pandas",
12351336
description: str = "",
12361337
tags: Optional[dict[str, str]] = None,
@@ -1252,6 +1353,10 @@ def on_demand_feature_view(
12521353
sources: A map from input source names to the actual input sources, which may be
12531354
feature views, or request data sources. These sources serve as inputs to the udf,
12541355
which will refer to them by name.
1356+
input_schema (optional): A list of Fields describing data that is accepted as input
1357+
but not stored directly as features — e.g. aggregation columns, normalization
1358+
parameters, thresholds, or other contextual values passed at request time.
1359+
When provided, sources is not required.
12551360
mode: The mode of execution (e.g,. Pandas or Python Native)
12561361
description (optional): A human-readable description.
12571362
tags (optional): A dictionary of key-value pairs to store arbitrary metadata.
@@ -1279,6 +1384,7 @@ def decorator(user_function):
12791384
on_demand_feature_view_obj = OnDemandFeatureView(
12801385
name=name if name is not None else user_function.__name__,
12811386
sources=sources,
1387+
input_schema=input_schema,
12821388
schema=schema,
12831389
mode=mode,
12841390
description=description,
@@ -1288,6 +1394,7 @@ def decorator(user_function):
12881394
entities=entities,
12891395
singleton=singleton,
12901396
track_metrics=track_metrics,
1397+
aggregations=aggregations,
12911398
udf=user_function,
12921399
udf_string=udf_string,
12931400
version=version,

0 commit comments

Comments
 (0)