Skip to content

Commit 6683cb5

Browse files
authored
Merge branch 'feast-dev:master' into feast-mlflow
2 parents c8dabb9 + ce883f8 commit 6683cb5

7 files changed

Lines changed: 289 additions & 9 deletions

File tree

infra/feast-operator/test/testdata/feast_integration_test_crs/redis.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ spec:
1616
containers:
1717
- name: redis
1818
image: 'quay.io/feastdev-ci/feast-test-images:redis-7-alpine'
19+
command: ["redis-server", "--save", ""]
1920
ports:
2021
- containerPort: 6379
2122
env:

sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import base64
2+
import logging
23
from datetime import datetime
34
from pathlib import Path
45
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple, Union
@@ -42,6 +43,8 @@
4243
to_naive_utc,
4344
)
4445

46+
logger = logging.getLogger(__name__)
47+
4548
PROTO_TO_MILVUS_TYPE_MAPPING: Dict[ValueType, DataType] = {
4649
PROTO_VALUE_TO_VALUE_TYPE_MAP["bytes_val"]: DataType.VARCHAR,
4750
ValueType.IMAGE_BYTES: DataType.VARCHAR,
@@ -140,11 +143,13 @@ def _connect(self, config: RepoConfig) -> MilvusClient:
140143
if not self.client:
141144
if config.provider == "local" and config.online_store.path:
142145
db_path = self._get_db_path(config)
143-
print(f"Connecting to Milvus in local mode using {db_path}")
146+
logger.info("Connecting to Milvus in local mode using %s", db_path)
144147
self.client = MilvusClient(db_path)
145148
else:
146-
print(
147-
f"Connecting to Milvus remotely at {config.online_store.host}:{config.online_store.port}"
149+
logger.info(
150+
"Connecting to Milvus remotely at %s:%s",
151+
config.online_store.host,
152+
config.online_store.port,
148153
)
149154
self.client = MilvusClient(
150155
uri=f"{config.online_store.host}:{config.online_store.port}",
@@ -339,7 +344,6 @@ def online_read(
339344
table: FeatureView,
340345
entity_keys: List[EntityKeyProto],
341346
requested_features: Optional[List[str]] = None,
342-
full_feature_names: bool = False,
343347
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
344348
self.client = self._connect(config)
345349
collection_name = _table_id(config.project, table)
@@ -487,7 +491,7 @@ def update(
487491
):
488492
self.client = self._connect(config)
489493
for table in tables_to_keep:
490-
self._collections = self._get_or_create_collection(config, table)
494+
self._get_or_create_collection(config, table)
491495

492496
for table in tables_to_delete:
493497
collection_name = _table_id(config.project, table)
@@ -498,7 +502,7 @@ def update(
498502
def plan(
499503
self, config: RepoConfig, desired_registry_proto: RegistryProto
500504
) -> List[InfraObject]:
501-
raise NotImplementedError
505+
return []
502506

503507
def teardown(
504508
self,
@@ -686,9 +690,8 @@ def retrieve_online_documents_v2(
686690
for hit in hits:
687691
res = {}
688692
res_ts = None
689-
entity_key_bytes = bytes.fromhex(
690-
hit.get("entity", {}).get(composite_key_name, None)
691-
)
693+
raw_key = hit.get("entity", {}).get(composite_key_name)
694+
entity_key_bytes = bytes.fromhex(raw_key) if raw_key else None
692695
entity_key_proto = (
693696
deserialize_entity_key(entity_key_bytes)
694697
if entity_key_bytes

sdk/python/feast/metrics.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,11 @@ def build_metrics_flags(metrics_config: Optional[object] = None) -> _MetricsFlag
198198
"Number of entity rows per online feature request",
199199
buckets=(1, 5, 10, 25, 50, 100, 250, 500, 1000),
200200
)
201+
online_features_status_total = Counter(
202+
"feast_online_features_status_total",
203+
"Count of individual feature values by retrieval status per feature view",
204+
["feature_view", "status"],
205+
)
201206

202207
# ---------------------------------------------------------------------------
203208
# Push / write metrics
@@ -334,6 +339,22 @@ def track_online_store_read(duration_seconds: float):
334339
online_store_read_duration_seconds.observe(duration_seconds)
335340

336341

342+
def track_feature_statuses(
343+
feature_view_name: str, present_count: int, not_found_count: int
344+
):
345+
"""Record the number of PRESENT vs NOT_FOUND feature values for a feature view."""
346+
if not _config.online_features:
347+
return
348+
if present_count > 0:
349+
online_features_status_total.labels(
350+
feature_view=feature_view_name, status="present"
351+
).inc(present_count)
352+
if not_found_count > 0:
353+
online_features_status_total.labels(
354+
feature_view=feature_view_name, status="not_found"
355+
).inc(not_found_count)
356+
357+
337358
def track_transformation(odfv_name: str, mode: str, duration_seconds: float):
338359
"""Record the duration of an on-demand feature view read-path transformation."""
339360
if not _config.online_features:

sdk/python/feast/utils.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1566,6 +1566,15 @@ def _populate_response_from_feature_data(
15661566
feat_values[f_idx][out_idx] = feat_val
15671567
feat_statuses[f_idx][out_idx] = PRESENT
15681568

1569+
try:
1570+
from feast.metrics import track_feature_statuses
1571+
1572+
_present = sum(s == PRESENT for row in feat_statuses for s in row)
1573+
_not_found = (n_features * output_len) - _present
1574+
track_feature_statuses(table.name, _present, _not_found)
1575+
except Exception:
1576+
pass
1577+
15691578
for f_idx in range(n_features):
15701579
online_features_response.results.append(
15711580
GetOnlineFeaturesResponse.FeatureVector(

sdk/python/tests/integration/rest_api/resource/redis.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ spec:
1616
containers:
1717
- name: redis
1818
image: 'quay.io/feastdev-ci/feast-test-images:redis-7-alpine'
19+
command: ["redis-server", "--save", ""]
1920
ports:
2021
- containerPort: 6379
2122
env:

sdk/python/tests/unit/online_store/test_online_retrieval.py

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1714,3 +1714,181 @@ def test_milvus_keyword_search() -> None:
17141714
assert len(result_hybrid["content"]) > 0
17151715
assert any("Feast" in content for content in result_hybrid["content"])
17161716
assert len(result_hybrid["vector"]) > 0
1717+
1718+
1719+
def test_milvus_update_preserves_collection_cache() -> None:
1720+
"""
1721+
Regression test: update() used to overwrite self._collections with the
1722+
describe_collection() dict of the last processed table, replacing the
1723+
dict-of-dicts cache with a single flat dict. After the fix, each call
1724+
to _get_or_create_collection() updates the keyed entry in-place and the
1725+
cache remains a proper mapping from collection name to collection info.
1726+
"""
1727+
from datetime import timedelta
1728+
1729+
from feast import Entity, FeatureView, Field, FileSource
1730+
from feast.types import Array, Float32, Int64, String
1731+
1732+
runner = CliRunner()
1733+
with runner.local_repo(
1734+
example_repo_py=get_example_repo("example_rag_feature_repo.py"),
1735+
offline_store="file",
1736+
online_store="milvus",
1737+
apply=False,
1738+
teardown=False,
1739+
) as store:
1740+
source = FileSource(
1741+
path="data/dummy.parquet",
1742+
timestamp_field="event_timestamp",
1743+
created_timestamp_column="created_timestamp",
1744+
)
1745+
entity_a = Entity(name="id_a", join_keys=["id_a"], value_type=ValueType.INT64)
1746+
entity_b = Entity(name="id_b", join_keys=["id_b"], value_type=ValueType.INT64)
1747+
1748+
fv_a = FeatureView(
1749+
name="fv_a",
1750+
entities=[entity_a],
1751+
schema=[
1752+
Field(name="id_a", dtype=Int64),
1753+
Field(
1754+
name="vec_a",
1755+
dtype=Array(Float32),
1756+
vector_index=True,
1757+
vector_search_metric="COSINE",
1758+
),
1759+
Field(name="text_a", dtype=String),
1760+
],
1761+
source=source,
1762+
ttl=timedelta(hours=1),
1763+
)
1764+
fv_b = FeatureView(
1765+
name="fv_b",
1766+
entities=[entity_b],
1767+
schema=[
1768+
Field(name="id_b", dtype=Int64),
1769+
Field(
1770+
name="vec_b",
1771+
dtype=Array(Float32),
1772+
vector_index=True,
1773+
vector_search_metric="COSINE",
1774+
),
1775+
Field(name="text_b", dtype=String),
1776+
],
1777+
source=source,
1778+
ttl=timedelta(hours=1),
1779+
)
1780+
1781+
store.apply([source, entity_a, entity_b, fv_a, fv_b])
1782+
1783+
online_store = store._provider._online_store
1784+
# After applying two feature views, the cache must be a proper dict
1785+
# mapping collection names to collection-info dicts, not a flat dict.
1786+
assert isinstance(online_store._collections, dict), (
1787+
"_collections should be a dict"
1788+
)
1789+
collection_name_a = f"{store.config.project}_fv_a"
1790+
collection_name_b = f"{store.config.project}_fv_b"
1791+
assert collection_name_a in online_store._collections, (
1792+
f"Cache missing entry for {collection_name_a}"
1793+
)
1794+
assert collection_name_b in online_store._collections, (
1795+
f"Cache missing entry for {collection_name_b} — "
1796+
"update() likely overwrote _collections with a single collection dict"
1797+
)
1798+
# Each cached value must be a collection-info dict (has a 'fields' key),
1799+
# not itself keyed by collection name.
1800+
for name in [collection_name_a, collection_name_b]:
1801+
assert "fields" in online_store._collections[name], (
1802+
f"Cache entry for {name} looks like a corrupted flat dict"
1803+
)
1804+
1805+
1806+
def test_milvus_plan_returns_empty_list() -> None:
1807+
"""
1808+
Regression test: plan() used to raise NotImplementedError, causing
1809+
`feast plan` to crash for any project using the Milvus online store.
1810+
It should return [] matching the OnlineStore base class default.
1811+
"""
1812+
from feast.infra.online_stores.milvus_online_store.milvus import MilvusOnlineStore
1813+
1814+
store = MilvusOnlineStore()
1815+
result = store.plan(config=None, desired_registry_proto=None) # type: ignore[arg-type]
1816+
assert result == [], f"plan() should return [] but returned {result!r}"
1817+
1818+
1819+
def test_milvus_retrieve_online_documents_v2_missing_entity_key() -> None:
1820+
"""
1821+
Regression test: retrieve_online_documents_v2() passed the raw
1822+
hit.get("entity", {}).get(composite_key_name, None) directly to
1823+
bytes.fromhex(), raising TypeError when the key was absent.
1824+
After the fix, a missing composite key produces a None entity_key_proto
1825+
instead of crashing.
1826+
"""
1827+
from datetime import timedelta
1828+
from unittest.mock import patch
1829+
1830+
from feast import Entity, FeatureView, Field, FileSource
1831+
from feast.types import Array, Float32, Int64, String
1832+
1833+
runner = CliRunner()
1834+
with runner.local_repo(
1835+
example_repo_py=get_example_repo("example_rag_feature_repo.py"),
1836+
offline_store="file",
1837+
online_store="milvus",
1838+
apply=False,
1839+
teardown=False,
1840+
) as store:
1841+
source = FileSource(
1842+
path="data/dummy.parquet",
1843+
timestamp_field="event_timestamp",
1844+
created_timestamp_column="created_timestamp",
1845+
)
1846+
entity = Entity(name="doc_id", join_keys=["doc_id"], value_type=ValueType.INT64)
1847+
fv = FeatureView(
1848+
name="docs",
1849+
entities=[entity],
1850+
schema=[
1851+
Field(name="doc_id", dtype=Int64),
1852+
Field(
1853+
name="vec",
1854+
dtype=Array(Float32),
1855+
vector_index=True,
1856+
vector_search_metric="COSINE",
1857+
),
1858+
Field(name="text", dtype=String),
1859+
],
1860+
source=source,
1861+
ttl=timedelta(hours=1),
1862+
)
1863+
store.apply([source, entity, fv])
1864+
1865+
online_store = store._provider._online_store
1866+
fv_obj = store.get_feature_view("docs")
1867+
# Simulate a search hit that is missing the composite primary key.
1868+
fake_hit = {
1869+
"entity": {
1870+
"event_ts": int(_utc_now().timestamp() * 1e6),
1871+
"created_ts": int(_utc_now().timestamp() * 1e6),
1872+
"text": "hello",
1873+
},
1874+
"distance": 0.9,
1875+
}
1876+
1877+
mock_results = [[fake_hit]]
1878+
with patch.object(online_store.client, "search", return_value=mock_results):
1879+
with patch.object(
1880+
online_store.client, "load_collection", return_value=None
1881+
):
1882+
# Before the fix this raised TypeError: fromhex argument must be str, not None
1883+
result = online_store.retrieve_online_documents_v2(
1884+
config=store.config,
1885+
table=fv_obj,
1886+
requested_features=["text"],
1887+
embedding=[0.1] * 10,
1888+
top_k=1,
1889+
)
1890+
assert len(result) == 1
1891+
_ts, entity_key_proto, _features = result[0]
1892+
assert entity_key_proto is None, (
1893+
"entity_key_proto should be None when the composite key is absent from the hit"
1894+
)

0 commit comments

Comments
 (0)