From 3118ac951bc11f3a279a4eaaaf1e6b296c8273a9 Mon Sep 17 00:00:00 2001 From: Attila Toth Date: Fri, 12 Jun 2026 14:06:32 +0200 Subject: [PATCH 01/12] feat: add ScyllaDB online store with vector search Signed-off-by: Attila Toth --- docs/getting-started/genai.md | 1 + docs/reference/alpha-vector-database.md | 3 +- docs/reference/online-stores/scylladb.md | 93 +- pyproject.toml | 3 +- .../scylladb_online_store/__init__.py | 0 .../scylladb_online_store/scylladb.py | 792 ++++++++++++++++++ sdk/python/feast/repo_config.py | 1 + .../test_scylladb_online_store.py | 204 +++++ .../universal/online_store/scylladb.py | 48 ++ 9 files changed, 1123 insertions(+), 22 deletions(-) create mode 100644 sdk/python/feast/infra/online_stores/scylladb_online_store/__init__.py create mode 100644 sdk/python/feast/infra/online_stores/scylladb_online_store/scylladb.py create mode 100644 sdk/python/tests/integration/online_store/test_scylladb_online_store.py create mode 100644 sdk/python/tests/universal/feature_repos/universal/online_store/scylladb.py diff --git a/docs/getting-started/genai.md b/docs/getting-started/genai.md index f65aeac85e2..dfd9c41954a 100644 --- a/docs/getting-started/genai.md +++ b/docs/getting-started/genai.md @@ -15,6 +15,7 @@ Feast integrates with popular vector databases to store and retrieve embedding v * **Elasticsearch**: Scalable vector search capabilities * **Postgres with PGVector**: SQL-based vector operations * **Qdrant**: Purpose-built vector database integration +* **ScyllaDB**: Native `vector` type with HNSW ANN index, full `retrieve_online_documents_v2` support These integrations allow you to: - Store embeddings as features diff --git a/docs/reference/alpha-vector-database.md b/docs/reference/alpha-vector-database.md index 861c3fcb114..28d0bf0098e 100644 --- a/docs/reference/alpha-vector-database.md +++ b/docs/reference/alpha-vector-database.md @@ -15,6 +15,7 @@ Below are supported vector databases and implemented features: | Faiss | [ ] | [ ] | [] | [] | | SQLite | [x] | [ ] | [x] | [x] | | Qdrant | [x] | [x] | [] | [] | +| ScyllaDB | [x] | [x] | [x] | [x] | *Note: V2 Support means the SDK supports retrieval of features along with vector embeddings from vector similarity search. @@ -30,7 +31,7 @@ Beyond that, we will then have `retrieve_online_documents` and `retrieve_online_ backwards compatibility and the adopt industry standard naming conventions. {% endhint %} -**Note**: Milvus and SQLite implement the v2 `retrieve_online_documents_v2` method in the SDK. This will be the longer-term solution so that Data Scientists can easily enable vector similarity search by just flipping a flag. +**Note**: Milvus, SQLite, and ScyllaDB implement the v2 `retrieve_online_documents_v2` method in the SDK. This will be the longer-term solution so that Data Scientists can easily enable vector similarity search by just flipping a flag. ## Examples diff --git a/docs/reference/online-stores/scylladb.md b/docs/reference/online-stores/scylladb.md index c8583ac101a..ae05418f623 100644 --- a/docs/reference/online-stores/scylladb.md +++ b/docs/reference/online-stores/scylladb.md @@ -2,20 +2,15 @@ ## Description -ScyllaDB is a low-latency and high-performance Cassandra-compatible (uses CQL) database. You can use the existing Cassandra connector to use ScyllaDB as an online store in Feast. - -The [ScyllaDB](https://www.scylladb.com/) online store provides support for materializing feature values into a ScyllaDB or [ScyllaDB Cloud](https://www.scylladb.com/product/scylla-cloud/) cluster for serving online features real-time. +[ScyllaDB](https://www.scylladb.com/) is a distributed real-time NoSQL database with vector search support. +This integration uses the native **`scylla-driver`** Python driver for optimised performance and supports materializing feature values into a [ScyllaDB Cloud](https://www.scylladb.com/product/scylla-cloud/) cluster for real-time online feature serving. ## Getting started -Install Feast with Cassandra support: -```bash -pip install "feast[cassandra]" -``` +Install Feast with the `scylladb` extra, which pulls in `scylla-driver` automatically: -Create a new Feast project: ```bash -feast init REPO_NAME -t cassandra +pip install feast[scylladb] ``` ### Example (ScyllaDB) @@ -26,7 +21,7 @@ project: scylla_feature_repo registry: data/registry.db provider: local online_store: - type: cassandra + type: scylladb hosts: - 172.17.0.2 keyspace: feast @@ -43,36 +38,94 @@ project: scylla_feature_repo registry: data/registry.db provider: local online_store: - type: cassandra + type: scylladb hosts: - node-0.aws_us_east_1.xxxxxxxx.clusters.scylla.cloud - node-1.aws_us_east_1.xxxxxxxx.clusters.scylla.cloud - node-2.aws_us_east_1.xxxxxxxx.clusters.scylla.cloud keyspace: feast username: scylla - password: password + password: xxxxxx + local_dc: AWS_US_EAST_1 ``` {% endcode %} - -The full set of configuration options is available in [CassandraOnlineStoreConfig](https://rtd.feast.dev/en/master/#feast.infra.online_stores.cassandra_online_store.cassandra_online_store.CassandraOnlineStoreConfig). -For a full explanation of configuration options please look at file -`sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/README.md`. +## Configuration options + +| Parameter | Type | Default | Description | +| :--- | :--- | :--- | :--- | +| `hosts` | list[str] | *(required)* | Contact-point host addresses. | +| `port` | int | `9042` | CQL port. | +| `keyspace` | str | `feast_keyspace` | Target ScyllaDB keyspace. | +| `username` | str | `None` | Auth username. | +| `password` | str | `None` | Auth password. | +| `local_dc` | str | `None` | Local datacenter name for DC-aware load balancing. | +| `request_timeout` | float | `None` | Driver request timeout in seconds. | +| `read_concurrency` | int | `100` | `concurrency` argument passed to the driver's `execute_concurrent_with_args` for reads. Controls how many CQL statements are in-flight at once. | +| `write_concurrency` | int | `100` | `concurrency` argument passed to the driver's `execute_concurrent_with_args` for writes. Controls how many CQL statements are in-flight at once. | +| `vector_similarity_function` | str | `COSINE` | Default similarity function for vector indexes. Supported: `COSINE`, `DOT_PRODUCT`, `EUCLIDEAN`. Can be overridden per-feature via the `similarity_function` Field tag. | Storage specifications can be found at `docs/specs/online_store_format.md`. +## Vector Search + +ScyllaDB Cloud supports approximate nearest-neighbour (ANN) vector search. +To enable it for a feature view, tag the embedding `Field` with `vector_index=true` and specify the number of dimensions: + +{% code title="feature_definitions.py" %} +```python +from feast import FeatureView, Field +from feast.types import Array, Float32, String + +documents_fv = FeatureView( + name="documents", + entities=[item], + schema=[ + Field(name="text", dtype=String), + Field( + name="embedding", + dtype=Array(Float32), + tags={ + "vector_index": "true", + "dimensions": "768", + "similarity_function": "COSINE", # COSINE | DOT_PRODUCT | EUCLIDEAN + }, + ), + ], + online=True, + source=push_source, +) +``` +{% endcode %} + +When `feast apply` runs, the store creates: + +- A regular feature table (`{project}_{fv_name}`) for `online_read` / `online_write_batch`. +- A vector table (`{project}_{fv_name}__{feature}_vec`) with a native `vector` column and an HNSW ANN index. + +To query the top-k most similar documents: + +```python +result = store.retrieve_online_documents_v2( + features=["documents:text", "documents:embedding"], + query=[0.1, 0.2, ...], # your query embedding + top_k=10, + distance_metric="COSINE", +) +``` + ## Functionality Matrix The set of functionality supported by online stores is described in detail [here](overview.md#functionality). -Below is a matrix indicating which functionality is supported by the Cassandra plugin. +Below is a matrix indicating which functionality is supported by the ScyllaDB online store. -| | Cassandra | +| | ScyllaDB | | :-------------------------------------------------------- | :-------- | | write feature values to the online store | yes | | read feature values from the online store | yes | | update infrastructure (e.g. tables) in the online store | yes | | teardown infrastructure (e.g. tables) in the online store | yes | -| generate a plan of infrastructure changes | yes | +| generate a plan of infrastructure changes | no | | support for on-demand transforms | yes | | readable by Python SDK | yes | | readable by Java | no | @@ -89,6 +142,6 @@ To compare this set of functionality against other online stores, please see the ## Resources -* [Sample application with ScyllaDB](https://feature-store.scylladb.com/stable/) +* [ScyllaDB Vector Search documentation](https://cloud.docs.scylladb.com/stable/vector-search/) * [ScyllaDB website](https://www.scylladb.com/) * [ScyllaDB Cloud documentation](https://cloud.docs.scylladb.com/stable/) diff --git a/pyproject.toml b/pyproject.toml index 25bef64ccfa..52ef25a2017 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -57,6 +57,7 @@ azure = [ "pymssql<2.3.3" ] cassandra = ["cassandra-driver>=3.24.0,<4"] +scylladb = ["scylla-driver>=3.28.0,<4"] clickhouse = ["clickhouse-connect>=0.7.19"] couchbase = ["couchbase==4.3.2", "couchbase-columnar==1.0.0"] delta = ["deltalake<1.0.0"] @@ -158,7 +159,7 @@ test = [ ] ci = [ - "feast[test, aws, azure, cassandra, clickhouse, couchbase, delta, docling, duckdb, elasticsearch, faiss, gcp, ge, go, grpcio, hazelcast, hbase, ibis, image, k8s, mcp, milvus, mlflow, mongodb, mssql, mysql, openlineage, opentelemetry, oracle, spark, trino, postgres, pytorch, qdrant, rag, ray, redis, singlestore, snowflake, sqlite_vec]", + "feast[test, aws, azure, cassandra, clickhouse, couchbase, delta, docling, duckdb, elasticsearch, faiss, gcp, ge, go, grpcio, hazelcast, hbase, ibis, image, k8s, mcp, milvus, mlflow, mongodb, mssql, mysql, openlineage, opentelemetry, oracle, scylladb, spark, trino, postgres, pytorch, qdrant, rag, ray, redis, singlestore, snowflake, sqlite_vec]", "build", "virtualenv==20.23.0", "dbt-artifacts-parser", diff --git a/sdk/python/feast/infra/online_stores/scylladb_online_store/__init__.py b/sdk/python/feast/infra/online_stores/scylladb_online_store/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/sdk/python/feast/infra/online_stores/scylladb_online_store/scylladb.py b/sdk/python/feast/infra/online_stores/scylladb_online_store/scylladb.py new file mode 100644 index 00000000000..d6008d71a07 --- /dev/null +++ b/sdk/python/feast/infra/online_stores/scylladb_online_store/scylladb.py @@ -0,0 +1,792 @@ +import logging +import warnings +from datetime import datetime +from typing import ( + Any, + Callable, + Dict, + Iterable, + List, + Literal, + Optional, + Sequence, + Tuple, +) + +from cassandra.auth import PlainTextAuthProvider +from cassandra.cluster import ( + EXEC_PROFILE_DEFAULT, + Cluster, + ExecutionProfile, + Session, +) +from cassandra.concurrent import execute_concurrent_with_args +from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy +from cassandra.query import PreparedStatement +from pydantic import StrictFloat, StrictInt, StrictStr + +from feast import Entity, FeatureView, RepoConfig +from feast.infra.key_encoding_utils import serialize_entity_key +from feast.infra.online_stores.online_store import OnlineStore +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.repo_config import FeastConfigBaseModel + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Error messages +# --------------------------------------------------------------------------- + +E_SCYLLA_UNEXPECTED_CONFIG = ( + "Unexpected configuration object (not a ScyllaDBOnlineStoreConfig instance)" +) +E_SCYLLA_NOT_CONFIGURED = ( + "Inconsistent ScyllaDB configuration: 'hosts' and 'keyspace' are required" +) +E_SCYLLA_INCONSISTENT_AUTH = ( + "ScyllaDB username and password must be provided together or not at all" +) + +# --------------------------------------------------------------------------- +# CQL templates +# --------------------------------------------------------------------------- + +# Regular feature store table (one row per entity_key + feature_name) +CREATE_TABLE_CQL = """ + CREATE TABLE IF NOT EXISTS {fqtable} ( + entity_key TEXT, + feature_name TEXT, + value BLOB, + event_ts TIMESTAMP, + created_ts TIMESTAMP, + PRIMARY KEY ((entity_key), feature_name) + ) WITH CLUSTERING ORDER BY (feature_name ASC); +""" + +DROP_TABLE_CQL = "DROP TABLE IF EXISTS {fqtable};" + +INSERT_CQL = ( + "INSERT INTO {fqtable} (feature_name, value, entity_key, event_ts, created_ts)" + " VALUES (?, ?, ?, ?, ?);" +) + +SELECT_CQL = "SELECT {columns} FROM {fqtable} WHERE entity_key = ?;" + +# Vector table — one row per entity, native vector column for ANN +CREATE_VECTOR_TABLE_CQL = """ + CREATE TABLE IF NOT EXISTS {fqtable} ( + entity_key TEXT PRIMARY KEY, + {vec_col} vector, + event_ts TIMESTAMP, + created_ts TIMESTAMP + ); +""" + +CREATE_VECTOR_INDEX_CQL = ( + "CREATE CUSTOM INDEX IF NOT EXISTS {index_name}" + " ON {fqtable} ({vec_col})" + " USING 'vector_index'" + " WITH OPTIONS = {{'similarity_function': '{sim_func}'}};" +) + +DROP_VECTOR_TABLE_CQL = "DROP TABLE IF EXISTS {fqtable};" + +INSERT_VECTOR_CQL = ( + "INSERT INTO {fqtable} (entity_key, {vec_col}, event_ts, created_ts)" + " VALUES (?, ?, ?, ?);" +) + +ANN_SELECT_CQL = ( + "SELECT entity_key, {sim_func_call} AS score, event_ts" + " FROM {fqtable}" + " ORDER BY {vec_col} ANN OF ?" + " LIMIT ?;" +) + +# op_name -> (template, prepare?) +_CQL_TEMPLATES: Dict[str, Tuple[str, bool]] = { + "create": (CREATE_TABLE_CQL, False), + "drop": (DROP_TABLE_CQL, False), + "insert": (INSERT_CQL, True), + "select": (SELECT_CQL, True), +} + +# Similarity function CQL expression helpers +_SIM_FUNC_EXPR = { + "COSINE": "similarity_cosine({vec_col}, ?)", + "DOT_PRODUCT": "similarity_dot_product({vec_col}, ?)", + "EUCLIDEAN": "similarity_euclidean({vec_col}, ?)", +} + + +# --------------------------------------------------------------------------- +# Config +# --------------------------------------------------------------------------- + + +class ScyllaDBOnlineStoreConfig(FeastConfigBaseModel): + """ + Configuration for the native ScyllaDB online store. + + Requires ``scylla-driver`` (``pip install scylla-driver``). + + Example ``feature_store.yaml``:: + + online_store: + type: feast_scylladb.ScyllaDBOnlineStore + hosts: + - node-0.aws_us_east_1.xxxxxxxx.clusters.scylla.cloud + keyspace: feast + username: scylla + password: pass + local_dc: AWS_US_EAST_1 + """ + + type: Literal[ + "scylladb", + "feast.infra.online_stores.scylladb_online_store.scylladb.ScyllaDBOnlineStore", + ] = "scylladb" + + # Connection + hosts: List[StrictStr] + """Contact-point host addresses.""" + + port: Optional[StrictInt] = 9042 + """CQL port (default 9042).""" + + keyspace: StrictStr = "feast_keyspace" + """Target ScyllaDB keyspace.""" + + username: Optional[StrictStr] = None + """Auth username.""" + + password: Optional[StrictStr] = None + """Auth password.""" + + local_dc: Optional[StrictStr] = None + """ + Local datacenter name. + For ScyllaDB Cloud this is the region string, e.g. ``AWS_US_EAST_1``. + """ + + request_timeout: Optional[StrictFloat] = None + """Driver request timeout in seconds.""" + + read_concurrency: Optional[StrictInt] = 100 + """Concurrency level passed to ``execute_concurrent_with_args`` for reads.""" + + write_concurrency: Optional[StrictInt] = 100 + """Concurrency level passed to ``execute_concurrent_with_args`` for writes.""" + + vector_similarity_function: StrictStr = "COSINE" + """ + Default similarity function used when creating vector indexes. + Can be overridden per-feature via the ``similarity_function`` Field tag. + Supported values: ``COSINE``, ``DOT_PRODUCT``, ``EUCLIDEAN``. + """ + + +# --------------------------------------------------------------------------- +# Vector feature helpers +# --------------------------------------------------------------------------- + + +def _get_vector_features( + table: FeatureView, default_sim_func: str +) -> List[Tuple[str, int, str]]: + """ + Return ``(feature_name, dimension, similarity_function)`` for every feature + in *table* tagged as a vector feature. + + A feature is treated as a vector feature when its tags include:: + + "vector_index": "true" + "dimensions": "" + + The similarity function defaults to *default_sim_func* but can be + overridden per-feature with a ``"similarity_function"`` tag. + """ + result = [] + for field in table.schema: + tags = field.tags or {} + if tags.get("vector_index", "").lower() != "true": + continue + dim_str = tags.get("dimensions", "") + if not dim_str: + warnings.warn( + f"Feature '{field.name}' in FeatureView '{table.name}' is tagged " + "vector_index=true but is missing a 'dimensions' tag. " + "Skipping vector table creation for this feature.", + UserWarning, + stacklevel=2, + ) + continue + sim_func = tags.get("similarity_function", default_sim_func).upper() + result.append((field.name, int(dim_str), sim_func)) + return result + + +# --------------------------------------------------------------------------- +# Store implementation +# --------------------------------------------------------------------------- + + +class ScyllaDBInvalidConfig(Exception): + pass + + +class ScyllaDBOnlineStore(OnlineStore): + """ + Native ScyllaDB online store for Feast. + + Supports both regular feature materialisation and vector similarity search + via ScyllaDB's ANN / ``vector_index`` functionality. + + **Vector features** — tag your ``Field`` definitions like this:: + + from feast import Field + from feast.types import Array, Float32 + + Field( + name="embedding", + dtype=Array(Float32), + tags={ + "vector_index": "true", + "dimensions": "768", + "similarity_function": "COSINE", # optional, default COSINE + }, + ) + + Then call ``FeatureStore.retrieve_online_documents_v2(...)`` to perform an + approximate nearest-neighbour search. + """ + + _cluster: Optional[Cluster] = None + _session: Optional[Session] = None + _keyspace: str = "feast_keyspace" + _prepared_statements: Dict[str, PreparedStatement] = {} + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _get_session(self, config: RepoConfig) -> Session: + """Return the active session, creating the cluster connection if needed.""" + online_store_config = config.online_store + if not isinstance(online_store_config, ScyllaDBOnlineStoreConfig): + raise ScyllaDBInvalidConfig(E_SCYLLA_UNEXPECTED_CONFIG) + + if self._session: + return self._session + + hosts = online_store_config.hosts + port = online_store_config.port or 9042 + keyspace = online_store_config.keyspace + username = online_store_config.username + password = online_store_config.password + local_dc = online_store_config.local_dc + + if not hosts or not keyspace: + raise ScyllaDBInvalidConfig(E_SCYLLA_NOT_CONFIGURED) + if (username is None) ^ (password is None): + raise ScyllaDBInvalidConfig(E_SCYLLA_INCONSISTENT_AUTH) + + auth_provider = ( + PlainTextAuthProvider(username=username, password=password) + if username is not None + else None + ) + + # Build execution profile + if local_dc: + lb_policy: Any = TokenAwarePolicy( + DCAwareRoundRobinPolicy(local_dc=local_dc) + ) + exe_profile = ExecutionProfile( + request_timeout=online_store_config.request_timeout, + load_balancing_policy=lb_policy, + ) + execution_profiles: Optional[Dict] = {EXEC_PROFILE_DEFAULT: exe_profile} + elif online_store_config.request_timeout is not None: + exe_profile = ExecutionProfile( + request_timeout=online_store_config.request_timeout, + ) + execution_profiles = {EXEC_PROFILE_DEFAULT: exe_profile} + else: + execution_profiles = None + + cluster_kwargs: Dict[str, Any] = { + k: v + for k, v in { + "execution_profiles": execution_profiles, + }.items() + if v is not None + } + + self._cluster = Cluster( + hosts, + port=port, + auth_provider=auth_provider, + **cluster_kwargs, + ) + self._keyspace = keyspace + self._session = self._cluster.connect(self._keyspace) + return self._session + + @staticmethod + def _fq_table_name(keyspace: str, project: str, table: FeatureView) -> str: + return f'"{keyspace}"."{project}_{table.name}"' + + @staticmethod + def _fq_vector_table_name( + keyspace: str, project: str, table: FeatureView, vec_feature: str + ) -> str: + return f'"{keyspace}"."{project}_{table.name}__{vec_feature}_vec"' + + def _get_statement( + self, config: RepoConfig, op_name: str, fqtable: str, **kwargs: Any + ) -> Any: + """ + Resolve *op_name* to a CQL statement, preparing and caching it when the + template is marked as prepareable. + """ + session = self._get_session(config) + template, do_prepare = _CQL_TEMPLATES[op_name] + cql = template.format(fqtable=fqtable, **kwargs) + if do_prepare: + if cql not in self._prepared_statements: + logger.info("Preparing %s statement on %s.", op_name, fqtable) + self._prepared_statements[cql] = session.prepare(cql) + return self._prepared_statements[cql] + return cql + + # ------------------------------------------------------------------ + # Table lifecycle helpers + # ------------------------------------------------------------------ + + def _create_table( + self, config: RepoConfig, project: str, table: FeatureView + ) -> None: + session = self._get_session(config) + fqtable = self._fq_table_name(self._keyspace, project, table) + logger.info("Creating table %s.", fqtable) + session.execute(CREATE_TABLE_CQL.format(fqtable=fqtable)) + + def _drop_table( + self, config: RepoConfig, project: str, table: FeatureView + ) -> None: + session = self._get_session(config) + fqtable = self._fq_table_name(self._keyspace, project, table) + logger.info("Dropping table %s.", fqtable) + session.execute(DROP_TABLE_CQL.format(fqtable=fqtable)) + + def _create_vector_table( + self, + config: RepoConfig, + project: str, + table: FeatureView, + vec_feature: str, + dim: int, + sim_func: str, + ) -> None: + session = self._get_session(config) + fqtable = self._fq_vector_table_name( + self._keyspace, project, table, vec_feature + ) + bare_name = f"{project}_{table.name}__{vec_feature}_vec" + index_name = f"{bare_name}_idx" + + logger.info("Creating vector table %s.", fqtable) + session.execute( + CREATE_VECTOR_TABLE_CQL.format( + fqtable=fqtable, vec_col=vec_feature, dim=dim + ) + ) + logger.info("Creating vector index %s on %s.", index_name, fqtable) + session.execute( + CREATE_VECTOR_INDEX_CQL.format( + index_name=index_name, + fqtable=fqtable, + vec_col=vec_feature, + sim_func=sim_func, + ) + ) + + def _drop_vector_table( + self, + config: RepoConfig, + project: str, + table: FeatureView, + vec_feature: str, + ) -> None: + session = self._get_session(config) + fqtable = self._fq_vector_table_name( + self._keyspace, project, table, vec_feature + ) + logger.info("Dropping vector table %s.", fqtable) + session.execute(DROP_VECTOR_TABLE_CQL.format(fqtable=fqtable)) + + # ------------------------------------------------------------------ + # OnlineStore interface — infrastructure + # ------------------------------------------------------------------ + + def update( + self, + config: RepoConfig, + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, + ) -> None: + """Create and drop feature-store tables as required by the registry.""" + project = config.project + online_store_config = config.online_store + assert isinstance(online_store_config, ScyllaDBOnlineStoreConfig) + default_sim = online_store_config.vector_similarity_function + + for table in tables_to_keep: + self._create_table(config, project, table) + for vec_feature, dim, sim_func in _get_vector_features(table, default_sim): + self._create_vector_table( + config, project, table, vec_feature, dim, sim_func + ) + + for table in tables_to_delete: + for vec_feature, _dim, _sim in _get_vector_features(table, default_sim): + self._drop_vector_table(config, project, table, vec_feature) + self._drop_table(config, project, table) + + def teardown( + self, + config: RepoConfig, + tables: Sequence[FeatureView], + entities: Sequence[Entity], + ) -> None: + """Drop all tables for the given feature views.""" + project = config.project + online_store_config = config.online_store + assert isinstance(online_store_config, ScyllaDBOnlineStoreConfig) + default_sim = online_store_config.vector_similarity_function + + for table in tables: + for vec_feature, _dim, _sim in _get_vector_features(table, default_sim): + self._drop_vector_table(config, project, table, vec_feature) + self._drop_table(config, project, table) + + # ------------------------------------------------------------------ + # OnlineStore interface — write + # ------------------------------------------------------------------ + + def online_write_batch( + self, + config: RepoConfig, + table: FeatureView, + data: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], + ) -> None: + """ + Write a batch of feature rows to the online store. + + Each ``(entity_key, feature_name)`` pair is upserted into the regular + feature table. If the feature view has vector features, the + corresponding float-list values are also written to the per-feature + vector table so they can be queried with ANN. + """ + project = config.project + online_store_config = config.online_store + assert isinstance(online_store_config, ScyllaDBOnlineStoreConfig) + default_sim = online_store_config.vector_similarity_function + vec_features = _get_vector_features(table, default_sim) + vec_feature_names = {vf[0] for vf in vec_features} + + # --- regular table --- + fqtable = self._fq_table_name(self._keyspace, project, table) + insert_stmt = self._get_statement(config, "insert", fqtable) + + def _regular_rows() -> Iterable[Tuple[str, bytes, str, datetime, Optional[datetime]]]: + for entity_key, values, timestamp, created_ts in data: + entity_key_bin = serialize_entity_key( + entity_key, + entity_key_serialization_version=config.entity_key_serialization_version, + ).hex() + for feature_name, val in values.items(): + yield (feature_name, val.SerializeToString(), entity_key_bin, timestamp, created_ts) + if progress: + progress(1) + + execute_concurrent_with_args( + self._get_session(config), + insert_stmt, + _regular_rows(), + concurrency=online_store_config.write_concurrency, + ) + # correction for the last missing call to `progress`: + if progress: + progress(1) + + # --- vector tables --- + if not vec_features: + return + + session = self._get_session(config) + + for vec_feature, _dim, _sim in vec_features: + fq_vec_table = self._fq_vector_table_name( + self._keyspace, project, table, vec_feature + ) + vec_insert_cql = INSERT_VECTOR_CQL.format( + fqtable=fq_vec_table, vec_col=vec_feature + ) + cache_key = f"vec_insert_{fq_vec_table}" + if cache_key not in self._prepared_statements: + logger.info("Preparing vector insert on %s.", fq_vec_table) + self._prepared_statements[cache_key] = session.prepare(vec_insert_cql) + vec_stmt = self._prepared_statements[cache_key] + + def _vec_rows( + _vec_feat: str = vec_feature, + ) -> Iterable[Tuple[str, List[float], datetime, Optional[datetime]]]: + for entity_key, values, timestamp, created_ts in data: + if _vec_feat not in values: + continue + entity_key_bin = serialize_entity_key( + entity_key, + entity_key_serialization_version=config.entity_key_serialization_version, + ).hex() + float_list = list(values[_vec_feat].float_list_val.val) + yield (entity_key_bin, float_list, timestamp, created_ts) + + execute_concurrent_with_args( + session, + vec_stmt, + _vec_rows(), + concurrency=online_store_config.write_concurrency, + ) + + # ------------------------------------------------------------------ + # OnlineStore interface — read + # ------------------------------------------------------------------ + + def online_read( + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + """Read feature values for the given entity keys from the regular table.""" + project = config.project + online_store_config = config.online_store + assert isinstance(online_store_config, ScyllaDBOnlineStoreConfig) + + entity_key_bins = [ + serialize_entity_key( + ek, + entity_key_serialization_version=config.entity_key_serialization_version, + ).hex() + for ek in entity_keys + ] + + fqtable = self._fq_table_name(self._keyspace, project, table) + select_stmt = self._get_statement( + config, "select", fqtable, columns="feature_name, value, event_ts" + ) + + session = self._get_session(config) + retrieval = execute_concurrent_with_args( + session, + select_stmt, + ((ek_bin,) for ek_bin in entity_key_bins), + concurrency=online_store_config.read_concurrency, + ) + + results: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] + for success, rows_or_exc in retrieval: + if not success: + logger.error( + "ScyllaDB read error during concurrent fetch: %s", rows_or_exc + ) + results.append((None, None)) + continue + res: Dict[str, ValueProto] = {} + res_ts: Optional[datetime] = None + for row in rows_or_exc: + if requested_features is None or row.feature_name in requested_features: + val = ValueProto() + val.ParseFromString(row.value) + res[row.feature_name] = val + res_ts = row.event_ts + results.append((res_ts, res) if res else (None, None)) + + return results + + # ------------------------------------------------------------------ + # Vector store interface + # ------------------------------------------------------------------ + + def retrieve_online_documents_v2( + self, + config: RepoConfig, + table: FeatureView, + requested_features: List[str], + embedding: Optional[List[float]], + top_k: int, + distance_metric: Optional[str] = None, + query_string: Optional[str] = None, + include_feature_view_version_metadata: bool = False, + ) -> List[ + Tuple[ + Optional[datetime], + Optional[EntityKeyProto], + Optional[Dict[str, ValueProto]], + ] + ]: + """ + Approximate nearest-neighbour search using ScyllaDB's vector_index. + + The feature view must have exactly one feature tagged with + ``vector_index=true``. The query *embedding* is compared against all + stored vectors; the top *top_k* results are returned together with + their feature values fetched from the regular feature table. + + Args: + config: RepoConfig for the current FeatureStore. + table: FeatureView to search. + requested_features: Feature names to include in the returned dicts. + embedding: Query vector. Must match the indexed column's dimension. + top_k: Number of results to return. + distance_metric: Override similarity function + (``COSINE`` / ``DOT_PRODUCT`` / ``EUCLIDEAN``). Defaults to the + store-level ``vector_similarity_function`` config value or the + per-feature tag. + query_string: Unused (reserved for future hybrid text+vector search). + include_feature_view_version_metadata: Unused. + + Returns: + List of ``(event_timestamp, entity_key_proto, feature_values)`` + tuples ordered from most to least similar. + """ + if embedding is None: + raise ValueError( + "retrieve_online_documents_v2 requires a non-None 'embedding' " + "for ScyllaDB ANN search." + ) + + online_store_config = config.online_store + assert isinstance(online_store_config, ScyllaDBOnlineStoreConfig) + + default_sim = ( + distance_metric.upper() + if distance_metric + else online_store_config.vector_similarity_function + ) + project = config.project + vec_features = _get_vector_features(table, default_sim) + + if not vec_features: + raise NotImplementedError( + f"FeatureView '{table.name}' has no features tagged with " + "'vector_index=true'. Cannot perform vector search." + ) + if len(vec_features) > 1: + raise ValueError( + f"FeatureView '{table.name}' has {len(vec_features)} vector features. " + "retrieve_online_documents_v2 supports exactly one vector feature " + "per feature view." + ) + + vec_feature, _dim, sim_func = vec_features[0] + if distance_metric: + sim_func = distance_metric.upper() + + sim_expr_template = _SIM_FUNC_EXPR.get(sim_func) + if sim_expr_template is None: + raise ValueError( + f"Unsupported similarity function '{sim_func}'. " + "Choose from: COSINE, DOT_PRODUCT, EUCLIDEAN." + ) + sim_expr = sim_expr_template.format(vec_col=vec_feature) + + fq_vec_table = self._fq_vector_table_name( + self._keyspace, project, table, vec_feature + ) + ann_cql = ANN_SELECT_CQL.format( + sim_func_call=sim_expr, + fqtable=fq_vec_table, + vec_col=vec_feature, + ) + + session = self._get_session(config) + if ann_cql not in self._prepared_statements: + self._prepared_statements[ann_cql] = session.prepare(ann_cql) + ann_stmt = self._prepared_statements[ann_cql] + ann_rows = list(session.execute(ann_stmt, (embedding, embedding, top_k))) + + if not ann_rows: + return [] + + # Ordered list of entity key bins from ANN results + entity_key_bins: List[str] = [row.entity_key for row in ann_rows] + scores: Dict[str, float] = {row.entity_key: row.score for row in ann_rows} + timestamps: Dict[str, Optional[datetime]] = { + row.entity_key: row.event_ts for row in ann_rows + } + + # Batch-fetch full feature values from the regular table + fqtable = self._fq_table_name(self._keyspace, project, table) + select_stmt = self._get_statement( + config, "select", fqtable, columns="feature_name, value, event_ts" + ) + + retrieval = execute_concurrent_with_args( + session, + select_stmt, + ((ek_bin,) for ek_bin in entity_key_bins), + concurrency=online_store_config.read_concurrency, + ) + + # Map entity_key_bin -> feature dict, preserving ANN order + feature_map: Dict[str, Dict[str, ValueProto]] = { + ek: {} for ek in entity_key_bins + } + for (ek_bin, (success, rows_or_exc)) in zip( + entity_key_bins, retrieval + ): + if not success: + logger.error("ScyllaDB ANN batch-read error: %s", rows_or_exc) + continue + for row in rows_or_exc: + if requested_features and row.feature_name not in requested_features: + continue + val = ValueProto() + val.ParseFromString(row.value) + feature_map[ek_bin][row.feature_name] = val + + # Assemble output in ANN rank order + output: List[ + Tuple[ + Optional[datetime], + Optional[EntityKeyProto], + Optional[Dict[str, ValueProto]], + ] + ] = [] + for ek_bin in entity_key_bins: + ek_proto = EntityKeyProto() + try: + ek_proto.ParseFromString(bytes.fromhex(ek_bin)) + except Exception: + ek_proto = None # type: ignore[assignment] + + output.append( + ( + timestamps.get(ek_bin), + ek_proto, + feature_map.get(ek_bin, {}), + ) + ) + + return output diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 06529cea0f2..6c417542bcd 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -75,6 +75,7 @@ "postgres": "feast.infra.online_stores.postgres_online_store.postgres.PostgreSQLOnlineStore", "hbase": "feast.infra.online_stores.hbase_online_store.hbase.HbaseOnlineStore", "cassandra": "feast.infra.online_stores.cassandra_online_store.cassandra_online_store.CassandraOnlineStore", + "scylladb": "feast.infra.online_stores.scylladb_online_store.scylladb.ScyllaDBOnlineStore", "mysql": "feast.infra.online_stores.mysql_online_store.mysql.MySQLOnlineStore", "hazelcast": "feast.infra.online_stores.hazelcast_online_store.hazelcast_online_store.HazelcastOnlineStore", "elasticsearch": "feast.infra.online_stores.elasticsearch_online_store.elasticsearch.ElasticSearchOnlineStore", diff --git a/sdk/python/tests/integration/online_store/test_scylladb_online_store.py b/sdk/python/tests/integration/online_store/test_scylladb_online_store.py new file mode 100644 index 00000000000..9a31215ef16 --- /dev/null +++ b/sdk/python/tests/integration/online_store/test_scylladb_online_store.py @@ -0,0 +1,204 @@ +"""Integration tests for ScyllaDBOnlineStore, including vector search. + +These tests require a running ScyllaDB cluster. Set the following environment +variables before running: + + SCYLLA_HOSTS="host1,host2" contact points (required to run tests) + SCYLLA_KEYSPACE="feast_test" keyspace (default: feast_test) + SCYLLA_USERNAME="scylla" username (optional) + SCYLLA_PASSWORD="..." password (optional) + SCYLLA_LOCAL_DC="..." DC name, e.g. AWS_US_EAST_1 (required for vector tests) + +Run: + SCYLLA_HOSTS=... pytest sdk/python/tests/integration/online_store/test_scylladb_online_store.py -v +""" + +from __future__ import annotations + +import os +from datetime import datetime, timezone +from typing import Dict, List + +import pytest + +from feast import Entity, FeatureView, RepoConfig +from feast.field import Field +from feast.infra.offline_stores.file_source import FileSource +from feast.infra.online_stores.scylladb_online_store.scylladb import ( + ScyllaDBOnlineStore, + ScyllaDBOnlineStoreConfig, +) +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import FloatList, Value as ValueProto +from feast.types import Array, Float32, Int64, String +from feast.value_type import ValueType + +# --------------------------------------------------------------------------- +# Skip entire module when SCYLLA_HOSTS is not set +# --------------------------------------------------------------------------- + +pytestmark = pytest.mark.skipif( + not os.environ.get("SCYLLA_HOSTS"), + reason="Set SCYLLA_HOSTS to run ScyllaDB integration tests", +) + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _utc_now() -> datetime: + return datetime.now(tz=timezone.utc) + + +def _make_config(vector: bool = False) -> RepoConfig: + hosts = os.environ.get("SCYLLA_HOSTS", "").split(",") + store_cfg = ScyllaDBOnlineStoreConfig( + hosts=hosts, + keyspace=os.environ.get("SCYLLA_KEYSPACE", "feast_test"), + username=os.environ.get("SCYLLA_USERNAME"), + password=os.environ.get("SCYLLA_PASSWORD"), + local_dc=os.environ.get("SCYLLA_LOCAL_DC"), + ) + cfg = RepoConfig( + project="integ_test", + provider="local", + online_store=store_cfg, + registry="memory://", + entity_key_serialization_version=3, + ) + return cfg + + +def _make_entity_key(val: str) -> EntityKeyProto: + return EntityKeyProto( + join_keys=["item_id"], + entity_values=[ValueProto(string_val=val)], + ) + + +def _make_feature_view(name: str, with_vector: bool = False) -> FeatureView: + source = FileSource(path="dummy.parquet", timestamp_field="event_timestamp") + schema: List[Field] = [Field(name="score", dtype=Array(Float32))] + if with_vector: + schema.append( + Field( + name="embedding", + dtype=Array(Float32), + tags={ + "vector_index": "true", + "dimensions": "4", + "similarity_function": "COSINE", + }, + ) + ) + return FeatureView( + name=name, + entities=[Entity(name="item_id", join_keys=["item_id"], value_type=ValueType.STRING)], + schema=schema, + online=True, + source=source, + ) + + +# --------------------------------------------------------------------------- +# Tests — regular online store +# --------------------------------------------------------------------------- + + +def test_write_and_read(): + store = ScyllaDBOnlineStore() + cfg = _make_config() + fv = _make_feature_view("test_write_read") + + store.update(cfg, [], [fv], [], [], partial=False) + try: + ek = _make_entity_key("item_1") + store.online_write_batch( + cfg, + fv, + [(ek, {"score": ValueProto(float_list_val=FloatList(val=[0.9]))}, _utc_now(), None)], + None, + ) + results = store.online_read(cfg, fv, [ek]) + assert len(results) == 1 + ts, feats = results[0] + assert feats is not None + assert "score" in feats + finally: + store.teardown(cfg, [fv], []) + + +def test_missing_key_returns_none(): + store = ScyllaDBOnlineStore() + cfg = _make_config() + fv = _make_feature_view("test_missing_key") + + store.update(cfg, [], [fv], [], [], partial=False) + try: + existing = _make_entity_key("present") + missing = _make_entity_key("does_not_exist") + store.online_write_batch( + cfg, + fv, + [(existing, {"score": ValueProto(float_list_val=FloatList(val=[0.5]))}, _utc_now(), None)], + None, + ) + results = store.online_read(cfg, fv, [existing, missing]) + assert len(results) == 2 + assert results[0][1] is not None + assert results[1][1] is None + finally: + store.teardown(cfg, [fv], []) + + +# --------------------------------------------------------------------------- +# Tests — vector search +# --------------------------------------------------------------------------- + + +@pytest.mark.skipif( + not os.environ.get("SCYLLA_LOCAL_DC"), + reason="Set SCYLLA_LOCAL_DC to run vector search tests", +) +def test_vector_search(): + import time + + store = ScyllaDBOnlineStore() + cfg = _make_config(vector=True) + fv = _make_feature_view("test_vector_search", with_vector=True) + + store.update(cfg, [], [fv], [], [], partial=False) + try: + rows = [ + ("vec_a", [1.0, 0.0, 0.0, 0.0]), + ("vec_b", [0.0, 1.0, 0.0, 0.0]), + ("vec_c", [1.0, 0.1, 0.0, 0.0]), # close to vec_a + ] + batch = [] + for item_id, vec in rows: + ek = _make_entity_key(item_id) + batch.append(( + ek, + { + "score": ValueProto(float_list_val=FloatList(val=[0.5])), + "embedding": ValueProto(float_list_val=FloatList(val=vec)), + }, + _utc_now(), + None, + )) + store.online_write_batch(cfg, fv, batch, None) + + time.sleep(2) # allow vector index to propagate + + results = store.retrieve_online_documents_v2( + cfg, fv, + requested_features=["score", "embedding"], + embedding=[1.0, 0.0, 0.0, 0.0], + top_k=2, + ) + assert len(results) == 2 + for ts, ek_proto, feats in results: + assert feats is not None + finally: + store.teardown(cfg, [fv], []) diff --git a/sdk/python/tests/universal/feature_repos/universal/online_store/scylladb.py b/sdk/python/tests/universal/feature_repos/universal/online_store/scylladb.py new file mode 100644 index 00000000000..a89fec9a40b --- /dev/null +++ b/sdk/python/tests/universal/feature_repos/universal/online_store/scylladb.py @@ -0,0 +1,48 @@ +import os +from typing import Dict + +from tests.universal.feature_repos.universal.online_store_creator import ( + OnlineStoreCreator, +) + + +class ScyllaDBOnlineStoreCreator(OnlineStoreCreator): + """ + Online store creator for ScyllaDB integration tests. + + Reads connection details from environment variables: + + SCYLLA_HOSTS comma-separated contact points (required) + SCYLLA_KEYSPACE keyspace name (default: feast_test) + SCYLLA_USERNAME username (optional) + SCYLLA_PASSWORD password (optional) + SCYLLA_LOCAL_DC datacenter name (optional, required for vector search) + """ + + def __init__(self, project_name: str, **kwargs): + super().__init__(project_name) + + def create_online_store(self) -> Dict[str, object]: + hosts_env = os.environ.get("SCYLLA_HOSTS", "") + if not hosts_env: + raise RuntimeError( + "Set SCYLLA_HOSTS (comma-separated) to run ScyllaDB integration tests." + ) + store: Dict[str, object] = { + "type": "scylladb", + "hosts": [h.strip() for h in hosts_env.split(",")], + "keyspace": os.environ.get("SCYLLA_KEYSPACE", "feast_test"), + } + username = os.environ.get("SCYLLA_USERNAME") + password = os.environ.get("SCYLLA_PASSWORD") + local_dc = os.environ.get("SCYLLA_LOCAL_DC") + if username: + store["username"] = username + if password: + store["password"] = password + if local_dc: + store["local_dc"] = local_dc + return store + + def teardown(self): + pass From a30364c86654361d761f405ba84b9ebe3b8502ed Mon Sep 17 00:00:00 2001 From: Attila Toth Date: Thu, 18 Jun 2026 11:59:39 +0200 Subject: [PATCH 02/12] refactor: update integration tests to use local Docker stack Signed-off-by: Attila Toth --- .../test_scylladb_online_store.py | 187 +++++++++++------- 1 file changed, 121 insertions(+), 66 deletions(-) diff --git a/sdk/python/tests/integration/online_store/test_scylladb_online_store.py b/sdk/python/tests/integration/online_store/test_scylladb_online_store.py index 9a31215ef16..9e44782cdb3 100644 --- a/sdk/python/tests/integration/online_store/test_scylladb_online_store.py +++ b/sdk/python/tests/integration/online_store/test_scylladb_online_store.py @@ -1,23 +1,20 @@ """Integration tests for ScyllaDBOnlineStore, including vector search. -These tests require a running ScyllaDB cluster. Set the following environment -variables before running: +All tests (regular read/write and vector search) run against a local +ScyllaDB + Vector Store stack started via ``ScyllaDBOnlineStoreCreator``. +No external cloud cluster is required. - SCYLLA_HOSTS="host1,host2" contact points (required to run tests) +To run against a ScyllaDB Cloud cluster instead, set: + + SCYLLA_HOSTS="host1,host2" contact points SCYLLA_KEYSPACE="feast_test" keyspace (default: feast_test) SCYLLA_USERNAME="scylla" username (optional) SCYLLA_PASSWORD="..." password (optional) - SCYLLA_LOCAL_DC="..." DC name, e.g. AWS_US_EAST_1 (required for vector tests) - -Run: - SCYLLA_HOSTS=... pytest sdk/python/tests/integration/online_store/test_scylladb_online_store.py -v + SCYLLA_LOCAL_DC="..." DC name, e.g. AWS_US_EAST_1 (required) """ -from __future__ import annotations - import os -from datetime import datetime, timezone -from typing import Dict, List +from typing import List import pytest @@ -29,41 +26,52 @@ ScyllaDBOnlineStoreConfig, ) from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto -from feast.protos.feast.types.Value_pb2 import FloatList, Value as ValueProto -from feast.types import Array, Float32, Int64, String +from feast.protos.feast.types.Value_pb2 import FloatList +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.types import Array, Float32 +from feast.utils import _utc_now from feast.value_type import ValueType - -# --------------------------------------------------------------------------- -# Skip entire module when SCYLLA_HOSTS is not set -# --------------------------------------------------------------------------- - -pytestmark = pytest.mark.skipif( - not os.environ.get("SCYLLA_HOSTS"), - reason="Set SCYLLA_HOSTS to run ScyllaDB integration tests", +from tests.universal.feature_repos.universal.online_store.scylladb import ( + ScyllaDBOnlineStoreCreator, ) # --------------------------------------------------------------------------- -# Helpers +# Fixtures # --------------------------------------------------------------------------- -def _utc_now() -> datetime: - return datetime.now(tz=timezone.utc) +@pytest.fixture(scope="module") +def docker_config() -> RepoConfig: + """Start a local ScyllaDB container and return a RepoConfig for it.""" + creator = ScyllaDBOnlineStoreCreator(project_name="integ_test") + store_dict = creator.create_online_store() + cfg = RepoConfig( + project="integ_test", + provider="local", + online_store=ScyllaDBOnlineStoreConfig(**store_dict), + registry="memory://", + entity_key_serialization_version=3, + ) + yield cfg + creator.teardown() -def _make_config(vector: bool = False) -> RepoConfig: - hosts = os.environ.get("SCYLLA_HOSTS", "").split(",") - store_cfg = ScyllaDBOnlineStoreConfig( - hosts=hosts, - keyspace=os.environ.get("SCYLLA_KEYSPACE", "feast_test"), - username=os.environ.get("SCYLLA_USERNAME"), - password=os.environ.get("SCYLLA_PASSWORD"), - local_dc=os.environ.get("SCYLLA_LOCAL_DC"), - ) +@pytest.fixture(scope="module") +def cloud_config(): + """RepoConfig pointing at a ScyllaDB Cloud cluster (skipped if env vars absent).""" + if not os.environ.get("SCYLLA_HOSTS") or not os.environ.get("SCYLLA_LOCAL_DC"): + pytest.skip("Set SCYLLA_HOSTS and SCYLLA_LOCAL_DC to run vector search tests") + hosts = os.environ["SCYLLA_HOSTS"].split(",") cfg = RepoConfig( project="integ_test", provider="local", - online_store=store_cfg, + online_store=ScyllaDBOnlineStoreConfig( + hosts=hosts, + keyspace=os.environ.get("SCYLLA_KEYSPACE", "feast_test"), + username=os.environ.get("SCYLLA_USERNAME"), + password=os.environ.get("SCYLLA_PASSWORD"), + local_dc=os.environ["SCYLLA_LOCAL_DC"], + ), registry="memory://", entity_key_serialization_version=3, ) @@ -94,7 +102,9 @@ def _make_feature_view(name: str, with_vector: bool = False) -> FeatureView: ) return FeatureView( name=name, - entities=[Entity(name="item_id", join_keys=["item_id"], value_type=ValueType.STRING)], + entities=[ + Entity(name="item_id", join_keys=["item_id"], value_type=ValueType.STRING) + ], schema=schema, online=True, source=source, @@ -102,13 +112,13 @@ def _make_feature_view(name: str, with_vector: bool = False) -> FeatureView: # --------------------------------------------------------------------------- -# Tests — regular online store +# Tests — regular online store (uses Docker via docker_config fixture) # --------------------------------------------------------------------------- -def test_write_and_read(): +def test_write_and_read(docker_config): store = ScyllaDBOnlineStore() - cfg = _make_config() + cfg = docker_config fv = _make_feature_view("test_write_read") store.update(cfg, [], [fv], [], [], partial=False) @@ -117,7 +127,14 @@ def test_write_and_read(): store.online_write_batch( cfg, fv, - [(ek, {"score": ValueProto(float_list_val=FloatList(val=[0.9]))}, _utc_now(), None)], + [ + ( + ek, + {"score": ValueProto(float_list_val=FloatList(val=[0.9]))}, + _utc_now(), + None, + ) + ], None, ) results = store.online_read(cfg, fv, [ek]) @@ -129,9 +146,9 @@ def test_write_and_read(): store.teardown(cfg, [fv], []) -def test_missing_key_returns_none(): +def test_missing_key_returns_none(docker_config): store = ScyllaDBOnlineStore() - cfg = _make_config() + cfg = docker_config fv = _make_feature_view("test_missing_key") store.update(cfg, [], [fv], [], [], partial=False) @@ -141,7 +158,14 @@ def test_missing_key_returns_none(): store.online_write_batch( cfg, fv, - [(existing, {"score": ValueProto(float_list_val=FloatList(val=[0.5]))}, _utc_now(), None)], + [ + ( + existing, + {"score": ValueProto(float_list_val=FloatList(val=[0.5]))}, + _utc_now(), + None, + ) + ], None, ) results = store.online_read(cfg, fv, [existing, missing]) @@ -153,19 +177,15 @@ def test_missing_key_returns_none(): # --------------------------------------------------------------------------- -# Tests — vector search +# Tests — vector search (local Docker stack via docker_config) # --------------------------------------------------------------------------- -@pytest.mark.skipif( - not os.environ.get("SCYLLA_LOCAL_DC"), - reason="Set SCYLLA_LOCAL_DC to run vector search tests", -) -def test_vector_search(): +def test_vector_search(docker_config): import time store = ScyllaDBOnlineStore() - cfg = _make_config(vector=True) + cfg = docker_config fv = _make_feature_view("test_vector_search", with_vector=True) store.update(cfg, [], [fv], [], [], partial=False) @@ -178,27 +198,62 @@ def test_vector_search(): batch = [] for item_id, vec in rows: ek = _make_entity_key(item_id) - batch.append(( - ek, - { - "score": ValueProto(float_list_val=FloatList(val=[0.5])), - "embedding": ValueProto(float_list_val=FloatList(val=vec)), - }, - _utc_now(), - None, - )) + batch.append( + ( + ek, + { + "score": ValueProto(float_list_val=FloatList(val=[0.5])), + "embedding": ValueProto(float_list_val=FloatList(val=vec)), + }, + _utc_now(), + None, + ) + ) store.online_write_batch(cfg, fv, batch, None) - time.sleep(2) # allow vector index to propagate - - results = store.retrieve_online_documents_v2( - cfg, fv, - requested_features=["score", "embedding"], - embedding=[1.0, 0.0, 0.0, 0.0], - top_k=2, - ) + # Wait for the vector index to finish building (up to 60 s). + deadline = time.time() + 60 + results = None + while time.time() < deadline: + try: + results = store.retrieve_online_documents_v2( + cfg, + fv, + requested_features=["score", "embedding"], + embedding=[1.0, 0.0, 0.0, 0.0], + top_k=2, + ) + break # index is ready + except Exception as exc: + if "still being constructed" in str(exc) or "missing index" in str(exc): + time.sleep(2) + else: + raise + else: + raise TimeoutError("Vector index was not ready within 60 s") + + assert results is not None assert len(results) == 2 + + # Extract entity IDs from the returned entity key protos. + returned_ids = [] for ts, ek_proto, feats in results: + assert ts is not None assert feats is not None + assert "score" in feats + assert "embedding" in feats + assert ek_proto is not None + returned_ids.append(ek_proto.entity_values[0].string_val) + + # Query is [1,0,0,0]; vec_a=[1,0,0,0] (exact match) and + # vec_c=[1,0.1,0,0] are the two nearest neighbours by cosine similarity. + # vec_b=[0,1,0,0] is orthogonal and must NOT appear in top-2. + assert set(returned_ids) == {"vec_a", "vec_c"}, ( + f"Expected top-2 neighbours to be vec_a and vec_c, got {returned_ids}" + ) + # Exact match must be ranked first. + assert returned_ids[0] == "vec_a", ( + f"Expected vec_a (exact match) to be ranked first, got {returned_ids[0]}" + ) finally: store.teardown(cfg, [fv], []) From 7ef5e215d9d14c725bc0c5a53d52577e804089f1 Mon Sep 17 00:00:00 2001 From: Attila Toth Date: Thu, 18 Jun 2026 12:00:43 +0200 Subject: [PATCH 03/12] add testcontainers that support vectors Signed-off-by: Attila Toth --- .../universal/online_store/scylladb.py | 89 +++++++++++++------ 1 file changed, 62 insertions(+), 27 deletions(-) diff --git a/sdk/python/tests/universal/feature_repos/universal/online_store/scylladb.py b/sdk/python/tests/universal/feature_repos/universal/online_store/scylladb.py index a89fec9a40b..59f6ae97549 100644 --- a/sdk/python/tests/universal/feature_repos/universal/online_store/scylladb.py +++ b/sdk/python/tests/universal/feature_repos/universal/online_store/scylladb.py @@ -1,48 +1,83 @@ -import os +import time from typing import Dict +from testcontainers.core.container import DockerContainer +from testcontainers.core.network import Network +from testcontainers.core.waiting_utils import wait_for_logs + from tests.universal.feature_repos.universal.online_store_creator import ( OnlineStoreCreator, ) +_SCYLLA_IMAGE = "scylladb/scylla:2026.1.4" +_VECTOR_STORE_IMAGE = "scylladb/vector-store:1.7.0" + class ScyllaDBOnlineStoreCreator(OnlineStoreCreator): """ - Online store creator for ScyllaDB integration tests. + Starts a ScyllaDB + Vector Store stack for integration tests. - Reads connection details from environment variables: + Two containers share a Docker network: + - scylladb/scylla:2026.1.4 — CQL data node (exposes port 9042) + - scylladb/vector-store:1.7.0 — ANN indexing service (port 6080) - SCYLLA_HOSTS comma-separated contact points (required) - SCYLLA_KEYSPACE keyspace name (default: feast_test) - SCYLLA_USERNAME username (optional) - SCYLLA_PASSWORD password (optional) - SCYLLA_LOCAL_DC datacenter name (optional, required for vector search) + Both regular read/write and vector search tests run against this stack + with no external cloud dependency. """ def __init__(self, project_name: str, **kwargs): super().__init__(project_name) + self.network = Network() + self.scylla = ( + DockerContainer(_SCYLLA_IMAGE) + .with_network(self.network) + .with_network_aliases("scylla") + .with_command( + "--smp 1 --memory 1G --overprovisioned 1 " + "--vector-store-primary-uri http://vector-store:6080 " + "--broadcast-rpc-address 127.0.0.1" + ) + .with_exposed_ports("9042") + ) + self.vector_store = ( + DockerContainer(_VECTOR_STORE_IMAGE) + .with_network(self.network) + .with_network_aliases("vector-store") + .with_env("VECTOR_STORE_URI", "0.0.0.0:6080") + .with_env("VECTOR_STORE_SCYLLADB_URI", "scylla:9042") + ) def create_online_store(self) -> Dict[str, object]: - hosts_env = os.environ.get("SCYLLA_HOSTS", "") - if not hosts_env: - raise RuntimeError( - "Set SCYLLA_HOSTS (comma-separated) to run ScyllaDB integration tests." - ) - store: Dict[str, object] = { + self.network.create() + self.scylla.start() + wait_for_logs( + container=self.scylla, + predicate="Starting listening for CQL clients", + timeout=120, + ) + time.sleep(5) # allow CQL port to be fully ready + self.vector_store.start() + wait_for_logs( + container=self.vector_store, + predicate="6080", + timeout=60, + ) + keyspace = "feast_keyspace" + self.scylla.exec( + f'cqlsh -e "CREATE KEYSPACE IF NOT EXISTS \\"{keyspace}\\"' + " WITH replication = {'class': 'NetworkTopologyStrategy', 'datacenter1': 1}" + " AND tablets = {'enabled': true};\"" + ) + exposed_port = int(self.scylla.get_exposed_port("9042")) + return { "type": "scylladb", - "hosts": [h.strip() for h in hosts_env.split(",")], - "keyspace": os.environ.get("SCYLLA_KEYSPACE", "feast_test"), + "hosts": ["127.0.0.1"], + "port": exposed_port, + "keyspace": keyspace, + "local_dc": "datacenter1", } - username = os.environ.get("SCYLLA_USERNAME") - password = os.environ.get("SCYLLA_PASSWORD") - local_dc = os.environ.get("SCYLLA_LOCAL_DC") - if username: - store["username"] = username - if password: - store["password"] = password - if local_dc: - store["local_dc"] = local_dc - return store def teardown(self): - pass + self.vector_store.stop() + self.scylla.stop() + self.network.remove() From a113fd561ac9394f8916d11727e458177b4d5007 Mon Sep 17 00:00:00 2001 From: Attila Toth Date: Thu, 18 Jun 2026 12:01:07 +0200 Subject: [PATCH 04/12] fix vector support and refactor Signed-off-by: Attila Toth --- .../scylladb_online_store/scylladb.py | 248 ++++++++---------- 1 file changed, 114 insertions(+), 134 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/scylladb_online_store/scylladb.py b/sdk/python/feast/infra/online_stores/scylladb_online_store/scylladb.py index d6008d71a07..ece1a8c5f4e 100644 --- a/sdk/python/feast/infra/online_stores/scylladb_online_store/scylladb.py +++ b/sdk/python/feast/infra/online_stores/scylladb_online_store/scylladb.py @@ -26,7 +26,7 @@ from pydantic import StrictFloat, StrictInt, StrictStr from feast import Entity, FeatureView, RepoConfig -from feast.infra.key_encoding_utils import serialize_entity_key +from feast.infra.key_encoding_utils import deserialize_entity_key, serialize_entity_key from feast.infra.online_stores.online_store import OnlineStore from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto @@ -52,7 +52,8 @@ # CQL templates # --------------------------------------------------------------------------- -# Regular feature store table (one row per entity_key + feature_name) +# Main feature table: one row per (entity_key, feature_name). +# Vector data lives in its own dedicated table to match query patterns. CREATE_TABLE_CQL = """ CREATE TABLE IF NOT EXISTS {fqtable} ( entity_key TEXT, @@ -73,34 +74,36 @@ SELECT_CQL = "SELECT {columns} FROM {fqtable} WHERE entity_key = ?;" -# Vector table — one row per entity, native vector column for ANN -CREATE_VECTOR_TABLE_CQL = """ - CREATE TABLE IF NOT EXISTS {fqtable} ( - entity_key TEXT PRIMARY KEY, - {vec_col} vector, - event_ts TIMESTAMP, - created_ts TIMESTAMP - ); -""" +# Dedicated table to serve vector queries, +# one row per entity, no feature_name column. +# Named "{project}_{fv_name}__{feature_name}_vec". +CREATE_VECTOR_TABLE_CQL = ( + "CREATE TABLE IF NOT EXISTS {{fqvectable}} (" + "entity_key TEXT, " + "vector_value vector, " + "event_ts TIMESTAMP, " + "PRIMARY KEY (entity_key));" +) + +DROP_VECTOR_TABLE_CQL = "DROP TABLE IF EXISTS {fqvectable};" + +INSERT_VEC_CQL = ( + "INSERT INTO {fqvectable} (entity_key, vector_value, event_ts)" + " VALUES (?, ?, ?);" +) CREATE_VECTOR_INDEX_CQL = ( "CREATE CUSTOM INDEX IF NOT EXISTS {index_name}" - " ON {fqtable} ({vec_col})" + " ON {fqvectable} (vector_value)" " USING 'vector_index'" " WITH OPTIONS = {{'similarity_function': '{sim_func}'}};" ) -DROP_VECTOR_TABLE_CQL = "DROP TABLE IF EXISTS {fqtable};" - -INSERT_VECTOR_CQL = ( - "INSERT INTO {fqtable} (entity_key, {vec_col}, event_ts, created_ts)" - " VALUES (?, ?, ?, ?);" -) - +# ANN query on the dedicated vector table. ANN_SELECT_CQL = ( "SELECT entity_key, {sim_func_call} AS score, event_ts" - " FROM {fqtable}" - " ORDER BY {vec_col} ANN OF ?" + " FROM {fqvectable}" + " ORDER BY vector_value ANN OF ?" " LIMIT ?;" ) @@ -112,11 +115,11 @@ "select": (SELECT_CQL, True), } -# Similarity function CQL expression helpers +# Similarity function CQL expression helpers (vector_value is the fixed column name) _SIM_FUNC_EXPR = { - "COSINE": "similarity_cosine({vec_col}, ?)", - "DOT_PRODUCT": "similarity_dot_product({vec_col}, ?)", - "EUCLIDEAN": "similarity_euclidean({vec_col}, ?)", + "COSINE": "similarity_cosine(vector_value, ?)", + "DOT_PRODUCT": "similarity_dot_product(vector_value, ?)", + "EUCLIDEAN": "similarity_euclidean(vector_value, ?)", } @@ -151,7 +154,7 @@ class ScyllaDBOnlineStoreConfig(FeastConfigBaseModel): # Connection hosts: List[StrictStr] """Contact-point host addresses.""" - + port: Optional[StrictInt] = 9042 """CQL port (default 9042).""" @@ -339,10 +342,10 @@ def _fq_table_name(keyspace: str, project: str, table: FeatureView) -> str: return f'"{keyspace}"."{project}_{table.name}"' @staticmethod - def _fq_vector_table_name( - keyspace: str, project: str, table: FeatureView, vec_feature: str + def _fq_vec_table_name( + keyspace: str, project: str, table: FeatureView, feature_name: str ) -> str: - return f'"{keyspace}"."{project}_{table.name}__{vec_feature}_vec"' + return f'"{keyspace}"."{project}_{table.name}__{feature_name}_vec"' def _get_statement( self, config: RepoConfig, op_name: str, fqtable: str, **kwargs: Any @@ -366,66 +369,52 @@ def _get_statement( # ------------------------------------------------------------------ def _create_table( - self, config: RepoConfig, project: str, table: FeatureView - ) -> None: - session = self._get_session(config) - fqtable = self._fq_table_name(self._keyspace, project, table) - logger.info("Creating table %s.", fqtable) - session.execute(CREATE_TABLE_CQL.format(fqtable=fqtable)) - - def _drop_table( - self, config: RepoConfig, project: str, table: FeatureView - ) -> None: - session = self._get_session(config) - fqtable = self._fq_table_name(self._keyspace, project, table) - logger.info("Dropping table %s.", fqtable) - session.execute(DROP_TABLE_CQL.format(fqtable=fqtable)) - - def _create_vector_table( self, config: RepoConfig, project: str, table: FeatureView, - vec_feature: str, - dim: int, - sim_func: str, + vec_features: Optional[List[Tuple[str, int, str]]] = None, ) -> None: session = self._get_session(config) - fqtable = self._fq_vector_table_name( - self._keyspace, project, table, vec_feature - ) - bare_name = f"{project}_{table.name}__{vec_feature}_vec" - index_name = f"{bare_name}_idx" + fqtable = self._fq_table_name(self._keyspace, project, table) + logger.info("Creating table %s.", fqtable) + session.execute(CREATE_TABLE_CQL.format(fqtable=fqtable)) - logger.info("Creating vector table %s.", fqtable) - session.execute( - CREATE_VECTOR_TABLE_CQL.format( - fqtable=fqtable, vec_col=vec_feature, dim=dim + for feat_name, dim, sim_func in (vec_features or []): + fqvectable = self._fq_vec_table_name( + self._keyspace, project, table, feat_name ) - ) - logger.info("Creating vector index %s on %s.", index_name, fqtable) - session.execute( - CREATE_VECTOR_INDEX_CQL.format( - index_name=index_name, - fqtable=fqtable, - vec_col=vec_feature, - sim_func=sim_func, + logger.info("Creating vector table %s.", fqvectable) + session.execute( + CREATE_VECTOR_TABLE_CQL.format(dim=dim).format(fqvectable=fqvectable) + ) + index_name = f"{project}_{table.name}__{feat_name}_vec_idx" + logger.info("Creating vector index %s on %s.", index_name, fqvectable) + session.execute( + CREATE_VECTOR_INDEX_CQL.format( + index_name=index_name, + fqvectable=fqvectable, + sim_func=sim_func, + ) ) - ) - def _drop_vector_table( + def _drop_table( self, config: RepoConfig, project: str, table: FeatureView, - vec_feature: str, + vec_features: Optional[List[Tuple[str, int, str]]] = None, ) -> None: session = self._get_session(config) - fqtable = self._fq_vector_table_name( - self._keyspace, project, table, vec_feature - ) - logger.info("Dropping vector table %s.", fqtable) - session.execute(DROP_VECTOR_TABLE_CQL.format(fqtable=fqtable)) + for feat_name, _dim, _sim in (vec_features or []): + fqvectable = self._fq_vec_table_name( + self._keyspace, project, table, feat_name + ) + logger.info("Dropping vector table %s.", fqvectable) + session.execute(DROP_VECTOR_TABLE_CQL.format(fqvectable=fqvectable)) + fqtable = self._fq_table_name(self._keyspace, project, table) + logger.info("Dropping table %s.", fqtable) + session.execute(DROP_TABLE_CQL.format(fqtable=fqtable)) # ------------------------------------------------------------------ # OnlineStore interface — infrastructure @@ -447,16 +436,12 @@ def update( default_sim = online_store_config.vector_similarity_function for table in tables_to_keep: - self._create_table(config, project, table) - for vec_feature, dim, sim_func in _get_vector_features(table, default_sim): - self._create_vector_table( - config, project, table, vec_feature, dim, sim_func - ) + vec_features = _get_vector_features(table, default_sim) + self._create_table(config, project, table, vec_features=vec_features) for table in tables_to_delete: - for vec_feature, _dim, _sim in _get_vector_features(table, default_sim): - self._drop_vector_table(config, project, table, vec_feature) - self._drop_table(config, project, table) + vec_features = _get_vector_features(table, default_sim) + self._drop_table(config, project, table, vec_features=vec_features) def teardown( self, @@ -468,12 +453,10 @@ def teardown( project = config.project online_store_config = config.online_store assert isinstance(online_store_config, ScyllaDBOnlineStoreConfig) - default_sim = online_store_config.vector_similarity_function for table in tables: - for vec_feature, _dim, _sim in _get_vector_features(table, default_sim): - self._drop_vector_table(config, project, table, vec_feature) - self._drop_table(config, project, table) + vec_features = _get_vector_features(table, online_store_config.vector_similarity_function) + self._drop_table(config, project, table, vec_features=vec_features) # ------------------------------------------------------------------ # OnlineStore interface — write @@ -491,78 +474,79 @@ def online_write_batch( """ Write a batch of feature rows to the online store. - Each ``(entity_key, feature_name)`` pair is upserted into the regular - feature table. If the feature view has vector features, the - corresponding float-list values are also written to the per-feature - vector table so they can be queried with ANN. + All features are written to the main table. For vector features, + the float-list value is additionally written to the dedicated vector + table to support ANN search. """ project = config.project online_store_config = config.online_store assert isinstance(online_store_config, ScyllaDBOnlineStoreConfig) default_sim = online_store_config.vector_similarity_function vec_features = _get_vector_features(table, default_sim) - vec_feature_names = {vf[0] for vf in vec_features} - # --- regular table --- + # --- regular table (vector_value populated for vector feature rows) --- fqtable = self._fq_table_name(self._keyspace, project, table) insert_stmt = self._get_statement(config, "insert", fqtable) - def _regular_rows() -> Iterable[Tuple[str, bytes, str, datetime, Optional[datetime]]]: + session = self._get_session(config) + + # --- main table rows (all features, no vector column) --- + def _main_rows() -> Iterable[ + Tuple[str, bytes, str, datetime, Optional[datetime]] + ]: for entity_key, values, timestamp, created_ts in data: entity_key_bin = serialize_entity_key( entity_key, entity_key_serialization_version=config.entity_key_serialization_version, ).hex() for feature_name, val in values.items(): - yield (feature_name, val.SerializeToString(), entity_key_bin, timestamp, created_ts) + yield ( + feature_name, + val.SerializeToString(), + entity_key_bin, + timestamp, + created_ts, + ) if progress: progress(1) execute_concurrent_with_args( - self._get_session(config), + session, insert_stmt, - _regular_rows(), + _main_rows(), concurrency=online_store_config.write_concurrency, ) # correction for the last missing call to `progress`: if progress: progress(1) - # --- vector tables --- - if not vec_features: - return - - session = self._get_session(config) - - for vec_feature, _dim, _sim in vec_features: - fq_vec_table = self._fq_vector_table_name( - self._keyspace, project, table, vec_feature - ) - vec_insert_cql = INSERT_VECTOR_CQL.format( - fqtable=fq_vec_table, vec_col=vec_feature - ) - cache_key = f"vec_insert_{fq_vec_table}" - if cache_key not in self._prepared_statements: - logger.info("Preparing vector insert on %s.", fq_vec_table) - self._prepared_statements[cache_key] = session.prepare(vec_insert_cql) - vec_stmt = self._prepared_statements[cache_key] + # --- vector table rows (one per entity per vector feature) --- + for feat_name, _dim, _sim in vec_features: + fqvectable = self._fq_vec_table_name(self._keyspace, project, table, feat_name) + vec_insert_cql = INSERT_VEC_CQL.format(fqvectable=fqvectable) + if vec_insert_cql not in self._prepared_statements: + self._prepared_statements[vec_insert_cql] = session.prepare(vec_insert_cql) + vec_insert_stmt = self._prepared_statements[vec_insert_cql] def _vec_rows( - _vec_feat: str = vec_feature, - ) -> Iterable[Tuple[str, List[float], datetime, Optional[datetime]]]: - for entity_key, values, timestamp, created_ts in data: - if _vec_feat not in values: + fn: str = feat_name, + ) -> Iterable[Tuple[str, Any, datetime]]: + for entity_key, values, timestamp, _created_ts in data: + if fn not in values: continue entity_key_bin = serialize_entity_key( entity_key, entity_key_serialization_version=config.entity_key_serialization_version, ).hex() - float_list = list(values[_vec_feat].float_list_val.val) - yield (entity_key_bin, float_list, timestamp, created_ts) + yield ( + entity_key_bin, + list(values[fn].float_list_val.val), + timestamp, + ) execute_concurrent_with_args( session, - vec_stmt, + vec_insert_stmt, _vec_rows(), concurrency=online_store_config.write_concurrency, ) @@ -709,15 +693,12 @@ def retrieve_online_documents_v2( f"Unsupported similarity function '{sim_func}'. " "Choose from: COSINE, DOT_PRODUCT, EUCLIDEAN." ) - sim_expr = sim_expr_template.format(vec_col=vec_feature) + sim_expr = sim_expr_template - fq_vec_table = self._fq_vector_table_name( - self._keyspace, project, table, vec_feature - ) + fqvectable = self._fq_vec_table_name(self._keyspace, project, table, vec_feature) ann_cql = ANN_SELECT_CQL.format( sim_func_call=sim_expr, - fqtable=fq_vec_table, - vec_col=vec_feature, + fqvectable=fqvectable, ) session = self._get_session(config) @@ -731,12 +712,11 @@ def retrieve_online_documents_v2( # Ordered list of entity key bins from ANN results entity_key_bins: List[str] = [row.entity_key for row in ann_rows] - scores: Dict[str, float] = {row.entity_key: row.score for row in ann_rows} timestamps: Dict[str, Optional[datetime]] = { row.entity_key: row.event_ts for row in ann_rows } - # Batch-fetch full feature values from the regular table + # Batch-fetch full feature values from the main table fqtable = self._fq_table_name(self._keyspace, project, table) select_stmt = self._get_statement( config, "select", fqtable, columns="feature_name, value, event_ts" @@ -753,9 +733,7 @@ def retrieve_online_documents_v2( feature_map: Dict[str, Dict[str, ValueProto]] = { ek: {} for ek in entity_key_bins } - for (ek_bin, (success, rows_or_exc)) in zip( - entity_key_bins, retrieval - ): + for ek_bin, (success, rows_or_exc) in zip(entity_key_bins, retrieval): if not success: logger.error("ScyllaDB ANN batch-read error: %s", rows_or_exc) continue @@ -775,11 +753,13 @@ def retrieve_online_documents_v2( ] ] = [] for ek_bin in entity_key_bins: - ek_proto = EntityKeyProto() try: - ek_proto.ParseFromString(bytes.fromhex(ek_bin)) + ek_proto: Optional[EntityKeyProto] = deserialize_entity_key( + bytes.fromhex(ek_bin), + entity_key_serialization_version=config.entity_key_serialization_version, + ) except Exception: - ek_proto = None # type: ignore[assignment] + ek_proto = None output.append( ( From ec48773703ae284af0e0f13d257229d2c6dd2cb4 Mon Sep 17 00:00:00 2001 From: Attila Toth Date: Thu, 18 Jun 2026 12:01:27 +0200 Subject: [PATCH 05/12] docs update Signed-off-by: Attila Toth --- docs/reference/online-stores/scylladb.md | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/docs/reference/online-stores/scylladb.md b/docs/reference/online-stores/scylladb.md index ae05418f623..aa11974afb2 100644 --- a/docs/reference/online-stores/scylladb.md +++ b/docs/reference/online-stores/scylladb.md @@ -98,10 +98,7 @@ documents_fv = FeatureView( ``` {% endcode %} -When `feast apply` runs, the store creates: - -- A regular feature table (`{project}_{fv_name}`) for `online_read` / `online_write_batch`. -- A vector table (`{project}_{fv_name}__{feature}_vec`) with a native `vector` column and an HNSW ANN index. +When `feast apply` runs, the store automatically creates the necessary tables and HNSW ANN index for any feature view with vector-tagged fields. To query the top-k most similar documents: From 09f8d329f072283e0a09bde81c9ee72de3652f68 Mon Sep 17 00:00:00 2001 From: Attila Toth Date: Thu, 18 Jun 2026 13:42:20 +0200 Subject: [PATCH 06/12] add support for native TTL, set local_quorum as default Signed-off-by: Attila Toth --- .../scylladb_online_store/scylladb.py | 40 ++++++++++++++----- 1 file changed, 30 insertions(+), 10 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/scylladb_online_store/scylladb.py b/sdk/python/feast/infra/online_stores/scylladb_online_store/scylladb.py index ece1a8c5f4e..4f63990c689 100644 --- a/sdk/python/feast/infra/online_stores/scylladb_online_store/scylladb.py +++ b/sdk/python/feast/infra/online_stores/scylladb_online_store/scylladb.py @@ -22,7 +22,7 @@ ) from cassandra.concurrent import execute_concurrent_with_args from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy -from cassandra.query import PreparedStatement +from cassandra.query import ConsistencyLevel, PreparedStatement from pydantic import StrictFloat, StrictInt, StrictStr from feast import Entity, FeatureView, RepoConfig @@ -72,6 +72,11 @@ " VALUES (?, ?, ?, ?, ?);" ) +INSERT_CQL_TTL = ( + "INSERT INTO {fqtable} (feature_name, value, entity_key, event_ts, created_ts)" + " VALUES (?, ?, ?, ?, ?) USING TTL ?;" +) + SELECT_CQL = "SELECT {columns} FROM {fqtable} WHERE entity_key = ?;" # Dedicated table to serve vector queries, @@ -112,6 +117,7 @@ "create": (CREATE_TABLE_CQL, False), "drop": (DROP_TABLE_CQL, False), "insert": (INSERT_CQL, True), + "insert_ttl": (INSERT_CQL_TTL, True), "select": (SELECT_CQL, True), } @@ -309,15 +315,20 @@ def _get_session(self, config: RepoConfig) -> Session: exe_profile = ExecutionProfile( request_timeout=online_store_config.request_timeout, load_balancing_policy=lb_policy, + consistency_level=ConsistencyLevel.LOCAL_QUORUM, ) execution_profiles: Optional[Dict] = {EXEC_PROFILE_DEFAULT: exe_profile} elif online_store_config.request_timeout is not None: exe_profile = ExecutionProfile( request_timeout=online_store_config.request_timeout, + consistency_level=ConsistencyLevel.LOCAL_QUORUM, ) execution_profiles = {EXEC_PROFILE_DEFAULT: exe_profile} else: - execution_profiles = None + exe_profile = ExecutionProfile( + consistency_level=ConsistencyLevel.LOCAL_QUORUM, + ) + execution_profiles = {EXEC_PROFILE_DEFAULT: exe_profile} cluster_kwargs: Dict[str, Any] = { k: v @@ -334,7 +345,7 @@ def _get_session(self, config: RepoConfig) -> Session: **cluster_kwargs, ) self._keyspace = keyspace - self._session = self._cluster.connect(self._keyspace) + self._session = self._cluster.connect(keyspace) return self._session @staticmethod @@ -484,29 +495,38 @@ def online_write_batch( default_sim = online_store_config.vector_similarity_function vec_features = _get_vector_features(table, default_sim) - # --- regular table (vector_value populated for vector feature rows) --- + # Compute TTL in seconds from the FeatureView's ttl field (timedelta or None). + # ScyllaDB will automatically expire rows after this many seconds, covering + # both "support for ttl at retrieval" and "support for deleting expired data". + ttl_seconds: Optional[int] = ( + int(table.ttl.total_seconds()) if table.ttl else None + ) + + # --- regular table --- fqtable = self._fq_table_name(self._keyspace, project, table) - insert_stmt = self._get_statement(config, "insert", fqtable) + insert_op = "insert_ttl" if ttl_seconds is not None else "insert" + insert_stmt = self._get_statement(config, insert_op, fqtable) session = self._get_session(config) - # --- main table rows (all features, no vector column) --- - def _main_rows() -> Iterable[ - Tuple[str, bytes, str, datetime, Optional[datetime]] - ]: + # --- main table rows (all features) --- + def _main_rows() -> Iterable[Tuple]: for entity_key, values, timestamp, created_ts in data: entity_key_bin = serialize_entity_key( entity_key, entity_key_serialization_version=config.entity_key_serialization_version, ).hex() for feature_name, val in values.items(): - yield ( + row: Tuple = ( feature_name, val.SerializeToString(), entity_key_bin, timestamp, created_ts, ) + if ttl_seconds is not None: + row = row + (ttl_seconds,) + yield row if progress: progress(1) From b42809e19e18721057e9b38d3115f5eba6aae7a8 Mon Sep 17 00:00:00 2001 From: Attila Toth Date: Thu, 18 Jun 2026 13:42:35 +0200 Subject: [PATCH 07/12] docs fixes Signed-off-by: Attila Toth --- docs/reference/online-stores/overview.md | 38 ++++++++++++------------ docs/reference/online-stores/scylladb.md | 4 +-- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/docs/reference/online-stores/overview.md b/docs/reference/online-stores/overview.md index 6ee076b0669..663a48836dc 100644 --- a/docs/reference/online-stores/overview.md +++ b/docs/reference/online-stores/overview.md @@ -29,26 +29,26 @@ See this [issue](https://github.com/feast-dev/feast/issues/2254) for a discussio ## Functionality Matrix There are currently five core online store implementations: `SqliteOnlineStore`, `RedisOnlineStore`, `DynamoDBOnlineStore`, `SnowflakeOnlineStore`, and `DatastoreOnlineStore`. -There are several additional implementations contributed by the Feast community (`PostgreSQLOnlineStore`, `HbaseOnlineStore` and `CassandraOnlineStore`), which are not guaranteed to be stable or to match the functionality of the core implementations. +There are several additional implementations contributed by the Feast community (`PostgreSQLOnlineStore`, `HbaseOnlineStore`, `CassandraOnlineStore` and `ScyllaDBOnlineStore`), which are not guaranteed to be stable or to match the functionality of the core implementations. Details for each specific online store, such as how to configure it in a `feature_store.yaml`, can be found [here](README.md). Below is a matrix indicating which online stores support what functionality. -| | Sqlite | Redis | DynamoDB | Snowflake | Datastore | Postgres | Hbase | [[Cassandra](https://cassandra.apache.org/_/index.html) / [Astra DB](https://www.datastax.com/products/datastax-astra?utm_source=feast)] | Milvus | -| :-------------------------------------------------------- | :-- | :-- | :-- | :-- | :-- | :-- | :-- | :-- |:----| -| write feature values to the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes | -| read feature values from the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes | -| update infrastructure (e.g. tables) in the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes | -| teardown infrastructure (e.g. tables) in the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes | -| generate a plan of infrastructure changes | yes | no | no | no | no | no | no | yes | no | -| support for on-demand transforms | yes | yes | yes | yes | yes | yes | yes | yes | yes | -| readable by Python SDK | yes | yes | yes | yes | yes | yes | yes | yes | yes | -| readable by Java | no | yes | no | no | no | no | no | no | no | -| readable by Go | yes | yes | no | no | no | no | no | no | no | -| support for entityless feature views | yes | yes | yes | yes | yes | yes | yes | yes | yes | -| support for concurrent writing to the same key | no | yes | no | no | no | no | no | no | yes | -| support for ttl (time to live) at retrieval | no | yes | no | no | no | no | no | no | no | -| support for deleting expired data | no | yes | no | no | no | no | no | no | no | -| collocated by feature view | yes | no | yes | yes | yes | yes | yes | yes | no | -| collocated by feature service | no | no | no | no | no | no | no | no | no | -| collocated by entity key | no | yes | no | no | no | no | no | no | yes | +| | Sqlite | Redis | DynamoDB | Snowflake | Datastore | Postgres | Hbase | [[Cassandra](https://cassandra.apache.org/_/index.html) / [Astra DB](https://www.datastax.com/products/datastax-astra?utm_source=feast)] | Milvus | ScyllaDB | +| :-------------------------------------------------------- | :-- | :-- | :-- | :-- | :-- | :-- | :-- | :-- |:----| :-- | +| write feature values to the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes | yes | +| read feature values from the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes | yes | +| update infrastructure (e.g. tables) in the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes | yes | +| teardown infrastructure (e.g. tables) in the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes | yes | +| generate a plan of infrastructure changes | yes | no | no | no | no | no | no | yes | no | no | +| support for on-demand transforms | yes | yes | yes | yes | yes | yes | yes | yes | yes | yes | +| readable by Python SDK | yes | yes | yes | yes | yes | yes | yes | yes | yes | yes | +| readable by Java | no | yes | no | no | no | no | no | no | no | no | +| readable by Go | yes | yes | no | no | no | no | no | no | no | no | +| support for entityless feature views | yes | yes | yes | yes | yes | yes | yes | yes | yes | yes | +| support for concurrent writing to the same key | no | yes | no | no | no | no | no | no | yes | no | +| support for ttl (time to live) at retrieval | no | yes | no | no | no | no | no | no | no | yes | +| support for deleting expired data | no | yes | no | no | no | no | no | no | no | yes | +| collocated by feature view | yes | no | yes | yes | yes | yes | yes | yes | no | yes | +| collocated by feature service | no | no | no | no | no | no | no | no | no | no | +| collocated by entity key | no | yes | no | no | no | no | no | no | yes | no | diff --git a/docs/reference/online-stores/scylladb.md b/docs/reference/online-stores/scylladb.md index aa11974afb2..6c2e462877b 100644 --- a/docs/reference/online-stores/scylladb.md +++ b/docs/reference/online-stores/scylladb.md @@ -129,8 +129,8 @@ Below is a matrix indicating which functionality is supported by the ScyllaDB on | readable by Go | no | | support for entityless feature views | yes | | support for concurrent writing to the same key | no | -| support for ttl (time to live) at retrieval | no | -| support for deleting expired data | no | +| support for ttl (time to live) at retrieval | yes | +| support for deleting expired data | yes | | collocated by feature view | yes | | collocated by feature service | no | | collocated by entity key | no | From 08573c6b98ef7e992cfe82edac13916206fae7a9 Mon Sep 17 00:00:00 2001 From: Attila Toth Date: Thu, 18 Jun 2026 13:43:21 +0200 Subject: [PATCH 08/12] add tests Signed-off-by: Attila Toth --- .../test_scylladb_online_store.py | 215 +++++++++++++++++- 1 file changed, 209 insertions(+), 6 deletions(-) diff --git a/sdk/python/tests/integration/online_store/test_scylladb_online_store.py b/sdk/python/tests/integration/online_store/test_scylladb_online_store.py index 9e44782cdb3..a5d456d86e6 100644 --- a/sdk/python/tests/integration/online_store/test_scylladb_online_store.py +++ b/sdk/python/tests/integration/online_store/test_scylladb_online_store.py @@ -14,6 +14,8 @@ """ import os +import time +from datetime import datetime, timedelta, timezone from typing import List import pytest @@ -28,7 +30,7 @@ from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import FloatList from feast.protos.feast.types.Value_pb2 import Value as ValueProto -from feast.types import Array, Float32 +from feast.types import Array, Float32, Int64 from feast.utils import _utc_now from feast.value_type import ValueType from tests.universal.feature_repos.universal.online_store.scylladb import ( @@ -85,7 +87,7 @@ def _make_entity_key(val: str) -> EntityKeyProto: ) -def _make_feature_view(name: str, with_vector: bool = False) -> FeatureView: +def _make_feature_view(name: str, with_vector: bool = False, ttl: timedelta = None) -> FeatureView: source = FileSource(path="dummy.parquet", timestamp_field="event_timestamp") schema: List[Field] = [Field(name="score", dtype=Array(Float32))] if with_vector: @@ -108,6 +110,7 @@ def _make_feature_view(name: str, with_vector: bool = False) -> FeatureView: schema=schema, online=True, source=source, + ttl=ttl, ) @@ -142,6 +145,7 @@ def test_write_and_read(docker_config): ts, feats = results[0] assert feats is not None assert "score" in feats + assert list(feats["score"].float_list_val.val) == pytest.approx([0.9]) finally: store.teardown(cfg, [fv], []) @@ -176,14 +180,155 @@ def test_missing_key_returns_none(docker_config): store.teardown(cfg, [fv], []) +def test_multiple_features_roundtrip(docker_config): + """Multiple features of different types all round-trip with the correct value. + """ + store = ScyllaDBOnlineStore() + cfg = docker_config + source = FileSource(path="dummy.parquet", timestamp_field="event_timestamp") + fv = FeatureView( + name="test_multi_features", + entities=[Entity(name="item_id", join_keys=["item_id"], value_type=ValueType.STRING)], + schema=[ + Field(name="score", dtype=Array(Float32)), + Field(name="priority", dtype=Int64), + ], + online=True, + source=source, + ) + + store.update(cfg, [], [fv], [], [], partial=False) + try: + ek = _make_entity_key("item_mf") + store.online_write_batch( + cfg, + fv, + [ + ( + ek, + { + "score": ValueProto(float_list_val=FloatList(val=[0.85, 0.15])), + "priority": ValueProto(int64_val=42), + }, + _utc_now(), + None, + ) + ], + None, + ) + results = store.online_read(cfg, fv, [ek]) + assert len(results) == 1 + ts, feats = results[0] + assert feats is not None + assert list(feats["score"].float_list_val.val) == pytest.approx([0.85, 0.15]) + assert feats["priority"].int64_val == 42 + finally: + store.teardown(cfg, [fv], []) + + +def test_multiple_entities(docker_config): + """Multiple entity keys can be read in a single online_read call with correct values. + + Mirrors the multi-entity behaviour verified by the universal online store + suite (``test_online_retrieval_with_event_timestamps``) which Cassandra runs. + """ + store = ScyllaDBOnlineStore() + cfg = docker_config + fv = _make_feature_view("test_multi_entities") + + store.update(cfg, [], [fv], [], [], partial=False) + try: + keys = [_make_entity_key(f"entity_{i}") for i in range(3)] + batch = [ + ( + ek, + {"score": ValueProto(float_list_val=FloatList(val=[float(i) * 0.1 + 0.1]))}, + _utc_now(), + None, + ) + for i, ek in enumerate(keys) + ] + store.online_write_batch(cfg, fv, batch, None) + results = store.online_read(cfg, fv, keys) + assert len(results) == 3 + for i, (ts, feats) in enumerate(results): + assert feats is not None, f"Entity {i} returned None features" + assert list(feats["score"].float_list_val.val) == pytest.approx( + [float(i) * 0.1 + 0.1] + ) + finally: + store.teardown(cfg, [fv], []) + + +def test_overwrite_uses_latest_value(docker_config): + """Writing the same entity key twice keeps the most-recently-written value.""" + store = ScyllaDBOnlineStore() + cfg = docker_config + fv = _make_feature_view("test_overwrite") + + store.update(cfg, [], [fv], [], [], partial=False) + try: + ek = _make_entity_key("overwrite_item") + store.online_write_batch( + cfg, + fv, + [(ek, {"score": ValueProto(float_list_val=FloatList(val=[0.1]))}, _utc_now(), None)], + None, + ) + store.online_write_batch( + cfg, + fv, + [(ek, {"score": ValueProto(float_list_val=FloatList(val=[0.9]))}, _utc_now(), None)], + None, + ) + results = store.online_read(cfg, fv, [ek]) + assert len(results) == 1 + ts, feats = results[0] + assert feats is not None + assert list(feats["score"].float_list_val.val) == pytest.approx([0.9]) + finally: + store.teardown(cfg, [fv], []) + + +def test_event_timestamp_returned(docker_config): + """The event timestamp written with a row is returned correctly by online_read. + + Mirrors ``test_online_retrieval_with_event_timestamps`` from the universal + suite which verifies per-entity timestamps for all online store types. + """ + store = ScyllaDBOnlineStore() + cfg = docker_config + fv = _make_feature_view("test_event_ts") + + store.update(cfg, [], [fv], [], [], partial=False) + try: + ek = _make_entity_key("ts_item") + # Use a second-boundary timestamp — CQL ``timestamp`` has ms precision. + write_ts = datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc) + store.online_write_batch( + cfg, + fv, + [(ek, {"score": ValueProto(float_list_val=FloatList(val=[0.5]))}, write_ts, None)], + None, + ) + results = store.online_read(cfg, fv, [ek]) + assert len(results) == 1 + ts, feats = results[0] + assert ts is not None + assert isinstance(ts, datetime) + # Normalise both to UTC before comparing. + ts_utc = ts if ts.tzinfo is not None else ts.replace(tzinfo=timezone.utc) + assert ts_utc == write_ts + finally: + store.teardown(cfg, [fv], []) + + # --------------------------------------------------------------------------- # Tests — vector search (local Docker stack via docker_config) # --------------------------------------------------------------------------- def test_vector_search(docker_config): - import time - store = ScyllaDBOnlineStore() cfg = docker_config fv = _make_feature_view("test_vector_search", with_vector=True) @@ -236,14 +381,28 @@ def test_vector_search(docker_config): assert len(results) == 2 # Extract entity IDs from the returned entity key protos. + # Also verify the shape and types of every field in each result tuple. + # The expected embeddings keyed by entity ID for value verification. + expected_embeddings = { + "vec_a": [1.0, 0.0, 0.0, 0.0], + "vec_b": [0.0, 1.0, 0.0, 0.0], + "vec_c": [1.0, 0.1, 0.0, 0.0], + } returned_ids = [] for ts, ek_proto, feats in results: - assert ts is not None + assert isinstance(ts, datetime), f"Expected datetime, got {type(ts)}" assert feats is not None assert "score" in feats assert "embedding" in feats + # score field: single-element float list written as 0.5 + assert list(feats["score"].float_list_val.val) == pytest.approx([0.5]) + # embedding field: values must match what was written + entity_id = ek_proto.entity_values[0].string_val + assert list(feats["embedding"].float_list_val.val) == pytest.approx( + expected_embeddings[entity_id] + ) assert ek_proto is not None - returned_ids.append(ek_proto.entity_values[0].string_val) + returned_ids.append(entity_id) # Query is [1,0,0,0]; vec_a=[1,0,0,0] (exact match) and # vec_c=[1,0.1,0,0] are the two nearest neighbours by cosine similarity. @@ -257,3 +416,47 @@ def test_vector_search(docker_config): ) finally: store.teardown(cfg, [fv], []) + + +def test_ttl_expiry(docker_config): + """Rows written with a TTL should be gone after ScyllaDB expires them.""" + store = ScyllaDBOnlineStore() + cfg = docker_config + # TTL of 2 seconds, short enough for a test, long enough to write first. + fv = _make_feature_view("test_ttl_expiry", ttl=timedelta(seconds=5)) + + store.update(cfg, [], [fv], [], [], partial=False) + try: + ek = _make_entity_key("ttl_item") + store.online_write_batch( + cfg, + fv, + [ + ( + ek, + {"score": ValueProto(float_list_val=FloatList(val=[0.7]))}, + _utc_now(), + None, + ) + ], + None, + ) + + # Confirm the row is readable immediately after writing. + results = store.online_read(cfg, fv, [ek]) + assert len(results) == 1 + ts, feats = results[0] + assert feats is not None, "Row should be present right after write" + + # Wait for ScyllaDB to expire the row (TTL = 5s, wait 8s to be safe). + time.sleep(8) + + results = store.online_read(cfg, fv, [ek]) + assert len(results) == 1 + ts_after, feats_after = results[0] + assert feats_after is None, ( + "Row should have expired and return None, " + f"but got features: {feats_after}" + ) + finally: + store.teardown(cfg, [fv], []) From de1a61b22ffa7fda661549910f427dde1c7e1faf Mon Sep 17 00:00:00 2001 From: Attila Toth Date: Thu, 18 Jun 2026 14:08:46 +0200 Subject: [PATCH 09/12] lint fix Signed-off-by: Attila Toth --- .../scylladb_online_store/scylladb.py | 23 +++++---- .../test_scylladb_online_store.py | 47 +++++++++++++++---- 2 files changed, 52 insertions(+), 18 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/scylladb_online_store/scylladb.py b/sdk/python/feast/infra/online_stores/scylladb_online_store/scylladb.py index 4f63990c689..12c4f25c474 100644 --- a/sdk/python/feast/infra/online_stores/scylladb_online_store/scylladb.py +++ b/sdk/python/feast/infra/online_stores/scylladb_online_store/scylladb.py @@ -93,8 +93,7 @@ DROP_VECTOR_TABLE_CQL = "DROP TABLE IF EXISTS {fqvectable};" INSERT_VEC_CQL = ( - "INSERT INTO {fqvectable} (entity_key, vector_value, event_ts)" - " VALUES (?, ?, ?);" + "INSERT INTO {fqvectable} (entity_key, vector_value, event_ts) VALUES (?, ?, ?);" ) CREATE_VECTOR_INDEX_CQL = ( @@ -391,7 +390,7 @@ def _create_table( logger.info("Creating table %s.", fqtable) session.execute(CREATE_TABLE_CQL.format(fqtable=fqtable)) - for feat_name, dim, sim_func in (vec_features or []): + for feat_name, dim, sim_func in vec_features or []: fqvectable = self._fq_vec_table_name( self._keyspace, project, table, feat_name ) @@ -417,7 +416,7 @@ def _drop_table( vec_features: Optional[List[Tuple[str, int, str]]] = None, ) -> None: session = self._get_session(config) - for feat_name, _dim, _sim in (vec_features or []): + for feat_name, _dim, _sim in vec_features or []: fqvectable = self._fq_vec_table_name( self._keyspace, project, table, feat_name ) @@ -466,7 +465,9 @@ def teardown( assert isinstance(online_store_config, ScyllaDBOnlineStoreConfig) for table in tables: - vec_features = _get_vector_features(table, online_store_config.vector_similarity_function) + vec_features = _get_vector_features( + table, online_store_config.vector_similarity_function + ) self._drop_table(config, project, table, vec_features=vec_features) # ------------------------------------------------------------------ @@ -542,10 +543,14 @@ def _main_rows() -> Iterable[Tuple]: # --- vector table rows (one per entity per vector feature) --- for feat_name, _dim, _sim in vec_features: - fqvectable = self._fq_vec_table_name(self._keyspace, project, table, feat_name) + fqvectable = self._fq_vec_table_name( + self._keyspace, project, table, feat_name + ) vec_insert_cql = INSERT_VEC_CQL.format(fqvectable=fqvectable) if vec_insert_cql not in self._prepared_statements: - self._prepared_statements[vec_insert_cql] = session.prepare(vec_insert_cql) + self._prepared_statements[vec_insert_cql] = session.prepare( + vec_insert_cql + ) vec_insert_stmt = self._prepared_statements[vec_insert_cql] def _vec_rows( @@ -715,7 +720,9 @@ def retrieve_online_documents_v2( ) sim_expr = sim_expr_template - fqvectable = self._fq_vec_table_name(self._keyspace, project, table, vec_feature) + fqvectable = self._fq_vec_table_name( + self._keyspace, project, table, vec_feature + ) ann_cql = ANN_SELECT_CQL.format( sim_func_call=sim_expr, fqvectable=fqvectable, diff --git a/sdk/python/tests/integration/online_store/test_scylladb_online_store.py b/sdk/python/tests/integration/online_store/test_scylladb_online_store.py index a5d456d86e6..4c2385d1e50 100644 --- a/sdk/python/tests/integration/online_store/test_scylladb_online_store.py +++ b/sdk/python/tests/integration/online_store/test_scylladb_online_store.py @@ -87,7 +87,9 @@ def _make_entity_key(val: str) -> EntityKeyProto: ) -def _make_feature_view(name: str, with_vector: bool = False, ttl: timedelta = None) -> FeatureView: +def _make_feature_view( + name: str, with_vector: bool = False, ttl: timedelta = None +) -> FeatureView: source = FileSource(path="dummy.parquet", timestamp_field="event_timestamp") schema: List[Field] = [Field(name="score", dtype=Array(Float32))] if with_vector: @@ -181,14 +183,15 @@ def test_missing_key_returns_none(docker_config): def test_multiple_features_roundtrip(docker_config): - """Multiple features of different types all round-trip with the correct value. - """ + """Multiple features of different types all round-trip with the correct value.""" store = ScyllaDBOnlineStore() cfg = docker_config source = FileSource(path="dummy.parquet", timestamp_field="event_timestamp") fv = FeatureView( name="test_multi_features", - entities=[Entity(name="item_id", join_keys=["item_id"], value_type=ValueType.STRING)], + entities=[ + Entity(name="item_id", join_keys=["item_id"], value_type=ValueType.STRING) + ], schema=[ Field(name="score", dtype=Array(Float32)), Field(name="priority", dtype=Int64), @@ -242,7 +245,11 @@ def test_multiple_entities(docker_config): batch = [ ( ek, - {"score": ValueProto(float_list_val=FloatList(val=[float(i) * 0.1 + 0.1]))}, + { + "score": ValueProto( + float_list_val=FloatList(val=[float(i) * 0.1 + 0.1]) + ) + }, _utc_now(), None, ) @@ -272,13 +279,27 @@ def test_overwrite_uses_latest_value(docker_config): store.online_write_batch( cfg, fv, - [(ek, {"score": ValueProto(float_list_val=FloatList(val=[0.1]))}, _utc_now(), None)], + [ + ( + ek, + {"score": ValueProto(float_list_val=FloatList(val=[0.1]))}, + _utc_now(), + None, + ) + ], None, ) store.online_write_batch( cfg, fv, - [(ek, {"score": ValueProto(float_list_val=FloatList(val=[0.9]))}, _utc_now(), None)], + [ + ( + ek, + {"score": ValueProto(float_list_val=FloatList(val=[0.9]))}, + _utc_now(), + None, + ) + ], None, ) results = store.online_read(cfg, fv, [ek]) @@ -308,7 +329,14 @@ def test_event_timestamp_returned(docker_config): store.online_write_batch( cfg, fv, - [(ek, {"score": ValueProto(float_list_val=FloatList(val=[0.5]))}, write_ts, None)], + [ + ( + ek, + {"score": ValueProto(float_list_val=FloatList(val=[0.5]))}, + write_ts, + None, + ) + ], None, ) results = store.online_read(cfg, fv, [ek]) @@ -455,8 +483,7 @@ def test_ttl_expiry(docker_config): assert len(results) == 1 ts_after, feats_after = results[0] assert feats_after is None, ( - "Row should have expired and return None, " - f"but got features: {feats_after}" + f"Row should have expired and return None, but got features: {feats_after}" ) finally: store.teardown(cfg, [fv], []) From 47c4e53b215a8f35e4376d0d17cba5810b1a7a15 Mon Sep 17 00:00:00 2001 From: Attila Toth Date: Tue, 23 Jun 2026 11:14:13 +0200 Subject: [PATCH 10/12] refactor: use one table for both vector and non-vector data Signed-off-by: Attila Toth --- .../scylladb_online_store/scylladb.py | 186 ++++++++++-------- 1 file changed, 101 insertions(+), 85 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/scylladb_online_store/scylladb.py b/sdk/python/feast/infra/online_stores/scylladb_online_store/scylladb.py index 12c4f25c474..bb92eb2dc98 100644 --- a/sdk/python/feast/infra/online_stores/scylladb_online_store/scylladb.py +++ b/sdk/python/feast/infra/online_stores/scylladb_online_store/scylladb.py @@ -52,18 +52,26 @@ # CQL templates # --------------------------------------------------------------------------- -# Main feature table: one row per (entity_key, feature_name). -# Vector data lives in its own dedicated table to match query patterns. -CREATE_TABLE_CQL = """ - CREATE TABLE IF NOT EXISTS {fqtable} ( - entity_key TEXT, - feature_name TEXT, - value BLOB, - event_ts TIMESTAMP, - created_ts TIMESTAMP, - PRIMARY KEY ((entity_key), feature_name) - ) WITH CLUSTERING ORDER BY (feature_name ASC); -""" +def _build_create_table_cql(fqtable: str, dim: Optional[int]) -> str: + """Return a CREATE TABLE CQL string, optionally including the vector column. + + When *dim* is given a ``vector_value vector`` column is inserted + between the BLOB ``value`` column and ``event_ts``. When *dim* is ``None`` + the column is omitted entirely and the table schema is fully static. + """ + vec_col = f"vector_value vector,\n" if dim else "" + return ( + f"CREATE TABLE IF NOT EXISTS {fqtable} (\n" + f" entity_key TEXT,\n" + f" feature_name TEXT,\n" + f" value BLOB,\n" + f" {vec_col}" + f" event_ts TIMESTAMP,\n" + f" created_ts TIMESTAMP,\n" + f" PRIMARY KEY ((entity_key), feature_name)\n" + f");" + ) + DROP_TABLE_CQL = "DROP TABLE IF EXISTS {fqtable};" @@ -77,43 +85,40 @@ " VALUES (?, ?, ?, ?, ?) USING TTL ?;" ) -SELECT_CQL = "SELECT {columns} FROM {fqtable} WHERE entity_key = ?;" - -# Dedicated table to serve vector queries, -# one row per entity, no feature_name column. -# Named "{project}_{fv_name}__{feature_name}_vec". -CREATE_VECTOR_TABLE_CQL = ( - "CREATE TABLE IF NOT EXISTS {{fqvectable}} (" - "entity_key TEXT, " - "vector_value vector, " - "event_ts TIMESTAMP, " - "PRIMARY KEY (entity_key));" -) - -DROP_VECTOR_TABLE_CQL = "DROP TABLE IF EXISTS {fqvectable};" - +# INSERT for vector feature rows: populates both the BLOB value (for online_read +# compatibility) and the native vector_value column (for ANN search). +# No TTL variant, ScyllaDB ignores TTL on vector-indexed columns, +# a future release will fix this, +# combining TTL with vector features is rejected at feast apply time. INSERT_VEC_CQL = ( - "INSERT INTO {fqvectable} (entity_key, vector_value, event_ts) VALUES (?, ?, ?);" + "INSERT INTO {fqtable}" + " (feature_name, value, entity_key, event_ts, created_ts, vector_value)" + " VALUES (?, ?, ?, ?, ?, ?);" ) +SELECT_CQL = "SELECT {columns} FROM {fqtable} WHERE entity_key = ?;" + +# Global vector index on the main feature table. CREATE_VECTOR_INDEX_CQL = ( "CREATE CUSTOM INDEX IF NOT EXISTS {index_name}" - " ON {fqvectable} (vector_value)" + " ON {fqtable} (vector_value)" " USING 'vector_index'" " WITH OPTIONS = {{'similarity_function': '{sim_func}'}};" ) -# ANN query on the dedicated vector table. +# ANN query on the main feature table. +# ALLOW FILTERING is required because feature_name is a clustering key. ANN_SELECT_CQL = ( "SELECT entity_key, {sim_func_call} AS score, event_ts" - " FROM {fqvectable}" + " FROM {fqtable}" + " WHERE feature_name = ?" " ORDER BY vector_value ANN OF ?" - " LIMIT ?;" + " LIMIT ?" + " ALLOW FILTERING;" ) # op_name -> (template, prepare?) _CQL_TEMPLATES: Dict[str, Tuple[str, bool]] = { - "create": (CREATE_TABLE_CQL, False), "drop": (DROP_TABLE_CQL, False), "insert": (INSERT_CQL, True), "insert_ttl": (INSERT_CQL_TTL, True), @@ -344,19 +349,20 @@ def _get_session(self, config: RepoConfig) -> Session: **cluster_kwargs, ) self._keyspace = keyspace - self._session = self._cluster.connect(keyspace) + # Connect without a keyspace first so we can create it if needed. + session = self._cluster.connect() + session.execute( + f"CREATE KEYSPACE IF NOT EXISTS \"{keyspace}\"" + " WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': '3'};" + ) + session.set_keyspace(keyspace) + self._session = session return self._session @staticmethod def _fq_table_name(keyspace: str, project: str, table: FeatureView) -> str: return f'"{keyspace}"."{project}_{table.name}"' - @staticmethod - def _fq_vec_table_name( - keyspace: str, project: str, table: FeatureView, feature_name: str - ) -> str: - return f'"{keyspace}"."{project}_{table.name}__{feature_name}_vec"' - def _get_statement( self, config: RepoConfig, op_name: str, fqtable: str, **kwargs: Any ) -> Any: @@ -387,23 +393,17 @@ def _create_table( ) -> None: session = self._get_session(config) fqtable = self._fq_table_name(self._keyspace, project, table) + dim = vec_features[0][1] if vec_features else None logger.info("Creating table %s.", fqtable) - session.execute(CREATE_TABLE_CQL.format(fqtable=fqtable)) + session.execute(_build_create_table_cql(fqtable, dim)) - for feat_name, dim, sim_func in vec_features or []: - fqvectable = self._fq_vec_table_name( - self._keyspace, project, table, feat_name - ) - logger.info("Creating vector table %s.", fqvectable) - session.execute( - CREATE_VECTOR_TABLE_CQL.format(dim=dim).format(fqvectable=fqvectable) - ) - index_name = f"{project}_{table.name}__{feat_name}_vec_idx" - logger.info("Creating vector index %s on %s.", index_name, fqvectable) + for _feat_name, _dim, sim_func in vec_features or []: + index_name = f"{project}_{table.name}_vec_idx" + logger.info("Creating vector index %s on %s.", index_name, fqtable) session.execute( CREATE_VECTOR_INDEX_CQL.format( index_name=index_name, - fqvectable=fqvectable, + fqtable=fqtable, sim_func=sim_func, ) ) @@ -416,12 +416,6 @@ def _drop_table( vec_features: Optional[List[Tuple[str, int, str]]] = None, ) -> None: session = self._get_session(config) - for feat_name, _dim, _sim in vec_features or []: - fqvectable = self._fq_vec_table_name( - self._keyspace, project, table, feat_name - ) - logger.info("Dropping vector table %s.", fqvectable) - session.execute(DROP_VECTOR_TABLE_CQL.format(fqvectable=fqvectable)) fqtable = self._fq_table_name(self._keyspace, project, table) logger.info("Dropping table %s.", fqtable) session.execute(DROP_TABLE_CQL.format(fqtable=fqtable)) @@ -447,6 +441,12 @@ def update( for table in tables_to_keep: vec_features = _get_vector_features(table, default_sim) + if vec_features and table.ttl: + raise ValueError( + f"FeatureView '{table.name}' has both a TTL and vector features. " + "ScyllaDB does not support TTL on vector-indexed columns. " + "Remove the TTL or the vector_index tag." + ) self._create_table(config, project, table, vec_features=vec_features) for table in tables_to_delete: @@ -503,14 +503,14 @@ def online_write_batch( int(table.ttl.total_seconds()) if table.ttl else None ) - # --- regular table --- + vec_feature_set = {fn for fn, _, _ in vec_features} fqtable = self._fq_table_name(self._keyspace, project, table) insert_op = "insert_ttl" if ttl_seconds is not None else "insert" insert_stmt = self._get_statement(config, insert_op, fqtable) session = self._get_session(config) - # --- main table rows (all features) --- + # --- non-vector rows (all features except vector features) --- def _main_rows() -> Iterable[Tuple]: for entity_key, values, timestamp, created_ts in data: entity_key_bin = serialize_entity_key( @@ -518,6 +518,8 @@ def _main_rows() -> Iterable[Tuple]: entity_key_serialization_version=config.entity_key_serialization_version, ).hex() for feature_name, val in values.items(): + if feature_name in vec_feature_set: + continue # written separately via INSERT_VEC_CQL row: Tuple = ( feature_name, val.SerializeToString(), @@ -541,33 +543,35 @@ def _main_rows() -> Iterable[Tuple]: if progress: progress(1) - # --- vector table rows (one per entity per vector feature) --- - for feat_name, _dim, _sim in vec_features: - fqvectable = self._fq_vec_table_name( - self._keyspace, project, table, feat_name - ) - vec_insert_cql = INSERT_VEC_CQL.format(fqvectable=fqvectable) + # --- vector rows --- + # TTL is not used here: TTL + vector features is rejected at feast apply time, + # so this path is only reached for TTL-free feature views. + if vec_features: + vec_insert_cql = INSERT_VEC_CQL.format(fqtable=fqtable) if vec_insert_cql not in self._prepared_statements: self._prepared_statements[vec_insert_cql] = session.prepare( vec_insert_cql ) vec_insert_stmt = self._prepared_statements[vec_insert_cql] - def _vec_rows( - fn: str = feat_name, - ) -> Iterable[Tuple[str, Any, datetime]]: - for entity_key, values, timestamp, _created_ts in data: - if fn not in values: - continue + def _vec_rows() -> Iterable[Tuple]: + for entity_key, values, timestamp, created_ts in data: entity_key_bin = serialize_entity_key( entity_key, entity_key_serialization_version=config.entity_key_serialization_version, ).hex() - yield ( - entity_key_bin, - list(values[fn].float_list_val.val), - timestamp, - ) + for feat_name in vec_feature_set: + if feat_name not in values: + continue + val = values[feat_name] + yield ( + feat_name, + val.SerializeToString(), + entity_key_bin, + timestamp, + created_ts, + list(val.float_list_val.val), + ) execute_concurrent_with_args( session, @@ -710,7 +714,17 @@ def retrieve_online_documents_v2( vec_feature, _dim, sim_func = vec_features[0] if distance_metric: - sim_func = distance_metric.upper() + # Normalize aliases: Feast core uses "L2" for Euclidean distance; + # "cosine" (lowercase) is the default in feature_store.py; + # "inner_product" / "dot" are aliases for dot-product similarity. + _METRIC_ALIASES = { + "L2": "EUCLIDEAN", + "cosine": "COSINE", + "inner_product": "DOT_PRODUCT", + "dot": "DOT_PRODUCT", + } + candidate = distance_metric.upper() + sim_func = _METRIC_ALIASES.get(distance_metric, _METRIC_ALIASES.get(candidate, candidate)) sim_expr_template = _SIM_FUNC_EXPR.get(sim_func) if sim_expr_template is None: @@ -720,19 +734,22 @@ def retrieve_online_documents_v2( ) sim_expr = sim_expr_template - fqvectable = self._fq_vec_table_name( - self._keyspace, project, table, vec_feature - ) + fqtable = self._fq_table_name(self._keyspace, project, table) ann_cql = ANN_SELECT_CQL.format( sim_func_call=sim_expr, - fqvectable=fqvectable, + fqtable=fqtable, ) session = self._get_session(config) if ann_cql not in self._prepared_statements: self._prepared_statements[ann_cql] = session.prepare(ann_cql) ann_stmt = self._prepared_statements[ann_cql] - ann_rows = list(session.execute(ann_stmt, (embedding, embedding, top_k))) + # Parameter order matches the placeholders in ANN_SELECT_CQL: + # 1. similarity_(vector_value, ?) → embedding + # 2. WHERE feature_name = ? → vec_feature + # 3. ORDER BY vector_value ANN OF ? → embedding + # 4. LIMIT ? → top_k + ann_rows = list(session.execute(ann_stmt, (embedding, vec_feature, embedding, top_k))) if not ann_rows: return [] @@ -743,8 +760,7 @@ def retrieve_online_documents_v2( row.entity_key: row.event_ts for row in ann_rows } - # Batch-fetch full feature values from the main table - fqtable = self._fq_table_name(self._keyspace, project, table) + # Batch-fetch full feature values from the same main table select_stmt = self._get_statement( config, "select", fqtable, columns="feature_name, value, event_ts" ) From 6d7db1e76d472711dc292340b1456d201a29f74e Mon Sep 17 00:00:00 2001 From: Attila Toth Date: Tue, 23 Jun 2026 11:18:23 +0200 Subject: [PATCH 11/12] fix lint Signed-off-by: Attila Toth --- .../feast/infra/online_stores/scylladb_online_store/scylladb.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/online_stores/scylladb_online_store/scylladb.py b/sdk/python/feast/infra/online_stores/scylladb_online_store/scylladb.py index bb92eb2dc98..1c3cfb3483b 100644 --- a/sdk/python/feast/infra/online_stores/scylladb_online_store/scylladb.py +++ b/sdk/python/feast/infra/online_stores/scylladb_online_store/scylladb.py @@ -87,7 +87,7 @@ def _build_create_table_cql(fqtable: str, dim: Optional[int]) -> str: # INSERT for vector feature rows: populates both the BLOB value (for online_read # compatibility) and the native vector_value column (for ANN search). -# No TTL variant, ScyllaDB ignores TTL on vector-indexed columns, +# No TTL variant, ScyllaDB ignores TTL on vector-indexed columns, # a future release will fix this, # combining TTL with vector features is rejected at feast apply time. INSERT_VEC_CQL = ( From 496c174ffaf921399b9735ac5809cc78a4a6a30b Mon Sep 17 00:00:00 2001 From: Attila Toth Date: Tue, 23 Jun 2026 11:18:53 +0200 Subject: [PATCH 12/12] fix lint Signed-off-by: Attila Toth --- .../online_stores/scylladb_online_store/scylladb.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/scylladb_online_store/scylladb.py b/sdk/python/feast/infra/online_stores/scylladb_online_store/scylladb.py index 1c3cfb3483b..51a22e07a76 100644 --- a/sdk/python/feast/infra/online_stores/scylladb_online_store/scylladb.py +++ b/sdk/python/feast/infra/online_stores/scylladb_online_store/scylladb.py @@ -52,6 +52,7 @@ # CQL templates # --------------------------------------------------------------------------- + def _build_create_table_cql(fqtable: str, dim: Optional[int]) -> str: """Return a CREATE TABLE CQL string, optionally including the vector column. @@ -352,7 +353,7 @@ def _get_session(self, config: RepoConfig) -> Session: # Connect without a keyspace first so we can create it if needed. session = self._cluster.connect() session.execute( - f"CREATE KEYSPACE IF NOT EXISTS \"{keyspace}\"" + f'CREATE KEYSPACE IF NOT EXISTS "{keyspace}"' " WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': '3'};" ) session.set_keyspace(keyspace) @@ -724,7 +725,9 @@ def retrieve_online_documents_v2( "dot": "DOT_PRODUCT", } candidate = distance_metric.upper() - sim_func = _METRIC_ALIASES.get(distance_metric, _METRIC_ALIASES.get(candidate, candidate)) + sim_func = _METRIC_ALIASES.get( + distance_metric, _METRIC_ALIASES.get(candidate, candidate) + ) sim_expr_template = _SIM_FUNC_EXPR.get(sim_func) if sim_expr_template is None: @@ -749,7 +752,9 @@ def retrieve_online_documents_v2( # 2. WHERE feature_name = ? → vec_feature # 3. ORDER BY vector_value ANN OF ? → embedding # 4. LIMIT ? → top_k - ann_rows = list(session.execute(ann_stmt, (embedding, vec_feature, embedding, top_k))) + ann_rows = list( + session.execute(ann_stmt, (embedding, vec_feature, embedding, top_k)) + ) if not ann_rows: return []