From 1a22adbaeb378c441c7b7d329019564a1192fb36 Mon Sep 17 00:00:00 2001 From: Krishna mohan Pulipati Date: Thu, 30 Oct 2025 00:30:36 -0500 Subject: [PATCH 1/4] feat: Redis range query materialization --- sdk/python/feast/infra/key_encoding_utils.py | 15 + sdk/python/feast/infra/online_stores/redis.py | 148 ++++++---- .../redis_online_store_creator.py | 32 +++ .../unit/infra/online_store/test_redis.py | 256 ++++++++++++++++++ 4 files changed, 400 insertions(+), 51 deletions(-) create mode 100644 sdk/python/tests/unit/infra/online_store/redis_online_store_creator.py diff --git a/sdk/python/feast/infra/key_encoding_utils.py b/sdk/python/feast/infra/key_encoding_utils.py index 1f9ffeef140..0966d13d770 100644 --- a/sdk/python/feast/infra/key_encoding_utils.py +++ b/sdk/python/feast/infra/key_encoding_utils.py @@ -19,6 +19,12 @@ def _serialize_val( if 0 <= entity_key_serialization_version <= 1: return struct.pack(" ValueProto: return ValueProto(string_val=value) elif value_type == ValueType.BYTES: return ValueProto(bytes_val=value_bytes) + elif value_type == ValueType.UNIX_TIMESTAMP: + value = struct.unpack(" None: online_store_config = config.online_store assert isinstance(online_store_config, RedisOnlineStoreConfig) @@ -292,50 +292,96 @@ def online_write_batch( keys = [] # redis pipelining optimization: send multiple commands to redis server without waiting for every reply with client.pipeline(transaction=False) as pipe: - # check if a previous record under the key bin exists - # TODO: investigate if check and set is a better approach rather than pulling all entity ts and then setting - # it may be significantly slower but avoids potential (rare) race conditions - for entity_key, _, _, _ in data: - redis_key_bin = _redis_key( - project, - entity_key, - entity_key_serialization_version=config.entity_key_serialization_version, - ) - keys.append(redis_key_bin) - pipe.hmget(redis_key_bin, ts_key) - prev_event_timestamps = pipe.execute() - # flattening the list of lists. `hmget` does the lookup assuming a list of keys in the key bin - prev_event_timestamps = [i[0] for i in prev_event_timestamps] - - for redis_key_bin, prev_event_time, (_, values, timestamp, _) in zip( - keys, prev_event_timestamps, data - ): - event_time_seconds = int(utils.make_tzaware(timestamp).timestamp()) - - # ignore if event_timestamp is before the event features that are currently in the feature store - if prev_event_time: - prev_ts = Timestamp() - prev_ts.ParseFromString(prev_event_time) - if prev_ts.seconds and event_time_seconds <= prev_ts.seconds: - # TODO: somehow signal that it's not overwriting the current record? - if progress: - progress(1) - continue - - ts = Timestamp() - ts.seconds = event_time_seconds - entity_hset = dict() - entity_hset[ts_key] = ts.SerializeToString() - - for feature_name, val in values.items(): - f_key = _mmh3(f"{feature_view}:{feature_name}") - entity_hset[f_key] = val.SerializeToString() - - pipe.hset(redis_key_bin, mapping=entity_hset) - - ttl = online_store_config.key_ttl_seconds - if ttl: - pipe.expire(name=redis_key_bin, time=ttl) + if isinstance(table, SortedFeatureView): + if len(table.sort_keys) == 1: + sort_key_name = table.sort_keys[0].name + sort_key_value_type = table.sort_keys[0].value_type + for entity_key, values, timestamp, _ in data: + redis_key_bin = _redis_key( + project, + entity_key, + entity_key_serialization_version=config.entity_key_serialization_version, + ) + entity_key.join_keys.append(sort_key_name) + entity_key.entity_values.append(values[sort_key_name]) + redis_key_bin_with_sort_keys = _redis_key( + project, + entity_key, + entity_key_serialization_version=config.entity_key_serialization_version, + ) + + event_time_seconds = int(utils.make_tzaware(timestamp).timestamp()) + ts = Timestamp() + ts.seconds = event_time_seconds + entity_hset = dict() + entity_hset[ts_key] = ts.SerializeToString() + + for feature_name, val in values.items(): + f_key = _mmh3(f"{feature_view}:{feature_name}") + entity_hset[f_key] = val.SerializeToString() + if feature_name == sort_key_name: + feast_value_type = val.WhichOneof("val") + if feast_value_type == "unix_timestamp_val": + feature_value = ( + val.unix_timestamp_val * 1000 + ) # Convert to milliseconds + else: + feature_value = getattr(val, str(feast_value_type)) + score = feature_value + member = redis_key_bin_with_sort_keys + zset_key = f"{project}:{table.name}:{feature_name}:{redis_key_bin}" + pipe.zadd(zset_key, {member: score}) + + pipe.hset(redis_key_bin_with_sort_keys, mapping=entity_hset) + + ttl = online_store_config.key_ttl_seconds + if ttl: + pipe.expire(name=redis_key_bin_with_sort_keys, time=ttl) + else: + # check if a previous record under the key bin exists + # TODO: investigate if check and set is a better approach rather than pulling all entity ts and then setting + # it may be significantly slower but avoids potential (rare) race conditions + for entity_key, _, _, _ in data: + redis_key_bin = _redis_key( + project, + entity_key, + entity_key_serialization_version=config.entity_key_serialization_version, + ) + keys.append(redis_key_bin) + pipe.hmget(redis_key_bin, ts_key) + prev_event_timestamps = pipe.execute() + # flattening the list of lists. `hmget` does the lookup assuming a list of keys in the key bin + prev_event_timestamps = [i[0] for i in prev_event_timestamps] + + for redis_key_bin, prev_event_time, (_, values, timestamp, _) in zip( + keys, prev_event_timestamps, data + ): + event_time_seconds = int(utils.make_tzaware(timestamp).timestamp()) + + # ignore if event_timestamp is before the event features that are currently in the feature store + if prev_event_time: + prev_ts = Timestamp() + prev_ts.ParseFromString(prev_event_time) + if prev_ts.seconds and event_time_seconds <= prev_ts.seconds: + # TODO: somehow signal that it's not overwriting the current record? + if progress: + progress(1) + continue + + ts = Timestamp() + ts.seconds = event_time_seconds + entity_hset = dict() + entity_hset[ts_key] = ts.SerializeToString() + + for feature_name, val in values.items(): + f_key = _mmh3(f"{feature_view}:{feature_name}") + entity_hset[f_key] = val.SerializeToString() + + pipe.hset(redis_key_bin, mapping=entity_hset) + + ttl = online_store_config.key_ttl_seconds + if ttl: + pipe.expire(name=redis_key_bin, time=ttl) results = pipe.execute() if progress: progress(len(results)) diff --git a/sdk/python/tests/unit/infra/online_store/redis_online_store_creator.py b/sdk/python/tests/unit/infra/online_store/redis_online_store_creator.py new file mode 100644 index 00000000000..a38ec41a32e --- /dev/null +++ b/sdk/python/tests/unit/infra/online_store/redis_online_store_creator.py @@ -0,0 +1,32 @@ +from typing import Dict + +from testcontainers.redis import RedisContainer +from testcontainers.core.waiting_utils import wait_for_logs + +from tests.integration.feature_repos.universal.online_store_creator import ( + OnlineStoreCreator, +) + + +class RedisOnlineStoreCreator(OnlineStoreCreator): + def __init__(self, project_name: str, **kwargs): + super().__init__(project_name) + with RedisContainer("redis:latest") as redis_container: + self.container = redis_container + + def create_online_store(self) -> Dict[str, str]: + self.container.start() + log_string_to_wait_for = "Server initialized" + wait_for_logs( + container=self.container, predicate=log_string_to_wait_for, timeout=120 + ) + host = self.container.get_container_host_ip() + exposed_port = int(self.container.get_exposed_port(self.container.port)) + connection_string = f"{host}:{exposed_port}" + print(f"connection_string: {connection_string}") + return { + "connection_string": connection_string, + } + + def teardown(self): + self.container.stop() \ No newline at end of file diff --git a/sdk/python/tests/unit/infra/online_store/test_redis.py b/sdk/python/tests/unit/infra/online_store/test_redis.py index c26c2f25c5f..e969a1264ad 100644 --- a/sdk/python/tests/unit/infra/online_store/test_redis.py +++ b/sdk/python/tests/unit/infra/online_store/test_redis.py @@ -128,3 +128,259 @@ def test_get_features_for_entity(redis_online_store: RedisOnlineStore, feature_v assert "feature_view_1:feature_11" in features assert features["feature_view_1:feature_10"].int32_val == 1 assert features["feature_view_1:feature_11"].int32_val == 2 + +def test_redis_online_write_batch_with_timestamp_as_sortkey( + repo_config: RepoConfig, + redis_online_store: RedisOnlineStore, +): + ( + feature_view, + data, + ) = _create_sorted_feature_view_with_timestamp_as_sortkey() + + redis_online_store.online_write_batch( + config=repo_config, + table=feature_view, + data=data, + progress=None, + ) + + connection_string = repo_config.online_store.connection_string + connection_string_split = connection_string.split(":") + conn_dict = {} + conn_dict["host"] = connection_string_split[0] + conn_dict["port"] = connection_string_split[1] + + r = Redis(**conn_dict) + + pipe = r.pipeline(transaction=True) + + entity_key_driver_1 = EntityKeyProto( + join_keys=["driver_id"], + entity_values=[ValueProto(int32_val=1)], + ) + + redis_key_bin_driver_1 = _redis_key( + repo_config.project, + entity_key_driver_1, + entity_key_serialization_version=repo_config.entity_key_serialization_version, + ) + + zset_key_driver_1 = f"{repo_config.project}:{feature_view.name}:{feature_view.sort_keys[0].name}:{redis_key_bin_driver_1}" + + entity_key_driver_2 = EntityKeyProto( + join_keys=["driver_id"], + entity_values=[ValueProto(int32_val=2)], + ) + redis_key_bin_driver_2 = _redis_key( + repo_config.project, + entity_key_driver_2, + entity_key_serialization_version=repo_config.entity_key_serialization_version, + ) + + zset_key_driver_2 = f"{repo_config.project}:{feature_view.name}:{feature_view.sort_keys[0].name}:{redis_key_bin_driver_2}" + + driver_1_zset_members = r.zrange(zset_key_driver_1, 0, -1, withscores=True) + driver_2_zset_members = r.zrange(zset_key_driver_2, 0, -1, withscores=True) + + assert len(driver_1_zset_members) == 5 + assert len(driver_2_zset_members) == 5 + + # Get last 3 trips for both drivers from the respective sorted sets + last_3_trips_driver_1 = r.zrevrangebyscore(zset_key_driver_1, "+inf", "-inf", start=0, num=3) + last_3_trips_driver_2 = r.zrevrangebyscore(zset_key_driver_2, "+inf", "-inf", start=0, num=3) + + # Look up features for last 3 trips for driver 1 + for id in last_3_trips_driver_1: + pipe.hgetall(id) + + # Look up features for last 3 trips for driver 2 + for id in last_3_trips_driver_2: + pipe.hgetall(id) + + features_list = pipe.execute() + + trip_id_feature_name = _mmh3(f"{feature_view.name}:trip_id") + trip_id_drivers = [] + for feature_dict in features_list: + val = ValueProto() + val.ParseFromString(feature_dict[trip_id_feature_name]) + trip_id_drivers.append(val.int32_val) + assert trip_id_drivers == [4, 3, 2, 9, 8, 7] + + +def test_redis_online_write_batch_with_float_as_sortkey( + repo_config: RepoConfig, + redis_online_store: RedisOnlineStore, +): + ( + feature_view, + data, + ) = _create_sorted_feature_view_with_float_as_sortkey() + + redis_online_store.online_write_batch( + config=repo_config, + table=feature_view, + data=data, + progress=None, + ) + + connection_string = repo_config.online_store.connection_string + connection_string_split = connection_string.split(":") + conn_dict = {} + conn_dict["host"] = connection_string_split[0] + conn_dict["port"] = connection_string_split[1] + + r = Redis(**conn_dict) + + pipe = r.pipeline(transaction=True) + + entity_key_driver_1 = EntityKeyProto( + join_keys=["driver_id"], + entity_values=[ValueProto(int32_val=1)], + ) + + redis_key_bin_driver_1 = _redis_key( + repo_config.project, + entity_key_driver_1, + entity_key_serialization_version=repo_config.entity_key_serialization_version, + ) + + zset_key_driver_1 = f"{repo_config.project}:{feature_view.name}:{feature_view.sort_keys[0].name}:{redis_key_bin_driver_1}" + + entity_key_driver_2 = EntityKeyProto( + join_keys=["driver_id"], + entity_values=[ValueProto(int32_val=2)], + ) + redis_key_bin_driver_2 = _redis_key( + repo_config.project, + entity_key_driver_2, + entity_key_serialization_version=repo_config.entity_key_serialization_version, + ) + + zset_key_driver_2 = f"{repo_config.project}:{feature_view.name}:{feature_view.sort_keys[0].name}:{redis_key_bin_driver_2}" + + driver_1_zset_members = r.zrange(zset_key_driver_1, 0, -1, withscores=True) + driver_2_zset_members = r.zrange(zset_key_driver_2, 0, -1, withscores=True) + + assert len(driver_1_zset_members) == 5 + assert len(driver_2_zset_members) == 5 + + # Get trips for driver 1 where ratings between 2.5 and 4.5 + # Get trips for driver 2 where ratings between 7.5 and 9.5 + driver_1_trips = r.zrangebyscore(zset_key_driver_1, 2.5, 4.5) + driver_2_trips = r.zrangebyscore(zset_key_driver_2, 7.5, 9.5) + + # Look up features for trips for driver 1 + for id in driver_1_trips: + pipe.hgetall(id) + + # Look up features for trips for driver 2 + for id in driver_2_trips: + pipe.hgetall(id) + + features_list = pipe.execute() + + trip_id_feature_name = _mmh3(f"{feature_view.name}:trip_id") + trip_id_drivers = [] + for feature_dict in features_list: + val = ValueProto() + val.ParseFromString(feature_dict[trip_id_feature_name]) + trip_id_drivers.append(val.int32_val) + assert trip_id_drivers == [2, 3, 4, 7, 8, 9] + + +def _create_sorted_feature_view_with_timestamp_as_sortkey(): + fv = SortedFeatureView( + name="driver_stats", + source=FileSource( + name="my_file_source", + path="test.parquet", + timestamp_field="event_timestamp", + ), + entities=[Entity(name="driver_id")], + ttl=timedelta(seconds=10), + sort_keys=[ + SortKey( + name="event_timestamp", + value_type=ValueType.UNIX_TIMESTAMP, + default_sort_order=SortOrder.DESC, + ) + ], + schema=[ + Field( + name="driver_id", + dtype=Int32, + ), + Field(name="event_timestamp", dtype=UnixTimestamp), + Field( + name="trip_id", + dtype=Int32, + ), + Field( + name="rating", + dtype=Float32, + ), + ], + ) + + return fv, _make_rows() + + +def _create_sorted_feature_view_with_float_as_sortkey(n=10): + fv = SortedFeatureView( + name="driver_stats", + source=FileSource( + name="my_file_source", + path="test.parquet", + timestamp_field="event_timestamp", + ), + entities=[Entity(name="driver_id")], + ttl=timedelta(seconds=10), + sort_keys=[ + SortKey( + name="rating", + value_type=ValueType.FLOAT, + default_sort_order=SortOrder.DESC, + ) + ], + schema=[ + Field( + name="driver_id", + dtype=Int32, + ), + Field(name="event_timestamp", dtype=UnixTimestamp), + Field( + name="trip_id", + dtype=Int32, + ), + Field( + name="rating", + dtype=Float32, + ), + ], + ) + + return fv, _make_rows() + + +def _make_rows(n=10): + """Generate 10 rows split between driver_id 1 (first 5) and 2 (rest), + with rating = i + 0.5 and an event_timestamp spanning ~15 minutes.""" + return [ + ( + EntityKeyProto( + join_keys=["driver_id"], + entity_values=[ValueProto(int32_val=1) if i <= 4 else ValueProto(int32_val=2)], + ), + { + "trip_id": ValueProto(int32_val=i), + "rating": ValueProto(float_val=i + 0.5), + "event_timestamp": ValueProto(unix_timestamp_val=int( + ((datetime.utcnow() - timedelta(minutes=15)) + timedelta(minutes=i)).timestamp())), + }, + datetime.utcnow(), + None, + ) + for i in range(n) + ] \ No newline at end of file From 2fafff5d811ec2d20a7f0f8050d75c29158d9657 Mon Sep 17 00:00:00 2001 From: Krishna mohan Pulipati Date: Thu, 30 Oct 2025 10:59:09 -0500 Subject: [PATCH 2/4] fix: fix tests --- .../unit/infra/online_store/test_redis.py | 39 ++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/sdk/python/tests/unit/infra/online_store/test_redis.py b/sdk/python/tests/unit/infra/online_store/test_redis.py index e969a1264ad..a7e772a1b97 100644 --- a/sdk/python/tests/unit/infra/online_store/test_redis.py +++ b/sdk/python/tests/unit/infra/online_store/test_redis.py @@ -1,17 +1,54 @@ import pytest from google.protobuf.timestamp_pb2 import Timestamp +from datetime import datetime, timedelta from feast import Entity, FeatureView, Field, FileSource, RepoConfig -from feast.infra.online_stores.redis import RedisOnlineStore +from feast.infra.online_stores.redis import RedisOnlineStore, RedisOnlineStoreConfig from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast import Entity, Field, FileSource, RepoConfig, ValueType, utils from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.protos.feast.core.SortedFeatureView_pb2 import SortOrder +from feast.infra.online_stores.helpers import _mmh3, _redis_key, _redis_key_prefix +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import ( + BoolList, + BytesList, + DoubleList, + FloatList, + Int32List, + Int64List, + StringList, +) +from redis import Redis +from feast.sorted_feature_view import SortedFeatureView, SortKey +from feast.types import ( + Array, + Bool, + Bytes, + Float32, + Float64, + Int32, + Int64, + String, + UnixTimestamp, +) from feast.types import Int32 +from tests.unit.infra.online_store.redis_online_store_creator import ( + RedisOnlineStoreCreator, +) + @pytest.fixture def redis_online_store() -> RedisOnlineStore: return RedisOnlineStore() +@pytest.fixture(scope="session") +def redis_online_store_config(): + creator = RedisOnlineStoreCreator("redis_project") + config = creator.create_online_store() + yield config + creator.teardown() @pytest.fixture def repo_config(): From aa257af161e04faa8d0c364a6f12e8f74c41e0f1 Mon Sep 17 00:00:00 2001 From: Krishna mohan Pulipati Date: Thu, 30 Oct 2025 11:42:30 -0500 Subject: [PATCH 3/4] fix: fix tests --- sdk/python/feast/infra/online_stores/redis.py | 1 + .../tests/unit/infra/online_store/test_redis.py | 14 ++++++++++---- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/redis.py b/sdk/python/feast/infra/online_stores/redis.py index 6bf571e7a69..072f0ccfce0 100644 --- a/sdk/python/feast/infra/online_stores/redis.py +++ b/sdk/python/feast/infra/online_stores/redis.py @@ -37,6 +37,7 @@ from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import FeastConfigBaseModel +from feast.sorted_feature_view import SortedFeatureView try: from redis import Redis diff --git a/sdk/python/tests/unit/infra/online_store/test_redis.py b/sdk/python/tests/unit/infra/online_store/test_redis.py index a7e772a1b97..c14c1bd4376 100644 --- a/sdk/python/tests/unit/infra/online_store/test_redis.py +++ b/sdk/python/tests/unit/infra/online_store/test_redis.py @@ -43,6 +43,7 @@ def redis_online_store() -> RedisOnlineStore: return RedisOnlineStore() + @pytest.fixture(scope="session") def redis_online_store_config(): creator = RedisOnlineStoreCreator("redis_project") @@ -50,11 +51,15 @@ def redis_online_store_config(): yield config creator.teardown() + @pytest.fixture -def repo_config(): +def repo_config(redis_online_store_config): return RepoConfig( provider="local", project="test", + online_store=RedisOnlineStoreConfig( + connection_string=redis_online_store_config["connection_string"], + ), entity_key_serialization_version=2, registry="dummy_registry.db", ) @@ -92,7 +97,7 @@ def test_generate_entity_redis_keys(redis_online_store: RedisOnlineStore, repo_c def test_generate_hset_keys_for_features( - redis_online_store: RedisOnlineStore, feature_view + redis_online_store: RedisOnlineStore, feature_view ): actual = redis_online_store._generate_hset_keys_for_features(feature_view) expected = ( @@ -103,7 +108,7 @@ def test_generate_hset_keys_for_features( def test_generate_hset_keys_for_features_with_requested_features( - redis_online_store: RedisOnlineStore, feature_view + redis_online_store: RedisOnlineStore, feature_view ): actual = redis_online_store._generate_hset_keys_for_features( feature_view=feature_view, requested_features=["my-feature-view:feature1"] @@ -116,7 +121,7 @@ def test_generate_hset_keys_for_features_with_requested_features( def test_convert_redis_values_to_protobuf( - redis_online_store: RedisOnlineStore, feature_view + redis_online_store: RedisOnlineStore, feature_view ): requested_features = [ "feature_view_1:feature_10", @@ -166,6 +171,7 @@ def test_get_features_for_entity(redis_online_store: RedisOnlineStore, feature_v assert features["feature_view_1:feature_10"].int32_val == 1 assert features["feature_view_1:feature_11"].int32_val == 2 + def test_redis_online_write_batch_with_timestamp_as_sortkey( repo_config: RepoConfig, redis_online_store: RedisOnlineStore, From 4048a9c0052dec49bf3b5078d4e75955c9bb8e4d Mon Sep 17 00:00:00 2001 From: Krishna mohan Pulipati Date: Thu, 30 Oct 2025 11:45:21 -0500 Subject: [PATCH 4/4] fix: fix lint --- sdk/python/feast/infra/online_stores/redis.py | 23 +++--- .../redis_online_store_creator.py | 4 +- .../unit/infra/online_store/test_redis.py | 70 +++++++++---------- 3 files changed, 46 insertions(+), 51 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/redis.py b/sdk/python/feast/infra/online_stores/redis.py index 072f0ccfce0..16e48860d87 100644 --- a/sdk/python/feast/infra/online_stores/redis.py +++ b/sdk/python/feast/infra/online_stores/redis.py @@ -274,13 +274,13 @@ async def _get_client_async(self, online_store_config: RedisOnlineStoreConfig): return self._client_async def online_write_batch( - self, - config: RepoConfig, - table: FeatureView, - data: List[ - Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] - ], - progress: Optional[Callable[[int], Any]], + self, + config: RepoConfig, + table: FeatureView, + data: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], ) -> None: online_store_config = config.online_store assert isinstance(online_store_config, RedisOnlineStoreConfig) @@ -296,7 +296,6 @@ def online_write_batch( if isinstance(table, SortedFeatureView): if len(table.sort_keys) == 1: sort_key_name = table.sort_keys[0].name - sort_key_value_type = table.sort_keys[0].value_type for entity_key, values, timestamp, _ in data: redis_key_bin = _redis_key( project, @@ -311,7 +310,9 @@ def online_write_batch( entity_key_serialization_version=config.entity_key_serialization_version, ) - event_time_seconds = int(utils.make_tzaware(timestamp).timestamp()) + event_time_seconds = int( + utils.make_tzaware(timestamp).timestamp() + ) ts = Timestamp() ts.seconds = event_time_seconds entity_hset = dict() @@ -324,7 +325,7 @@ def online_write_batch( feast_value_type = val.WhichOneof("val") if feast_value_type == "unix_timestamp_val": feature_value = ( - val.unix_timestamp_val * 1000 + val.unix_timestamp_val * 1000 ) # Convert to milliseconds else: feature_value = getattr(val, str(feast_value_type)) @@ -355,7 +356,7 @@ def online_write_batch( prev_event_timestamps = [i[0] for i in prev_event_timestamps] for redis_key_bin, prev_event_time, (_, values, timestamp, _) in zip( - keys, prev_event_timestamps, data + keys, prev_event_timestamps, data ): event_time_seconds = int(utils.make_tzaware(timestamp).timestamp()) diff --git a/sdk/python/tests/unit/infra/online_store/redis_online_store_creator.py b/sdk/python/tests/unit/infra/online_store/redis_online_store_creator.py index a38ec41a32e..95bcca5b85d 100644 --- a/sdk/python/tests/unit/infra/online_store/redis_online_store_creator.py +++ b/sdk/python/tests/unit/infra/online_store/redis_online_store_creator.py @@ -1,7 +1,7 @@ from typing import Dict -from testcontainers.redis import RedisContainer from testcontainers.core.waiting_utils import wait_for_logs +from testcontainers.redis import RedisContainer from tests.integration.feature_repos.universal.online_store_creator import ( OnlineStoreCreator, @@ -29,4 +29,4 @@ def create_online_store(self) -> Dict[str, str]: } def teardown(self): - self.container.stop() \ No newline at end of file + self.container.stop() diff --git a/sdk/python/tests/unit/infra/online_store/test_redis.py b/sdk/python/tests/unit/infra/online_store/test_redis.py index c14c1bd4376..3ec2c196f95 100644 --- a/sdk/python/tests/unit/infra/online_store/test_redis.py +++ b/sdk/python/tests/unit/infra/online_store/test_redis.py @@ -1,39 +1,21 @@ +from datetime import datetime, timedelta + import pytest from google.protobuf.timestamp_pb2 import Timestamp -from datetime import datetime, timedelta +from redis import Redis -from feast import Entity, FeatureView, Field, FileSource, RepoConfig +from feast import Entity, FeatureView, Field, FileSource, RepoConfig, ValueType +from feast.infra.online_stores.helpers import _mmh3, _redis_key from feast.infra.online_stores.redis import RedisOnlineStore, RedisOnlineStoreConfig -from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto -from feast import Entity, Field, FileSource, RepoConfig, ValueType, utils -from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.protos.feast.core.SortedFeatureView_pb2 import SortOrder -from feast.infra.online_stores.helpers import _mmh3, _redis_key, _redis_key_prefix from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto -from feast.protos.feast.types.Value_pb2 import ( - BoolList, - BytesList, - DoubleList, - FloatList, - Int32List, - Int64List, - StringList, -) -from redis import Redis +from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.sorted_feature_view import SortedFeatureView, SortKey from feast.types import ( - Array, - Bool, - Bytes, Float32, - Float64, Int32, - Int64, - String, UnixTimestamp, ) -from feast.types import Int32 - from tests.unit.infra.online_store.redis_online_store_creator import ( RedisOnlineStoreCreator, ) @@ -97,7 +79,7 @@ def test_generate_entity_redis_keys(redis_online_store: RedisOnlineStore, repo_c def test_generate_hset_keys_for_features( - redis_online_store: RedisOnlineStore, feature_view + redis_online_store: RedisOnlineStore, feature_view ): actual = redis_online_store._generate_hset_keys_for_features(feature_view) expected = ( @@ -108,7 +90,7 @@ def test_generate_hset_keys_for_features( def test_generate_hset_keys_for_features_with_requested_features( - redis_online_store: RedisOnlineStore, feature_view + redis_online_store: RedisOnlineStore, feature_view ): actual = redis_online_store._generate_hset_keys_for_features( feature_view=feature_view, requested_features=["my-feature-view:feature1"] @@ -121,7 +103,7 @@ def test_generate_hset_keys_for_features_with_requested_features( def test_convert_redis_values_to_protobuf( - redis_online_store: RedisOnlineStore, feature_view + redis_online_store: RedisOnlineStore, feature_view ): requested_features = [ "feature_view_1:feature_10", @@ -173,8 +155,8 @@ def test_get_features_for_entity(redis_online_store: RedisOnlineStore, feature_v def test_redis_online_write_batch_with_timestamp_as_sortkey( - repo_config: RepoConfig, - redis_online_store: RedisOnlineStore, + repo_config: RepoConfig, + redis_online_store: RedisOnlineStore, ): ( feature_view, @@ -230,8 +212,12 @@ def test_redis_online_write_batch_with_timestamp_as_sortkey( assert len(driver_2_zset_members) == 5 # Get last 3 trips for both drivers from the respective sorted sets - last_3_trips_driver_1 = r.zrevrangebyscore(zset_key_driver_1, "+inf", "-inf", start=0, num=3) - last_3_trips_driver_2 = r.zrevrangebyscore(zset_key_driver_2, "+inf", "-inf", start=0, num=3) + last_3_trips_driver_1 = r.zrevrangebyscore( + zset_key_driver_1, "+inf", "-inf", start=0, num=3 + ) + last_3_trips_driver_2 = r.zrevrangebyscore( + zset_key_driver_2, "+inf", "-inf", start=0, num=3 + ) # Look up features for last 3 trips for driver 1 for id in last_3_trips_driver_1: @@ -253,8 +239,8 @@ def test_redis_online_write_batch_with_timestamp_as_sortkey( def test_redis_online_write_batch_with_float_as_sortkey( - repo_config: RepoConfig, - redis_online_store: RedisOnlineStore, + repo_config: RepoConfig, + redis_online_store: RedisOnlineStore, ): ( feature_view, @@ -409,21 +395,29 @@ def _create_sorted_feature_view_with_float_as_sortkey(n=10): def _make_rows(n=10): """Generate 10 rows split between driver_id 1 (first 5) and 2 (rest), - with rating = i + 0.5 and an event_timestamp spanning ~15 minutes.""" + with rating = i + 0.5 and an event_timestamp spanning ~15 minutes.""" return [ ( EntityKeyProto( join_keys=["driver_id"], - entity_values=[ValueProto(int32_val=1) if i <= 4 else ValueProto(int32_val=2)], + entity_values=[ + ValueProto(int32_val=1) if i <= 4 else ValueProto(int32_val=2) + ], ), { "trip_id": ValueProto(int32_val=i), "rating": ValueProto(float_val=i + 0.5), - "event_timestamp": ValueProto(unix_timestamp_val=int( - ((datetime.utcnow() - timedelta(minutes=15)) + timedelta(minutes=i)).timestamp())), + "event_timestamp": ValueProto( + unix_timestamp_val=int( + ( + (datetime.utcnow() - timedelta(minutes=15)) + + timedelta(minutes=i) + ).timestamp() + ) + ), }, datetime.utcnow(), None, ) for i in range(n) - ] \ No newline at end of file + ]