diff --git a/docs/getting-started/genai.md b/docs/getting-started/genai.md index f65aeac85e2..dfd9c41954a 100644 --- a/docs/getting-started/genai.md +++ b/docs/getting-started/genai.md @@ -15,6 +15,7 @@ Feast integrates with popular vector databases to store and retrieve embedding v * **Elasticsearch**: Scalable vector search capabilities * **Postgres with PGVector**: SQL-based vector operations * **Qdrant**: Purpose-built vector database integration +* **ScyllaDB**: Native `vector` type with HNSW ANN index, full `retrieve_online_documents_v2` support These integrations allow you to: - Store embeddings as features diff --git a/docs/reference/alpha-vector-database.md b/docs/reference/alpha-vector-database.md index 861c3fcb114..28d0bf0098e 100644 --- a/docs/reference/alpha-vector-database.md +++ b/docs/reference/alpha-vector-database.md @@ -15,6 +15,7 @@ Below are supported vector databases and implemented features: | Faiss | [ ] | [ ] | [] | [] | | SQLite | [x] | [ ] | [x] | [x] | | Qdrant | [x] | [x] | [] | [] | +| ScyllaDB | [x] | [x] | [x] | [x] | *Note: V2 Support means the SDK supports retrieval of features along with vector embeddings from vector similarity search. @@ -30,7 +31,7 @@ Beyond that, we will then have `retrieve_online_documents` and `retrieve_online_ backwards compatibility and the adopt industry standard naming conventions. {% endhint %} -**Note**: Milvus and SQLite implement the v2 `retrieve_online_documents_v2` method in the SDK. This will be the longer-term solution so that Data Scientists can easily enable vector similarity search by just flipping a flag. +**Note**: Milvus, SQLite, and ScyllaDB implement the v2 `retrieve_online_documents_v2` method in the SDK. This will be the longer-term solution so that Data Scientists can easily enable vector similarity search by just flipping a flag. ## Examples diff --git a/docs/reference/online-stores/overview.md b/docs/reference/online-stores/overview.md index 6ee076b0669..663a48836dc 100644 --- a/docs/reference/online-stores/overview.md +++ b/docs/reference/online-stores/overview.md @@ -29,26 +29,26 @@ See this [issue](https://github.com/feast-dev/feast/issues/2254) for a discussio ## Functionality Matrix There are currently five core online store implementations: `SqliteOnlineStore`, `RedisOnlineStore`, `DynamoDBOnlineStore`, `SnowflakeOnlineStore`, and `DatastoreOnlineStore`. -There are several additional implementations contributed by the Feast community (`PostgreSQLOnlineStore`, `HbaseOnlineStore` and `CassandraOnlineStore`), which are not guaranteed to be stable or to match the functionality of the core implementations. +There are several additional implementations contributed by the Feast community (`PostgreSQLOnlineStore`, `HbaseOnlineStore`, `CassandraOnlineStore` and `ScyllaDBOnlineStore`), which are not guaranteed to be stable or to match the functionality of the core implementations. Details for each specific online store, such as how to configure it in a `feature_store.yaml`, can be found [here](README.md). Below is a matrix indicating which online stores support what functionality. -| | Sqlite | Redis | DynamoDB | Snowflake | Datastore | Postgres | Hbase | [[Cassandra](https://cassandra.apache.org/_/index.html) / [Astra DB](https://www.datastax.com/products/datastax-astra?utm_source=feast)] | Milvus | -| :-------------------------------------------------------- | :-- | :-- | :-- | :-- | :-- | :-- | :-- | :-- |:----| -| write feature values to the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes | -| read feature values from the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes | -| update infrastructure (e.g. tables) in the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes | -| teardown infrastructure (e.g. tables) in the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes | -| generate a plan of infrastructure changes | yes | no | no | no | no | no | no | yes | no | -| support for on-demand transforms | yes | yes | yes | yes | yes | yes | yes | yes | yes | -| readable by Python SDK | yes | yes | yes | yes | yes | yes | yes | yes | yes | -| readable by Java | no | yes | no | no | no | no | no | no | no | -| readable by Go | yes | yes | no | no | no | no | no | no | no | -| support for entityless feature views | yes | yes | yes | yes | yes | yes | yes | yes | yes | -| support for concurrent writing to the same key | no | yes | no | no | no | no | no | no | yes | -| support for ttl (time to live) at retrieval | no | yes | no | no | no | no | no | no | no | -| support for deleting expired data | no | yes | no | no | no | no | no | no | no | -| collocated by feature view | yes | no | yes | yes | yes | yes | yes | yes | no | -| collocated by feature service | no | no | no | no | no | no | no | no | no | -| collocated by entity key | no | yes | no | no | no | no | no | no | yes | +| | Sqlite | Redis | DynamoDB | Snowflake | Datastore | Postgres | Hbase | [[Cassandra](https://cassandra.apache.org/_/index.html) / [Astra DB](https://www.datastax.com/products/datastax-astra?utm_source=feast)] | Milvus | ScyllaDB | +| :-------------------------------------------------------- | :-- | :-- | :-- | :-- | :-- | :-- | :-- | :-- |:----| :-- | +| write feature values to the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes | yes | +| read feature values from the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes | yes | +| update infrastructure (e.g. tables) in the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes | yes | +| teardown infrastructure (e.g. tables) in the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes | yes | +| generate a plan of infrastructure changes | yes | no | no | no | no | no | no | yes | no | no | +| support for on-demand transforms | yes | yes | yes | yes | yes | yes | yes | yes | yes | yes | +| readable by Python SDK | yes | yes | yes | yes | yes | yes | yes | yes | yes | yes | +| readable by Java | no | yes | no | no | no | no | no | no | no | no | +| readable by Go | yes | yes | no | no | no | no | no | no | no | no | +| support for entityless feature views | yes | yes | yes | yes | yes | yes | yes | yes | yes | yes | +| support for concurrent writing to the same key | no | yes | no | no | no | no | no | no | yes | no | +| support for ttl (time to live) at retrieval | no | yes | no | no | no | no | no | no | no | yes | +| support for deleting expired data | no | yes | no | no | no | no | no | no | no | yes | +| collocated by feature view | yes | no | yes | yes | yes | yes | yes | yes | no | yes | +| collocated by feature service | no | no | no | no | no | no | no | no | no | no | +| collocated by entity key | no | yes | no | no | no | no | no | no | yes | no | diff --git a/docs/reference/online-stores/scylladb.md b/docs/reference/online-stores/scylladb.md index c8583ac101a..6c2e462877b 100644 --- a/docs/reference/online-stores/scylladb.md +++ b/docs/reference/online-stores/scylladb.md @@ -2,20 +2,15 @@ ## Description -ScyllaDB is a low-latency and high-performance Cassandra-compatible (uses CQL) database. You can use the existing Cassandra connector to use ScyllaDB as an online store in Feast. - -The [ScyllaDB](https://www.scylladb.com/) online store provides support for materializing feature values into a ScyllaDB or [ScyllaDB Cloud](https://www.scylladb.com/product/scylla-cloud/) cluster for serving online features real-time. +[ScyllaDB](https://www.scylladb.com/) is a distributed real-time NoSQL database with vector search support. +This integration uses the native **`scylla-driver`** Python driver for optimised performance and supports materializing feature values into a [ScyllaDB Cloud](https://www.scylladb.com/product/scylla-cloud/) cluster for real-time online feature serving. ## Getting started -Install Feast with Cassandra support: -```bash -pip install "feast[cassandra]" -``` +Install Feast with the `scylladb` extra, which pulls in `scylla-driver` automatically: -Create a new Feast project: ```bash -feast init REPO_NAME -t cassandra +pip install feast[scylladb] ``` ### Example (ScyllaDB) @@ -26,7 +21,7 @@ project: scylla_feature_repo registry: data/registry.db provider: local online_store: - type: cassandra + type: scylladb hosts: - 172.17.0.2 keyspace: feast @@ -43,44 +38,99 @@ project: scylla_feature_repo registry: data/registry.db provider: local online_store: - type: cassandra + type: scylladb hosts: - node-0.aws_us_east_1.xxxxxxxx.clusters.scylla.cloud - node-1.aws_us_east_1.xxxxxxxx.clusters.scylla.cloud - node-2.aws_us_east_1.xxxxxxxx.clusters.scylla.cloud keyspace: feast username: scylla - password: password + password: xxxxxx + local_dc: AWS_US_EAST_1 ``` {% endcode %} - -The full set of configuration options is available in [CassandraOnlineStoreConfig](https://rtd.feast.dev/en/master/#feast.infra.online_stores.cassandra_online_store.cassandra_online_store.CassandraOnlineStoreConfig). -For a full explanation of configuration options please look at file -`sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/README.md`. +## Configuration options + +| Parameter | Type | Default | Description | +| :--- | :--- | :--- | :--- | +| `hosts` | list[str] | *(required)* | Contact-point host addresses. | +| `port` | int | `9042` | CQL port. | +| `keyspace` | str | `feast_keyspace` | Target ScyllaDB keyspace. | +| `username` | str | `None` | Auth username. | +| `password` | str | `None` | Auth password. | +| `local_dc` | str | `None` | Local datacenter name for DC-aware load balancing. | +| `request_timeout` | float | `None` | Driver request timeout in seconds. | +| `read_concurrency` | int | `100` | `concurrency` argument passed to the driver's `execute_concurrent_with_args` for reads. Controls how many CQL statements are in-flight at once. | +| `write_concurrency` | int | `100` | `concurrency` argument passed to the driver's `execute_concurrent_with_args` for writes. Controls how many CQL statements are in-flight at once. | +| `vector_similarity_function` | str | `COSINE` | Default similarity function for vector indexes. Supported: `COSINE`, `DOT_PRODUCT`, `EUCLIDEAN`. Can be overridden per-feature via the `similarity_function` Field tag. | Storage specifications can be found at `docs/specs/online_store_format.md`. +## Vector Search + +ScyllaDB Cloud supports approximate nearest-neighbour (ANN) vector search. +To enable it for a feature view, tag the embedding `Field` with `vector_index=true` and specify the number of dimensions: + +{% code title="feature_definitions.py" %} +```python +from feast import FeatureView, Field +from feast.types import Array, Float32, String + +documents_fv = FeatureView( + name="documents", + entities=[item], + schema=[ + Field(name="text", dtype=String), + Field( + name="embedding", + dtype=Array(Float32), + tags={ + "vector_index": "true", + "dimensions": "768", + "similarity_function": "COSINE", # COSINE | DOT_PRODUCT | EUCLIDEAN + }, + ), + ], + online=True, + source=push_source, +) +``` +{% endcode %} + +When `feast apply` runs, the store automatically creates the necessary tables and HNSW ANN index for any feature view with vector-tagged fields. + +To query the top-k most similar documents: + +```python +result = store.retrieve_online_documents_v2( + features=["documents:text", "documents:embedding"], + query=[0.1, 0.2, ...], # your query embedding + top_k=10, + distance_metric="COSINE", +) +``` + ## Functionality Matrix The set of functionality supported by online stores is described in detail [here](overview.md#functionality). -Below is a matrix indicating which functionality is supported by the Cassandra plugin. +Below is a matrix indicating which functionality is supported by the ScyllaDB online store. -| | Cassandra | +| | ScyllaDB | | :-------------------------------------------------------- | :-------- | | write feature values to the online store | yes | | read feature values from the online store | yes | | update infrastructure (e.g. tables) in the online store | yes | | teardown infrastructure (e.g. tables) in the online store | yes | -| generate a plan of infrastructure changes | yes | +| generate a plan of infrastructure changes | no | | support for on-demand transforms | yes | | readable by Python SDK | yes | | readable by Java | no | | readable by Go | no | | support for entityless feature views | yes | | support for concurrent writing to the same key | no | -| support for ttl (time to live) at retrieval | no | -| support for deleting expired data | no | +| support for ttl (time to live) at retrieval | yes | +| support for deleting expired data | yes | | collocated by feature view | yes | | collocated by feature service | no | | collocated by entity key | no | @@ -89,6 +139,6 @@ To compare this set of functionality against other online stores, please see the ## Resources -* [Sample application with ScyllaDB](https://feature-store.scylladb.com/stable/) +* [ScyllaDB Vector Search documentation](https://cloud.docs.scylladb.com/stable/vector-search/) * [ScyllaDB website](https://www.scylladb.com/) * [ScyllaDB Cloud documentation](https://cloud.docs.scylladb.com/stable/) diff --git a/pyproject.toml b/pyproject.toml index 25bef64ccfa..52ef25a2017 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -57,6 +57,7 @@ azure = [ "pymssql<2.3.3" ] cassandra = ["cassandra-driver>=3.24.0,<4"] +scylladb = ["scylla-driver>=3.28.0,<4"] clickhouse = ["clickhouse-connect>=0.7.19"] couchbase = ["couchbase==4.3.2", "couchbase-columnar==1.0.0"] delta = ["deltalake<1.0.0"] @@ -158,7 +159,7 @@ test = [ ] ci = [ - "feast[test, aws, azure, cassandra, clickhouse, couchbase, delta, docling, duckdb, elasticsearch, faiss, gcp, ge, go, grpcio, hazelcast, hbase, ibis, image, k8s, mcp, milvus, mlflow, mongodb, mssql, mysql, openlineage, opentelemetry, oracle, spark, trino, postgres, pytorch, qdrant, rag, ray, redis, singlestore, snowflake, sqlite_vec]", + "feast[test, aws, azure, cassandra, clickhouse, couchbase, delta, docling, duckdb, elasticsearch, faiss, gcp, ge, go, grpcio, hazelcast, hbase, ibis, image, k8s, mcp, milvus, mlflow, mongodb, mssql, mysql, openlineage, opentelemetry, oracle, scylladb, spark, trino, postgres, pytorch, qdrant, rag, ray, redis, singlestore, snowflake, sqlite_vec]", "build", "virtualenv==20.23.0", "dbt-artifacts-parser", diff --git a/sdk/python/feast/infra/online_stores/scylladb_online_store/__init__.py b/sdk/python/feast/infra/online_stores/scylladb_online_store/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/sdk/python/feast/infra/online_stores/scylladb_online_store/scylladb.py b/sdk/python/feast/infra/online_stores/scylladb_online_store/scylladb.py new file mode 100644 index 00000000000..51a22e07a76 --- /dev/null +++ b/sdk/python/feast/infra/online_stores/scylladb_online_store/scylladb.py @@ -0,0 +1,820 @@ +import logging +import warnings +from datetime import datetime +from typing import ( + Any, + Callable, + Dict, + Iterable, + List, + Literal, + Optional, + Sequence, + Tuple, +) + +from cassandra.auth import PlainTextAuthProvider +from cassandra.cluster import ( + EXEC_PROFILE_DEFAULT, + Cluster, + ExecutionProfile, + Session, +) +from cassandra.concurrent import execute_concurrent_with_args +from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy +from cassandra.query import ConsistencyLevel, PreparedStatement +from pydantic import StrictFloat, StrictInt, StrictStr + +from feast import Entity, FeatureView, RepoConfig +from feast.infra.key_encoding_utils import deserialize_entity_key, serialize_entity_key +from feast.infra.online_stores.online_store import OnlineStore +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.repo_config import FeastConfigBaseModel + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Error messages +# --------------------------------------------------------------------------- + +E_SCYLLA_UNEXPECTED_CONFIG = ( + "Unexpected configuration object (not a ScyllaDBOnlineStoreConfig instance)" +) +E_SCYLLA_NOT_CONFIGURED = ( + "Inconsistent ScyllaDB configuration: 'hosts' and 'keyspace' are required" +) +E_SCYLLA_INCONSISTENT_AUTH = ( + "ScyllaDB username and password must be provided together or not at all" +) + +# --------------------------------------------------------------------------- +# CQL templates +# --------------------------------------------------------------------------- + + +def _build_create_table_cql(fqtable: str, dim: Optional[int]) -> str: + """Return a CREATE TABLE CQL string, optionally including the vector column. + + When *dim* is given a ``vector_value vector`` column is inserted + between the BLOB ``value`` column and ``event_ts``. When *dim* is ``None`` + the column is omitted entirely and the table schema is fully static. + """ + vec_col = f"vector_value vector,\n" if dim else "" + return ( + f"CREATE TABLE IF NOT EXISTS {fqtable} (\n" + f" entity_key TEXT,\n" + f" feature_name TEXT,\n" + f" value BLOB,\n" + f" {vec_col}" + f" event_ts TIMESTAMP,\n" + f" created_ts TIMESTAMP,\n" + f" PRIMARY KEY ((entity_key), feature_name)\n" + f");" + ) + + +DROP_TABLE_CQL = "DROP TABLE IF EXISTS {fqtable};" + +INSERT_CQL = ( + "INSERT INTO {fqtable} (feature_name, value, entity_key, event_ts, created_ts)" + " VALUES (?, ?, ?, ?, ?);" +) + +INSERT_CQL_TTL = ( + "INSERT INTO {fqtable} (feature_name, value, entity_key, event_ts, created_ts)" + " VALUES (?, ?, ?, ?, ?) USING TTL ?;" +) + +# INSERT for vector feature rows: populates both the BLOB value (for online_read +# compatibility) and the native vector_value column (for ANN search). +# No TTL variant, ScyllaDB ignores TTL on vector-indexed columns, +# a future release will fix this, +# combining TTL with vector features is rejected at feast apply time. +INSERT_VEC_CQL = ( + "INSERT INTO {fqtable}" + " (feature_name, value, entity_key, event_ts, created_ts, vector_value)" + " VALUES (?, ?, ?, ?, ?, ?);" +) + +SELECT_CQL = "SELECT {columns} FROM {fqtable} WHERE entity_key = ?;" + +# Global vector index on the main feature table. +CREATE_VECTOR_INDEX_CQL = ( + "CREATE CUSTOM INDEX IF NOT EXISTS {index_name}" + " ON {fqtable} (vector_value)" + " USING 'vector_index'" + " WITH OPTIONS = {{'similarity_function': '{sim_func}'}};" +) + +# ANN query on the main feature table. +# ALLOW FILTERING is required because feature_name is a clustering key. +ANN_SELECT_CQL = ( + "SELECT entity_key, {sim_func_call} AS score, event_ts" + " FROM {fqtable}" + " WHERE feature_name = ?" + " ORDER BY vector_value ANN OF ?" + " LIMIT ?" + " ALLOW FILTERING;" +) + +# op_name -> (template, prepare?) +_CQL_TEMPLATES: Dict[str, Tuple[str, bool]] = { + "drop": (DROP_TABLE_CQL, False), + "insert": (INSERT_CQL, True), + "insert_ttl": (INSERT_CQL_TTL, True), + "select": (SELECT_CQL, True), +} + +# Similarity function CQL expression helpers (vector_value is the fixed column name) +_SIM_FUNC_EXPR = { + "COSINE": "similarity_cosine(vector_value, ?)", + "DOT_PRODUCT": "similarity_dot_product(vector_value, ?)", + "EUCLIDEAN": "similarity_euclidean(vector_value, ?)", +} + + +# --------------------------------------------------------------------------- +# Config +# --------------------------------------------------------------------------- + + +class ScyllaDBOnlineStoreConfig(FeastConfigBaseModel): + """ + Configuration for the native ScyllaDB online store. + + Requires ``scylla-driver`` (``pip install scylla-driver``). + + Example ``feature_store.yaml``:: + + online_store: + type: feast_scylladb.ScyllaDBOnlineStore + hosts: + - node-0.aws_us_east_1.xxxxxxxx.clusters.scylla.cloud + keyspace: feast + username: scylla + password: pass + local_dc: AWS_US_EAST_1 + """ + + type: Literal[ + "scylladb", + "feast.infra.online_stores.scylladb_online_store.scylladb.ScyllaDBOnlineStore", + ] = "scylladb" + + # Connection + hosts: List[StrictStr] + """Contact-point host addresses.""" + + port: Optional[StrictInt] = 9042 + """CQL port (default 9042).""" + + keyspace: StrictStr = "feast_keyspace" + """Target ScyllaDB keyspace.""" + + username: Optional[StrictStr] = None + """Auth username.""" + + password: Optional[StrictStr] = None + """Auth password.""" + + local_dc: Optional[StrictStr] = None + """ + Local datacenter name. + For ScyllaDB Cloud this is the region string, e.g. ``AWS_US_EAST_1``. + """ + + request_timeout: Optional[StrictFloat] = None + """Driver request timeout in seconds.""" + + read_concurrency: Optional[StrictInt] = 100 + """Concurrency level passed to ``execute_concurrent_with_args`` for reads.""" + + write_concurrency: Optional[StrictInt] = 100 + """Concurrency level passed to ``execute_concurrent_with_args`` for writes.""" + + vector_similarity_function: StrictStr = "COSINE" + """ + Default similarity function used when creating vector indexes. + Can be overridden per-feature via the ``similarity_function`` Field tag. + Supported values: ``COSINE``, ``DOT_PRODUCT``, ``EUCLIDEAN``. + """ + + +# --------------------------------------------------------------------------- +# Vector feature helpers +# --------------------------------------------------------------------------- + + +def _get_vector_features( + table: FeatureView, default_sim_func: str +) -> List[Tuple[str, int, str]]: + """ + Return ``(feature_name, dimension, similarity_function)`` for every feature + in *table* tagged as a vector feature. + + A feature is treated as a vector feature when its tags include:: + + "vector_index": "true" + "dimensions": "" + + The similarity function defaults to *default_sim_func* but can be + overridden per-feature with a ``"similarity_function"`` tag. + """ + result = [] + for field in table.schema: + tags = field.tags or {} + if tags.get("vector_index", "").lower() != "true": + continue + dim_str = tags.get("dimensions", "") + if not dim_str: + warnings.warn( + f"Feature '{field.name}' in FeatureView '{table.name}' is tagged " + "vector_index=true but is missing a 'dimensions' tag. " + "Skipping vector table creation for this feature.", + UserWarning, + stacklevel=2, + ) + continue + sim_func = tags.get("similarity_function", default_sim_func).upper() + result.append((field.name, int(dim_str), sim_func)) + return result + + +# --------------------------------------------------------------------------- +# Store implementation +# --------------------------------------------------------------------------- + + +class ScyllaDBInvalidConfig(Exception): + pass + + +class ScyllaDBOnlineStore(OnlineStore): + """ + Native ScyllaDB online store for Feast. + + Supports both regular feature materialisation and vector similarity search + via ScyllaDB's ANN / ``vector_index`` functionality. + + **Vector features** — tag your ``Field`` definitions like this:: + + from feast import Field + from feast.types import Array, Float32 + + Field( + name="embedding", + dtype=Array(Float32), + tags={ + "vector_index": "true", + "dimensions": "768", + "similarity_function": "COSINE", # optional, default COSINE + }, + ) + + Then call ``FeatureStore.retrieve_online_documents_v2(...)`` to perform an + approximate nearest-neighbour search. + """ + + _cluster: Optional[Cluster] = None + _session: Optional[Session] = None + _keyspace: str = "feast_keyspace" + _prepared_statements: Dict[str, PreparedStatement] = {} + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _get_session(self, config: RepoConfig) -> Session: + """Return the active session, creating the cluster connection if needed.""" + online_store_config = config.online_store + if not isinstance(online_store_config, ScyllaDBOnlineStoreConfig): + raise ScyllaDBInvalidConfig(E_SCYLLA_UNEXPECTED_CONFIG) + + if self._session: + return self._session + + hosts = online_store_config.hosts + port = online_store_config.port or 9042 + keyspace = online_store_config.keyspace + username = online_store_config.username + password = online_store_config.password + local_dc = online_store_config.local_dc + + if not hosts or not keyspace: + raise ScyllaDBInvalidConfig(E_SCYLLA_NOT_CONFIGURED) + if (username is None) ^ (password is None): + raise ScyllaDBInvalidConfig(E_SCYLLA_INCONSISTENT_AUTH) + + auth_provider = ( + PlainTextAuthProvider(username=username, password=password) + if username is not None + else None + ) + + # Build execution profile + if local_dc: + lb_policy: Any = TokenAwarePolicy( + DCAwareRoundRobinPolicy(local_dc=local_dc) + ) + exe_profile = ExecutionProfile( + request_timeout=online_store_config.request_timeout, + load_balancing_policy=lb_policy, + consistency_level=ConsistencyLevel.LOCAL_QUORUM, + ) + execution_profiles: Optional[Dict] = {EXEC_PROFILE_DEFAULT: exe_profile} + elif online_store_config.request_timeout is not None: + exe_profile = ExecutionProfile( + request_timeout=online_store_config.request_timeout, + consistency_level=ConsistencyLevel.LOCAL_QUORUM, + ) + execution_profiles = {EXEC_PROFILE_DEFAULT: exe_profile} + else: + exe_profile = ExecutionProfile( + consistency_level=ConsistencyLevel.LOCAL_QUORUM, + ) + execution_profiles = {EXEC_PROFILE_DEFAULT: exe_profile} + + cluster_kwargs: Dict[str, Any] = { + k: v + for k, v in { + "execution_profiles": execution_profiles, + }.items() + if v is not None + } + + self._cluster = Cluster( + hosts, + port=port, + auth_provider=auth_provider, + **cluster_kwargs, + ) + self._keyspace = keyspace + # Connect without a keyspace first so we can create it if needed. + session = self._cluster.connect() + session.execute( + f'CREATE KEYSPACE IF NOT EXISTS "{keyspace}"' + " WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': '3'};" + ) + session.set_keyspace(keyspace) + self._session = session + return self._session + + @staticmethod + def _fq_table_name(keyspace: str, project: str, table: FeatureView) -> str: + return f'"{keyspace}"."{project}_{table.name}"' + + def _get_statement( + self, config: RepoConfig, op_name: str, fqtable: str, **kwargs: Any + ) -> Any: + """ + Resolve *op_name* to a CQL statement, preparing and caching it when the + template is marked as prepareable. + """ + session = self._get_session(config) + template, do_prepare = _CQL_TEMPLATES[op_name] + cql = template.format(fqtable=fqtable, **kwargs) + if do_prepare: + if cql not in self._prepared_statements: + logger.info("Preparing %s statement on %s.", op_name, fqtable) + self._prepared_statements[cql] = session.prepare(cql) + return self._prepared_statements[cql] + return cql + + # ------------------------------------------------------------------ + # Table lifecycle helpers + # ------------------------------------------------------------------ + + def _create_table( + self, + config: RepoConfig, + project: str, + table: FeatureView, + vec_features: Optional[List[Tuple[str, int, str]]] = None, + ) -> None: + session = self._get_session(config) + fqtable = self._fq_table_name(self._keyspace, project, table) + dim = vec_features[0][1] if vec_features else None + logger.info("Creating table %s.", fqtable) + session.execute(_build_create_table_cql(fqtable, dim)) + + for _feat_name, _dim, sim_func in vec_features or []: + index_name = f"{project}_{table.name}_vec_idx" + logger.info("Creating vector index %s on %s.", index_name, fqtable) + session.execute( + CREATE_VECTOR_INDEX_CQL.format( + index_name=index_name, + fqtable=fqtable, + sim_func=sim_func, + ) + ) + + def _drop_table( + self, + config: RepoConfig, + project: str, + table: FeatureView, + vec_features: Optional[List[Tuple[str, int, str]]] = None, + ) -> None: + session = self._get_session(config) + fqtable = self._fq_table_name(self._keyspace, project, table) + logger.info("Dropping table %s.", fqtable) + session.execute(DROP_TABLE_CQL.format(fqtable=fqtable)) + + # ------------------------------------------------------------------ + # OnlineStore interface — infrastructure + # ------------------------------------------------------------------ + + def update( + self, + config: RepoConfig, + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, + ) -> None: + """Create and drop feature-store tables as required by the registry.""" + project = config.project + online_store_config = config.online_store + assert isinstance(online_store_config, ScyllaDBOnlineStoreConfig) + default_sim = online_store_config.vector_similarity_function + + for table in tables_to_keep: + vec_features = _get_vector_features(table, default_sim) + if vec_features and table.ttl: + raise ValueError( + f"FeatureView '{table.name}' has both a TTL and vector features. " + "ScyllaDB does not support TTL on vector-indexed columns. " + "Remove the TTL or the vector_index tag." + ) + self._create_table(config, project, table, vec_features=vec_features) + + for table in tables_to_delete: + vec_features = _get_vector_features(table, default_sim) + self._drop_table(config, project, table, vec_features=vec_features) + + def teardown( + self, + config: RepoConfig, + tables: Sequence[FeatureView], + entities: Sequence[Entity], + ) -> None: + """Drop all tables for the given feature views.""" + project = config.project + online_store_config = config.online_store + assert isinstance(online_store_config, ScyllaDBOnlineStoreConfig) + + for table in tables: + vec_features = _get_vector_features( + table, online_store_config.vector_similarity_function + ) + self._drop_table(config, project, table, vec_features=vec_features) + + # ------------------------------------------------------------------ + # OnlineStore interface — write + # ------------------------------------------------------------------ + + def online_write_batch( + self, + config: RepoConfig, + table: FeatureView, + data: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], + ) -> None: + """ + Write a batch of feature rows to the online store. + + All features are written to the main table. For vector features, + the float-list value is additionally written to the dedicated vector + table to support ANN search. + """ + project = config.project + online_store_config = config.online_store + assert isinstance(online_store_config, ScyllaDBOnlineStoreConfig) + default_sim = online_store_config.vector_similarity_function + vec_features = _get_vector_features(table, default_sim) + + # Compute TTL in seconds from the FeatureView's ttl field (timedelta or None). + # ScyllaDB will automatically expire rows after this many seconds, covering + # both "support for ttl at retrieval" and "support for deleting expired data". + ttl_seconds: Optional[int] = ( + int(table.ttl.total_seconds()) if table.ttl else None + ) + + vec_feature_set = {fn for fn, _, _ in vec_features} + fqtable = self._fq_table_name(self._keyspace, project, table) + insert_op = "insert_ttl" if ttl_seconds is not None else "insert" + insert_stmt = self._get_statement(config, insert_op, fqtable) + + session = self._get_session(config) + + # --- non-vector rows (all features except vector features) --- + def _main_rows() -> Iterable[Tuple]: + for entity_key, values, timestamp, created_ts in data: + entity_key_bin = serialize_entity_key( + entity_key, + entity_key_serialization_version=config.entity_key_serialization_version, + ).hex() + for feature_name, val in values.items(): + if feature_name in vec_feature_set: + continue # written separately via INSERT_VEC_CQL + row: Tuple = ( + feature_name, + val.SerializeToString(), + entity_key_bin, + timestamp, + created_ts, + ) + if ttl_seconds is not None: + row = row + (ttl_seconds,) + yield row + if progress: + progress(1) + + execute_concurrent_with_args( + session, + insert_stmt, + _main_rows(), + concurrency=online_store_config.write_concurrency, + ) + # correction for the last missing call to `progress`: + if progress: + progress(1) + + # --- vector rows --- + # TTL is not used here: TTL + vector features is rejected at feast apply time, + # so this path is only reached for TTL-free feature views. + if vec_features: + vec_insert_cql = INSERT_VEC_CQL.format(fqtable=fqtable) + if vec_insert_cql not in self._prepared_statements: + self._prepared_statements[vec_insert_cql] = session.prepare( + vec_insert_cql + ) + vec_insert_stmt = self._prepared_statements[vec_insert_cql] + + def _vec_rows() -> Iterable[Tuple]: + for entity_key, values, timestamp, created_ts in data: + entity_key_bin = serialize_entity_key( + entity_key, + entity_key_serialization_version=config.entity_key_serialization_version, + ).hex() + for feat_name in vec_feature_set: + if feat_name not in values: + continue + val = values[feat_name] + yield ( + feat_name, + val.SerializeToString(), + entity_key_bin, + timestamp, + created_ts, + list(val.float_list_val.val), + ) + + execute_concurrent_with_args( + session, + vec_insert_stmt, + _vec_rows(), + concurrency=online_store_config.write_concurrency, + ) + + # ------------------------------------------------------------------ + # OnlineStore interface — read + # ------------------------------------------------------------------ + + def online_read( + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + """Read feature values for the given entity keys from the regular table.""" + project = config.project + online_store_config = config.online_store + assert isinstance(online_store_config, ScyllaDBOnlineStoreConfig) + + entity_key_bins = [ + serialize_entity_key( + ek, + entity_key_serialization_version=config.entity_key_serialization_version, + ).hex() + for ek in entity_keys + ] + + fqtable = self._fq_table_name(self._keyspace, project, table) + select_stmt = self._get_statement( + config, "select", fqtable, columns="feature_name, value, event_ts" + ) + + session = self._get_session(config) + retrieval = execute_concurrent_with_args( + session, + select_stmt, + ((ek_bin,) for ek_bin in entity_key_bins), + concurrency=online_store_config.read_concurrency, + ) + + results: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] + for success, rows_or_exc in retrieval: + if not success: + logger.error( + "ScyllaDB read error during concurrent fetch: %s", rows_or_exc + ) + results.append((None, None)) + continue + res: Dict[str, ValueProto] = {} + res_ts: Optional[datetime] = None + for row in rows_or_exc: + if requested_features is None or row.feature_name in requested_features: + val = ValueProto() + val.ParseFromString(row.value) + res[row.feature_name] = val + res_ts = row.event_ts + results.append((res_ts, res) if res else (None, None)) + + return results + + # ------------------------------------------------------------------ + # Vector store interface + # ------------------------------------------------------------------ + + def retrieve_online_documents_v2( + self, + config: RepoConfig, + table: FeatureView, + requested_features: List[str], + embedding: Optional[List[float]], + top_k: int, + distance_metric: Optional[str] = None, + query_string: Optional[str] = None, + include_feature_view_version_metadata: bool = False, + ) -> List[ + Tuple[ + Optional[datetime], + Optional[EntityKeyProto], + Optional[Dict[str, ValueProto]], + ] + ]: + """ + Approximate nearest-neighbour search using ScyllaDB's vector_index. + + The feature view must have exactly one feature tagged with + ``vector_index=true``. The query *embedding* is compared against all + stored vectors; the top *top_k* results are returned together with + their feature values fetched from the regular feature table. + + Args: + config: RepoConfig for the current FeatureStore. + table: FeatureView to search. + requested_features: Feature names to include in the returned dicts. + embedding: Query vector. Must match the indexed column's dimension. + top_k: Number of results to return. + distance_metric: Override similarity function + (``COSINE`` / ``DOT_PRODUCT`` / ``EUCLIDEAN``). Defaults to the + store-level ``vector_similarity_function`` config value or the + per-feature tag. + query_string: Unused (reserved for future hybrid text+vector search). + include_feature_view_version_metadata: Unused. + + Returns: + List of ``(event_timestamp, entity_key_proto, feature_values)`` + tuples ordered from most to least similar. + """ + if embedding is None: + raise ValueError( + "retrieve_online_documents_v2 requires a non-None 'embedding' " + "for ScyllaDB ANN search." + ) + + online_store_config = config.online_store + assert isinstance(online_store_config, ScyllaDBOnlineStoreConfig) + + default_sim = ( + distance_metric.upper() + if distance_metric + else online_store_config.vector_similarity_function + ) + project = config.project + vec_features = _get_vector_features(table, default_sim) + + if not vec_features: + raise NotImplementedError( + f"FeatureView '{table.name}' has no features tagged with " + "'vector_index=true'. Cannot perform vector search." + ) + if len(vec_features) > 1: + raise ValueError( + f"FeatureView '{table.name}' has {len(vec_features)} vector features. " + "retrieve_online_documents_v2 supports exactly one vector feature " + "per feature view." + ) + + vec_feature, _dim, sim_func = vec_features[0] + if distance_metric: + # Normalize aliases: Feast core uses "L2" for Euclidean distance; + # "cosine" (lowercase) is the default in feature_store.py; + # "inner_product" / "dot" are aliases for dot-product similarity. + _METRIC_ALIASES = { + "L2": "EUCLIDEAN", + "cosine": "COSINE", + "inner_product": "DOT_PRODUCT", + "dot": "DOT_PRODUCT", + } + candidate = distance_metric.upper() + sim_func = _METRIC_ALIASES.get( + distance_metric, _METRIC_ALIASES.get(candidate, candidate) + ) + + sim_expr_template = _SIM_FUNC_EXPR.get(sim_func) + if sim_expr_template is None: + raise ValueError( + f"Unsupported similarity function '{sim_func}'. " + "Choose from: COSINE, DOT_PRODUCT, EUCLIDEAN." + ) + sim_expr = sim_expr_template + + fqtable = self._fq_table_name(self._keyspace, project, table) + ann_cql = ANN_SELECT_CQL.format( + sim_func_call=sim_expr, + fqtable=fqtable, + ) + + session = self._get_session(config) + if ann_cql not in self._prepared_statements: + self._prepared_statements[ann_cql] = session.prepare(ann_cql) + ann_stmt = self._prepared_statements[ann_cql] + # Parameter order matches the placeholders in ANN_SELECT_CQL: + # 1. similarity_(vector_value, ?) → embedding + # 2. WHERE feature_name = ? → vec_feature + # 3. ORDER BY vector_value ANN OF ? → embedding + # 4. LIMIT ? → top_k + ann_rows = list( + session.execute(ann_stmt, (embedding, vec_feature, embedding, top_k)) + ) + + if not ann_rows: + return [] + + # Ordered list of entity key bins from ANN results + entity_key_bins: List[str] = [row.entity_key for row in ann_rows] + timestamps: Dict[str, Optional[datetime]] = { + row.entity_key: row.event_ts for row in ann_rows + } + + # Batch-fetch full feature values from the same main table + select_stmt = self._get_statement( + config, "select", fqtable, columns="feature_name, value, event_ts" + ) + + retrieval = execute_concurrent_with_args( + session, + select_stmt, + ((ek_bin,) for ek_bin in entity_key_bins), + concurrency=online_store_config.read_concurrency, + ) + + # Map entity_key_bin -> feature dict, preserving ANN order + feature_map: Dict[str, Dict[str, ValueProto]] = { + ek: {} for ek in entity_key_bins + } + for ek_bin, (success, rows_or_exc) in zip(entity_key_bins, retrieval): + if not success: + logger.error("ScyllaDB ANN batch-read error: %s", rows_or_exc) + continue + for row in rows_or_exc: + if requested_features and row.feature_name not in requested_features: + continue + val = ValueProto() + val.ParseFromString(row.value) + feature_map[ek_bin][row.feature_name] = val + + # Assemble output in ANN rank order + output: List[ + Tuple[ + Optional[datetime], + Optional[EntityKeyProto], + Optional[Dict[str, ValueProto]], + ] + ] = [] + for ek_bin in entity_key_bins: + try: + ek_proto: Optional[EntityKeyProto] = deserialize_entity_key( + bytes.fromhex(ek_bin), + entity_key_serialization_version=config.entity_key_serialization_version, + ) + except Exception: + ek_proto = None + + output.append( + ( + timestamps.get(ek_bin), + ek_proto, + feature_map.get(ek_bin, {}), + ) + ) + + return output diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 06529cea0f2..6c417542bcd 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -75,6 +75,7 @@ "postgres": "feast.infra.online_stores.postgres_online_store.postgres.PostgreSQLOnlineStore", "hbase": "feast.infra.online_stores.hbase_online_store.hbase.HbaseOnlineStore", "cassandra": "feast.infra.online_stores.cassandra_online_store.cassandra_online_store.CassandraOnlineStore", + "scylladb": "feast.infra.online_stores.scylladb_online_store.scylladb.ScyllaDBOnlineStore", "mysql": "feast.infra.online_stores.mysql_online_store.mysql.MySQLOnlineStore", "hazelcast": "feast.infra.online_stores.hazelcast_online_store.hazelcast_online_store.HazelcastOnlineStore", "elasticsearch": "feast.infra.online_stores.elasticsearch_online_store.elasticsearch.ElasticSearchOnlineStore", diff --git a/sdk/python/tests/integration/online_store/test_scylladb_online_store.py b/sdk/python/tests/integration/online_store/test_scylladb_online_store.py new file mode 100644 index 00000000000..4c2385d1e50 --- /dev/null +++ b/sdk/python/tests/integration/online_store/test_scylladb_online_store.py @@ -0,0 +1,489 @@ +"""Integration tests for ScyllaDBOnlineStore, including vector search. + +All tests (regular read/write and vector search) run against a local +ScyllaDB + Vector Store stack started via ``ScyllaDBOnlineStoreCreator``. +No external cloud cluster is required. + +To run against a ScyllaDB Cloud cluster instead, set: + + SCYLLA_HOSTS="host1,host2" contact points + SCYLLA_KEYSPACE="feast_test" keyspace (default: feast_test) + SCYLLA_USERNAME="scylla" username (optional) + SCYLLA_PASSWORD="..." password (optional) + SCYLLA_LOCAL_DC="..." DC name, e.g. AWS_US_EAST_1 (required) +""" + +import os +import time +from datetime import datetime, timedelta, timezone +from typing import List + +import pytest + +from feast import Entity, FeatureView, RepoConfig +from feast.field import Field +from feast.infra.offline_stores.file_source import FileSource +from feast.infra.online_stores.scylladb_online_store.scylladb import ( + ScyllaDBOnlineStore, + ScyllaDBOnlineStoreConfig, +) +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import FloatList +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.types import Array, Float32, Int64 +from feast.utils import _utc_now +from feast.value_type import ValueType +from tests.universal.feature_repos.universal.online_store.scylladb import ( + ScyllaDBOnlineStoreCreator, +) + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture(scope="module") +def docker_config() -> RepoConfig: + """Start a local ScyllaDB container and return a RepoConfig for it.""" + creator = ScyllaDBOnlineStoreCreator(project_name="integ_test") + store_dict = creator.create_online_store() + cfg = RepoConfig( + project="integ_test", + provider="local", + online_store=ScyllaDBOnlineStoreConfig(**store_dict), + registry="memory://", + entity_key_serialization_version=3, + ) + yield cfg + creator.teardown() + + +@pytest.fixture(scope="module") +def cloud_config(): + """RepoConfig pointing at a ScyllaDB Cloud cluster (skipped if env vars absent).""" + if not os.environ.get("SCYLLA_HOSTS") or not os.environ.get("SCYLLA_LOCAL_DC"): + pytest.skip("Set SCYLLA_HOSTS and SCYLLA_LOCAL_DC to run vector search tests") + hosts = os.environ["SCYLLA_HOSTS"].split(",") + cfg = RepoConfig( + project="integ_test", + provider="local", + online_store=ScyllaDBOnlineStoreConfig( + hosts=hosts, + keyspace=os.environ.get("SCYLLA_KEYSPACE", "feast_test"), + username=os.environ.get("SCYLLA_USERNAME"), + password=os.environ.get("SCYLLA_PASSWORD"), + local_dc=os.environ["SCYLLA_LOCAL_DC"], + ), + registry="memory://", + entity_key_serialization_version=3, + ) + return cfg + + +def _make_entity_key(val: str) -> EntityKeyProto: + return EntityKeyProto( + join_keys=["item_id"], + entity_values=[ValueProto(string_val=val)], + ) + + +def _make_feature_view( + name: str, with_vector: bool = False, ttl: timedelta = None +) -> FeatureView: + source = FileSource(path="dummy.parquet", timestamp_field="event_timestamp") + schema: List[Field] = [Field(name="score", dtype=Array(Float32))] + if with_vector: + schema.append( + Field( + name="embedding", + dtype=Array(Float32), + tags={ + "vector_index": "true", + "dimensions": "4", + "similarity_function": "COSINE", + }, + ) + ) + return FeatureView( + name=name, + entities=[ + Entity(name="item_id", join_keys=["item_id"], value_type=ValueType.STRING) + ], + schema=schema, + online=True, + source=source, + ttl=ttl, + ) + + +# --------------------------------------------------------------------------- +# Tests — regular online store (uses Docker via docker_config fixture) +# --------------------------------------------------------------------------- + + +def test_write_and_read(docker_config): + store = ScyllaDBOnlineStore() + cfg = docker_config + fv = _make_feature_view("test_write_read") + + store.update(cfg, [], [fv], [], [], partial=False) + try: + ek = _make_entity_key("item_1") + store.online_write_batch( + cfg, + fv, + [ + ( + ek, + {"score": ValueProto(float_list_val=FloatList(val=[0.9]))}, + _utc_now(), + None, + ) + ], + None, + ) + results = store.online_read(cfg, fv, [ek]) + assert len(results) == 1 + ts, feats = results[0] + assert feats is not None + assert "score" in feats + assert list(feats["score"].float_list_val.val) == pytest.approx([0.9]) + finally: + store.teardown(cfg, [fv], []) + + +def test_missing_key_returns_none(docker_config): + store = ScyllaDBOnlineStore() + cfg = docker_config + fv = _make_feature_view("test_missing_key") + + store.update(cfg, [], [fv], [], [], partial=False) + try: + existing = _make_entity_key("present") + missing = _make_entity_key("does_not_exist") + store.online_write_batch( + cfg, + fv, + [ + ( + existing, + {"score": ValueProto(float_list_val=FloatList(val=[0.5]))}, + _utc_now(), + None, + ) + ], + None, + ) + results = store.online_read(cfg, fv, [existing, missing]) + assert len(results) == 2 + assert results[0][1] is not None + assert results[1][1] is None + finally: + store.teardown(cfg, [fv], []) + + +def test_multiple_features_roundtrip(docker_config): + """Multiple features of different types all round-trip with the correct value.""" + store = ScyllaDBOnlineStore() + cfg = docker_config + source = FileSource(path="dummy.parquet", timestamp_field="event_timestamp") + fv = FeatureView( + name="test_multi_features", + entities=[ + Entity(name="item_id", join_keys=["item_id"], value_type=ValueType.STRING) + ], + schema=[ + Field(name="score", dtype=Array(Float32)), + Field(name="priority", dtype=Int64), + ], + online=True, + source=source, + ) + + store.update(cfg, [], [fv], [], [], partial=False) + try: + ek = _make_entity_key("item_mf") + store.online_write_batch( + cfg, + fv, + [ + ( + ek, + { + "score": ValueProto(float_list_val=FloatList(val=[0.85, 0.15])), + "priority": ValueProto(int64_val=42), + }, + _utc_now(), + None, + ) + ], + None, + ) + results = store.online_read(cfg, fv, [ek]) + assert len(results) == 1 + ts, feats = results[0] + assert feats is not None + assert list(feats["score"].float_list_val.val) == pytest.approx([0.85, 0.15]) + assert feats["priority"].int64_val == 42 + finally: + store.teardown(cfg, [fv], []) + + +def test_multiple_entities(docker_config): + """Multiple entity keys can be read in a single online_read call with correct values. + + Mirrors the multi-entity behaviour verified by the universal online store + suite (``test_online_retrieval_with_event_timestamps``) which Cassandra runs. + """ + store = ScyllaDBOnlineStore() + cfg = docker_config + fv = _make_feature_view("test_multi_entities") + + store.update(cfg, [], [fv], [], [], partial=False) + try: + keys = [_make_entity_key(f"entity_{i}") for i in range(3)] + batch = [ + ( + ek, + { + "score": ValueProto( + float_list_val=FloatList(val=[float(i) * 0.1 + 0.1]) + ) + }, + _utc_now(), + None, + ) + for i, ek in enumerate(keys) + ] + store.online_write_batch(cfg, fv, batch, None) + results = store.online_read(cfg, fv, keys) + assert len(results) == 3 + for i, (ts, feats) in enumerate(results): + assert feats is not None, f"Entity {i} returned None features" + assert list(feats["score"].float_list_val.val) == pytest.approx( + [float(i) * 0.1 + 0.1] + ) + finally: + store.teardown(cfg, [fv], []) + + +def test_overwrite_uses_latest_value(docker_config): + """Writing the same entity key twice keeps the most-recently-written value.""" + store = ScyllaDBOnlineStore() + cfg = docker_config + fv = _make_feature_view("test_overwrite") + + store.update(cfg, [], [fv], [], [], partial=False) + try: + ek = _make_entity_key("overwrite_item") + store.online_write_batch( + cfg, + fv, + [ + ( + ek, + {"score": ValueProto(float_list_val=FloatList(val=[0.1]))}, + _utc_now(), + None, + ) + ], + None, + ) + store.online_write_batch( + cfg, + fv, + [ + ( + ek, + {"score": ValueProto(float_list_val=FloatList(val=[0.9]))}, + _utc_now(), + None, + ) + ], + None, + ) + results = store.online_read(cfg, fv, [ek]) + assert len(results) == 1 + ts, feats = results[0] + assert feats is not None + assert list(feats["score"].float_list_val.val) == pytest.approx([0.9]) + finally: + store.teardown(cfg, [fv], []) + + +def test_event_timestamp_returned(docker_config): + """The event timestamp written with a row is returned correctly by online_read. + + Mirrors ``test_online_retrieval_with_event_timestamps`` from the universal + suite which verifies per-entity timestamps for all online store types. + """ + store = ScyllaDBOnlineStore() + cfg = docker_config + fv = _make_feature_view("test_event_ts") + + store.update(cfg, [], [fv], [], [], partial=False) + try: + ek = _make_entity_key("ts_item") + # Use a second-boundary timestamp — CQL ``timestamp`` has ms precision. + write_ts = datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc) + store.online_write_batch( + cfg, + fv, + [ + ( + ek, + {"score": ValueProto(float_list_val=FloatList(val=[0.5]))}, + write_ts, + None, + ) + ], + None, + ) + results = store.online_read(cfg, fv, [ek]) + assert len(results) == 1 + ts, feats = results[0] + assert ts is not None + assert isinstance(ts, datetime) + # Normalise both to UTC before comparing. + ts_utc = ts if ts.tzinfo is not None else ts.replace(tzinfo=timezone.utc) + assert ts_utc == write_ts + finally: + store.teardown(cfg, [fv], []) + + +# --------------------------------------------------------------------------- +# Tests — vector search (local Docker stack via docker_config) +# --------------------------------------------------------------------------- + + +def test_vector_search(docker_config): + store = ScyllaDBOnlineStore() + cfg = docker_config + fv = _make_feature_view("test_vector_search", with_vector=True) + + store.update(cfg, [], [fv], [], [], partial=False) + try: + rows = [ + ("vec_a", [1.0, 0.0, 0.0, 0.0]), + ("vec_b", [0.0, 1.0, 0.0, 0.0]), + ("vec_c", [1.0, 0.1, 0.0, 0.0]), # close to vec_a + ] + batch = [] + for item_id, vec in rows: + ek = _make_entity_key(item_id) + batch.append( + ( + ek, + { + "score": ValueProto(float_list_val=FloatList(val=[0.5])), + "embedding": ValueProto(float_list_val=FloatList(val=vec)), + }, + _utc_now(), + None, + ) + ) + store.online_write_batch(cfg, fv, batch, None) + + # Wait for the vector index to finish building (up to 60 s). + deadline = time.time() + 60 + results = None + while time.time() < deadline: + try: + results = store.retrieve_online_documents_v2( + cfg, + fv, + requested_features=["score", "embedding"], + embedding=[1.0, 0.0, 0.0, 0.0], + top_k=2, + ) + break # index is ready + except Exception as exc: + if "still being constructed" in str(exc) or "missing index" in str(exc): + time.sleep(2) + else: + raise + else: + raise TimeoutError("Vector index was not ready within 60 s") + + assert results is not None + assert len(results) == 2 + + # Extract entity IDs from the returned entity key protos. + # Also verify the shape and types of every field in each result tuple. + # The expected embeddings keyed by entity ID for value verification. + expected_embeddings = { + "vec_a": [1.0, 0.0, 0.0, 0.0], + "vec_b": [0.0, 1.0, 0.0, 0.0], + "vec_c": [1.0, 0.1, 0.0, 0.0], + } + returned_ids = [] + for ts, ek_proto, feats in results: + assert isinstance(ts, datetime), f"Expected datetime, got {type(ts)}" + assert feats is not None + assert "score" in feats + assert "embedding" in feats + # score field: single-element float list written as 0.5 + assert list(feats["score"].float_list_val.val) == pytest.approx([0.5]) + # embedding field: values must match what was written + entity_id = ek_proto.entity_values[0].string_val + assert list(feats["embedding"].float_list_val.val) == pytest.approx( + expected_embeddings[entity_id] + ) + assert ek_proto is not None + returned_ids.append(entity_id) + + # Query is [1,0,0,0]; vec_a=[1,0,0,0] (exact match) and + # vec_c=[1,0.1,0,0] are the two nearest neighbours by cosine similarity. + # vec_b=[0,1,0,0] is orthogonal and must NOT appear in top-2. + assert set(returned_ids) == {"vec_a", "vec_c"}, ( + f"Expected top-2 neighbours to be vec_a and vec_c, got {returned_ids}" + ) + # Exact match must be ranked first. + assert returned_ids[0] == "vec_a", ( + f"Expected vec_a (exact match) to be ranked first, got {returned_ids[0]}" + ) + finally: + store.teardown(cfg, [fv], []) + + +def test_ttl_expiry(docker_config): + """Rows written with a TTL should be gone after ScyllaDB expires them.""" + store = ScyllaDBOnlineStore() + cfg = docker_config + # TTL of 2 seconds, short enough for a test, long enough to write first. + fv = _make_feature_view("test_ttl_expiry", ttl=timedelta(seconds=5)) + + store.update(cfg, [], [fv], [], [], partial=False) + try: + ek = _make_entity_key("ttl_item") + store.online_write_batch( + cfg, + fv, + [ + ( + ek, + {"score": ValueProto(float_list_val=FloatList(val=[0.7]))}, + _utc_now(), + None, + ) + ], + None, + ) + + # Confirm the row is readable immediately after writing. + results = store.online_read(cfg, fv, [ek]) + assert len(results) == 1 + ts, feats = results[0] + assert feats is not None, "Row should be present right after write" + + # Wait for ScyllaDB to expire the row (TTL = 5s, wait 8s to be safe). + time.sleep(8) + + results = store.online_read(cfg, fv, [ek]) + assert len(results) == 1 + ts_after, feats_after = results[0] + assert feats_after is None, ( + f"Row should have expired and return None, but got features: {feats_after}" + ) + finally: + store.teardown(cfg, [fv], []) diff --git a/sdk/python/tests/universal/feature_repos/universal/online_store/scylladb.py b/sdk/python/tests/universal/feature_repos/universal/online_store/scylladb.py new file mode 100644 index 00000000000..59f6ae97549 --- /dev/null +++ b/sdk/python/tests/universal/feature_repos/universal/online_store/scylladb.py @@ -0,0 +1,83 @@ +import time +from typing import Dict + +from testcontainers.core.container import DockerContainer +from testcontainers.core.network import Network +from testcontainers.core.waiting_utils import wait_for_logs + +from tests.universal.feature_repos.universal.online_store_creator import ( + OnlineStoreCreator, +) + +_SCYLLA_IMAGE = "scylladb/scylla:2026.1.4" +_VECTOR_STORE_IMAGE = "scylladb/vector-store:1.7.0" + + +class ScyllaDBOnlineStoreCreator(OnlineStoreCreator): + """ + Starts a ScyllaDB + Vector Store stack for integration tests. + + Two containers share a Docker network: + - scylladb/scylla:2026.1.4 — CQL data node (exposes port 9042) + - scylladb/vector-store:1.7.0 — ANN indexing service (port 6080) + + Both regular read/write and vector search tests run against this stack + with no external cloud dependency. + """ + + def __init__(self, project_name: str, **kwargs): + super().__init__(project_name) + self.network = Network() + self.scylla = ( + DockerContainer(_SCYLLA_IMAGE) + .with_network(self.network) + .with_network_aliases("scylla") + .with_command( + "--smp 1 --memory 1G --overprovisioned 1 " + "--vector-store-primary-uri http://vector-store:6080 " + "--broadcast-rpc-address 127.0.0.1" + ) + .with_exposed_ports("9042") + ) + self.vector_store = ( + DockerContainer(_VECTOR_STORE_IMAGE) + .with_network(self.network) + .with_network_aliases("vector-store") + .with_env("VECTOR_STORE_URI", "0.0.0.0:6080") + .with_env("VECTOR_STORE_SCYLLADB_URI", "scylla:9042") + ) + + def create_online_store(self) -> Dict[str, object]: + self.network.create() + self.scylla.start() + wait_for_logs( + container=self.scylla, + predicate="Starting listening for CQL clients", + timeout=120, + ) + time.sleep(5) # allow CQL port to be fully ready + self.vector_store.start() + wait_for_logs( + container=self.vector_store, + predicate="6080", + timeout=60, + ) + keyspace = "feast_keyspace" + self.scylla.exec( + f'cqlsh -e "CREATE KEYSPACE IF NOT EXISTS \\"{keyspace}\\"' + " WITH replication = {'class': 'NetworkTopologyStrategy', 'datacenter1': 1}" + " AND tablets = {'enabled': true};\"" + ) + exposed_port = int(self.scylla.get_exposed_port("9042")) + return { + "type": "scylladb", + "hosts": ["127.0.0.1"], + "port": exposed_port, + "keyspace": keyspace, + "local_dc": "datacenter1", + } + + def teardown(self): + self.vector_store.stop() + self.scylla.stop() + self.network.remove()