From b8857f3af375cf9f6139f43cbdd7c9b3afdf1259 Mon Sep 17 00:00:00 2001 From: Daria Korenieva Date: Wed, 13 May 2026 08:14:01 -0700 Subject: [PATCH 1/8] feat: add Valkey vector store integration Implement ValkeyStore in application/vectorstore/ using valkey-glide-sync and the valkey-search module for HNSW vector similarity search. - Add ValkeyStore class extending BaseVectorStore with full interface: search, add_texts, delete_index, get_chunks, add_chunk, delete_chunk - Register 'valkey' in VectorCreator factory - Add VALKEY_* settings to Settings class (host, port, password, tls, index_name, prefix) - Add valkey-glide-sync==2.3.1 to requirements.txt - Add Valkey config examples to .env-template - Add 22 unit tests (mocked, no external deps) - Add 7 integration tests (requires running Valkey with search module) Signed-off-by: Daria Korenieva --- .env-template | 8 + application/core/settings.py | 10 +- application/requirements.txt | 1 + application/vectorstore/valkey.py | 388 +++++++++++++++++++ application/vectorstore/vector_creator.py | 4 +- tests/vectorstore/test_valkey.py | 268 +++++++++++++ tests/vectorstore/test_valkey_integration.py | 167 ++++++++ 7 files changed, 844 insertions(+), 2 deletions(-) create mode 100644 application/vectorstore/valkey.py create mode 100644 tests/vectorstore/test_valkey.py create mode 100644 tests/vectorstore/test_valkey_integration.py diff --git a/.env-template b/.env-template index df9855bb2..9827efeae 100644 --- a/.env-template +++ b/.env-template @@ -37,3 +37,11 @@ MICROSOFT_AUTHORITY=https://{tenantId}.ciamlogin.com/{tenantId} # POSTGRES_URI=postgresql://docsgpt:docsgpt@localhost:5432/docsgpt + +# Valkey Vector Store (set VECTOR_STORE=valkey to use) +# VALKEY_HOST=localhost +# VALKEY_PORT=6379 +# VALKEY_PASSWORD= +# VALKEY_USE_TLS=false +# VALKEY_INDEX_NAME=docsgpt +# VALKEY_PREFIX=doc: diff --git a/application/core/settings.py b/application/core/settings.py index 0dee0db6e..253784491 100644 --- a/application/core/settings.py +++ b/application/core/settings.py @@ -69,7 +69,7 @@ class Settings(BaseSettings): # Pages docling's threaded pipeline buffers in flight; the library # default (100) drives worker RSS to ~3 GB on a mid-size PDF. DOCLING_PIPELINE_QUEUE_MAX_SIZE: int = 2 - VECTOR_STORE: str = "faiss" # "faiss" or "elasticsearch" or "qdrant" or "milvus" or "lancedb" or "pgvector" + VECTOR_STORE: str = "faiss" # "faiss" or "elasticsearch" or "qdrant" or "milvus" or "lancedb" or "pgvector" or "valkey" RETRIEVERS_ENABLED: list = ["classic_rag"] AGENT_NAME: str = "classic" FALLBACK_LLM_PROVIDER: Optional[str] = None # provider for fallback llm @@ -165,6 +165,14 @@ class Settings(BaseSettings): LANCEDB_PATH: str = "./data/lancedb" # Path where LanceDB stores its local data LANCEDB_TABLE_NAME: Optional[str] = "docsgpts" # Name of the table to use for storing vectors + # Valkey vectorstore config + VALKEY_HOST: str = "localhost" + VALKEY_PORT: int = 6379 + VALKEY_PASSWORD: Optional[str] = None + VALKEY_USE_TLS: bool = False + VALKEY_INDEX_NAME: str = "docsgpt" + VALKEY_PREFIX: str = "doc:" + FLASK_DEBUG_MODE: bool = False STORAGE_TYPE: str = "local" # local or s3 diff --git a/application/requirements.txt b/application/requirements.txt index 5414df037..6931a9190 100644 --- a/application/requirements.txt +++ b/application/requirements.txt @@ -103,6 +103,7 @@ tzdata==2026.1 urllib3==2.6.3 uvicorn[standard]>=0.30,<1 uvicorn-worker>=0.4,<1 +valkey-glide-sync==2.3.1 vine==5.1.0 wcwidth==0.6.0 werkzeug>=3.1.0 diff --git a/application/vectorstore/valkey.py b/application/vectorstore/valkey.py new file mode 100644 index 000000000..769e88156 --- /dev/null +++ b/application/vectorstore/valkey.py @@ -0,0 +1,388 @@ +"""Valkey vector store implementation using valkey-glide-sync and valkey-search module.""" + +import json +import logging +import struct +import uuid +from typing import Any, Dict, List, Optional + +try: + from glide_sync import ( + DataType, + DistanceMetricType, + Field, + FtCreateOptions, + FtSearchLimit, + FtSearchOptions, + GlideClient, + GlideClientConfiguration, + NodeAddress, + ServerCredentials, + TagField, + TextField, + VectorAlgorithm, + VectorField, + VectorFieldAttributesHnsw, + VectorType, + ft, + ) +except ImportError: + raise ImportError( + "Could not import valkey-glide-sync. " + "Please install with `pip install valkey-glide-sync`." + ) + +from application.core.settings import settings +from application.vectorstore.base import BaseVectorStore +from application.vectorstore.document_class import Document + +logger = logging.getLogger(__name__) + + +class ValkeyStore(BaseVectorStore): + """Vector store backed by Valkey with the valkey-search module. + + Uses HASH keys to store document text, metadata, and embedding vectors. + Creates a search index with FT.CREATE for KNN vector similarity search. + + Requires a Valkey server with the valkey-search module loaded. + """ + + def __init__( + self, + source_id: str = "", + embeddings_key: str = "embeddings", + ): + """Initialize ValkeyStore. + + Args: + source_id: Identifier for the document source, used to + namespace and filter documents. + embeddings_key: Key name or API key for the embeddings provider. + """ + super().__init__() + self._source_id = str(source_id).replace("application/indexes/", "").rstrip("/") + self._embedding = self._get_embeddings(settings.EMBEDDINGS_NAME, embeddings_key) + self._index_name = settings.VALKEY_INDEX_NAME + self._prefix = settings.VALKEY_PREFIX + + self._client = self._create_client() + self._ensure_index_exists() + + def _create_client(self) -> GlideClient: + """Create and return a synchronous Valkey GLIDE client. + + Returns: + A connected GlideClient instance (synchronous). + """ + addresses = [NodeAddress(host=settings.VALKEY_HOST, port=settings.VALKEY_PORT)] + + if settings.VALKEY_PASSWORD: + config = GlideClientConfiguration( + addresses=addresses, + use_tls=settings.VALKEY_USE_TLS, + credentials=ServerCredentials(password=settings.VALKEY_PASSWORD), + ) + else: + config = GlideClientConfiguration( + addresses=addresses, + use_tls=settings.VALKEY_USE_TLS, + ) + + return GlideClient.create(config) + + def _ensure_index_exists(self): + """Create the search index if it does not already exist.""" + embedding_dim = getattr(self._embedding, "dimension", 768) + + schema: List[Field] = [ + TextField("content"), + TagField("source_id"), + VectorField( + name="embedding", + algorithm=VectorAlgorithm.HNSW, + attributes=VectorFieldAttributesHnsw( + dimensions=embedding_dim, + distance_metric=DistanceMetricType.COSINE, + type=VectorType.FLOAT32, + ), + ), + ] + + options = FtCreateOptions(data_type=DataType.HASH, prefixes=[self._prefix]) + + try: + ft.create(self._client, self._index_name, schema, options) + logger.info(f"Created Valkey search index '{self._index_name}'") + except Exception as e: + if "already exists" in str(e).lower(): + logger.debug(f"Valkey index '{self._index_name}' already exists") + else: + logger.error(f"Error creating Valkey index: {e}") + raise + + def _doc_key(self, doc_id: str) -> str: + """Generate a hash key for a document. + + Args: + doc_id: The unique document identifier. + + Returns: + The full key including the prefix. + """ + return f"{self._prefix}{doc_id}" + + def search(self, question: str, k: int = 2, *args, **kwargs) -> List[Document]: + """Search for similar documents using vector similarity. + + Args: + question: The query text to search for. + k: Number of results to return. + + Returns: + A list of Document objects sorted by similarity. + """ + query_vector = self._embedding.embed_query(question) + vector_bytes = struct.pack(f"{len(query_vector)}f", *query_vector) + + # KNN search with source_id filter + query = f"@source_id:{{{self._source_id}}} =>[KNN {k} @embedding $BLOB AS score]" + + try: + results = ft.search( + self._client, + self._index_name, + query, + FtSearchOptions(params={"BLOB": vector_bytes}), + ) + + return self._parse_search_results(results) + + except Exception as e: + logger.error(f"Error searching Valkey: {e}", exc_info=True) + return [] + + def _parse_search_results(self, results: list) -> List[Document]: + """Parse ft.search response into Document objects. + + The response format is: [total_count, {key: {field: value}}, ...] + + Args: + results: Raw response from ft.search. + + Returns: + List of Document objects. + """ + documents = [] + if not results or len(results) < 2: + return documents + + # results[0] is the total count, results[1:] are mappings of {key: {fields}} + for entry in results[1:]: + if isinstance(entry, dict): + for _key, fields in entry.items(): + field_dict = self._decode_fields(fields) + content = field_dict.get("content", "") + metadata = self._parse_metadata(field_dict) + documents.append(Document(page_content=content, metadata=metadata)) + + return documents + + def _decode_fields(self, fields) -> Dict[str, Any]: + """Decode bytes in field dict to strings, skipping binary fields. + + Args: + fields: Dict with potentially bytes keys/values. + + Returns: + A dictionary with string keys and values (binary fields excluded). + """ + result = {} + if isinstance(fields, dict): + for k, v in fields.items(): + key = k.decode("utf-8") if isinstance(k, bytes) else str(k) + # Skip binary fields (like embedding vectors) that can't be decoded + if isinstance(v, bytes): + try: + value = v.decode("utf-8") + except UnicodeDecodeError: + continue + else: + value = str(v) if not isinstance(v, str) else v + result[key] = value + return result + + def _parse_metadata(self, field_dict: Dict[str, Any]) -> Dict[str, Any]: + """Extract metadata from field dictionary. + + Args: + field_dict: Parsed fields from a document hash. + + Returns: + Metadata dictionary. + """ + metadata_str = field_dict.get("metadata", "{}") + try: + return json.loads(metadata_str) + except (json.JSONDecodeError, TypeError): + return {} + + def add_texts( + self, + texts: List[str], + metadatas: Optional[List[Dict[str, Any]]] = None, + *args, + **kwargs, + ) -> List[str]: + """Add texts with embeddings to the vector store. + + Args: + texts: List of text strings to add. + metadatas: Optional list of metadata dicts for each text. + + Returns: + List of document IDs that were added. + """ + if not texts: + return [] + + embeddings = self._embedding.embed_documents(texts) + metadatas = metadatas or [{}] * len(texts) + doc_ids = [] + + for text, embedding, metadata in zip(texts, embeddings, metadatas): + doc_id = str(uuid.uuid4()) + key = self._doc_key(doc_id) + vector_bytes = struct.pack(f"{len(embedding)}f", *embedding) + + fields = { + "content": text, + "source_id": self._source_id, + "metadata": json.dumps(metadata), + "embedding": vector_bytes, + } + + try: + self._client.hset(key, fields) + doc_ids.append(doc_id) + except Exception as e: + logger.error(f"Error adding document to Valkey: {e}") + raise + + return doc_ids + + def delete_index(self, *args, **kwargs): + """Delete all documents for this source_id. + + Searches for all documents with matching source_id and deletes them. + """ + try: + query = f"@source_id:{{{self._source_id}}}" + results = ft.search( + self._client, + self._index_name, + query, + FtSearchOptions(limit=FtSearchLimit(0, 10000)), + ) + + if results and len(results) > 1: + for entry in results[1:]: + if isinstance(entry, dict): + for key in entry.keys(): + key_str = key.decode("utf-8") if isinstance(key, bytes) else str(key) + self._client.delete([key_str]) + + except Exception as e: + logger.error(f"Error deleting index from Valkey: {e}", exc_info=True) + + def save_local(self, *args, **kwargs): + """No-op for Valkey — data is already persisted.""" + pass + + def get_chunks(self) -> List[Dict[str, Any]]: + """Get all chunks for this source_id. + + Returns: + List of chunk dicts with doc_id, text, and metadata. + """ + try: + query = f"@source_id:{{{self._source_id}}}" + results = ft.search( + self._client, + self._index_name, + query, + FtSearchOptions(limit=FtSearchLimit(0, 10000)), + ) + + chunks = [] + if results and len(results) > 1: + for entry in results[1:]: + if isinstance(entry, dict): + for key, fields in entry.items(): + key_str = key.decode("utf-8") if isinstance(key, bytes) else str(key) + doc_id = key_str.replace(self._prefix, "", 1) + field_dict = self._decode_fields(fields) + chunks.append({ + "doc_id": doc_id, + "text": field_dict.get("content", ""), + "metadata": self._parse_metadata(field_dict), + }) + + return chunks + + except Exception as e: + logger.error(f"Error getting chunks from Valkey: {e}", exc_info=True) + return [] + + def add_chunk(self, text: str, metadata: Optional[Dict[str, Any]] = None) -> str: + """Add a single chunk to the vector store. + + Args: + text: The text content of the chunk. + metadata: Optional metadata dictionary. + + Returns: + The generated document ID. + """ + metadata = metadata or {} + final_metadata = metadata.copy() + final_metadata["source_id"] = self._source_id + + embeddings = self._embedding.embed_documents([text]) + if not embeddings: + raise ValueError("Could not generate embedding for chunk") + + doc_id = str(uuid.uuid4()) + key = self._doc_key(doc_id) + vector_bytes = struct.pack(f"{len(embeddings[0])}f", *embeddings[0]) + + fields = { + "content": text, + "source_id": self._source_id, + "metadata": json.dumps(final_metadata), + "embedding": vector_bytes, + } + + try: + self._client.hset(key, fields) + return doc_id + except Exception as e: + logger.error(f"Error adding chunk to Valkey: {e}") + raise + + def delete_chunk(self, chunk_id: str) -> bool: + """Delete a specific chunk by its ID. + + Args: + chunk_id: The document ID to delete. + + Returns: + True if the chunk was deleted, False otherwise. + """ + try: + key = self._doc_key(chunk_id) + result = self._client.delete([key]) + return result > 0 + except Exception as e: + logger.error(f"Error deleting chunk from Valkey: {e}", exc_info=True) + return False diff --git a/application/vectorstore/vector_creator.py b/application/vectorstore/vector_creator.py index 7d307f659..e1eab1bcb 100644 --- a/application/vectorstore/vector_creator.py +++ b/application/vectorstore/vector_creator.py @@ -4,6 +4,7 @@ from application.vectorstore.mongodb import MongoDBVectorStore from application.vectorstore.qdrant import QdrantStore from application.vectorstore.pgvector import PGVectorStore +from application.vectorstore.valkey import ValkeyStore class VectorCreator: @@ -13,7 +14,8 @@ class VectorCreator: "mongodb": MongoDBVectorStore, "qdrant": QdrantStore, "milvus": MilvusStore, - "pgvector": PGVectorStore + "pgvector": PGVectorStore, + "valkey": ValkeyStore, } @classmethod diff --git a/tests/vectorstore/test_valkey.py b/tests/vectorstore/test_valkey.py new file mode 100644 index 000000000..a65f2a0bf --- /dev/null +++ b/tests/vectorstore/test_valkey.py @@ -0,0 +1,268 @@ +"""Unit tests for the Valkey vector store implementation.""" + +import json +from unittest.mock import MagicMock, Mock, patch + +import pytest + + +def _make_store(source_id="test-source", embeddings_key="key"): + """Helper to create a ValkeyStore with all external deps mocked.""" + with patch( + "application.vectorstore.base.BaseVectorStore._get_embeddings" + ) as mock_get_emb, patch( + "application.vectorstore.valkey.settings" + ) as mock_settings, patch( + "application.vectorstore.valkey.ValkeyStore._create_client" + ) as mock_create_client, patch( + "application.vectorstore.valkey.ValkeyStore._ensure_index_exists" + ): + mock_emb = Mock() + mock_emb.embed_query = Mock(return_value=[0.1, 0.2, 0.3]) + mock_emb.embed_documents = Mock(return_value=[[0.1, 0.2, 0.3]]) + mock_emb.dimension = 768 + mock_get_emb.return_value = mock_emb + + mock_settings.EMBEDDINGS_NAME = "test_model" + mock_settings.VALKEY_HOST = "localhost" + mock_settings.VALKEY_PORT = 6379 + mock_settings.VALKEY_PASSWORD = None + mock_settings.VALKEY_USE_TLS = False + mock_settings.VALKEY_INDEX_NAME = "docsgpt" + mock_settings.VALKEY_PREFIX = "doc:" + + mock_client = MagicMock() + mock_create_client.return_value = mock_client + + from application.vectorstore.valkey import ValkeyStore + + store = ValkeyStore(source_id=source_id, embeddings_key=embeddings_key) + + return store, mock_client, mock_emb + + +@pytest.mark.unit +class TestValkeyStoreInit: + def test_source_id_cleaned(self): + store, _, _ = _make_store(source_id="application/indexes/abc123/") + assert store._source_id == "abc123" + + def test_source_id_simple(self): + store, _, _ = _make_store(source_id="my-source") + assert store._source_id == "my-source" + + +@pytest.mark.unit +class TestValkeyStoreSearch: + def test_search_returns_documents(self): + store, mock_client, mock_emb = _make_store() + + # ft.search returns [total_count, {key: {field: value}}, ...] + mock_response = [ + 2, + {b"doc:id1": {b"content": b"hello world", b"source_id": b"test-source", b"metadata": b'{"source": "test.txt"}'}}, + {b"doc:id2": {b"content": b"foo bar", b"source_id": b"test-source", b"metadata": b'{"source": "test2.txt"}'}}, + ] + + with patch("application.vectorstore.valkey.ft.search", return_value=mock_response): + results = store.search("query", k=2) + + mock_emb.embed_query.assert_called_once_with("query") + assert len(results) == 2 + assert results[0].page_content == "hello world" + assert results[0].metadata == {"source": "test.txt"} + assert results[1].page_content == "foo bar" + + def test_search_returns_empty_on_error(self): + store, mock_client, _ = _make_store() + + with patch("application.vectorstore.valkey.ft.search", side_effect=Exception("connection lost")): + results = store.search("query") + assert results == [] + + def test_search_returns_empty_on_no_results(self): + store, mock_client, _ = _make_store() + + with patch("application.vectorstore.valkey.ft.search", return_value=[0]): + results = store.search("query") + assert results == [] + + def test_search_handles_string_fields(self): + store, mock_client, _ = _make_store() + + mock_response = [ + 1, + {"doc:id1": {"content": "hello", "source_id": "test-source", "metadata": "{}"}}, + ] + + with patch("application.vectorstore.valkey.ft.search", return_value=mock_response): + results = store.search("query", k=1) + + assert len(results) == 1 + assert results[0].page_content == "hello" + + +@pytest.mark.unit +class TestValkeyStoreAddTexts: + def test_add_texts_returns_ids(self): + store, mock_client, mock_emb = _make_store() + mock_emb.embed_documents.return_value = [[0.1, 0.2, 0.3], [0.4, 0.5, 0.6]] + mock_client.hset = Mock(return_value=3) + + ids = store.add_texts(["text1", "text2"], [{"a": 1}, {"b": 2}]) + + assert len(ids) == 2 + assert mock_client.hset.call_count == 2 + + def test_add_texts_empty_returns_empty(self): + store, _, _ = _make_store() + assert store.add_texts([]) == [] + + def test_add_texts_default_metadatas(self): + store, mock_client, mock_emb = _make_store() + mock_emb.embed_documents.return_value = [[0.1, 0.2, 0.3]] + mock_client.hset = Mock(return_value=3) + + ids = store.add_texts(["text1"]) + assert len(ids) == 1 + + def test_add_texts_raises_on_error(self): + store, mock_client, mock_emb = _make_store() + mock_emb.embed_documents.return_value = [[0.1, 0.2, 0.3]] + mock_client.hset = Mock(side_effect=Exception("write failed")) + + with pytest.raises(Exception, match="write failed"): + store.add_texts(["text1"]) + + def test_add_texts_stores_correct_fields(self): + store, mock_client, mock_emb = _make_store(source_id="src1") + mock_emb.embed_documents.return_value = [[0.1, 0.2, 0.3]] + mock_client.hset = Mock(return_value=3) + + store.add_texts(["hello"], [{"key": "val"}]) + + call_args = mock_client.hset.call_args + key = call_args[0][0] + fields = call_args[0][1] + + assert key.startswith("doc:") + assert fields["content"] == "hello" + assert fields["source_id"] == "src1" + assert json.loads(fields["metadata"]) == {"key": "val"} + assert isinstance(fields["embedding"], bytes) + + +@pytest.mark.unit +class TestValkeyStoreDeleteIndex: + def test_delete_index_deletes_matching_docs(self): + store, mock_client, _ = _make_store(source_id="src123") + + mock_response = [ + 2, + {b"doc:id1": {b"content": b"text1"}}, + {b"doc:id2": {b"content": b"text2"}}, + ] + + with patch("application.vectorstore.valkey.ft.search", return_value=mock_response): + mock_client.delete = Mock(return_value=1) + store.delete_index() + + assert mock_client.delete.call_count == 2 + + def test_delete_index_handles_error(self): + store, mock_client, _ = _make_store() + + with patch("application.vectorstore.valkey.ft.search", side_effect=Exception("fail")): + # Should not raise + store.delete_index() + + +@pytest.mark.unit +class TestValkeyStoreSaveLocal: + def test_save_local_is_noop(self): + store, _, _ = _make_store() + assert store.save_local() is None + + +@pytest.mark.unit +class TestValkeyStoreGetChunks: + def test_get_chunks(self): + store, mock_client, _ = _make_store() + + mock_response = [ + 2, + {b"doc:uuid1": {b"content": b"text1", b"source_id": b"test-source", b"metadata": b'{"key": "val"}'}}, + {b"doc:uuid2": {b"content": b"text2", b"source_id": b"test-source", b"metadata": b"{}"}}, + ] + + with patch("application.vectorstore.valkey.ft.search", return_value=mock_response): + chunks = store.get_chunks() + + assert len(chunks) == 2 + assert chunks[0] == {"doc_id": "uuid1", "text": "text1", "metadata": {"key": "val"}} + assert chunks[1] == {"doc_id": "uuid2", "text": "text2", "metadata": {}} + + def test_get_chunks_returns_empty_on_error(self): + store, mock_client, _ = _make_store() + + with patch("application.vectorstore.valkey.ft.search", side_effect=Exception("fail")): + assert store.get_chunks() == [] + + +@pytest.mark.unit +class TestValkeyStoreAddChunk: + def test_add_chunk(self): + store, mock_client, mock_emb = _make_store(source_id="src1") + mock_emb.embed_documents.return_value = [[0.1, 0.2, 0.3]] + mock_client.hset = Mock(return_value=3) + + chunk_id = store.add_chunk("hello", metadata={"key": "val"}) + + assert isinstance(chunk_id, str) + assert len(chunk_id) > 0 + mock_client.hset.assert_called_once() + + def test_add_chunk_raises_on_empty_embedding(self): + store, _, mock_emb = _make_store() + mock_emb.embed_documents.return_value = [] + + with pytest.raises(ValueError, match="Could not generate embedding"): + store.add_chunk("text") + + def test_add_chunk_includes_source_id_in_metadata(self): + store, mock_client, mock_emb = _make_store(source_id="src1") + mock_emb.embed_documents.return_value = [[0.1, 0.2, 0.3]] + mock_client.hset = Mock(return_value=3) + + store.add_chunk("hello", metadata={"key": "val"}) + + call_args = mock_client.hset.call_args + fields = call_args[0][1] + metadata = json.loads(fields["metadata"]) + assert metadata["source_id"] == "src1" + assert metadata["key"] == "val" + + +@pytest.mark.unit +class TestValkeyStoreDeleteChunk: + def test_delete_chunk_success(self): + store, mock_client, _ = _make_store() + mock_client.delete = Mock(return_value=1) + + result = store.delete_chunk("uuid-123") + assert result is True + mock_client.delete.assert_called_once_with(["doc:uuid-123"]) + + def test_delete_chunk_not_found(self): + store, mock_client, _ = _make_store() + mock_client.delete = Mock(return_value=0) + + result = store.delete_chunk("nonexistent") + assert result is False + + def test_delete_chunk_returns_false_on_error(self): + store, mock_client, _ = _make_store() + mock_client.delete = Mock(side_effect=Exception("fail")) + + result = store.delete_chunk("uuid-123") + assert result is False diff --git a/tests/vectorstore/test_valkey_integration.py b/tests/vectorstore/test_valkey_integration.py new file mode 100644 index 000000000..dd82a9ab0 --- /dev/null +++ b/tests/vectorstore/test_valkey_integration.py @@ -0,0 +1,167 @@ +"""Integration tests for Valkey vector store. + +These tests require a running Valkey instance with the valkey-search module loaded. +Run with: podman run -d --name valkey-test -p 6379:6379 valkey/valkey:8.1 --loadmodule /usr/lib/valkey/modules/valkeysearch.so + +Skip these tests when Valkey is not available by running: + pytest -m "not integration" +""" + +import os +import uuid + +import pytest + +VALKEY_HOST = os.environ.get("VALKEY_HOST", "localhost") +VALKEY_PORT = int(os.environ.get("VALKEY_PORT", "6379")) + + +def _valkey_available() -> bool: + """Check if a Valkey instance is reachable.""" + try: + from glide_sync import GlideClient, GlideClientConfiguration, NodeAddress + + config = GlideClientConfiguration( + addresses=[NodeAddress(host=VALKEY_HOST, port=VALKEY_PORT)] + ) + client = GlideClient.create(config) + client.ping() + return True + except Exception: + return False + + +pytestmark = [ + pytest.mark.integration, + pytest.mark.skipif( + not _valkey_available(), + reason=f"Valkey not available at {VALKEY_HOST}:{VALKEY_PORT}", + ), +] + + +class FakeEmbeddings: + """Deterministic fake embeddings for integration testing.""" + + dimension = 4 + + def embed_query(self, text: str) -> list: + """Return a simple hash-based vector.""" + h = hash(text) % 1000 + return [h / 1000.0, (h + 1) / 1000.0, (h + 2) / 1000.0, (h + 3) / 1000.0] + + def embed_documents(self, texts: list) -> list: + """Embed multiple documents.""" + return [self.embed_query(t) for t in texts] + + +@pytest.fixture +def valkey_store(): + """Create a ValkeyStore with a unique source_id for test isolation.""" + from unittest.mock import patch + + source_id = f"test-{uuid.uuid4().hex[:8]}" + index_name = f"test_idx_{uuid.uuid4().hex[:8]}" + + with patch( + "application.vectorstore.base.BaseVectorStore._get_embeddings" + ) as mock_get_emb, patch( + "application.vectorstore.valkey.settings" + ) as mock_settings: + mock_get_emb.return_value = FakeEmbeddings() + mock_settings.EMBEDDINGS_NAME = "test_model" + mock_settings.VALKEY_HOST = VALKEY_HOST + mock_settings.VALKEY_PORT = VALKEY_PORT + mock_settings.VALKEY_PASSWORD = None + mock_settings.VALKEY_USE_TLS = False + mock_settings.VALKEY_INDEX_NAME = index_name + mock_settings.VALKEY_PREFIX = f"test:{source_id}:" + + from application.vectorstore.valkey import ValkeyStore + + store = ValkeyStore(source_id=source_id, embeddings_key="test") + + yield store + + # Cleanup: delete all test documents and drop the index + try: + store.delete_index() + from glide_sync import ft + + ft.dropindex(store._client, index_name) + except Exception: + pass + + +class TestValkeyIntegrationAddAndSearch: + def test_add_texts_and_search(self, valkey_store): + """Test basic add and search flow.""" + texts = [ + "Python is a programming language", + "Valkey is an in-memory data store", + "Machine learning uses neural networks", + ] + metadatas = [ + {"source": "python.txt"}, + {"source": "valkey.txt"}, + {"source": "ml.txt"}, + ] + + ids = valkey_store.add_texts(texts, metadatas) + + assert len(ids) == 3 + assert all(isinstance(id_, str) for id_ in ids) + + # Search should return results + results = valkey_store.search("programming language", k=2) + assert len(results) > 0 + assert all(hasattr(r, "page_content") for r in results) + assert all(hasattr(r, "metadata") for r in results) + + def test_add_chunk_and_get_chunks(self, valkey_store): + """Test single chunk add and retrieval.""" + chunk_id = valkey_store.add_chunk( + "Test document content", + metadata={"author": "test", "page": 1}, + ) + + assert isinstance(chunk_id, str) + + chunks = valkey_store.get_chunks() + assert len(chunks) == 1 + assert chunks[0]["text"] == "Test document content" + assert chunks[0]["metadata"]["author"] == "test" + + def test_delete_chunk(self, valkey_store): + """Test deleting a specific chunk.""" + chunk_id = valkey_store.add_chunk("to be deleted") + + result = valkey_store.delete_chunk(chunk_id) + assert result is True + + # Verify it's gone + chunks = valkey_store.get_chunks() + assert len(chunks) == 0 + + def test_delete_nonexistent_chunk(self, valkey_store): + """Test deleting a chunk that doesn't exist.""" + result = valkey_store.delete_chunk("nonexistent-id") + assert result is False + + def test_delete_index(self, valkey_store): + """Test deleting all documents for a source.""" + valkey_store.add_texts(["doc1", "doc2", "doc3"]) + + valkey_store.delete_index() + + chunks = valkey_store.get_chunks() + assert len(chunks) == 0 + + def test_save_local_is_noop(self, valkey_store): + """Test that save_local doesn't raise.""" + assert valkey_store.save_local() is None + + def test_empty_search(self, valkey_store): + """Test search with no documents returns empty.""" + results = valkey_store.search("anything", k=5) + assert results == [] From 2e117e535394645b200f1279755e357f45f72b80 Mon Sep 17 00:00:00 2001 From: Daria Korenieva Date: Wed, 13 May 2026 09:35:52 -0700 Subject: [PATCH 2/8] fix: address review issues in Valkey vector store - Escape special characters in source_id for tag queries (prevents malformed FT.SEARCH queries with dots, hyphens, slashes, etc.) - Use explicit password check (is not None and != '') instead of truthiness to handle empty-string env vars correctly - Add request_timeout=5000ms to GlideClientConfiguration - Use ReturnField to avoid fetching embedding blobs in search/get_chunks - Paginate delete_index and get_chunks to handle >10k documents - Batch DELETE calls (100 keys per call) for efficiency - Improve error message in add_texts to report partial write count - Add unit tests for tag escaping, batch delete, pagination, and password handling edge cases Signed-off-by: Daria Korenieva --- application/vectorstore/valkey.py | 167 +++++++++++++++++----- tests/vectorstore/test_valkey.py | 222 +++++++++++++++++++++++++++++- 2 files changed, 353 insertions(+), 36 deletions(-) diff --git a/application/vectorstore/valkey.py b/application/vectorstore/valkey.py index 769e88156..bc50d0eb8 100644 --- a/application/vectorstore/valkey.py +++ b/application/vectorstore/valkey.py @@ -17,6 +17,7 @@ GlideClient, GlideClientConfiguration, NodeAddress, + ReturnField, ServerCredentials, TagField, TextField, @@ -38,6 +39,15 @@ logger = logging.getLogger(__name__) +# Characters that must be escaped in Valkey tag field query values. +_TAG_SPECIAL_CHARS = set(r".,<>{}[]\"':;!@#$%^&*()-+=~ /") + +# Batch size for DELETE operations in delete_index. +_DELETE_BATCH_SIZE = 100 + +# Page size for paginated scan in delete_index / get_chunks. +_SCAN_PAGE_SIZE = 10000 + class ValkeyStore(BaseVectorStore): """Vector store backed by Valkey with the valkey-search module. @@ -77,16 +87,18 @@ def _create_client(self) -> GlideClient: """ addresses = [NodeAddress(host=settings.VALKEY_HOST, port=settings.VALKEY_PORT)] - if settings.VALKEY_PASSWORD: + if settings.VALKEY_PASSWORD is not None and settings.VALKEY_PASSWORD != "": config = GlideClientConfiguration( addresses=addresses, use_tls=settings.VALKEY_USE_TLS, credentials=ServerCredentials(password=settings.VALKEY_PASSWORD), + request_timeout=5000, ) else: config = GlideClientConfiguration( addresses=addresses, use_tls=settings.VALKEY_USE_TLS, + request_timeout=5000, ) return GlideClient.create(config) @@ -121,6 +133,24 @@ def _ensure_index_exists(self): logger.error(f"Error creating Valkey index: {e}") raise + @staticmethod + def _escape_tag_value(value: str) -> str: + """Escape special characters for Valkey tag field queries. + + Args: + value: The raw tag value to escape. + + Returns: + The escaped string safe for use in @field:{...} queries. + """ + escaped = [] + for ch in value: + if ch in _TAG_SPECIAL_CHARS: + escaped.append(f"\\{ch}") + else: + escaped.append(ch) + return "".join(escaped) + def _doc_key(self, doc_id: str) -> str: """Generate a hash key for a document. @@ -145,15 +175,22 @@ def search(self, question: str, k: int = 2, *args, **kwargs) -> List[Document]: query_vector = self._embedding.embed_query(question) vector_bytes = struct.pack(f"{len(query_vector)}f", *query_vector) - # KNN search with source_id filter - query = f"@source_id:{{{self._source_id}}} =>[KNN {k} @embedding $BLOB AS score]" + escaped_source = self._escape_tag_value(self._source_id) + query = f"@source_id:{{{escaped_source}}} =>[KNN {k} @embedding $BLOB AS score]" try: results = ft.search( self._client, self._index_name, query, - FtSearchOptions(params={"BLOB": vector_bytes}), + FtSearchOptions( + params={"BLOB": vector_bytes}, + return_fields=[ + ReturnField("content"), + ReturnField("source_id"), + ReturnField("metadata"), + ], + ), ) return self._parse_search_results(results) @@ -242,13 +279,17 @@ def add_texts( Returns: List of document IDs that were added. + + Raises: + Exception: If any write fails. Successfully written documents + prior to the failure are not rolled back. """ if not texts: return [] embeddings = self._embedding.embed_documents(texts) metadatas = metadatas or [{}] * len(texts) - doc_ids = [] + doc_ids: List[str] = [] for text, embedding, metadata in zip(texts, embeddings, metadatas): doc_id = str(uuid.uuid4()) @@ -266,31 +307,65 @@ def add_texts( self._client.hset(key, fields) doc_ids.append(doc_id) except Exception as e: - logger.error(f"Error adding document to Valkey: {e}") + logger.error( + f"Error adding document to Valkey (wrote {len(doc_ids)}/{len(texts)} " + f"before failure): {e}" + ) raise return doc_ids - def delete_index(self, *args, **kwargs): - """Delete all documents for this source_id. + def _paginated_source_scan(self) -> List[str]: + """Scan all keys matching this source_id, handling pagination. - Searches for all documents with matching source_id and deletes them. + Returns: + List of key strings for all documents with this source_id. """ - try: - query = f"@source_id:{{{self._source_id}}}" + all_keys: List[str] = [] + offset = 0 + escaped_source = self._escape_tag_value(self._source_id) + query = f"@source_id:{{{escaped_source}}}" + + while True: results = ft.search( self._client, self._index_name, query, - FtSearchOptions(limit=FtSearchLimit(0, 10000)), + FtSearchOptions(limit=FtSearchLimit(offset, _SCAN_PAGE_SIZE)), ) - if results and len(results) > 1: - for entry in results[1:]: - if isinstance(entry, dict): - for key in entry.keys(): - key_str = key.decode("utf-8") if isinstance(key, bytes) else str(key) - self._client.delete([key_str]) + if not results or len(results) < 2: + break + + page_keys: List[str] = [] + for entry in results[1:]: + if isinstance(entry, dict): + for key in entry.keys(): + key_str = key.decode("utf-8") if isinstance(key, bytes) else str(key) + page_keys.append(key_str) + + all_keys.extend(page_keys) + + # If we got fewer results than page size, we've reached the end + if len(page_keys) < _SCAN_PAGE_SIZE: + break + offset += _SCAN_PAGE_SIZE + + return all_keys + + def delete_index(self, *args, **kwargs): + """Delete all documents for this source_id. + + Searches for all documents with matching source_id and deletes them + in batches. Handles sources with more than 10,000 documents via pagination. + """ + try: + keys = self._paginated_source_scan() + + # Batch deletes for efficiency + for i in range(0, len(keys), _DELETE_BATCH_SIZE): + batch = keys[i : i + _DELETE_BATCH_SIZE] + self._client.delete(batch) except Exception as e: logger.error(f"Error deleting index from Valkey: {e}", exc_info=True) @@ -306,27 +381,51 @@ def get_chunks(self) -> List[Dict[str, Any]]: List of chunk dicts with doc_id, text, and metadata. """ try: - query = f"@source_id:{{{self._source_id}}}" - results = ft.search( - self._client, - self._index_name, - query, - FtSearchOptions(limit=FtSearchLimit(0, 10000)), - ) - - chunks = [] - if results and len(results) > 1: + escaped_source = self._escape_tag_value(self._source_id) + query = f"@source_id:{{{escaped_source}}}" + + chunks: List[Dict[str, Any]] = [] + offset = 0 + + while True: + results = ft.search( + self._client, + self._index_name, + query, + FtSearchOptions( + limit=FtSearchLimit(offset, _SCAN_PAGE_SIZE), + return_fields=[ + ReturnField("content"), + ReturnField("source_id"), + ReturnField("metadata"), + ], + ), + ) + + if not results or len(results) < 2: + break + + page_count = 0 for entry in results[1:]: if isinstance(entry, dict): for key, fields in entry.items(): - key_str = key.decode("utf-8") if isinstance(key, bytes) else str(key) + key_str = ( + key.decode("utf-8") if isinstance(key, bytes) else str(key) + ) doc_id = key_str.replace(self._prefix, "", 1) field_dict = self._decode_fields(fields) - chunks.append({ - "doc_id": doc_id, - "text": field_dict.get("content", ""), - "metadata": self._parse_metadata(field_dict), - }) + chunks.append( + { + "doc_id": doc_id, + "text": field_dict.get("content", ""), + "metadata": self._parse_metadata(field_dict), + } + ) + page_count += 1 + + if page_count < _SCAN_PAGE_SIZE: + break + offset += _SCAN_PAGE_SIZE return chunks diff --git a/tests/vectorstore/test_valkey.py b/tests/vectorstore/test_valkey.py index a65f2a0bf..b5fdf2d72 100644 --- a/tests/vectorstore/test_valkey.py +++ b/tests/vectorstore/test_valkey.py @@ -52,6 +52,61 @@ def test_source_id_simple(self): assert store._source_id == "my-source" +@pytest.mark.unit +class TestValkeyStoreTagEscaping: + """Tests for _escape_tag_value to ensure source_ids with special chars are safe.""" + + def test_escape_dots(self): + from application.vectorstore.valkey import ValkeyStore + + assert ValkeyStore._escape_tag_value("my.source.v2") == r"my\.source\.v2" + + def test_escape_hyphens(self): + from application.vectorstore.valkey import ValkeyStore + + assert ValkeyStore._escape_tag_value("my-source") == r"my\-source" + + def test_escape_slashes(self): + from application.vectorstore.valkey import ValkeyStore + + assert ValkeyStore._escape_tag_value("user/docs") == r"user\/docs" + + def test_escape_colons(self): + from application.vectorstore.valkey import ValkeyStore + + assert ValkeyStore._escape_tag_value("ns:value") == r"ns\:value" + + def test_escape_spaces(self): + from application.vectorstore.valkey import ValkeyStore + + assert ValkeyStore._escape_tag_value("my source") == r"my\ source" + + def test_no_escape_needed(self): + from application.vectorstore.valkey import ValkeyStore + + assert ValkeyStore._escape_tag_value("simplesource123") == "simplesource123" + + def test_escape_multiple_special_chars(self): + from application.vectorstore.valkey import ValkeyStore + + result = ValkeyStore._escape_tag_value("a.b-c/d:e") + assert result == r"a\.b\-c\/d\:e" + + def test_search_uses_escaped_source_id(self): + """Verify search query contains escaped source_id.""" + store, mock_client, mock_emb = _make_store(source_id="my.source-v2") + + mock_response = [0] + + with patch("application.vectorstore.valkey.ft.search", return_value=mock_response) as mock_ft: + store.search("query", k=1) + + call_args = mock_ft.call_args + query_str = call_args[0][2] + # Should contain escaped version + assert r"my\.source\-v2" in query_str + + @pytest.mark.unit class TestValkeyStoreSearch: def test_search_returns_documents(self): @@ -101,6 +156,17 @@ def test_search_handles_string_fields(self): assert len(results) == 1 assert results[0].page_content == "hello" + def test_search_passes_return_fields(self): + """Verify search specifies return_fields to avoid fetching embeddings.""" + store, mock_client, _ = _make_store() + + with patch("application.vectorstore.valkey.ft.search", return_value=[0]) as mock_ft: + store.search("query", k=1) + + call_args = mock_ft.call_args + options = call_args[0][3] + assert hasattr(options, "return_fields") + @pytest.mark.unit class TestValkeyStoreAddTexts: @@ -154,7 +220,8 @@ def test_add_texts_stores_correct_fields(self): @pytest.mark.unit class TestValkeyStoreDeleteIndex: - def test_delete_index_deletes_matching_docs(self): + def test_delete_index_deletes_matching_docs_in_batch(self): + """Verify delete_index uses batched deletes.""" store, mock_client, _ = _make_store(source_id="src123") mock_response = [ @@ -164,10 +231,43 @@ def test_delete_index_deletes_matching_docs(self): ] with patch("application.vectorstore.valkey.ft.search", return_value=mock_response): + mock_client.delete = Mock(return_value=2) + store.delete_index() + + # Should batch both keys into one delete call + mock_client.delete.assert_called_once() + deleted_keys = mock_client.delete.call_args[0][0] + assert len(deleted_keys) == 2 + + def test_delete_index_paginates_large_sets(self): + """Verify delete_index paginates when there are more docs than page size.""" + store, mock_client, _ = _make_store(source_id="src123") + + # Simulate two pages: first returns full page, second returns partial + from application.vectorstore.valkey import _SCAN_PAGE_SIZE + + page1_entries = [ + {f"doc:id{i}".encode(): {b"content": b"text"}} for i in range(_SCAN_PAGE_SIZE) + ] + page1_response = [_SCAN_PAGE_SIZE] + page1_entries + + page2_entries = [ + {b"doc:extra1": {b"content": b"text"}}, + {b"doc:extra2": {b"content": b"text"}}, + ] + page2_response = [2] + page2_entries + + with patch( + "application.vectorstore.valkey.ft.search", side_effect=[page1_response, page2_response] + ): mock_client.delete = Mock(return_value=1) store.delete_index() - assert mock_client.delete.call_count == 2 + # Should have multiple delete calls due to batching + total_deleted = sum( + len(call[0][0]) for call in mock_client.delete.call_args_list + ) + assert total_deleted == _SCAN_PAGE_SIZE + 2 def test_delete_index_handles_error(self): store, mock_client, _ = _make_store() @@ -208,6 +308,17 @@ def test_get_chunks_returns_empty_on_error(self): with patch("application.vectorstore.valkey.ft.search", side_effect=Exception("fail")): assert store.get_chunks() == [] + def test_get_chunks_uses_return_fields(self): + """Verify get_chunks specifies return_fields to skip embedding blobs.""" + store, mock_client, _ = _make_store() + + with patch("application.vectorstore.valkey.ft.search", return_value=[0]) as mock_ft: + store.get_chunks() + + call_args = mock_ft.call_args + options = call_args[0][3] + assert hasattr(options, "return_fields") + @pytest.mark.unit class TestValkeyStoreAddChunk: @@ -266,3 +377,110 @@ def test_delete_chunk_returns_false_on_error(self): result = store.delete_chunk("uuid-123") assert result is False + + +@pytest.mark.unit +class TestValkeyStoreCreateClient: + """Tests for password handling in _create_client.""" + + def test_password_none_skips_credentials(self): + """When VALKEY_PASSWORD is None, no credentials should be passed.""" + with patch( + "application.vectorstore.base.BaseVectorStore._get_embeddings" + ) as mock_get_emb, patch( + "application.vectorstore.valkey.settings" + ) as mock_settings, patch( + "application.vectorstore.valkey.GlideClientConfiguration" + ) as mock_config_cls, patch( + "application.vectorstore.valkey.GlideClient" + ) as mock_glide_cls, patch( + "application.vectorstore.valkey.ValkeyStore._ensure_index_exists" + ): + mock_emb = Mock() + mock_emb.dimension = 768 + mock_get_emb.return_value = mock_emb + mock_settings.EMBEDDINGS_NAME = "test" + mock_settings.VALKEY_HOST = "localhost" + mock_settings.VALKEY_PORT = 6379 + mock_settings.VALKEY_PASSWORD = None + mock_settings.VALKEY_USE_TLS = False + mock_settings.VALKEY_INDEX_NAME = "docsgpt" + mock_settings.VALKEY_PREFIX = "doc:" + mock_glide_cls.create = Mock(return_value=MagicMock()) + + from application.vectorstore.valkey import ValkeyStore + + ValkeyStore(source_id="test", embeddings_key="key") + + # Verify no credentials kwarg + config_call_kwargs = mock_config_cls.call_args[1] + assert "credentials" not in config_call_kwargs + + def test_empty_string_password_skips_credentials(self): + """When VALKEY_PASSWORD is empty string, no credentials should be passed.""" + with patch( + "application.vectorstore.base.BaseVectorStore._get_embeddings" + ) as mock_get_emb, patch( + "application.vectorstore.valkey.settings" + ) as mock_settings, patch( + "application.vectorstore.valkey.GlideClientConfiguration" + ) as mock_config_cls, patch( + "application.vectorstore.valkey.GlideClient" + ) as mock_glide_cls, patch( + "application.vectorstore.valkey.ValkeyStore._ensure_index_exists" + ): + mock_emb = Mock() + mock_emb.dimension = 768 + mock_get_emb.return_value = mock_emb + mock_settings.EMBEDDINGS_NAME = "test" + mock_settings.VALKEY_HOST = "localhost" + mock_settings.VALKEY_PORT = 6379 + mock_settings.VALKEY_PASSWORD = "" + mock_settings.VALKEY_USE_TLS = False + mock_settings.VALKEY_INDEX_NAME = "docsgpt" + mock_settings.VALKEY_PREFIX = "doc:" + mock_glide_cls.create = Mock(return_value=MagicMock()) + + from application.vectorstore.valkey import ValkeyStore + + ValkeyStore(source_id="test", embeddings_key="key") + + # Verify no credentials kwarg + config_call_kwargs = mock_config_cls.call_args[1] + assert "credentials" not in config_call_kwargs + + def test_non_empty_password_sets_credentials(self): + """When VALKEY_PASSWORD has a value, credentials should be passed.""" + with patch( + "application.vectorstore.base.BaseVectorStore._get_embeddings" + ) as mock_get_emb, patch( + "application.vectorstore.valkey.settings" + ) as mock_settings, patch( + "application.vectorstore.valkey.GlideClientConfiguration" + ) as mock_config_cls, patch( + "application.vectorstore.valkey.GlideClient" + ) as mock_glide_cls, patch( + "application.vectorstore.valkey.ValkeyStore._ensure_index_exists" + ), patch( + "application.vectorstore.valkey.ServerCredentials" + ) as mock_creds: + mock_emb = Mock() + mock_emb.dimension = 768 + mock_get_emb.return_value = mock_emb + mock_settings.EMBEDDINGS_NAME = "test" + mock_settings.VALKEY_HOST = "localhost" + mock_settings.VALKEY_PORT = 6379 + mock_settings.VALKEY_PASSWORD = "secret123" + mock_settings.VALKEY_USE_TLS = False + mock_settings.VALKEY_INDEX_NAME = "docsgpt" + mock_settings.VALKEY_PREFIX = "doc:" + mock_glide_cls.create = Mock(return_value=MagicMock()) + + from application.vectorstore.valkey import ValkeyStore + + ValkeyStore(source_id="test", embeddings_key="key") + + # Verify credentials kwarg was passed + config_call_kwargs = mock_config_cls.call_args[1] + assert "credentials" in config_call_kwargs + mock_creds.assert_called_once_with(password="secret123") From c74637d7bcb460a36440f84d1cc7ce176c466395 Mon Sep 17 00:00:00 2001 From: Daria Korenieva Date: Wed, 13 May 2026 09:48:00 -0700 Subject: [PATCH 3/8] feat: make Valkey schema configurable and escape pipe in tags - Add VALKEY_DISTANCE_METRIC (cosine/l2/ip), VALKEY_VECTOR_TYPE (float32), and VALKEY_VECTOR_ALGORITHM (hnsw/flat) settings with safe defaults - Log chosen config at index creation time - Fall back to defaults with a warning if unrecognized values are provided - Add '|' to tag escape character set (was missing) - Support FLAT vector algorithm as alternative to HNSW - Add unit tests for all resolver methods and pipe escaping - Update .env-template and integration test fixture Signed-off-by: Daria Korenieva --- .env-template | 3 + application/core/settings.py | 3 + application/vectorstore/valkey.py | 115 +++++++++++++++++-- tests/vectorstore/test_valkey.py | 81 +++++++++++++ tests/vectorstore/test_valkey_integration.py | 3 + 5 files changed, 196 insertions(+), 9 deletions(-) diff --git a/.env-template b/.env-template index 9827efeae..68142336b 100644 --- a/.env-template +++ b/.env-template @@ -45,3 +45,6 @@ MICROSOFT_AUTHORITY=https://{tenantId}.ciamlogin.com/{tenantId} # VALKEY_USE_TLS=false # VALKEY_INDEX_NAME=docsgpt # VALKEY_PREFIX=doc: +# VALKEY_DISTANCE_METRIC=cosine # cosine, l2, or ip +# VALKEY_VECTOR_TYPE=float32 # float32 (only option in valkey-glide-sync 2.x) +# VALKEY_VECTOR_ALGORITHM=hnsw # hnsw or flat diff --git a/application/core/settings.py b/application/core/settings.py index 253784491..ac10c5f41 100644 --- a/application/core/settings.py +++ b/application/core/settings.py @@ -172,6 +172,9 @@ class Settings(BaseSettings): VALKEY_USE_TLS: bool = False VALKEY_INDEX_NAME: str = "docsgpt" VALKEY_PREFIX: str = "doc:" + VALKEY_DISTANCE_METRIC: str = "cosine" # "cosine", "l2", or "ip" + VALKEY_VECTOR_TYPE: str = "float32" # "float32" (only option in valkey-glide-sync 2.x) + VALKEY_VECTOR_ALGORITHM: str = "hnsw" # "hnsw" or "flat" FLASK_DEBUG_MODE: bool = False STORAGE_TYPE: str = "local" # local or s3 diff --git a/application/vectorstore/valkey.py b/application/vectorstore/valkey.py index bc50d0eb8..2a87a847c 100644 --- a/application/vectorstore/valkey.py +++ b/application/vectorstore/valkey.py @@ -23,6 +23,7 @@ TextField, VectorAlgorithm, VectorField, + VectorFieldAttributesFlat, VectorFieldAttributesHnsw, VectorType, ft, @@ -40,7 +41,7 @@ logger = logging.getLogger(__name__) # Characters that must be escaped in Valkey tag field query values. -_TAG_SPECIAL_CHARS = set(r".,<>{}[]\"':;!@#$%^&*()-+=~ /") +_TAG_SPECIAL_CHARS = set(r".,<>{}[]\"':;!@#$%^&*()-+=~ /|") # Batch size for DELETE operations in delete_index. _DELETE_BATCH_SIZE = 100 @@ -104,21 +105,48 @@ def _create_client(self) -> GlideClient: return GlideClient.create(config) def _ensure_index_exists(self): - """Create the search index if it does not already exist.""" + """Create the search index if it does not already exist. + + Uses VALKEY_DISTANCE_METRIC, VALKEY_VECTOR_TYPE, and VALKEY_VECTOR_ALGORITHM + from settings. Falls back to cosine/float32/hnsw if values are unrecognized. + """ embedding_dim = getattr(self._embedding, "dimension", 768) - schema: List[Field] = [ - TextField("content"), - TagField("source_id"), - VectorField( + distance_metric = self._resolve_distance_metric(settings.VALKEY_DISTANCE_METRIC) + vector_type = self._resolve_vector_type(settings.VALKEY_VECTOR_TYPE) + algorithm = self._resolve_vector_algorithm(settings.VALKEY_VECTOR_ALGORITHM) + + logger.info( + f"Valkey index config: algorithm={algorithm.name}, " + f"distance_metric={distance_metric.name}, vector_type={vector_type.name}, " + f"dimensions={embedding_dim}" + ) + + if algorithm == VectorAlgorithm.HNSW: + vector_field = VectorField( name="embedding", algorithm=VectorAlgorithm.HNSW, attributes=VectorFieldAttributesHnsw( dimensions=embedding_dim, - distance_metric=DistanceMetricType.COSINE, - type=VectorType.FLOAT32, + distance_metric=distance_metric, + type=vector_type, + ), + ) + else: + vector_field = VectorField( + name="embedding", + algorithm=VectorAlgorithm.FLAT, + attributes=VectorFieldAttributesFlat( + dimensions=embedding_dim, + distance_metric=distance_metric, + type=vector_type, ), - ), + ) + + schema: List[Field] = [ + TextField("content"), + TagField("source_id"), + vector_field, ] options = FtCreateOptions(data_type=DataType.HASH, prefixes=[self._prefix]) @@ -133,6 +161,75 @@ def _ensure_index_exists(self): logger.error(f"Error creating Valkey index: {e}") raise + @staticmethod + def _resolve_distance_metric(value: str) -> DistanceMetricType: + """Resolve distance metric string to enum, defaulting to COSINE. + + Args: + value: One of "cosine", "l2", or "ip". + + Returns: + The corresponding DistanceMetricType enum value. + """ + mapping = { + "cosine": DistanceMetricType.COSINE, + "l2": DistanceMetricType.L2, + "ip": DistanceMetricType.IP, + } + result = mapping.get(value.lower().strip()) + if result is None: + logger.warning( + f"Unrecognized VALKEY_DISTANCE_METRIC='{value}', " + f"falling back to 'cosine'. Valid options: cosine, l2, ip" + ) + return DistanceMetricType.COSINE + return result + + @staticmethod + def _resolve_vector_type(value: str) -> VectorType: + """Resolve vector type string to enum, defaulting to FLOAT32. + + Args: + value: Currently only "float32" is supported by valkey-glide-sync. + + Returns: + The corresponding VectorType enum value. + """ + mapping = { + "float32": VectorType.FLOAT32, + } + result = mapping.get(value.lower().strip()) + if result is None: + logger.warning( + f"Unrecognized VALKEY_VECTOR_TYPE='{value}', " + f"falling back to 'float32'. Valid options: float32" + ) + return VectorType.FLOAT32 + return result + + @staticmethod + def _resolve_vector_algorithm(value: str) -> VectorAlgorithm: + """Resolve vector algorithm string to enum, defaulting to HNSW. + + Args: + value: One of "hnsw" or "flat". + + Returns: + The corresponding VectorAlgorithm enum value. + """ + mapping = { + "hnsw": VectorAlgorithm.HNSW, + "flat": VectorAlgorithm.FLAT, + } + result = mapping.get(value.lower().strip()) + if result is None: + logger.warning( + f"Unrecognized VALKEY_VECTOR_ALGORITHM='{value}', " + f"falling back to 'hnsw'. Valid options: hnsw, flat" + ) + return VectorAlgorithm.HNSW + return result + @staticmethod def _escape_tag_value(value: str) -> str: """Escape special characters for Valkey tag field queries. diff --git a/tests/vectorstore/test_valkey.py b/tests/vectorstore/test_valkey.py index b5fdf2d72..605029ebe 100644 --- a/tests/vectorstore/test_valkey.py +++ b/tests/vectorstore/test_valkey.py @@ -81,6 +81,11 @@ def test_escape_spaces(self): assert ValkeyStore._escape_tag_value("my source") == r"my\ source" + def test_escape_pipe(self): + from application.vectorstore.valkey import ValkeyStore + + assert ValkeyStore._escape_tag_value("a|b") == r"a\|b" + def test_no_escape_needed(self): from application.vectorstore.valkey import ValkeyStore @@ -484,3 +489,79 @@ def test_non_empty_password_sets_credentials(self): config_call_kwargs = mock_config_cls.call_args[1] assert "credentials" in config_call_kwargs mock_creds.assert_called_once_with(password="secret123") + + +@pytest.mark.unit +class TestValkeyStoreSchemaConfig: + """Tests for configurable distance metric, vector type, and algorithm.""" + + def test_resolve_distance_metric_cosine(self): + from application.vectorstore.valkey import ValkeyStore + + from glide_sync import DistanceMetricType + + assert ValkeyStore._resolve_distance_metric("cosine") == DistanceMetricType.COSINE + + def test_resolve_distance_metric_l2(self): + from application.vectorstore.valkey import ValkeyStore + + from glide_sync import DistanceMetricType + + assert ValkeyStore._resolve_distance_metric("l2") == DistanceMetricType.L2 + + def test_resolve_distance_metric_ip(self): + from application.vectorstore.valkey import ValkeyStore + + from glide_sync import DistanceMetricType + + assert ValkeyStore._resolve_distance_metric("ip") == DistanceMetricType.IP + + def test_resolve_distance_metric_invalid_falls_back(self): + from application.vectorstore.valkey import ValkeyStore + + from glide_sync import DistanceMetricType + + assert ValkeyStore._resolve_distance_metric("invalid") == DistanceMetricType.COSINE + + def test_resolve_distance_metric_case_insensitive(self): + from application.vectorstore.valkey import ValkeyStore + + from glide_sync import DistanceMetricType + + assert ValkeyStore._resolve_distance_metric("COSINE") == DistanceMetricType.COSINE + assert ValkeyStore._resolve_distance_metric("L2") == DistanceMetricType.L2 + + def test_resolve_vector_type_float32(self): + from application.vectorstore.valkey import ValkeyStore + + from glide_sync import VectorType + + assert ValkeyStore._resolve_vector_type("float32") == VectorType.FLOAT32 + + def test_resolve_vector_type_invalid_falls_back(self): + from application.vectorstore.valkey import ValkeyStore + + from glide_sync import VectorType + + assert ValkeyStore._resolve_vector_type("float64") == VectorType.FLOAT32 + + def test_resolve_vector_algorithm_hnsw(self): + from application.vectorstore.valkey import ValkeyStore + + from glide_sync import VectorAlgorithm + + assert ValkeyStore._resolve_vector_algorithm("hnsw") == VectorAlgorithm.HNSW + + def test_resolve_vector_algorithm_flat(self): + from application.vectorstore.valkey import ValkeyStore + + from glide_sync import VectorAlgorithm + + assert ValkeyStore._resolve_vector_algorithm("flat") == VectorAlgorithm.FLAT + + def test_resolve_vector_algorithm_invalid_falls_back(self): + from application.vectorstore.valkey import ValkeyStore + + from glide_sync import VectorAlgorithm + + assert ValkeyStore._resolve_vector_algorithm("annoy") == VectorAlgorithm.HNSW diff --git a/tests/vectorstore/test_valkey_integration.py b/tests/vectorstore/test_valkey_integration.py index dd82a9ab0..0faceb6c7 100644 --- a/tests/vectorstore/test_valkey_integration.py +++ b/tests/vectorstore/test_valkey_integration.py @@ -76,6 +76,9 @@ def valkey_store(): mock_settings.VALKEY_USE_TLS = False mock_settings.VALKEY_INDEX_NAME = index_name mock_settings.VALKEY_PREFIX = f"test:{source_id}:" + mock_settings.VALKEY_DISTANCE_METRIC = "cosine" + mock_settings.VALKEY_VECTOR_TYPE = "float32" + mock_settings.VALKEY_VECTOR_ALGORITHM = "hnsw" from application.vectorstore.valkey import ValkeyStore From f78ae3c58d3eade371dc3710ce6683fcb8aac16a Mon Sep 17 00:00:00 2001 From: Daria Korenieva Date: Wed, 13 May 2026 10:09:07 -0700 Subject: [PATCH 4/8] =?UTF-8?q?fix:=20add=20close(),=20minimize=20scan=20p?= =?UTF-8?q?ayload,=20harden=20index=20check,=20add=20FLAT=20tests=20-=20Ad?= =?UTF-8?q?d=20close()=20method=20and=20=5F=5Fdel=5F=5F=20to=20release=20G?= =?UTF-8?q?lideClient=20TCP=20connection=20-=20=5Fpaginated=5Fsource=5Fsca?= =?UTF-8?q?n=20now=20uses=20ReturnField('source=5Fid')=20to=20avoid=20fetc?= =?UTF-8?q?hing=20full=20document=20content=20(only=20key=20names=20needed?= =?UTF-8?q?=20for=20deletion)=20-=20=5Fensure=5Findex=5Fexists=20widens=20?= =?UTF-8?q?error=20matching=20to=20also=20catch=20'index=20already'=20phra?= =?UTF-8?q?sing,=20reducing=20brittleness=20on=20different=20Valkey=20vers?= =?UTF-8?q?ions=20-=20Add=206=20new=20unit=20tests:=20close()=20lifecycle?= =?UTF-8?q?=20(3),=20FLAT=20algorithm=20path=20(1),=20already-exists=20han?= =?UTF-8?q?dling=20(1),=20unknown=20error=20re-raise=20(1)=20-=20Skipped?= =?UTF-8?q?=20lazy-import=20refactor=20=E2=80=94=20imports=20remain=20at?= =?UTF-8?q?=20module=20top=20per=20project=20style?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Daria Korenieva --- application/vectorstore/valkey.py | 28 +++++- tests/vectorstore/test_valkey.py | 145 ++++++++++++++++++++++++++++++ 2 files changed, 171 insertions(+), 2 deletions(-) diff --git a/application/vectorstore/valkey.py b/application/vectorstore/valkey.py index 2a87a847c..8dd2b0532 100644 --- a/application/vectorstore/valkey.py +++ b/application/vectorstore/valkey.py @@ -80,6 +80,23 @@ def __init__( self._client = self._create_client() self._ensure_index_exists() + def close(self): + """Close the underlying Valkey client connection. + + Should be called when the store is no longer needed to release + the TCP connection held by the GLIDE client. + """ + if self._client is not None: + try: + self._client.close() + except Exception as e: + logger.debug(f"Error closing Valkey client: {e}") + self._client = None + + def __del__(self): + """Best-effort cleanup on garbage collection.""" + self.close() + def _create_client(self) -> GlideClient: """Create and return a synchronous Valkey GLIDE client. @@ -155,7 +172,8 @@ def _ensure_index_exists(self): ft.create(self._client, self._index_name, schema, options) logger.info(f"Created Valkey search index '{self._index_name}'") except Exception as e: - if "already exists" in str(e).lower(): + error_msg = str(e).lower() + if "already exists" in error_msg or "index already" in error_msg: logger.debug(f"Valkey index '{self._index_name}' already exists") else: logger.error(f"Error creating Valkey index: {e}") @@ -415,6 +433,9 @@ def add_texts( def _paginated_source_scan(self) -> List[str]: """Scan all keys matching this source_id, handling pagination. + Uses a minimal return field to avoid fetching full document content — + only the key names are needed for deletion. + Returns: List of key strings for all documents with this source_id. """ @@ -428,7 +449,10 @@ def _paginated_source_scan(self) -> List[str]: self._client, self._index_name, query, - FtSearchOptions(limit=FtSearchLimit(offset, _SCAN_PAGE_SIZE)), + FtSearchOptions( + limit=FtSearchLimit(offset, _SCAN_PAGE_SIZE), + return_fields=[ReturnField("source_id")], + ), ) if not results or len(results) < 2: diff --git a/tests/vectorstore/test_valkey.py b/tests/vectorstore/test_valkey.py index 605029ebe..53d6ccdfd 100644 --- a/tests/vectorstore/test_valkey.py +++ b/tests/vectorstore/test_valkey.py @@ -565,3 +565,148 @@ def test_resolve_vector_algorithm_invalid_falls_back(self): from glide_sync import VectorAlgorithm assert ValkeyStore._resolve_vector_algorithm("annoy") == VectorAlgorithm.HNSW + + +@pytest.mark.unit +class TestValkeyStoreClose: + """Tests for client resource cleanup.""" + + def test_close_calls_client_close(self): + store, mock_client, _ = _make_store() + + store.close() + + mock_client.close.assert_called_once() + assert store._client is None + + def test_close_is_idempotent(self): + store, mock_client, _ = _make_store() + + store.close() + store.close() # second call should not raise + + mock_client.close.assert_called_once() + + def test_close_handles_exception(self): + store, mock_client, _ = _make_store() + mock_client.close = Mock(side_effect=Exception("already closed")) + + # Should not raise + store.close() + assert store._client is None + + +@pytest.mark.unit +class TestValkeyStoreEnsureIndex: + """Tests for _ensure_index_exists with FLAT algorithm path.""" + + def test_ensure_index_flat_algorithm(self): + """Verify FLAT algorithm path creates index without error.""" + with patch( + "application.vectorstore.base.BaseVectorStore._get_embeddings" + ) as mock_get_emb, patch( + "application.vectorstore.valkey.settings" + ) as mock_settings, patch( + "application.vectorstore.valkey.ValkeyStore._create_client" + ) as mock_create_client, patch( + "application.vectorstore.valkey.ft" + ) as mock_ft: + mock_emb = Mock() + mock_emb.dimension = 128 + mock_get_emb.return_value = mock_emb + + mock_settings.EMBEDDINGS_NAME = "test" + mock_settings.VALKEY_HOST = "localhost" + mock_settings.VALKEY_PORT = 6379 + mock_settings.VALKEY_PASSWORD = None + mock_settings.VALKEY_USE_TLS = False + mock_settings.VALKEY_INDEX_NAME = "test_idx" + mock_settings.VALKEY_PREFIX = "doc:" + mock_settings.VALKEY_DISTANCE_METRIC = "l2" + mock_settings.VALKEY_VECTOR_TYPE = "float32" + mock_settings.VALKEY_VECTOR_ALGORITHM = "flat" + + mock_client = MagicMock() + mock_create_client.return_value = mock_client + + from application.vectorstore.valkey import ValkeyStore + + # This should not raise — exercises the FLAT branch + ValkeyStore(source_id="test", embeddings_key="key") + + # Verify ft.create was called with the schema + mock_ft.create.assert_called_once() + + def test_ensure_index_already_exists_is_silent(self): + """Verify 'already exists' error is handled gracefully.""" + with patch( + "application.vectorstore.base.BaseVectorStore._get_embeddings" + ) as mock_get_emb, patch( + "application.vectorstore.valkey.settings" + ) as mock_settings, patch( + "application.vectorstore.valkey.ValkeyStore._create_client" + ) as mock_create_client, patch( + "application.vectorstore.valkey.ft" + ) as mock_ft: + mock_emb = Mock() + mock_emb.dimension = 768 + mock_get_emb.return_value = mock_emb + + mock_settings.EMBEDDINGS_NAME = "test" + mock_settings.VALKEY_HOST = "localhost" + mock_settings.VALKEY_PORT = 6379 + mock_settings.VALKEY_PASSWORD = None + mock_settings.VALKEY_USE_TLS = False + mock_settings.VALKEY_INDEX_NAME = "test_idx" + mock_settings.VALKEY_PREFIX = "doc:" + mock_settings.VALKEY_DISTANCE_METRIC = "cosine" + mock_settings.VALKEY_VECTOR_TYPE = "float32" + mock_settings.VALKEY_VECTOR_ALGORITHM = "hnsw" + + mock_client = MagicMock() + mock_create_client.return_value = mock_client + + # Simulate "Index already exists" error + mock_ft.create.side_effect = Exception("Index already exists") + + from application.vectorstore.valkey import ValkeyStore + + # Should not raise + store = ValkeyStore(source_id="test", embeddings_key="key") + assert store is not None + + def test_ensure_index_unknown_error_raises(self): + """Verify non-'already exists' errors are re-raised.""" + with patch( + "application.vectorstore.base.BaseVectorStore._get_embeddings" + ) as mock_get_emb, patch( + "application.vectorstore.valkey.settings" + ) as mock_settings, patch( + "application.vectorstore.valkey.ValkeyStore._create_client" + ) as mock_create_client, patch( + "application.vectorstore.valkey.ft" + ) as mock_ft: + mock_emb = Mock() + mock_emb.dimension = 768 + mock_get_emb.return_value = mock_emb + + mock_settings.EMBEDDINGS_NAME = "test" + mock_settings.VALKEY_HOST = "localhost" + mock_settings.VALKEY_PORT = 6379 + mock_settings.VALKEY_PASSWORD = None + mock_settings.VALKEY_USE_TLS = False + mock_settings.VALKEY_INDEX_NAME = "test_idx" + mock_settings.VALKEY_PREFIX = "doc:" + mock_settings.VALKEY_DISTANCE_METRIC = "cosine" + mock_settings.VALKEY_VECTOR_TYPE = "float32" + mock_settings.VALKEY_VECTOR_ALGORITHM = "hnsw" + + mock_client = MagicMock() + mock_create_client.return_value = mock_client + + mock_ft.create.side_effect = Exception("Connection refused") + + from application.vectorstore.valkey import ValkeyStore + + with pytest.raises(Exception, match="Connection refused"): + ValkeyStore(source_id="test", embeddings_key="key") From ae6ca4ccfaea66556937c4678b4fe14a7bfa13c8 Mon Sep 17 00:00:00 2001 From: Daria Korenieva Date: Wed, 13 May 2026 11:27:52 -0700 Subject: [PATCH 5/8] docs: add Valkey to supported vector stores in docs Update Architecture, Settings, and Postgres Migration pages to list Valkey alongside the other supported vector store backends. Signed-off-by: Daria Korenieva --- docs/content/Deploying/DocsGPT-Settings.mdx | 2 +- docs/content/Deploying/Postgres-Migration.mdx | 2 +- docs/content/Guides/Architecture.mdx | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/content/Deploying/DocsGPT-Settings.mdx b/docs/content/Deploying/DocsGPT-Settings.mdx index 373e56cbe..c60e4d4bc 100644 --- a/docs/content/Deploying/DocsGPT-Settings.mdx +++ b/docs/content/Deploying/DocsGPT-Settings.mdx @@ -362,7 +362,7 @@ gated by CI/CD). These are just the basic settings to get you started. The `settings.py` file contains many more advanced options that you can explore to further customize DocsGPT, such as: -- Vector store configuration (`VECTOR_STORE`, Qdrant, Milvus, LanceDB settings) If you're looking for an easy way to set up a vector store with pgvector, try [Neon](https://get.neon.com/docsgpt). +- Vector store configuration (`VECTOR_STORE`, Qdrant, Milvus, LanceDB, Valkey settings) If you're looking for an easy way to set up a vector store with pgvector, try [Neon](https://get.neon.com/docsgpt). - Retriever settings (`RETRIEVERS_ENABLED`) - Cache settings (`CACHE_REDIS_URL`) - And many more! diff --git a/docs/content/Deploying/Postgres-Migration.mdx b/docs/content/Deploying/Postgres-Migration.mdx index f8a8b1e88..2bbb0072c 100644 --- a/docs/content/Deploying/Postgres-Migration.mdx +++ b/docs/content/Deploying/Postgres-Migration.mdx @@ -13,7 +13,7 @@ required. Vector stores are independent — `VECTOR_STORE` can still be `pgvector`, - `faiss`, `qdrant`, `milvus`, `elasticsearch`, or `mongodb`. + `faiss`, `qdrant`, `milvus`, `elasticsearch`, `mongodb`, or `valkey`. ## Quickstart diff --git a/docs/content/Guides/Architecture.mdx b/docs/content/Guides/Architecture.mdx index c858812c6..44d756c88 100644 --- a/docs/content/Guides/Architecture.mdx +++ b/docs/content/Guides/Architecture.mdx @@ -64,7 +64,7 @@ flowchart LR * **Technology:** Supports multiple vector databases. * **Responsibility:** Vector Stores are used to store and retrieve vector embeddings of document chunks. This enables semantic search and retrieval of relevant document snippets in response to user queries. * **Key Features:** - * Supports vector databases including FAISS, Elasticsearch, Qdrant, Milvus, MongoDB Atlas Vector Search, and pgvector. + * Supports vector databases including FAISS, Elasticsearch, Qdrant, Milvus, MongoDB Atlas Vector Search, pgvector, and Valkey. * Provides storage and indexing of high-dimensional vector embeddings. * Enables editing and updating of vector indexes including specific chunks. From 7c1796d42c6098af7bea1a004922119606e79e89 Mon Sep 17 00:00:00 2001 From: Daria Korenieva Date: Wed, 13 May 2026 12:15:04 -0700 Subject: [PATCH 6/8] Address review feedback: lazy import, dimension probe, batch pipeline, tag escaping, typed exceptions Signed-off-by: Daria Korenieva --- application/vectorstore/valkey.py | 66 ++++++++++++++++++++----------- tests/vectorstore/test_valkey.py | 48 +++++++++++++--------- 2 files changed, 72 insertions(+), 42 deletions(-) diff --git a/application/vectorstore/valkey.py b/application/vectorstore/valkey.py index 8dd2b0532..322eb1769 100644 --- a/application/vectorstore/valkey.py +++ b/application/vectorstore/valkey.py @@ -6,8 +6,11 @@ import uuid from typing import Any, Dict, List, Optional +_GLIDE_AVAILABLE = False try: from glide_sync import ( + Batch, + ConnectionError as GlideConnectionError, DataType, DistanceMetricType, Field, @@ -17,10 +20,12 @@ GlideClient, GlideClientConfiguration, NodeAddress, + RequestError, ReturnField, ServerCredentials, TagField, TextField, + TimeoutError as GlideTimeoutError, VectorAlgorithm, VectorField, VectorFieldAttributesFlat, @@ -28,20 +33,20 @@ VectorType, ft, ) + + _GLIDE_AVAILABLE = True except ImportError: - raise ImportError( - "Could not import valkey-glide-sync. " - "Please install with `pip install valkey-glide-sync`." - ) + pass -from application.core.settings import settings -from application.vectorstore.base import BaseVectorStore -from application.vectorstore.document_class import Document +from application.core.settings import settings # noqa: E402 +from application.vectorstore.base import BaseVectorStore # noqa: E402 +from application.vectorstore.document_class import Document # noqa: E402 logger = logging.getLogger(__name__) # Characters that must be escaped in Valkey tag field query values. -_TAG_SPECIAL_CHARS = set(r".,<>{}[]\"':;!@#$%^&*()-+=~ /|") +# Includes '?' which is a single-character wildcard in valkey-search TAG queries. +_TAG_SPECIAL_CHARS = set(r".,<>{}[]\"':;!@#$%^&*()-+=~ /|?") # Batch size for DELETE operations in delete_index. _DELETE_BATCH_SIZE = 100 @@ -70,7 +75,15 @@ def __init__( source_id: Identifier for the document source, used to namespace and filter documents. embeddings_key: Key name or API key for the embeddings provider. + + Raises: + ImportError: If valkey-glide-sync is not installed. """ + if not _GLIDE_AVAILABLE: + raise ImportError( + "Could not import valkey-glide-sync. " + "Please install with `pip install valkey-glide-sync`." + ) super().__init__() self._source_id = str(source_id).replace("application/indexes/", "").rstrip("/") self._embedding = self._get_embeddings(settings.EMBEDDINGS_NAME, embeddings_key) @@ -127,7 +140,10 @@ def _ensure_index_exists(self): Uses VALKEY_DISTANCE_METRIC, VALKEY_VECTOR_TYPE, and VALKEY_VECTOR_ALGORITHM from settings. Falls back to cosine/float32/hnsw if values are unrecognized. """ - embedding_dim = getattr(self._embedding, "dimension", 768) + embedding_dim = getattr(self._embedding, "dimension", None) + if embedding_dim is None: + probe = self._embedding.embed_query("dimension probe") + embedding_dim = len(probe) distance_metric = self._resolve_distance_metric(settings.VALKEY_DISTANCE_METRIC) vector_type = self._resolve_vector_type(settings.VALKEY_VECTOR_TYPE) @@ -310,7 +326,7 @@ def search(self, question: str, k: int = 2, *args, **kwargs) -> List[Document]: return self._parse_search_results(results) - except Exception as e: + except (RequestError, GlideConnectionError, GlideTimeoutError) as e: logger.error(f"Error searching Valkey: {e}", exc_info=True) return [] @@ -406,6 +422,8 @@ def add_texts( metadatas = metadatas or [{}] * len(texts) doc_ids: List[str] = [] + # Use non-atomic Batch (pipeline) to reduce network round trips. + batch = Batch(False) for text, embedding, metadata in zip(texts, embeddings, metadatas): doc_id = str(uuid.uuid4()) key = self._doc_key(doc_id) @@ -418,15 +436,17 @@ def add_texts( "embedding": vector_bytes, } - try: - self._client.hset(key, fields) - doc_ids.append(doc_id) - except Exception as e: - logger.error( - f"Error adding document to Valkey (wrote {len(doc_ids)}/{len(texts)} " - f"before failure): {e}" - ) - raise + batch.hset(key, fields) + doc_ids.append(doc_id) + + try: + self._client.exec(batch, raise_on_error=True) + except (RequestError, GlideConnectionError, GlideTimeoutError) as e: + logger.error( + f"Error adding documents to Valkey via pipeline " + f"({len(doc_ids)} documents): {e}" + ) + raise return doc_ids @@ -488,7 +508,7 @@ def delete_index(self, *args, **kwargs): batch = keys[i : i + _DELETE_BATCH_SIZE] self._client.delete(batch) - except Exception as e: + except (RequestError, GlideConnectionError, GlideTimeoutError) as e: logger.error(f"Error deleting index from Valkey: {e}", exc_info=True) def save_local(self, *args, **kwargs): @@ -550,7 +570,7 @@ def get_chunks(self) -> List[Dict[str, Any]]: return chunks - except Exception as e: + except (RequestError, GlideConnectionError, GlideTimeoutError) as e: logger.error(f"Error getting chunks from Valkey: {e}", exc_info=True) return [] @@ -586,7 +606,7 @@ def add_chunk(self, text: str, metadata: Optional[Dict[str, Any]] = None) -> str try: self._client.hset(key, fields) return doc_id - except Exception as e: + except (RequestError, GlideConnectionError, GlideTimeoutError) as e: logger.error(f"Error adding chunk to Valkey: {e}") raise @@ -603,6 +623,6 @@ def delete_chunk(self, chunk_id: str) -> bool: key = self._doc_key(chunk_id) result = self._client.delete([key]) return result > 0 - except Exception as e: + except (RequestError, GlideConnectionError, GlideTimeoutError) as e: logger.error(f"Error deleting chunk from Valkey: {e}", exc_info=True) return False diff --git a/tests/vectorstore/test_valkey.py b/tests/vectorstore/test_valkey.py index 53d6ccdfd..3241bf51d 100644 --- a/tests/vectorstore/test_valkey.py +++ b/tests/vectorstore/test_valkey.py @@ -5,6 +5,8 @@ import pytest +from glide_sync import RequestError + def _make_store(source_id="test-source", embeddings_key="key"): """Helper to create a ValkeyStore with all external deps mocked.""" @@ -136,7 +138,7 @@ def test_search_returns_documents(self): def test_search_returns_empty_on_error(self): store, mock_client, _ = _make_store() - with patch("application.vectorstore.valkey.ft.search", side_effect=Exception("connection lost")): + with patch("application.vectorstore.valkey.ft.search", side_effect=RequestError("connection lost")): results = store.search("query") assert results == [] @@ -178,12 +180,12 @@ class TestValkeyStoreAddTexts: def test_add_texts_returns_ids(self): store, mock_client, mock_emb = _make_store() mock_emb.embed_documents.return_value = [[0.1, 0.2, 0.3], [0.4, 0.5, 0.6]] - mock_client.hset = Mock(return_value=3) + mock_client.exec = Mock(return_value=["OK", "OK"]) ids = store.add_texts(["text1", "text2"], [{"a": 1}, {"b": 2}]) assert len(ids) == 2 - assert mock_client.hset.call_count == 2 + mock_client.exec.assert_called_once() def test_add_texts_empty_returns_empty(self): store, _, _ = _make_store() @@ -192,7 +194,7 @@ def test_add_texts_empty_returns_empty(self): def test_add_texts_default_metadatas(self): store, mock_client, mock_emb = _make_store() mock_emb.embed_documents.return_value = [[0.1, 0.2, 0.3]] - mock_client.hset = Mock(return_value=3) + mock_client.exec = Mock(return_value=["OK"]) ids = store.add_texts(["text1"]) assert len(ids) == 1 @@ -200,27 +202,35 @@ def test_add_texts_default_metadatas(self): def test_add_texts_raises_on_error(self): store, mock_client, mock_emb = _make_store() mock_emb.embed_documents.return_value = [[0.1, 0.2, 0.3]] - mock_client.hset = Mock(side_effect=Exception("write failed")) + mock_client.exec = Mock(side_effect=RequestError("write failed")) - with pytest.raises(Exception, match="write failed"): + with pytest.raises(RequestError, match="write failed"): store.add_texts(["text1"]) def test_add_texts_stores_correct_fields(self): store, mock_client, mock_emb = _make_store(source_id="src1") mock_emb.embed_documents.return_value = [[0.1, 0.2, 0.3]] - mock_client.hset = Mock(return_value=3) + mock_client.exec = Mock(return_value=["OK"]) - store.add_texts(["hello"], [{"key": "val"}]) + with patch("application.vectorstore.valkey.Batch") as MockBatch: + mock_batch_instance = MagicMock() + MockBatch.return_value = mock_batch_instance - call_args = mock_client.hset.call_args - key = call_args[0][0] - fields = call_args[0][1] + store.add_texts(["hello"], [{"key": "val"}]) + + # Verify the batch was created as non-atomic (pipeline) + MockBatch.assert_called_once_with(False) + + # Verify hset was called on the batch with correct fields + call_args = mock_batch_instance.hset.call_args + key = call_args[0][0] + fields = call_args[0][1] - assert key.startswith("doc:") - assert fields["content"] == "hello" - assert fields["source_id"] == "src1" - assert json.loads(fields["metadata"]) == {"key": "val"} - assert isinstance(fields["embedding"], bytes) + assert key.startswith("doc:") + assert fields["content"] == "hello" + assert fields["source_id"] == "src1" + assert json.loads(fields["metadata"]) == {"key": "val"} + assert isinstance(fields["embedding"], bytes) @pytest.mark.unit @@ -277,7 +287,7 @@ def test_delete_index_paginates_large_sets(self): def test_delete_index_handles_error(self): store, mock_client, _ = _make_store() - with patch("application.vectorstore.valkey.ft.search", side_effect=Exception("fail")): + with patch("application.vectorstore.valkey.ft.search", side_effect=RequestError("fail")): # Should not raise store.delete_index() @@ -310,7 +320,7 @@ def test_get_chunks(self): def test_get_chunks_returns_empty_on_error(self): store, mock_client, _ = _make_store() - with patch("application.vectorstore.valkey.ft.search", side_effect=Exception("fail")): + with patch("application.vectorstore.valkey.ft.search", side_effect=RequestError("fail")): assert store.get_chunks() == [] def test_get_chunks_uses_return_fields(self): @@ -378,7 +388,7 @@ def test_delete_chunk_not_found(self): def test_delete_chunk_returns_false_on_error(self): store, mock_client, _ = _make_store() - mock_client.delete = Mock(side_effect=Exception("fail")) + mock_client.delete = Mock(side_effect=RequestError("fail")) result = store.delete_chunk("uuid-123") assert result is False From 6b6a76ff8a53deb5003c92968a424e55aff3e586 Mon Sep 17 00:00:00 2001 From: Daria Korenieva Date: Fri, 15 May 2026 11:41:57 -0700 Subject: [PATCH 7/8] Address review feedback: harden valkey store - Document eager import guard in vector_creator.py (fragility concern) - Catch RequestError instead of bare Exception in _ensure_index_exists - Re-raise exceptions in delete_index so callers can handle failures - Extract shared _paginated_search generator with max-iterations guard - Add __enter__/__exit__ for deterministic connection cleanup - Cap search k to [1, 100] to prevent memory exhaustion - Make request_timeout configurable via VALKEY_REQUEST_TIMEOUT setting - Refactor integration test fixture to use real Settings instance - Update tests to match new behavior, add context manager + k bounds tests All 63 tests pass (56 unit + 7 integration). Signed-off-by: Daria Korenieva --- application/core/settings.py | 1 + application/vectorstore/valkey.py | 215 ++++++++++--------- application/vectorstore/vector_creator.py | 4 + tests/vectorstore/test_valkey.py | 41 +++- tests/vectorstore/test_valkey_integration.py | 44 ++-- 5 files changed, 181 insertions(+), 124 deletions(-) diff --git a/application/core/settings.py b/application/core/settings.py index ac10c5f41..a719e9aa8 100644 --- a/application/core/settings.py +++ b/application/core/settings.py @@ -172,6 +172,7 @@ class Settings(BaseSettings): VALKEY_USE_TLS: bool = False VALKEY_INDEX_NAME: str = "docsgpt" VALKEY_PREFIX: str = "doc:" + VALKEY_REQUEST_TIMEOUT: int = 5000 # milliseconds VALKEY_DISTANCE_METRIC: str = "cosine" # "cosine", "l2", or "ip" VALKEY_VECTOR_TYPE: str = "float32" # "float32" (only option in valkey-glide-sync 2.x) VALKEY_VECTOR_ALGORITHM: str = "hnsw" # "hnsw" or "flat" diff --git a/application/vectorstore/valkey.py b/application/vectorstore/valkey.py index 322eb1769..55e7382fa 100644 --- a/application/vectorstore/valkey.py +++ b/application/vectorstore/valkey.py @@ -1,10 +1,16 @@ -"""Valkey vector store implementation using valkey-glide-sync and valkey-search module.""" +"""Valkey vector store implementation using valkey-glide-sync and valkey-search module. + +NOTE: The try/except ImportError guard around the glide_sync import below is +**required** by ``application/vectorstore/vector_creator.py`` which eagerly +imports all vector store modules at module level. Without this guard, a missing +``valkey-glide-sync`` package would break VectorCreator for *all* backends. +""" import json import logging import struct import uuid -from typing import Any, Dict, List, Optional +from typing import Any, Dict, Generator, List, Optional, Tuple _GLIDE_AVAILABLE = False try: @@ -54,6 +60,12 @@ # Page size for paginated scan in delete_index / get_chunks. _SCAN_PAGE_SIZE = 10000 +# Safety limit to prevent infinite pagination loops (supports ~10M documents). +_MAX_SCAN_PAGES = 1000 + +# Maximum allowed k for vector search to prevent memory exhaustion. +_MAX_SEARCH_K = 100 + class ValkeyStore(BaseVectorStore): """Vector store backed by Valkey with the valkey-search module. @@ -62,6 +74,11 @@ class ValkeyStore(BaseVectorStore): Creates a search index with FT.CREATE for KNN vector similarity search. Requires a Valkey server with the valkey-search module loaded. + + Supports use as a context manager for deterministic connection cleanup:: + + with ValkeyStore(source_id="my-source") as store: + store.search("query") """ def __init__( @@ -93,6 +110,19 @@ def __init__( self._client = self._create_client() self._ensure_index_exists() + # --- Context manager support --- + + def __enter__(self): + """Enter the context manager.""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Exit the context manager and close the connection.""" + self.close() + return False + + # --- Connection lifecycle --- + def close(self): """Close the underlying Valkey client connection. @@ -117,19 +147,20 @@ def _create_client(self) -> GlideClient: A connected GlideClient instance (synchronous). """ addresses = [NodeAddress(host=settings.VALKEY_HOST, port=settings.VALKEY_PORT)] + timeout = settings.VALKEY_REQUEST_TIMEOUT if settings.VALKEY_PASSWORD is not None and settings.VALKEY_PASSWORD != "": config = GlideClientConfiguration( addresses=addresses, use_tls=settings.VALKEY_USE_TLS, credentials=ServerCredentials(password=settings.VALKEY_PASSWORD), - request_timeout=5000, + request_timeout=timeout, ) else: config = GlideClientConfiguration( addresses=addresses, use_tls=settings.VALKEY_USE_TLS, - request_timeout=5000, + request_timeout=timeout, ) return GlideClient.create(config) @@ -187,12 +218,11 @@ def _ensure_index_exists(self): try: ft.create(self._client, self._index_name, schema, options) logger.info(f"Created Valkey search index '{self._index_name}'") - except Exception as e: + except RequestError as e: error_msg = str(e).lower() if "already exists" in error_msg or "index already" in error_msg: logger.debug(f"Valkey index '{self._index_name}' already exists") else: - logger.error(f"Error creating Valkey index: {e}") raise @staticmethod @@ -293,16 +323,63 @@ def _doc_key(self, doc_id: str) -> str: """ return f"{self._prefix}{doc_id}" + # --- Shared pagination helper --- + + def _paginated_search( + self, query: str, return_fields: List[ReturnField] + ) -> Generator[Tuple[str, Dict[str, Any]], None, None]: + """Yield (key_str, field_dict) tuples across all pages. + + Handles pagination with a safety limit of _MAX_SCAN_PAGES iterations + to prevent infinite loops from concurrent inserts. + + Args: + query: The ft.search query string. + return_fields: Fields to return from each document. + + Yields: + Tuples of (key_string, field_dictionary) for each matched document. + """ + offset = 0 + for _ in range(_MAX_SCAN_PAGES): + results = ft.search( + self._client, + self._index_name, + query, + FtSearchOptions( + limit=FtSearchLimit(offset, _SCAN_PAGE_SIZE), + return_fields=return_fields, + ), + ) + + if not results or len(results) < 2: + break + + page_count = 0 + for entry in results[1:]: + if isinstance(entry, dict): + for key, fields in entry.items(): + key_str = key.decode("utf-8") if isinstance(key, bytes) else str(key) + yield key_str, fields + page_count += 1 + + if page_count < _SCAN_PAGE_SIZE: + break + offset += _SCAN_PAGE_SIZE + + # --- Public interface --- + def search(self, question: str, k: int = 2, *args, **kwargs) -> List[Document]: """Search for similar documents using vector similarity. Args: question: The query text to search for. - k: Number of results to return. + k: Number of results to return (capped at 100). Returns: A list of Document objects sorted by similarity. """ + k = max(1, min(k, _MAX_SEARCH_K)) query_vector = self._embedding.embed_query(question) vector_bytes = struct.pack(f"{len(query_vector)}f", *query_vector) @@ -450,66 +527,29 @@ def add_texts( return doc_ids - def _paginated_source_scan(self) -> List[str]: - """Scan all keys matching this source_id, handling pagination. - - Uses a minimal return field to avoid fetching full document content — - only the key names are needed for deletion. - - Returns: - List of key strings for all documents with this source_id. - """ - all_keys: List[str] = [] - offset = 0 - escaped_source = self._escape_tag_value(self._source_id) - query = f"@source_id:{{{escaped_source}}}" - - while True: - results = ft.search( - self._client, - self._index_name, - query, - FtSearchOptions( - limit=FtSearchLimit(offset, _SCAN_PAGE_SIZE), - return_fields=[ReturnField("source_id")], - ), - ) - - if not results or len(results) < 2: - break - - page_keys: List[str] = [] - for entry in results[1:]: - if isinstance(entry, dict): - for key in entry.keys(): - key_str = key.decode("utf-8") if isinstance(key, bytes) else str(key) - page_keys.append(key_str) - - all_keys.extend(page_keys) - - # If we got fewer results than page size, we've reached the end - if len(page_keys) < _SCAN_PAGE_SIZE: - break - offset += _SCAN_PAGE_SIZE - - return all_keys - def delete_index(self, *args, **kwargs): """Delete all documents for this source_id. Searches for all documents with matching source_id and deletes them in batches. Handles sources with more than 10,000 documents via pagination. + + Raises: + RequestError: If the Valkey server returns an error. + ConnectionError: If the connection to Valkey is lost. + TimeoutError: If the operation exceeds the request timeout. """ - try: - keys = self._paginated_source_scan() + escaped_source = self._escape_tag_value(self._source_id) + query = f"@source_id:{{{escaped_source}}}" - # Batch deletes for efficiency - for i in range(0, len(keys), _DELETE_BATCH_SIZE): - batch = keys[i : i + _DELETE_BATCH_SIZE] - self._client.delete(batch) + keys = [ + key_str + for key_str, _ in self._paginated_search(query, [ReturnField("source_id")]) + ] - except (RequestError, GlideConnectionError, GlideTimeoutError) as e: - logger.error(f"Error deleting index from Valkey: {e}", exc_info=True) + # Batch deletes for efficiency + for i in range(0, len(keys), _DELETE_BATCH_SIZE): + batch = keys[i : i + _DELETE_BATCH_SIZE] + self._client.delete(batch) def save_local(self, *args, **kwargs): """No-op for Valkey — data is already persisted.""" @@ -526,47 +566,24 @@ def get_chunks(self) -> List[Dict[str, Any]]: query = f"@source_id:{{{escaped_source}}}" chunks: List[Dict[str, Any]] = [] - offset = 0 - - while True: - results = ft.search( - self._client, - self._index_name, - query, - FtSearchOptions( - limit=FtSearchLimit(offset, _SCAN_PAGE_SIZE), - return_fields=[ - ReturnField("content"), - ReturnField("source_id"), - ReturnField("metadata"), - ], - ), - ) - if not results or len(results) < 2: - break - - page_count = 0 - for entry in results[1:]: - if isinstance(entry, dict): - for key, fields in entry.items(): - key_str = ( - key.decode("utf-8") if isinstance(key, bytes) else str(key) - ) - doc_id = key_str.replace(self._prefix, "", 1) - field_dict = self._decode_fields(fields) - chunks.append( - { - "doc_id": doc_id, - "text": field_dict.get("content", ""), - "metadata": self._parse_metadata(field_dict), - } - ) - page_count += 1 - - if page_count < _SCAN_PAGE_SIZE: - break - offset += _SCAN_PAGE_SIZE + for key_str, fields in self._paginated_search( + query, + [ + ReturnField("content"), + ReturnField("source_id"), + ReturnField("metadata"), + ], + ): + doc_id = key_str.replace(self._prefix, "", 1) + field_dict = self._decode_fields(fields) + chunks.append( + { + "doc_id": doc_id, + "text": field_dict.get("content", ""), + "metadata": self._parse_metadata(field_dict), + } + ) return chunks diff --git a/application/vectorstore/vector_creator.py b/application/vectorstore/vector_creator.py index e1eab1bcb..830c1f30d 100644 --- a/application/vectorstore/vector_creator.py +++ b/application/vectorstore/vector_creator.py @@ -4,6 +4,10 @@ from application.vectorstore.mongodb import MongoDBVectorStore from application.vectorstore.qdrant import QdrantStore from application.vectorstore.pgvector import PGVectorStore + +# ValkeyStore uses a try/except ImportError guard around its glide_sync +# dependency so that this eager import does NOT break VectorCreator when +# valkey-glide-sync is not installed. Do not remove that guard. from application.vectorstore.valkey import ValkeyStore diff --git a/tests/vectorstore/test_valkey.py b/tests/vectorstore/test_valkey.py index 3241bf51d..593ccdee7 100644 --- a/tests/vectorstore/test_valkey.py +++ b/tests/vectorstore/test_valkey.py @@ -174,9 +174,27 @@ def test_search_passes_return_fields(self): options = call_args[0][3] assert hasattr(options, "return_fields") + def test_search_caps_k_to_max(self): + """Verify k is capped at _MAX_SEARCH_K to prevent memory exhaustion.""" + store, mock_client, _ = _make_store() -@pytest.mark.unit -class TestValkeyStoreAddTexts: + with patch("application.vectorstore.valkey.ft.search", return_value=[0]) as mock_ft: + store.search("query", k=500) + + call_args = mock_ft.call_args + query_str = call_args[0][2] + assert "KNN 100" in query_str + + def test_search_floors_k_to_one(self): + """Verify k is at least 1.""" + store, mock_client, _ = _make_store() + + with patch("application.vectorstore.valkey.ft.search", return_value=[0]) as mock_ft: + store.search("query", k=0) + + call_args = mock_ft.call_args + query_str = call_args[0][2] + assert "KNN 1" in query_str def test_add_texts_returns_ids(self): store, mock_client, mock_emb = _make_store() mock_emb.embed_documents.return_value = [[0.1, 0.2, 0.3], [0.4, 0.5, 0.6]] @@ -284,12 +302,12 @@ def test_delete_index_paginates_large_sets(self): ) assert total_deleted == _SCAN_PAGE_SIZE + 2 - def test_delete_index_handles_error(self): + def test_delete_index_raises_on_error(self): store, mock_client, _ = _make_store() with patch("application.vectorstore.valkey.ft.search", side_effect=RequestError("fail")): - # Should not raise - store.delete_index() + with pytest.raises(RequestError): + store.delete_index() @pytest.mark.unit @@ -605,6 +623,15 @@ def test_close_handles_exception(self): store.close() assert store._client is None + def test_context_manager_calls_close(self): + store, mock_client, _ = _make_store() + + store.__enter__() + store.__exit__(None, None, None) + + mock_client.close.assert_called_once() + assert store._client is None + @pytest.mark.unit class TestValkeyStoreEnsureIndex: @@ -676,8 +703,8 @@ def test_ensure_index_already_exists_is_silent(self): mock_client = MagicMock() mock_create_client.return_value = mock_client - # Simulate "Index already exists" error - mock_ft.create.side_effect = Exception("Index already exists") + # Simulate "Index already exists" error (must be RequestError) + mock_ft.create.side_effect = RequestError("Index already exists") from application.vectorstore.valkey import ValkeyStore diff --git a/tests/vectorstore/test_valkey_integration.py b/tests/vectorstore/test_valkey_integration.py index 0faceb6c7..236bc7e17 100644 --- a/tests/vectorstore/test_valkey_integration.py +++ b/tests/vectorstore/test_valkey_integration.py @@ -56,30 +56,38 @@ def embed_documents(self, texts: list) -> list: @pytest.fixture -def valkey_store(): - """Create a ValkeyStore with a unique source_id for test isolation.""" +def valkey_store(monkeypatch): + """Create a ValkeyStore with a unique source_id for test isolation. + + Uses environment variables to override only test-specific settings (index + name, prefix, connection target). This avoids fully mocking settings, so + new settings never silently break the fixture. + """ from unittest.mock import patch source_id = f"test-{uuid.uuid4().hex[:8]}" index_name = f"test_idx_{uuid.uuid4().hex[:8]}" - with patch( - "application.vectorstore.base.BaseVectorStore._get_embeddings" - ) as mock_get_emb, patch( - "application.vectorstore.valkey.settings" - ) as mock_settings: - mock_get_emb.return_value = FakeEmbeddings() - mock_settings.EMBEDDINGS_NAME = "test_model" - mock_settings.VALKEY_HOST = VALKEY_HOST - mock_settings.VALKEY_PORT = VALKEY_PORT - mock_settings.VALKEY_PASSWORD = None - mock_settings.VALKEY_USE_TLS = False - mock_settings.VALKEY_INDEX_NAME = index_name - mock_settings.VALKEY_PREFIX = f"test:{source_id}:" - mock_settings.VALKEY_DISTANCE_METRIC = "cosine" - mock_settings.VALKEY_VECTOR_TYPE = "float32" - mock_settings.VALKEY_VECTOR_ALGORITHM = "hnsw" + # Override settings via env vars before importing the store + monkeypatch.setenv("VALKEY_HOST", VALKEY_HOST) + monkeypatch.setenv("VALKEY_PORT", str(VALKEY_PORT)) + monkeypatch.setenv("VALKEY_PASSWORD", "") + monkeypatch.setenv("VALKEY_USE_TLS", "False") + monkeypatch.setenv("VALKEY_INDEX_NAME", index_name) + monkeypatch.setenv("VALKEY_PREFIX", f"test:{source_id}:") + + # Reload settings with test env vars + from application.core.settings import Settings + test_settings = Settings() + + with patch( + "application.vectorstore.base.BaseVectorStore._get_embeddings", + return_value=FakeEmbeddings(), + ), patch( + "application.vectorstore.valkey.settings", + test_settings, + ): from application.vectorstore.valkey import ValkeyStore store = ValkeyStore(source_id=source_id, embeddings_key="test") From 27b6f5b441fa141d28476798b1de28889de742d3 Mon Sep 17 00:00:00 2001 From: Daria Korenieva Date: Fri, 29 May 2026 10:10:52 -0700 Subject: [PATCH 8/8] chore: switch to valkey/valkey-bundle:latest (includes search module) Signed-off-by: Daria Korenieva --- .../docsgpt/01-getting-started.md | 124 ++++++++++++ .../docsgpt/02-ingestion-and-retrieval.md | 191 ++++++++++++++++++ tests/vectorstore/test_valkey_integration.py | 2 +- 3 files changed, 316 insertions(+), 1 deletion(-) create mode 100644 cookbooks/framework-integrations/docsgpt/01-getting-started.md create mode 100644 cookbooks/framework-integrations/docsgpt/02-ingestion-and-retrieval.md diff --git a/cookbooks/framework-integrations/docsgpt/01-getting-started.md b/cookbooks/framework-integrations/docsgpt/01-getting-started.md new file mode 100644 index 000000000..a80b4c887 --- /dev/null +++ b/cookbooks/framework-integrations/docsgpt/01-getting-started.md @@ -0,0 +1,124 @@ +# Getting Started with DocsGPT + Valkey + +**Beginner** · Python · ~15 min + +## What is DocsGPT + Valkey? + +[DocsGPT](https://github.com/arc53/DocsGPT) is an open-source AI assistant platform that lets you build intelligent agents with document retrieval (RAG). It supports pluggable vector store backends — FAISS, PostgreSQL, Elasticsearch, Qdrant, Milvus, and now **Valkey**. + +Using Valkey as the vector store gives you: + +* **Sub-millisecond vector search** — HNSW indexing with cosine similarity +* **Source isolation** — each document source gets its own filtered namespace +* **Simple deployment** — single Valkey instance handles both vector search and caching +* **No additional database** — if you already run Valkey for caching or sessions, reuse it for vectors + +## Prerequisites + +* Docker or Podman installed +* Python 3.10+ +* Git + +## Step 1: Start Valkey with Search Module + +```bash +docker run -d --name valkey \ + -p 6379:6379 \ + valkey/valkey-bundle:latest +``` + +Verify the search module is loaded: + +```bash +docker exec valkey valkey-cli MODULE LIST +# Should show "search" in the output +``` + +## Step 2: Clone and Configure DocsGPT + +```bash +git clone https://github.com/arc53/DocsGPT.git +cd DocsGPT +``` + +Create your `.env` file: + +```bash +cp .env-template .env +``` + +Edit `.env` and set: + +```env +VECTOR_STORE=valkey +VALKEY_HOST=localhost +VALKEY_PORT=6379 +``` + +## Step 3: Install Dependencies + +```bash +python -m venv .venv +source .venv/bin/activate +pip install -r application/requirements.txt +``` + +This installs `valkey-glide-sync` which provides the synchronous GLIDE client for Valkey. + +## Step 4: Verify the Connection + +```python +from glide_sync import GlideClient, GlideClientConfiguration, NodeAddress + +config = GlideClientConfiguration( + addresses=[NodeAddress(host="localhost", port=6379)] +) +client = GlideClient.create(config) +print(client.ping()) # b'PONG' +``` + +## Step 5: Run DocsGPT + +Start the backend: + +```bash +flask --app application/app.py run --host=0.0.0.0 --port=7091 +``` + +DocsGPT will automatically create the Valkey search index on first use. You can now ingest documents through the UI or API — they'll be stored as vector embeddings in Valkey. + +## How It Works Under the Hood + +When you ingest a document, DocsGPT: + +1. **Chunks** the document into passages +2. **Embeds** each chunk using the configured embedding model +3. **Stores** each chunk as a Valkey HASH with fields: `content`, `source_id`, `metadata`, `embedding` +4. **Indexes** the embeddings with an HNSW vector index via `FT.CREATE` + +When you ask a question: + +1. The query is embedded into a vector +2. `FT.SEARCH` performs KNN search filtered by `source_id` +3. Top-k results are returned as context for the LLM + +| Operation | Valkey Command | What It Does | +|-----------|---------------|--------------| +| Create index | `FT.CREATE docsgpt ON HASH PREFIX doc: SCHEMA ...` | One-time index setup | +| Store chunk | `HSET doc:{uuid} content "..." source_id "..." embedding ` | Store document with vector | +| Search | `FT.SEARCH docsgpt @source_id:{id} =>[KNN k @embedding $BLOB]` | Vector similarity search | +| Delete source | `FT.SEARCH` + `DEL` per key | Remove all chunks for a source | + +## Configuration Reference + +| Environment Variable | Default | Description | +|---------------------|---------|-------------| +| `VECTOR_STORE` | `faiss` | Set to `valkey` to use Valkey | +| `VALKEY_HOST` | `localhost` | Valkey server hostname | +| `VALKEY_PORT` | `6379` | Valkey server port | +| `VALKEY_PASSWORD` | (none) | Password for authentication | +| `VALKEY_USE_TLS` | `false` | Enable TLS connections | +| `VALKEY_INDEX_NAME` | `docsgpt` | Name of the search index | +| `VALKEY_PREFIX` | `doc:` | Key prefix for document hashes | + +[Next: 02 Document Ingestion & Retrieval →](02-ingestion-and-retrieval.md) diff --git a/cookbooks/framework-integrations/docsgpt/02-ingestion-and-retrieval.md b/cookbooks/framework-integrations/docsgpt/02-ingestion-and-retrieval.md new file mode 100644 index 000000000..3efab4cc0 --- /dev/null +++ b/cookbooks/framework-integrations/docsgpt/02-ingestion-and-retrieval.md @@ -0,0 +1,191 @@ +# Document Ingestion & Retrieval + +**Intermediate** · Python · ~20 min + +## Overview + +This cookbook walks through the DocsGPT + Valkey vector store internals — how documents are chunked, embedded, stored in Valkey as HASH objects with HNSW-indexed vectors, and retrieved via filtered KNN search. You'll use the `ValkeyStore` class directly to understand each step. + +## Prerequisites + +* Completed [01 - Getting Started](01-getting-started.md) +* Valkey running with the search module loaded +* DocsGPT dependencies installed + +## Step 1: Create a ValkeyStore Instance + +```python +import os +os.environ["VECTOR_STORE"] = "valkey" +os.environ["VALKEY_HOST"] = "localhost" +os.environ["VALKEY_PORT"] = "6379" + +from application.vectorstore.valkey import ValkeyStore + +# Each source_id isolates a set of documents +store = ValkeyStore(source_id="my-docs", embeddings_key="embeddings") +``` + +On creation, `ValkeyStore`: +1. Connects to Valkey using the synchronous GLIDE client +2. Creates an HNSW index (if it doesn't exist) with schema: + - `content` — TEXT field (full-text searchable) + - `source_id` — TAG field (exact match filtering) + - `embedding` — VECTOR field (HNSW, cosine distance, FLOAT32) + +## Step 2: Ingest Documents + +### Bulk ingestion with `add_texts` + +```python +texts = [ + "Valkey is a high-performance in-memory data store.", + "Vector search uses HNSW algorithm for approximate nearest neighbors.", + "DocsGPT supports multiple vector store backends including Valkey.", + "The GLIDE client provides both async and sync interfaces for Valkey.", +] + +metadatas = [ + {"source": "valkey-docs.pdf", "page": 1}, + {"source": "valkey-docs.pdf", "page": 5}, + {"source": "docsgpt-readme.md", "page": 1}, + {"source": "glide-docs.md", "page": 1}, +] + +doc_ids = store.add_texts(texts, metadatas) +print(f"Ingested {len(doc_ids)} documents: {doc_ids}") +``` + +Each document is stored as a Valkey HASH: + +``` +HSET doc: content "Valkey is a high..." source_id "my-docs" metadata '{"source":"valkey-docs.pdf","page":1}' embedding <768 floats as bytes> +``` + +### Single chunk with `add_chunk` + +```python +chunk_id = store.add_chunk( + text="Valkey Search supports KNN queries with pre-filtering.", + metadata={"source": "search-guide.md", "section": "queries"}, +) +print(f"Added chunk: {chunk_id}") +``` + +## Step 3: Search by Semantic Similarity + +```python +results = store.search("How does vector search work?", k=3) + +for doc in results: + print(f"Content: {doc.page_content[:80]}...") + print(f"Metadata: {doc.metadata}") + print() +``` + +Under the hood, this: +1. Embeds the query text into a vector +2. Executes: `FT.SEARCH docsgpt "@source_id:{my-docs} =>[KNN 3 @embedding $BLOB AS score]"` +3. Returns documents ranked by cosine similarity + +### Source Isolation + +Each `source_id` is a TAG field filter. Multiple document sources share the same index but never mix in search results: + +```python +# Only searches within "my-docs" source +store_a = ValkeyStore(source_id="project-a") +store_b = ValkeyStore(source_id="project-b") + +# These searches are completely isolated +results_a = store_a.search("deployment guide") +results_b = store_b.search("deployment guide") +``` + +## Step 4: Manage Chunks + +### List all chunks for a source + +```python +chunks = store.get_chunks() +print(f"Total chunks: {len(chunks)}") +for chunk in chunks: + print(f" ID: {chunk['doc_id']}, Text: {chunk['text'][:50]}...") +``` + +### Delete a specific chunk + +```python +success = store.delete_chunk(doc_ids[0]) +print(f"Deleted: {success}") # True +``` + +### Delete all chunks for a source + +```python +store.delete_index() +# All documents with source_id="my-docs" are removed +``` + +## Step 5: Production Deployment with Docker Compose + +```yaml +services: + valkey: + image: valkey/valkey-bundle:latest + ports: + - "6379:6379" + volumes: + - valkey_data:/data + + docsgpt-backend: + build: ./application + environment: + - VECTOR_STORE=valkey + - VALKEY_HOST=valkey + - VALKEY_PORT=6379 + depends_on: + - valkey + +volumes: + valkey_data: +``` + +## Performance Characteristics + +| Operation | Complexity | Typical Latency | +|-----------|-----------|----------------| +| `add_texts` (per doc) | O(log n) HNSW insert | ~1-2ms | +| `search` (KNN) | O(log n) HNSW search | <1ms | +| `delete_chunk` | O(1) key delete | <0.1ms | +| `get_chunks` | O(n) for source | ~1ms per 100 docs | +| `delete_index` | O(n) search + delete | Depends on doc count | + +## Index Configuration + +The default HNSW parameters work well for most use cases. The index is created once and backfilled automatically: + +```python +VectorField( + name="embedding", + algorithm=VectorAlgorithm.HNSW, + attributes=VectorFieldAttributesHnsw( + dimensions=768, # Matches embedding model output + distance_metric=DistanceMetricType.COSINE, + type=VectorType.FLOAT32, + ), +) +``` + +For larger datasets (>1M vectors), consider tuning HNSW parameters like `number_of_edges` and `vectors_examined_on_construction` for your recall/latency tradeoff. + +## Troubleshooting + +| Issue | Cause | Fix | +|-------|-------|-----| +| `ImportError: valkey-glide-sync` | Package not installed | `pip install valkey-glide-sync` | +| `Connection refused` | Valkey not running | Start Valkey container | +| `unknown command FT.CREATE` | Search module not loaded | Add `--loadmodule` flag to Valkey startup | +| Empty search results | Wrong `source_id` | Verify source_id matches what was used during ingestion | + +[← Back: 01 Getting Started](01-getting-started.md) diff --git a/tests/vectorstore/test_valkey_integration.py b/tests/vectorstore/test_valkey_integration.py index 236bc7e17..736605ec0 100644 --- a/tests/vectorstore/test_valkey_integration.py +++ b/tests/vectorstore/test_valkey_integration.py @@ -1,7 +1,7 @@ """Integration tests for Valkey vector store. These tests require a running Valkey instance with the valkey-search module loaded. -Run with: podman run -d --name valkey-test -p 6379:6379 valkey/valkey:8.1 --loadmodule /usr/lib/valkey/modules/valkeysearch.so +Run with: podman run -d --name valkey-test -p 6379:6379 valkey/valkey-bundle:latest Skip these tests when Valkey is not available by running: pytest -m "not integration"