Skip to content

Commit 09f8d32

Browse files
committed
add support for native TTL, set local_quorum as default
Signed-off-by: Attila Toth <attila.toth@scylladb.com>
1 parent ec48773 commit 09f8d32

1 file changed

Lines changed: 30 additions & 10 deletions

File tree

  • sdk/python/feast/infra/online_stores/scylladb_online_store

sdk/python/feast/infra/online_stores/scylladb_online_store/scylladb.py

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
)
2323
from cassandra.concurrent import execute_concurrent_with_args
2424
from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy
25-
from cassandra.query import PreparedStatement
25+
from cassandra.query import ConsistencyLevel, PreparedStatement
2626
from pydantic import StrictFloat, StrictInt, StrictStr
2727

2828
from feast import Entity, FeatureView, RepoConfig
@@ -72,6 +72,11 @@
7272
" VALUES (?, ?, ?, ?, ?);"
7373
)
7474

75+
INSERT_CQL_TTL = (
76+
"INSERT INTO {fqtable} (feature_name, value, entity_key, event_ts, created_ts)"
77+
" VALUES (?, ?, ?, ?, ?) USING TTL ?;"
78+
)
79+
7580
SELECT_CQL = "SELECT {columns} FROM {fqtable} WHERE entity_key = ?;"
7681

7782
# Dedicated table to serve vector queries,
@@ -112,6 +117,7 @@
112117
"create": (CREATE_TABLE_CQL, False),
113118
"drop": (DROP_TABLE_CQL, False),
114119
"insert": (INSERT_CQL, True),
120+
"insert_ttl": (INSERT_CQL_TTL, True),
115121
"select": (SELECT_CQL, True),
116122
}
117123

@@ -309,15 +315,20 @@ def _get_session(self, config: RepoConfig) -> Session:
309315
exe_profile = ExecutionProfile(
310316
request_timeout=online_store_config.request_timeout,
311317
load_balancing_policy=lb_policy,
318+
consistency_level=ConsistencyLevel.LOCAL_QUORUM,
312319
)
313320
execution_profiles: Optional[Dict] = {EXEC_PROFILE_DEFAULT: exe_profile}
314321
elif online_store_config.request_timeout is not None:
315322
exe_profile = ExecutionProfile(
316323
request_timeout=online_store_config.request_timeout,
324+
consistency_level=ConsistencyLevel.LOCAL_QUORUM,
317325
)
318326
execution_profiles = {EXEC_PROFILE_DEFAULT: exe_profile}
319327
else:
320-
execution_profiles = None
328+
exe_profile = ExecutionProfile(
329+
consistency_level=ConsistencyLevel.LOCAL_QUORUM,
330+
)
331+
execution_profiles = {EXEC_PROFILE_DEFAULT: exe_profile}
321332

322333
cluster_kwargs: Dict[str, Any] = {
323334
k: v
@@ -334,7 +345,7 @@ def _get_session(self, config: RepoConfig) -> Session:
334345
**cluster_kwargs,
335346
)
336347
self._keyspace = keyspace
337-
self._session = self._cluster.connect(self._keyspace)
348+
self._session = self._cluster.connect(keyspace)
338349
return self._session
339350

340351
@staticmethod
@@ -484,29 +495,38 @@ def online_write_batch(
484495
default_sim = online_store_config.vector_similarity_function
485496
vec_features = _get_vector_features(table, default_sim)
486497

487-
# --- regular table (vector_value populated for vector feature rows) ---
498+
# Compute TTL in seconds from the FeatureView's ttl field (timedelta or None).
499+
# ScyllaDB will automatically expire rows after this many seconds, covering
500+
# both "support for ttl at retrieval" and "support for deleting expired data".
501+
ttl_seconds: Optional[int] = (
502+
int(table.ttl.total_seconds()) if table.ttl else None
503+
)
504+
505+
# --- regular table ---
488506
fqtable = self._fq_table_name(self._keyspace, project, table)
489-
insert_stmt = self._get_statement(config, "insert", fqtable)
507+
insert_op = "insert_ttl" if ttl_seconds is not None else "insert"
508+
insert_stmt = self._get_statement(config, insert_op, fqtable)
490509

491510
session = self._get_session(config)
492511

493-
# --- main table rows (all features, no vector column) ---
494-
def _main_rows() -> Iterable[
495-
Tuple[str, bytes, str, datetime, Optional[datetime]]
496-
]:
512+
# --- main table rows (all features) ---
513+
def _main_rows() -> Iterable[Tuple]:
497514
for entity_key, values, timestamp, created_ts in data:
498515
entity_key_bin = serialize_entity_key(
499516
entity_key,
500517
entity_key_serialization_version=config.entity_key_serialization_version,
501518
).hex()
502519
for feature_name, val in values.items():
503-
yield (
520+
row: Tuple = (
504521
feature_name,
505522
val.SerializeToString(),
506523
entity_key_bin,
507524
timestamp,
508525
created_ts,
509526
)
527+
if ttl_seconds is not None:
528+
row = row + (ttl_seconds,)
529+
yield row
510530
if progress:
511531
progress(1)
512532

0 commit comments

Comments
 (0)