Skip to content

Commit 2a19428

Browse files
zabarnEXPEbdodla
andauthored
feat: TTL support for ScyllaDB config and FeatureViews (#154)
* feat: TTL support for ScyllaDB tables and FeatureViews * fix: lint formatting * fix: set TTL as a tag in FeatureViews * fix: lint formatting * fix: remove ttl_clause from write_rows * just make online_store_ttl an int rather than timedelta * fix: address comments * chore: revert repo_configuration changes * fix: lint formatting * fix: change from online_store_ttl to online_store_key_ttl_seconds * chore: change Redis key_tt_seconds docstring * change Cassandra key_ttle_seconds docstring Co-authored-by: Bhargav Dodla <13788369+EXPEbdodla@users.noreply.github.com> --------- Co-authored-by: Bhargav Dodla <13788369+EXPEbdodla@users.noreply.github.com>
1 parent 43c94a6 commit 2a19428

4 files changed

Lines changed: 81 additions & 8 deletions

File tree

sdk/python/feast/feature_view.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -494,3 +494,22 @@ def most_recent_end_time(self) -> Optional[datetime]:
494494
if len(self.materialization_intervals) == 0:
495495
return None
496496
return max([interval[1] for interval in self.materialization_intervals])
497+
498+
@property
499+
def online_store_key_ttl_seconds(self) -> Optional[int]:
500+
"""
501+
Retrieves the online store TTL from the FeatureView's tags.
502+
503+
Returns:
504+
An integer representing the TTL in seconds, or None if not set.
505+
"""
506+
ttl_str = self.tags.get("online_store_key_ttl_seconds")
507+
if ttl_str:
508+
try:
509+
return int(ttl_str)
510+
except ValueError:
511+
raise ValueError(
512+
f"Invalid online_store_key_ttl_seconds value '{ttl_str}' in tags. It must be an integer representing seconds."
513+
)
514+
else:
515+
return None

sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@
8888
event_ts TIMESTAMP,
8989
created_ts TIMESTAMP,
9090
PRIMARY KEY ((entity_key), feature_name)
91-
) WITH CLUSTERING ORDER BY (feature_name ASC);
91+
) WITH CLUSTERING ORDER BY (feature_name ASC) AND default_time_to_live={ttl};
9292
"""
9393

9494
DROP_TABLE_CQL_TEMPLATE = "DROP TABLE IF EXISTS {fqtable};"
@@ -159,6 +159,9 @@ class CassandraOnlineStoreConfig(FeastConfigBaseModel):
159159
Table deletion is not currently supported in this mode.
160160
"""
161161

162+
key_ttl_seconds: Optional[StrictInt] = None
163+
"""Default TTL (in seconds) to apply to all tables if not specified in FeatureView. Value 0 or None means No TTL."""
164+
162165
class CassandraLoadBalancingPolicy(FeastConfigBaseModel):
163166
"""
164167
Configuration block related to the Cluster's load-balancing policy.
@@ -566,8 +569,13 @@ def _create_table(self, config: RepoConfig, project: str, table: FeatureView):
566569
session: Session = self._get_session(config)
567570
keyspace: str = self._keyspace
568571
fqtable = CassandraOnlineStore._fq_table_name(keyspace, project, table)
569-
create_cql = self._get_cql_statement(config, "create", fqtable)
570-
logger.info(f"Creating table {fqtable}.")
572+
ttl = (
573+
table.online_store_key_ttl_seconds
574+
or config.online_store.key_ttl_seconds
575+
or 0
576+
)
577+
create_cql = self._get_cql_statement(config, "create", fqtable, ttl=ttl)
578+
logger.info(f"Creating table {fqtable} with TTL {ttl}.")
571579
session.execute(create_cql)
572580

573581
def _get_cql_statement(

sdk/python/feast/infra/online_stores/redis.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ class RedisOnlineStoreConfig(FeastConfigBaseModel):
7474
format: host:port,parameter1,parameter2 eg. redis:6379,db=0 """
7575

7676
key_ttl_seconds: Optional[int] = None
77-
"""(Optional) redis key bin ttl (in seconds) for expiring entities"""
77+
"""(Optional) redis key bin ttl (in seconds) for expiring entities. Value None means No TTL. Value 0 means expire in 0 seconds."""
7878

7979
full_scan_for_deletion: Optional[bool] = True
8080
"""(Optional) whether to scan for deletion of features"""
@@ -330,10 +330,13 @@ def online_write_batch(
330330

331331
pipe.hset(redis_key_bin, mapping=entity_hset)
332332

333-
if online_store_config.key_ttl_seconds:
334-
pipe.expire(
335-
name=redis_key_bin, time=online_store_config.key_ttl_seconds
336-
)
333+
ttl = (
334+
table.online_store_key_ttl_seconds
335+
or online_store_config.key_ttl_seconds
336+
or None
337+
)
338+
if ttl:
339+
pipe.expire(name=redis_key_bin, time=ttl)
337340
results = pipe.execute()
338341
if progress:
339342
progress(len(results))

sdk/python/tests/unit/test_feature_views.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,3 +168,46 @@ def test_update_materialization_intervals():
168168
second_updated_feature_view.materialization_intervals[0][1]
169169
== updated_feature_view.materialization_intervals[0][1]
170170
)
171+
172+
173+
def test_online_store_key_ttl_seconds_retrieval():
174+
# Test when TTL is set as a valid integer in tags
175+
file_source = FileSource(name="my-file-source", path="test.parquet")
176+
feature_view = FeatureView(
177+
name="feature_view_with_ttl",
178+
entities=[],
179+
schema=[Field(name="feature1", dtype=Float32)],
180+
source=file_source,
181+
tags={"online_store_key_ttl_seconds": "3600"},
182+
)
183+
assert feature_view.online_store_key_ttl_seconds == 3600
184+
185+
186+
def test_online_store_key_ttl_seconds_none_when_not_set():
187+
# Test when TTL is not set in tags, expecting None
188+
file_source = FileSource(name="my-file-source", path="test.parquet")
189+
feature_view = FeatureView(
190+
name="feature_view_without_ttl",
191+
entities=[],
192+
schema=[Field(name="feature1", dtype=Float32)],
193+
source=file_source,
194+
tags={},
195+
)
196+
assert feature_view.online_store_key_ttl_seconds is None
197+
198+
199+
def test_online_store_key_ttl_seconds_invalid_value():
200+
# Test when TTL is set as a non-integer string, expecting a ValueError
201+
file_source = FileSource(name="my-file-source", path="test.parquet")
202+
feature_view = FeatureView(
203+
name="feature_view_invalid_ttl",
204+
entities=[],
205+
schema=[Field(name="feature1", dtype=Float32)],
206+
source=file_source,
207+
tags={"online_store_key_ttl_seconds": "invalid_ttl"},
208+
)
209+
with pytest.raises(
210+
ValueError,
211+
match="Invalid online_store_key_ttl_seconds value 'invalid_ttl' in tags. It must be an integer representing seconds.",
212+
):
213+
_ = feature_view.online_store_key_ttl_seconds

0 commit comments

Comments
 (0)