diff --git a/.env-template b/.env-template index df9855bb2..68142336b 100644 --- a/.env-template +++ b/.env-template @@ -37,3 +37,14 @@ MICROSOFT_AUTHORITY=https://{tenantId}.ciamlogin.com/{tenantId} # POSTGRES_URI=postgresql://docsgpt:docsgpt@localhost:5432/docsgpt + +# Valkey Vector Store (set VECTOR_STORE=valkey to use) +# VALKEY_HOST=localhost +# VALKEY_PORT=6379 +# VALKEY_PASSWORD= +# VALKEY_USE_TLS=false +# VALKEY_INDEX_NAME=docsgpt +# VALKEY_PREFIX=doc: +# VALKEY_DISTANCE_METRIC=cosine # cosine, l2, or ip +# VALKEY_VECTOR_TYPE=float32 # float32 (only option in valkey-glide-sync 2.x) +# VALKEY_VECTOR_ALGORITHM=hnsw # hnsw or flat diff --git a/application/core/settings.py b/application/core/settings.py index 0dee0db6e..a719e9aa8 100644 --- a/application/core/settings.py +++ b/application/core/settings.py @@ -69,7 +69,7 @@ class Settings(BaseSettings): # Pages docling's threaded pipeline buffers in flight; the library # default (100) drives worker RSS to ~3 GB on a mid-size PDF. DOCLING_PIPELINE_QUEUE_MAX_SIZE: int = 2 - VECTOR_STORE: str = "faiss" # "faiss" or "elasticsearch" or "qdrant" or "milvus" or "lancedb" or "pgvector" + VECTOR_STORE: str = "faiss" # "faiss" or "elasticsearch" or "qdrant" or "milvus" or "lancedb" or "pgvector" or "valkey" RETRIEVERS_ENABLED: list = ["classic_rag"] AGENT_NAME: str = "classic" FALLBACK_LLM_PROVIDER: Optional[str] = None # provider for fallback llm @@ -165,6 +165,18 @@ class Settings(BaseSettings): LANCEDB_PATH: str = "./data/lancedb" # Path where LanceDB stores its local data LANCEDB_TABLE_NAME: Optional[str] = "docsgpts" # Name of the table to use for storing vectors + # Valkey vectorstore config + VALKEY_HOST: str = "localhost" + VALKEY_PORT: int = 6379 + VALKEY_PASSWORD: Optional[str] = None + VALKEY_USE_TLS: bool = False + VALKEY_INDEX_NAME: str = "docsgpt" + VALKEY_PREFIX: str = "doc:" + VALKEY_REQUEST_TIMEOUT: int = 5000 # milliseconds + VALKEY_DISTANCE_METRIC: str = "cosine" # "cosine", "l2", or "ip" + VALKEY_VECTOR_TYPE: str = "float32" # "float32" (only option in valkey-glide-sync 2.x) + VALKEY_VECTOR_ALGORITHM: str = "hnsw" # "hnsw" or "flat" + FLASK_DEBUG_MODE: bool = False STORAGE_TYPE: str = "local" # local or s3 diff --git a/application/requirements.txt b/application/requirements.txt index 5414df037..6931a9190 100644 --- a/application/requirements.txt +++ b/application/requirements.txt @@ -103,6 +103,7 @@ tzdata==2026.1 urllib3==2.6.3 uvicorn[standard]>=0.30,<1 uvicorn-worker>=0.4,<1 +valkey-glide-sync==2.3.1 vine==5.1.0 wcwidth==0.6.0 werkzeug>=3.1.0 diff --git a/application/vectorstore/valkey.py b/application/vectorstore/valkey.py new file mode 100644 index 000000000..55e7382fa --- /dev/null +++ b/application/vectorstore/valkey.py @@ -0,0 +1,645 @@ +"""Valkey vector store implementation using valkey-glide-sync and valkey-search module. + +NOTE: The try/except ImportError guard around the glide_sync import below is +**required** by ``application/vectorstore/vector_creator.py`` which eagerly +imports all vector store modules at module level. Without this guard, a missing +``valkey-glide-sync`` package would break VectorCreator for *all* backends. +""" + +import json +import logging +import struct +import uuid +from typing import Any, Dict, Generator, List, Optional, Tuple + +_GLIDE_AVAILABLE = False +try: + from glide_sync import ( + Batch, + ConnectionError as GlideConnectionError, + DataType, + DistanceMetricType, + Field, + FtCreateOptions, + FtSearchLimit, + FtSearchOptions, + GlideClient, + GlideClientConfiguration, + NodeAddress, + RequestError, + ReturnField, + ServerCredentials, + TagField, + TextField, + TimeoutError as GlideTimeoutError, + VectorAlgorithm, + VectorField, + VectorFieldAttributesFlat, + VectorFieldAttributesHnsw, + VectorType, + ft, + ) + + _GLIDE_AVAILABLE = True +except ImportError: + pass + +from application.core.settings import settings # noqa: E402 +from application.vectorstore.base import BaseVectorStore # noqa: E402 +from application.vectorstore.document_class import Document # noqa: E402 + +logger = logging.getLogger(__name__) + +# Characters that must be escaped in Valkey tag field query values. +# Includes '?' which is a single-character wildcard in valkey-search TAG queries. +_TAG_SPECIAL_CHARS = set(r".,<>{}[]\"':;!@#$%^&*()-+=~ /|?") + +# Batch size for DELETE operations in delete_index. +_DELETE_BATCH_SIZE = 100 + +# Page size for paginated scan in delete_index / get_chunks. +_SCAN_PAGE_SIZE = 10000 + +# Safety limit to prevent infinite pagination loops (supports ~10M documents). +_MAX_SCAN_PAGES = 1000 + +# Maximum allowed k for vector search to prevent memory exhaustion. +_MAX_SEARCH_K = 100 + + +class ValkeyStore(BaseVectorStore): + """Vector store backed by Valkey with the valkey-search module. + + Uses HASH keys to store document text, metadata, and embedding vectors. + Creates a search index with FT.CREATE for KNN vector similarity search. + + Requires a Valkey server with the valkey-search module loaded. + + Supports use as a context manager for deterministic connection cleanup:: + + with ValkeyStore(source_id="my-source") as store: + store.search("query") + """ + + def __init__( + self, + source_id: str = "", + embeddings_key: str = "embeddings", + ): + """Initialize ValkeyStore. + + Args: + source_id: Identifier for the document source, used to + namespace and filter documents. + embeddings_key: Key name or API key for the embeddings provider. + + Raises: + ImportError: If valkey-glide-sync is not installed. + """ + if not _GLIDE_AVAILABLE: + raise ImportError( + "Could not import valkey-glide-sync. " + "Please install with `pip install valkey-glide-sync`." + ) + super().__init__() + self._source_id = str(source_id).replace("application/indexes/", "").rstrip("/") + self._embedding = self._get_embeddings(settings.EMBEDDINGS_NAME, embeddings_key) + self._index_name = settings.VALKEY_INDEX_NAME + self._prefix = settings.VALKEY_PREFIX + + self._client = self._create_client() + self._ensure_index_exists() + + # --- Context manager support --- + + def __enter__(self): + """Enter the context manager.""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Exit the context manager and close the connection.""" + self.close() + return False + + # --- Connection lifecycle --- + + def close(self): + """Close the underlying Valkey client connection. + + Should be called when the store is no longer needed to release + the TCP connection held by the GLIDE client. + """ + if self._client is not None: + try: + self._client.close() + except Exception as e: + logger.debug(f"Error closing Valkey client: {e}") + self._client = None + + def __del__(self): + """Best-effort cleanup on garbage collection.""" + self.close() + + def _create_client(self) -> GlideClient: + """Create and return a synchronous Valkey GLIDE client. + + Returns: + A connected GlideClient instance (synchronous). + """ + addresses = [NodeAddress(host=settings.VALKEY_HOST, port=settings.VALKEY_PORT)] + timeout = settings.VALKEY_REQUEST_TIMEOUT + + if settings.VALKEY_PASSWORD is not None and settings.VALKEY_PASSWORD != "": + config = GlideClientConfiguration( + addresses=addresses, + use_tls=settings.VALKEY_USE_TLS, + credentials=ServerCredentials(password=settings.VALKEY_PASSWORD), + request_timeout=timeout, + ) + else: + config = GlideClientConfiguration( + addresses=addresses, + use_tls=settings.VALKEY_USE_TLS, + request_timeout=timeout, + ) + + return GlideClient.create(config) + + def _ensure_index_exists(self): + """Create the search index if it does not already exist. + + Uses VALKEY_DISTANCE_METRIC, VALKEY_VECTOR_TYPE, and VALKEY_VECTOR_ALGORITHM + from settings. Falls back to cosine/float32/hnsw if values are unrecognized. + """ + embedding_dim = getattr(self._embedding, "dimension", None) + if embedding_dim is None: + probe = self._embedding.embed_query("dimension probe") + embedding_dim = len(probe) + + distance_metric = self._resolve_distance_metric(settings.VALKEY_DISTANCE_METRIC) + vector_type = self._resolve_vector_type(settings.VALKEY_VECTOR_TYPE) + algorithm = self._resolve_vector_algorithm(settings.VALKEY_VECTOR_ALGORITHM) + + logger.info( + f"Valkey index config: algorithm={algorithm.name}, " + f"distance_metric={distance_metric.name}, vector_type={vector_type.name}, " + f"dimensions={embedding_dim}" + ) + + if algorithm == VectorAlgorithm.HNSW: + vector_field = VectorField( + name="embedding", + algorithm=VectorAlgorithm.HNSW, + attributes=VectorFieldAttributesHnsw( + dimensions=embedding_dim, + distance_metric=distance_metric, + type=vector_type, + ), + ) + else: + vector_field = VectorField( + name="embedding", + algorithm=VectorAlgorithm.FLAT, + attributes=VectorFieldAttributesFlat( + dimensions=embedding_dim, + distance_metric=distance_metric, + type=vector_type, + ), + ) + + schema: List[Field] = [ + TextField("content"), + TagField("source_id"), + vector_field, + ] + + options = FtCreateOptions(data_type=DataType.HASH, prefixes=[self._prefix]) + + try: + ft.create(self._client, self._index_name, schema, options) + logger.info(f"Created Valkey search index '{self._index_name}'") + except RequestError as e: + error_msg = str(e).lower() + if "already exists" in error_msg or "index already" in error_msg: + logger.debug(f"Valkey index '{self._index_name}' already exists") + else: + raise + + @staticmethod + def _resolve_distance_metric(value: str) -> DistanceMetricType: + """Resolve distance metric string to enum, defaulting to COSINE. + + Args: + value: One of "cosine", "l2", or "ip". + + Returns: + The corresponding DistanceMetricType enum value. + """ + mapping = { + "cosine": DistanceMetricType.COSINE, + "l2": DistanceMetricType.L2, + "ip": DistanceMetricType.IP, + } + result = mapping.get(value.lower().strip()) + if result is None: + logger.warning( + f"Unrecognized VALKEY_DISTANCE_METRIC='{value}', " + f"falling back to 'cosine'. Valid options: cosine, l2, ip" + ) + return DistanceMetricType.COSINE + return result + + @staticmethod + def _resolve_vector_type(value: str) -> VectorType: + """Resolve vector type string to enum, defaulting to FLOAT32. + + Args: + value: Currently only "float32" is supported by valkey-glide-sync. + + Returns: + The corresponding VectorType enum value. + """ + mapping = { + "float32": VectorType.FLOAT32, + } + result = mapping.get(value.lower().strip()) + if result is None: + logger.warning( + f"Unrecognized VALKEY_VECTOR_TYPE='{value}', " + f"falling back to 'float32'. Valid options: float32" + ) + return VectorType.FLOAT32 + return result + + @staticmethod + def _resolve_vector_algorithm(value: str) -> VectorAlgorithm: + """Resolve vector algorithm string to enum, defaulting to HNSW. + + Args: + value: One of "hnsw" or "flat". + + Returns: + The corresponding VectorAlgorithm enum value. + """ + mapping = { + "hnsw": VectorAlgorithm.HNSW, + "flat": VectorAlgorithm.FLAT, + } + result = mapping.get(value.lower().strip()) + if result is None: + logger.warning( + f"Unrecognized VALKEY_VECTOR_ALGORITHM='{value}', " + f"falling back to 'hnsw'. Valid options: hnsw, flat" + ) + return VectorAlgorithm.HNSW + return result + + @staticmethod + def _escape_tag_value(value: str) -> str: + """Escape special characters for Valkey tag field queries. + + Args: + value: The raw tag value to escape. + + Returns: + The escaped string safe for use in @field:{...} queries. + """ + escaped = [] + for ch in value: + if ch in _TAG_SPECIAL_CHARS: + escaped.append(f"\\{ch}") + else: + escaped.append(ch) + return "".join(escaped) + + def _doc_key(self, doc_id: str) -> str: + """Generate a hash key for a document. + + Args: + doc_id: The unique document identifier. + + Returns: + The full key including the prefix. + """ + return f"{self._prefix}{doc_id}" + + # --- Shared pagination helper --- + + def _paginated_search( + self, query: str, return_fields: List[ReturnField] + ) -> Generator[Tuple[str, Dict[str, Any]], None, None]: + """Yield (key_str, field_dict) tuples across all pages. + + Handles pagination with a safety limit of _MAX_SCAN_PAGES iterations + to prevent infinite loops from concurrent inserts. + + Args: + query: The ft.search query string. + return_fields: Fields to return from each document. + + Yields: + Tuples of (key_string, field_dictionary) for each matched document. + """ + offset = 0 + for _ in range(_MAX_SCAN_PAGES): + results = ft.search( + self._client, + self._index_name, + query, + FtSearchOptions( + limit=FtSearchLimit(offset, _SCAN_PAGE_SIZE), + return_fields=return_fields, + ), + ) + + if not results or len(results) < 2: + break + + page_count = 0 + for entry in results[1:]: + if isinstance(entry, dict): + for key, fields in entry.items(): + key_str = key.decode("utf-8") if isinstance(key, bytes) else str(key) + yield key_str, fields + page_count += 1 + + if page_count < _SCAN_PAGE_SIZE: + break + offset += _SCAN_PAGE_SIZE + + # --- Public interface --- + + def search(self, question: str, k: int = 2, *args, **kwargs) -> List[Document]: + """Search for similar documents using vector similarity. + + Args: + question: The query text to search for. + k: Number of results to return (capped at 100). + + Returns: + A list of Document objects sorted by similarity. + """ + k = max(1, min(k, _MAX_SEARCH_K)) + query_vector = self._embedding.embed_query(question) + vector_bytes = struct.pack(f"{len(query_vector)}f", *query_vector) + + escaped_source = self._escape_tag_value(self._source_id) + query = f"@source_id:{{{escaped_source}}} =>[KNN {k} @embedding $BLOB AS score]" + + try: + results = ft.search( + self._client, + self._index_name, + query, + FtSearchOptions( + params={"BLOB": vector_bytes}, + return_fields=[ + ReturnField("content"), + ReturnField("source_id"), + ReturnField("metadata"), + ], + ), + ) + + return self._parse_search_results(results) + + except (RequestError, GlideConnectionError, GlideTimeoutError) as e: + logger.error(f"Error searching Valkey: {e}", exc_info=True) + return [] + + def _parse_search_results(self, results: list) -> List[Document]: + """Parse ft.search response into Document objects. + + The response format is: [total_count, {key: {field: value}}, ...] + + Args: + results: Raw response from ft.search. + + Returns: + List of Document objects. + """ + documents = [] + if not results or len(results) < 2: + return documents + + # results[0] is the total count, results[1:] are mappings of {key: {fields}} + for entry in results[1:]: + if isinstance(entry, dict): + for _key, fields in entry.items(): + field_dict = self._decode_fields(fields) + content = field_dict.get("content", "") + metadata = self._parse_metadata(field_dict) + documents.append(Document(page_content=content, metadata=metadata)) + + return documents + + def _decode_fields(self, fields) -> Dict[str, Any]: + """Decode bytes in field dict to strings, skipping binary fields. + + Args: + fields: Dict with potentially bytes keys/values. + + Returns: + A dictionary with string keys and values (binary fields excluded). + """ + result = {} + if isinstance(fields, dict): + for k, v in fields.items(): + key = k.decode("utf-8") if isinstance(k, bytes) else str(k) + # Skip binary fields (like embedding vectors) that can't be decoded + if isinstance(v, bytes): + try: + value = v.decode("utf-8") + except UnicodeDecodeError: + continue + else: + value = str(v) if not isinstance(v, str) else v + result[key] = value + return result + + def _parse_metadata(self, field_dict: Dict[str, Any]) -> Dict[str, Any]: + """Extract metadata from field dictionary. + + Args: + field_dict: Parsed fields from a document hash. + + Returns: + Metadata dictionary. + """ + metadata_str = field_dict.get("metadata", "{}") + try: + return json.loads(metadata_str) + except (json.JSONDecodeError, TypeError): + return {} + + def add_texts( + self, + texts: List[str], + metadatas: Optional[List[Dict[str, Any]]] = None, + *args, + **kwargs, + ) -> List[str]: + """Add texts with embeddings to the vector store. + + Args: + texts: List of text strings to add. + metadatas: Optional list of metadata dicts for each text. + + Returns: + List of document IDs that were added. + + Raises: + Exception: If any write fails. Successfully written documents + prior to the failure are not rolled back. + """ + if not texts: + return [] + + embeddings = self._embedding.embed_documents(texts) + metadatas = metadatas or [{}] * len(texts) + doc_ids: List[str] = [] + + # Use non-atomic Batch (pipeline) to reduce network round trips. + batch = Batch(False) + for text, embedding, metadata in zip(texts, embeddings, metadatas): + doc_id = str(uuid.uuid4()) + key = self._doc_key(doc_id) + vector_bytes = struct.pack(f"{len(embedding)}f", *embedding) + + fields = { + "content": text, + "source_id": self._source_id, + "metadata": json.dumps(metadata), + "embedding": vector_bytes, + } + + batch.hset(key, fields) + doc_ids.append(doc_id) + + try: + self._client.exec(batch, raise_on_error=True) + except (RequestError, GlideConnectionError, GlideTimeoutError) as e: + logger.error( + f"Error adding documents to Valkey via pipeline " + f"({len(doc_ids)} documents): {e}" + ) + raise + + return doc_ids + + def delete_index(self, *args, **kwargs): + """Delete all documents for this source_id. + + Searches for all documents with matching source_id and deletes them + in batches. Handles sources with more than 10,000 documents via pagination. + + Raises: + RequestError: If the Valkey server returns an error. + ConnectionError: If the connection to Valkey is lost. + TimeoutError: If the operation exceeds the request timeout. + """ + escaped_source = self._escape_tag_value(self._source_id) + query = f"@source_id:{{{escaped_source}}}" + + keys = [ + key_str + for key_str, _ in self._paginated_search(query, [ReturnField("source_id")]) + ] + + # Batch deletes for efficiency + for i in range(0, len(keys), _DELETE_BATCH_SIZE): + batch = keys[i : i + _DELETE_BATCH_SIZE] + self._client.delete(batch) + + def save_local(self, *args, **kwargs): + """No-op for Valkey — data is already persisted.""" + pass + + def get_chunks(self) -> List[Dict[str, Any]]: + """Get all chunks for this source_id. + + Returns: + List of chunk dicts with doc_id, text, and metadata. + """ + try: + escaped_source = self._escape_tag_value(self._source_id) + query = f"@source_id:{{{escaped_source}}}" + + chunks: List[Dict[str, Any]] = [] + + for key_str, fields in self._paginated_search( + query, + [ + ReturnField("content"), + ReturnField("source_id"), + ReturnField("metadata"), + ], + ): + doc_id = key_str.replace(self._prefix, "", 1) + field_dict = self._decode_fields(fields) + chunks.append( + { + "doc_id": doc_id, + "text": field_dict.get("content", ""), + "metadata": self._parse_metadata(field_dict), + } + ) + + return chunks + + except (RequestError, GlideConnectionError, GlideTimeoutError) as e: + logger.error(f"Error getting chunks from Valkey: {e}", exc_info=True) + return [] + + def add_chunk(self, text: str, metadata: Optional[Dict[str, Any]] = None) -> str: + """Add a single chunk to the vector store. + + Args: + text: The text content of the chunk. + metadata: Optional metadata dictionary. + + Returns: + The generated document ID. + """ + metadata = metadata or {} + final_metadata = metadata.copy() + final_metadata["source_id"] = self._source_id + + embeddings = self._embedding.embed_documents([text]) + if not embeddings: + raise ValueError("Could not generate embedding for chunk") + + doc_id = str(uuid.uuid4()) + key = self._doc_key(doc_id) + vector_bytes = struct.pack(f"{len(embeddings[0])}f", *embeddings[0]) + + fields = { + "content": text, + "source_id": self._source_id, + "metadata": json.dumps(final_metadata), + "embedding": vector_bytes, + } + + try: + self._client.hset(key, fields) + return doc_id + except (RequestError, GlideConnectionError, GlideTimeoutError) as e: + logger.error(f"Error adding chunk to Valkey: {e}") + raise + + def delete_chunk(self, chunk_id: str) -> bool: + """Delete a specific chunk by its ID. + + Args: + chunk_id: The document ID to delete. + + Returns: + True if the chunk was deleted, False otherwise. + """ + try: + key = self._doc_key(chunk_id) + result = self._client.delete([key]) + return result > 0 + except (RequestError, GlideConnectionError, GlideTimeoutError) as e: + logger.error(f"Error deleting chunk from Valkey: {e}", exc_info=True) + return False diff --git a/application/vectorstore/vector_creator.py b/application/vectorstore/vector_creator.py index 7d307f659..830c1f30d 100644 --- a/application/vectorstore/vector_creator.py +++ b/application/vectorstore/vector_creator.py @@ -5,6 +5,11 @@ from application.vectorstore.qdrant import QdrantStore from application.vectorstore.pgvector import PGVectorStore +# ValkeyStore uses a try/except ImportError guard around its glide_sync +# dependency so that this eager import does NOT break VectorCreator when +# valkey-glide-sync is not installed. Do not remove that guard. +from application.vectorstore.valkey import ValkeyStore + class VectorCreator: vectorstores = { @@ -13,7 +18,8 @@ class VectorCreator: "mongodb": MongoDBVectorStore, "qdrant": QdrantStore, "milvus": MilvusStore, - "pgvector": PGVectorStore + "pgvector": PGVectorStore, + "valkey": ValkeyStore, } @classmethod diff --git a/cookbooks/framework-integrations/docsgpt/01-getting-started.md b/cookbooks/framework-integrations/docsgpt/01-getting-started.md new file mode 100644 index 000000000..a80b4c887 --- /dev/null +++ b/cookbooks/framework-integrations/docsgpt/01-getting-started.md @@ -0,0 +1,124 @@ +# Getting Started with DocsGPT + Valkey + +**Beginner** · Python · ~15 min + +## What is DocsGPT + Valkey? + +[DocsGPT](https://github.com/arc53/DocsGPT) is an open-source AI assistant platform that lets you build intelligent agents with document retrieval (RAG). It supports pluggable vector store backends — FAISS, PostgreSQL, Elasticsearch, Qdrant, Milvus, and now **Valkey**. + +Using Valkey as the vector store gives you: + +* **Sub-millisecond vector search** — HNSW indexing with cosine similarity +* **Source isolation** — each document source gets its own filtered namespace +* **Simple deployment** — single Valkey instance handles both vector search and caching +* **No additional database** — if you already run Valkey for caching or sessions, reuse it for vectors + +## Prerequisites + +* Docker or Podman installed +* Python 3.10+ +* Git + +## Step 1: Start Valkey with Search Module + +```bash +docker run -d --name valkey \ + -p 6379:6379 \ + valkey/valkey-bundle:latest +``` + +Verify the search module is loaded: + +```bash +docker exec valkey valkey-cli MODULE LIST +# Should show "search" in the output +``` + +## Step 2: Clone and Configure DocsGPT + +```bash +git clone https://github.com/arc53/DocsGPT.git +cd DocsGPT +``` + +Create your `.env` file: + +```bash +cp .env-template .env +``` + +Edit `.env` and set: + +```env +VECTOR_STORE=valkey +VALKEY_HOST=localhost +VALKEY_PORT=6379 +``` + +## Step 3: Install Dependencies + +```bash +python -m venv .venv +source .venv/bin/activate +pip install -r application/requirements.txt +``` + +This installs `valkey-glide-sync` which provides the synchronous GLIDE client for Valkey. + +## Step 4: Verify the Connection + +```python +from glide_sync import GlideClient, GlideClientConfiguration, NodeAddress + +config = GlideClientConfiguration( + addresses=[NodeAddress(host="localhost", port=6379)] +) +client = GlideClient.create(config) +print(client.ping()) # b'PONG' +``` + +## Step 5: Run DocsGPT + +Start the backend: + +```bash +flask --app application/app.py run --host=0.0.0.0 --port=7091 +``` + +DocsGPT will automatically create the Valkey search index on first use. You can now ingest documents through the UI or API — they'll be stored as vector embeddings in Valkey. + +## How It Works Under the Hood + +When you ingest a document, DocsGPT: + +1. **Chunks** the document into passages +2. **Embeds** each chunk using the configured embedding model +3. **Stores** each chunk as a Valkey HASH with fields: `content`, `source_id`, `metadata`, `embedding` +4. **Indexes** the embeddings with an HNSW vector index via `FT.CREATE` + +When you ask a question: + +1. The query is embedded into a vector +2. `FT.SEARCH` performs KNN search filtered by `source_id` +3. Top-k results are returned as context for the LLM + +| Operation | Valkey Command | What It Does | +|-----------|---------------|--------------| +| Create index | `FT.CREATE docsgpt ON HASH PREFIX doc: SCHEMA ...` | One-time index setup | +| Store chunk | `HSET doc:{uuid} content "..." source_id "..." embedding ` | Store document with vector | +| Search | `FT.SEARCH docsgpt @source_id:{id} =>[KNN k @embedding $BLOB]` | Vector similarity search | +| Delete source | `FT.SEARCH` + `DEL` per key | Remove all chunks for a source | + +## Configuration Reference + +| Environment Variable | Default | Description | +|---------------------|---------|-------------| +| `VECTOR_STORE` | `faiss` | Set to `valkey` to use Valkey | +| `VALKEY_HOST` | `localhost` | Valkey server hostname | +| `VALKEY_PORT` | `6379` | Valkey server port | +| `VALKEY_PASSWORD` | (none) | Password for authentication | +| `VALKEY_USE_TLS` | `false` | Enable TLS connections | +| `VALKEY_INDEX_NAME` | `docsgpt` | Name of the search index | +| `VALKEY_PREFIX` | `doc:` | Key prefix for document hashes | + +[Next: 02 Document Ingestion & Retrieval →](02-ingestion-and-retrieval.md) diff --git a/cookbooks/framework-integrations/docsgpt/02-ingestion-and-retrieval.md b/cookbooks/framework-integrations/docsgpt/02-ingestion-and-retrieval.md new file mode 100644 index 000000000..3efab4cc0 --- /dev/null +++ b/cookbooks/framework-integrations/docsgpt/02-ingestion-and-retrieval.md @@ -0,0 +1,191 @@ +# Document Ingestion & Retrieval + +**Intermediate** · Python · ~20 min + +## Overview + +This cookbook walks through the DocsGPT + Valkey vector store internals — how documents are chunked, embedded, stored in Valkey as HASH objects with HNSW-indexed vectors, and retrieved via filtered KNN search. You'll use the `ValkeyStore` class directly to understand each step. + +## Prerequisites + +* Completed [01 - Getting Started](01-getting-started.md) +* Valkey running with the search module loaded +* DocsGPT dependencies installed + +## Step 1: Create a ValkeyStore Instance + +```python +import os +os.environ["VECTOR_STORE"] = "valkey" +os.environ["VALKEY_HOST"] = "localhost" +os.environ["VALKEY_PORT"] = "6379" + +from application.vectorstore.valkey import ValkeyStore + +# Each source_id isolates a set of documents +store = ValkeyStore(source_id="my-docs", embeddings_key="embeddings") +``` + +On creation, `ValkeyStore`: +1. Connects to Valkey using the synchronous GLIDE client +2. Creates an HNSW index (if it doesn't exist) with schema: + - `content` — TEXT field (full-text searchable) + - `source_id` — TAG field (exact match filtering) + - `embedding` — VECTOR field (HNSW, cosine distance, FLOAT32) + +## Step 2: Ingest Documents + +### Bulk ingestion with `add_texts` + +```python +texts = [ + "Valkey is a high-performance in-memory data store.", + "Vector search uses HNSW algorithm for approximate nearest neighbors.", + "DocsGPT supports multiple vector store backends including Valkey.", + "The GLIDE client provides both async and sync interfaces for Valkey.", +] + +metadatas = [ + {"source": "valkey-docs.pdf", "page": 1}, + {"source": "valkey-docs.pdf", "page": 5}, + {"source": "docsgpt-readme.md", "page": 1}, + {"source": "glide-docs.md", "page": 1}, +] + +doc_ids = store.add_texts(texts, metadatas) +print(f"Ingested {len(doc_ids)} documents: {doc_ids}") +``` + +Each document is stored as a Valkey HASH: + +``` +HSET doc: content "Valkey is a high..." source_id "my-docs" metadata '{"source":"valkey-docs.pdf","page":1}' embedding <768 floats as bytes> +``` + +### Single chunk with `add_chunk` + +```python +chunk_id = store.add_chunk( + text="Valkey Search supports KNN queries with pre-filtering.", + metadata={"source": "search-guide.md", "section": "queries"}, +) +print(f"Added chunk: {chunk_id}") +``` + +## Step 3: Search by Semantic Similarity + +```python +results = store.search("How does vector search work?", k=3) + +for doc in results: + print(f"Content: {doc.page_content[:80]}...") + print(f"Metadata: {doc.metadata}") + print() +``` + +Under the hood, this: +1. Embeds the query text into a vector +2. Executes: `FT.SEARCH docsgpt "@source_id:{my-docs} =>[KNN 3 @embedding $BLOB AS score]"` +3. Returns documents ranked by cosine similarity + +### Source Isolation + +Each `source_id` is a TAG field filter. Multiple document sources share the same index but never mix in search results: + +```python +# Only searches within "my-docs" source +store_a = ValkeyStore(source_id="project-a") +store_b = ValkeyStore(source_id="project-b") + +# These searches are completely isolated +results_a = store_a.search("deployment guide") +results_b = store_b.search("deployment guide") +``` + +## Step 4: Manage Chunks + +### List all chunks for a source + +```python +chunks = store.get_chunks() +print(f"Total chunks: {len(chunks)}") +for chunk in chunks: + print(f" ID: {chunk['doc_id']}, Text: {chunk['text'][:50]}...") +``` + +### Delete a specific chunk + +```python +success = store.delete_chunk(doc_ids[0]) +print(f"Deleted: {success}") # True +``` + +### Delete all chunks for a source + +```python +store.delete_index() +# All documents with source_id="my-docs" are removed +``` + +## Step 5: Production Deployment with Docker Compose + +```yaml +services: + valkey: + image: valkey/valkey-bundle:latest + ports: + - "6379:6379" + volumes: + - valkey_data:/data + + docsgpt-backend: + build: ./application + environment: + - VECTOR_STORE=valkey + - VALKEY_HOST=valkey + - VALKEY_PORT=6379 + depends_on: + - valkey + +volumes: + valkey_data: +``` + +## Performance Characteristics + +| Operation | Complexity | Typical Latency | +|-----------|-----------|----------------| +| `add_texts` (per doc) | O(log n) HNSW insert | ~1-2ms | +| `search` (KNN) | O(log n) HNSW search | <1ms | +| `delete_chunk` | O(1) key delete | <0.1ms | +| `get_chunks` | O(n) for source | ~1ms per 100 docs | +| `delete_index` | O(n) search + delete | Depends on doc count | + +## Index Configuration + +The default HNSW parameters work well for most use cases. The index is created once and backfilled automatically: + +```python +VectorField( + name="embedding", + algorithm=VectorAlgorithm.HNSW, + attributes=VectorFieldAttributesHnsw( + dimensions=768, # Matches embedding model output + distance_metric=DistanceMetricType.COSINE, + type=VectorType.FLOAT32, + ), +) +``` + +For larger datasets (>1M vectors), consider tuning HNSW parameters like `number_of_edges` and `vectors_examined_on_construction` for your recall/latency tradeoff. + +## Troubleshooting + +| Issue | Cause | Fix | +|-------|-------|-----| +| `ImportError: valkey-glide-sync` | Package not installed | `pip install valkey-glide-sync` | +| `Connection refused` | Valkey not running | Start Valkey container | +| `unknown command FT.CREATE` | Search module not loaded | Add `--loadmodule` flag to Valkey startup | +| Empty search results | Wrong `source_id` | Verify source_id matches what was used during ingestion | + +[← Back: 01 Getting Started](01-getting-started.md) diff --git a/docs/content/Deploying/DocsGPT-Settings.mdx b/docs/content/Deploying/DocsGPT-Settings.mdx index 373e56cbe..c60e4d4bc 100644 --- a/docs/content/Deploying/DocsGPT-Settings.mdx +++ b/docs/content/Deploying/DocsGPT-Settings.mdx @@ -362,7 +362,7 @@ gated by CI/CD). These are just the basic settings to get you started. The `settings.py` file contains many more advanced options that you can explore to further customize DocsGPT, such as: -- Vector store configuration (`VECTOR_STORE`, Qdrant, Milvus, LanceDB settings) If you're looking for an easy way to set up a vector store with pgvector, try [Neon](https://get.neon.com/docsgpt). +- Vector store configuration (`VECTOR_STORE`, Qdrant, Milvus, LanceDB, Valkey settings) If you're looking for an easy way to set up a vector store with pgvector, try [Neon](https://get.neon.com/docsgpt). - Retriever settings (`RETRIEVERS_ENABLED`) - Cache settings (`CACHE_REDIS_URL`) - And many more! diff --git a/docs/content/Deploying/Postgres-Migration.mdx b/docs/content/Deploying/Postgres-Migration.mdx index f8a8b1e88..2bbb0072c 100644 --- a/docs/content/Deploying/Postgres-Migration.mdx +++ b/docs/content/Deploying/Postgres-Migration.mdx @@ -13,7 +13,7 @@ required. Vector stores are independent — `VECTOR_STORE` can still be `pgvector`, - `faiss`, `qdrant`, `milvus`, `elasticsearch`, or `mongodb`. + `faiss`, `qdrant`, `milvus`, `elasticsearch`, `mongodb`, or `valkey`. ## Quickstart diff --git a/docs/content/Guides/Architecture.mdx b/docs/content/Guides/Architecture.mdx index c858812c6..44d756c88 100644 --- a/docs/content/Guides/Architecture.mdx +++ b/docs/content/Guides/Architecture.mdx @@ -64,7 +64,7 @@ flowchart LR * **Technology:** Supports multiple vector databases. * **Responsibility:** Vector Stores are used to store and retrieve vector embeddings of document chunks. This enables semantic search and retrieval of relevant document snippets in response to user queries. * **Key Features:** - * Supports vector databases including FAISS, Elasticsearch, Qdrant, Milvus, MongoDB Atlas Vector Search, and pgvector. + * Supports vector databases including FAISS, Elasticsearch, Qdrant, Milvus, MongoDB Atlas Vector Search, pgvector, and Valkey. * Provides storage and indexing of high-dimensional vector embeddings. * Enables editing and updating of vector indexes including specific chunks. diff --git a/tests/vectorstore/test_valkey.py b/tests/vectorstore/test_valkey.py new file mode 100644 index 000000000..593ccdee7 --- /dev/null +++ b/tests/vectorstore/test_valkey.py @@ -0,0 +1,749 @@ +"""Unit tests for the Valkey vector store implementation.""" + +import json +from unittest.mock import MagicMock, Mock, patch + +import pytest + +from glide_sync import RequestError + + +def _make_store(source_id="test-source", embeddings_key="key"): + """Helper to create a ValkeyStore with all external deps mocked.""" + with patch( + "application.vectorstore.base.BaseVectorStore._get_embeddings" + ) as mock_get_emb, patch( + "application.vectorstore.valkey.settings" + ) as mock_settings, patch( + "application.vectorstore.valkey.ValkeyStore._create_client" + ) as mock_create_client, patch( + "application.vectorstore.valkey.ValkeyStore._ensure_index_exists" + ): + mock_emb = Mock() + mock_emb.embed_query = Mock(return_value=[0.1, 0.2, 0.3]) + mock_emb.embed_documents = Mock(return_value=[[0.1, 0.2, 0.3]]) + mock_emb.dimension = 768 + mock_get_emb.return_value = mock_emb + + mock_settings.EMBEDDINGS_NAME = "test_model" + mock_settings.VALKEY_HOST = "localhost" + mock_settings.VALKEY_PORT = 6379 + mock_settings.VALKEY_PASSWORD = None + mock_settings.VALKEY_USE_TLS = False + mock_settings.VALKEY_INDEX_NAME = "docsgpt" + mock_settings.VALKEY_PREFIX = "doc:" + + mock_client = MagicMock() + mock_create_client.return_value = mock_client + + from application.vectorstore.valkey import ValkeyStore + + store = ValkeyStore(source_id=source_id, embeddings_key=embeddings_key) + + return store, mock_client, mock_emb + + +@pytest.mark.unit +class TestValkeyStoreInit: + def test_source_id_cleaned(self): + store, _, _ = _make_store(source_id="application/indexes/abc123/") + assert store._source_id == "abc123" + + def test_source_id_simple(self): + store, _, _ = _make_store(source_id="my-source") + assert store._source_id == "my-source" + + +@pytest.mark.unit +class TestValkeyStoreTagEscaping: + """Tests for _escape_tag_value to ensure source_ids with special chars are safe.""" + + def test_escape_dots(self): + from application.vectorstore.valkey import ValkeyStore + + assert ValkeyStore._escape_tag_value("my.source.v2") == r"my\.source\.v2" + + def test_escape_hyphens(self): + from application.vectorstore.valkey import ValkeyStore + + assert ValkeyStore._escape_tag_value("my-source") == r"my\-source" + + def test_escape_slashes(self): + from application.vectorstore.valkey import ValkeyStore + + assert ValkeyStore._escape_tag_value("user/docs") == r"user\/docs" + + def test_escape_colons(self): + from application.vectorstore.valkey import ValkeyStore + + assert ValkeyStore._escape_tag_value("ns:value") == r"ns\:value" + + def test_escape_spaces(self): + from application.vectorstore.valkey import ValkeyStore + + assert ValkeyStore._escape_tag_value("my source") == r"my\ source" + + def test_escape_pipe(self): + from application.vectorstore.valkey import ValkeyStore + + assert ValkeyStore._escape_tag_value("a|b") == r"a\|b" + + def test_no_escape_needed(self): + from application.vectorstore.valkey import ValkeyStore + + assert ValkeyStore._escape_tag_value("simplesource123") == "simplesource123" + + def test_escape_multiple_special_chars(self): + from application.vectorstore.valkey import ValkeyStore + + result = ValkeyStore._escape_tag_value("a.b-c/d:e") + assert result == r"a\.b\-c\/d\:e" + + def test_search_uses_escaped_source_id(self): + """Verify search query contains escaped source_id.""" + store, mock_client, mock_emb = _make_store(source_id="my.source-v2") + + mock_response = [0] + + with patch("application.vectorstore.valkey.ft.search", return_value=mock_response) as mock_ft: + store.search("query", k=1) + + call_args = mock_ft.call_args + query_str = call_args[0][2] + # Should contain escaped version + assert r"my\.source\-v2" in query_str + + +@pytest.mark.unit +class TestValkeyStoreSearch: + def test_search_returns_documents(self): + store, mock_client, mock_emb = _make_store() + + # ft.search returns [total_count, {key: {field: value}}, ...] + mock_response = [ + 2, + {b"doc:id1": {b"content": b"hello world", b"source_id": b"test-source", b"metadata": b'{"source": "test.txt"}'}}, + {b"doc:id2": {b"content": b"foo bar", b"source_id": b"test-source", b"metadata": b'{"source": "test2.txt"}'}}, + ] + + with patch("application.vectorstore.valkey.ft.search", return_value=mock_response): + results = store.search("query", k=2) + + mock_emb.embed_query.assert_called_once_with("query") + assert len(results) == 2 + assert results[0].page_content == "hello world" + assert results[0].metadata == {"source": "test.txt"} + assert results[1].page_content == "foo bar" + + def test_search_returns_empty_on_error(self): + store, mock_client, _ = _make_store() + + with patch("application.vectorstore.valkey.ft.search", side_effect=RequestError("connection lost")): + results = store.search("query") + assert results == [] + + def test_search_returns_empty_on_no_results(self): + store, mock_client, _ = _make_store() + + with patch("application.vectorstore.valkey.ft.search", return_value=[0]): + results = store.search("query") + assert results == [] + + def test_search_handles_string_fields(self): + store, mock_client, _ = _make_store() + + mock_response = [ + 1, + {"doc:id1": {"content": "hello", "source_id": "test-source", "metadata": "{}"}}, + ] + + with patch("application.vectorstore.valkey.ft.search", return_value=mock_response): + results = store.search("query", k=1) + + assert len(results) == 1 + assert results[0].page_content == "hello" + + def test_search_passes_return_fields(self): + """Verify search specifies return_fields to avoid fetching embeddings.""" + store, mock_client, _ = _make_store() + + with patch("application.vectorstore.valkey.ft.search", return_value=[0]) as mock_ft: + store.search("query", k=1) + + call_args = mock_ft.call_args + options = call_args[0][3] + assert hasattr(options, "return_fields") + + def test_search_caps_k_to_max(self): + """Verify k is capped at _MAX_SEARCH_K to prevent memory exhaustion.""" + store, mock_client, _ = _make_store() + + with patch("application.vectorstore.valkey.ft.search", return_value=[0]) as mock_ft: + store.search("query", k=500) + + call_args = mock_ft.call_args + query_str = call_args[0][2] + assert "KNN 100" in query_str + + def test_search_floors_k_to_one(self): + """Verify k is at least 1.""" + store, mock_client, _ = _make_store() + + with patch("application.vectorstore.valkey.ft.search", return_value=[0]) as mock_ft: + store.search("query", k=0) + + call_args = mock_ft.call_args + query_str = call_args[0][2] + assert "KNN 1" in query_str + def test_add_texts_returns_ids(self): + store, mock_client, mock_emb = _make_store() + mock_emb.embed_documents.return_value = [[0.1, 0.2, 0.3], [0.4, 0.5, 0.6]] + mock_client.exec = Mock(return_value=["OK", "OK"]) + + ids = store.add_texts(["text1", "text2"], [{"a": 1}, {"b": 2}]) + + assert len(ids) == 2 + mock_client.exec.assert_called_once() + + def test_add_texts_empty_returns_empty(self): + store, _, _ = _make_store() + assert store.add_texts([]) == [] + + def test_add_texts_default_metadatas(self): + store, mock_client, mock_emb = _make_store() + mock_emb.embed_documents.return_value = [[0.1, 0.2, 0.3]] + mock_client.exec = Mock(return_value=["OK"]) + + ids = store.add_texts(["text1"]) + assert len(ids) == 1 + + def test_add_texts_raises_on_error(self): + store, mock_client, mock_emb = _make_store() + mock_emb.embed_documents.return_value = [[0.1, 0.2, 0.3]] + mock_client.exec = Mock(side_effect=RequestError("write failed")) + + with pytest.raises(RequestError, match="write failed"): + store.add_texts(["text1"]) + + def test_add_texts_stores_correct_fields(self): + store, mock_client, mock_emb = _make_store(source_id="src1") + mock_emb.embed_documents.return_value = [[0.1, 0.2, 0.3]] + mock_client.exec = Mock(return_value=["OK"]) + + with patch("application.vectorstore.valkey.Batch") as MockBatch: + mock_batch_instance = MagicMock() + MockBatch.return_value = mock_batch_instance + + store.add_texts(["hello"], [{"key": "val"}]) + + # Verify the batch was created as non-atomic (pipeline) + MockBatch.assert_called_once_with(False) + + # Verify hset was called on the batch with correct fields + call_args = mock_batch_instance.hset.call_args + key = call_args[0][0] + fields = call_args[0][1] + + assert key.startswith("doc:") + assert fields["content"] == "hello" + assert fields["source_id"] == "src1" + assert json.loads(fields["metadata"]) == {"key": "val"} + assert isinstance(fields["embedding"], bytes) + + +@pytest.mark.unit +class TestValkeyStoreDeleteIndex: + def test_delete_index_deletes_matching_docs_in_batch(self): + """Verify delete_index uses batched deletes.""" + store, mock_client, _ = _make_store(source_id="src123") + + mock_response = [ + 2, + {b"doc:id1": {b"content": b"text1"}}, + {b"doc:id2": {b"content": b"text2"}}, + ] + + with patch("application.vectorstore.valkey.ft.search", return_value=mock_response): + mock_client.delete = Mock(return_value=2) + store.delete_index() + + # Should batch both keys into one delete call + mock_client.delete.assert_called_once() + deleted_keys = mock_client.delete.call_args[0][0] + assert len(deleted_keys) == 2 + + def test_delete_index_paginates_large_sets(self): + """Verify delete_index paginates when there are more docs than page size.""" + store, mock_client, _ = _make_store(source_id="src123") + + # Simulate two pages: first returns full page, second returns partial + from application.vectorstore.valkey import _SCAN_PAGE_SIZE + + page1_entries = [ + {f"doc:id{i}".encode(): {b"content": b"text"}} for i in range(_SCAN_PAGE_SIZE) + ] + page1_response = [_SCAN_PAGE_SIZE] + page1_entries + + page2_entries = [ + {b"doc:extra1": {b"content": b"text"}}, + {b"doc:extra2": {b"content": b"text"}}, + ] + page2_response = [2] + page2_entries + + with patch( + "application.vectorstore.valkey.ft.search", side_effect=[page1_response, page2_response] + ): + mock_client.delete = Mock(return_value=1) + store.delete_index() + + # Should have multiple delete calls due to batching + total_deleted = sum( + len(call[0][0]) for call in mock_client.delete.call_args_list + ) + assert total_deleted == _SCAN_PAGE_SIZE + 2 + + def test_delete_index_raises_on_error(self): + store, mock_client, _ = _make_store() + + with patch("application.vectorstore.valkey.ft.search", side_effect=RequestError("fail")): + with pytest.raises(RequestError): + store.delete_index() + + +@pytest.mark.unit +class TestValkeyStoreSaveLocal: + def test_save_local_is_noop(self): + store, _, _ = _make_store() + assert store.save_local() is None + + +@pytest.mark.unit +class TestValkeyStoreGetChunks: + def test_get_chunks(self): + store, mock_client, _ = _make_store() + + mock_response = [ + 2, + {b"doc:uuid1": {b"content": b"text1", b"source_id": b"test-source", b"metadata": b'{"key": "val"}'}}, + {b"doc:uuid2": {b"content": b"text2", b"source_id": b"test-source", b"metadata": b"{}"}}, + ] + + with patch("application.vectorstore.valkey.ft.search", return_value=mock_response): + chunks = store.get_chunks() + + assert len(chunks) == 2 + assert chunks[0] == {"doc_id": "uuid1", "text": "text1", "metadata": {"key": "val"}} + assert chunks[1] == {"doc_id": "uuid2", "text": "text2", "metadata": {}} + + def test_get_chunks_returns_empty_on_error(self): + store, mock_client, _ = _make_store() + + with patch("application.vectorstore.valkey.ft.search", side_effect=RequestError("fail")): + assert store.get_chunks() == [] + + def test_get_chunks_uses_return_fields(self): + """Verify get_chunks specifies return_fields to skip embedding blobs.""" + store, mock_client, _ = _make_store() + + with patch("application.vectorstore.valkey.ft.search", return_value=[0]) as mock_ft: + store.get_chunks() + + call_args = mock_ft.call_args + options = call_args[0][3] + assert hasattr(options, "return_fields") + + +@pytest.mark.unit +class TestValkeyStoreAddChunk: + def test_add_chunk(self): + store, mock_client, mock_emb = _make_store(source_id="src1") + mock_emb.embed_documents.return_value = [[0.1, 0.2, 0.3]] + mock_client.hset = Mock(return_value=3) + + chunk_id = store.add_chunk("hello", metadata={"key": "val"}) + + assert isinstance(chunk_id, str) + assert len(chunk_id) > 0 + mock_client.hset.assert_called_once() + + def test_add_chunk_raises_on_empty_embedding(self): + store, _, mock_emb = _make_store() + mock_emb.embed_documents.return_value = [] + + with pytest.raises(ValueError, match="Could not generate embedding"): + store.add_chunk("text") + + def test_add_chunk_includes_source_id_in_metadata(self): + store, mock_client, mock_emb = _make_store(source_id="src1") + mock_emb.embed_documents.return_value = [[0.1, 0.2, 0.3]] + mock_client.hset = Mock(return_value=3) + + store.add_chunk("hello", metadata={"key": "val"}) + + call_args = mock_client.hset.call_args + fields = call_args[0][1] + metadata = json.loads(fields["metadata"]) + assert metadata["source_id"] == "src1" + assert metadata["key"] == "val" + + +@pytest.mark.unit +class TestValkeyStoreDeleteChunk: + def test_delete_chunk_success(self): + store, mock_client, _ = _make_store() + mock_client.delete = Mock(return_value=1) + + result = store.delete_chunk("uuid-123") + assert result is True + mock_client.delete.assert_called_once_with(["doc:uuid-123"]) + + def test_delete_chunk_not_found(self): + store, mock_client, _ = _make_store() + mock_client.delete = Mock(return_value=0) + + result = store.delete_chunk("nonexistent") + assert result is False + + def test_delete_chunk_returns_false_on_error(self): + store, mock_client, _ = _make_store() + mock_client.delete = Mock(side_effect=RequestError("fail")) + + result = store.delete_chunk("uuid-123") + assert result is False + + +@pytest.mark.unit +class TestValkeyStoreCreateClient: + """Tests for password handling in _create_client.""" + + def test_password_none_skips_credentials(self): + """When VALKEY_PASSWORD is None, no credentials should be passed.""" + with patch( + "application.vectorstore.base.BaseVectorStore._get_embeddings" + ) as mock_get_emb, patch( + "application.vectorstore.valkey.settings" + ) as mock_settings, patch( + "application.vectorstore.valkey.GlideClientConfiguration" + ) as mock_config_cls, patch( + "application.vectorstore.valkey.GlideClient" + ) as mock_glide_cls, patch( + "application.vectorstore.valkey.ValkeyStore._ensure_index_exists" + ): + mock_emb = Mock() + mock_emb.dimension = 768 + mock_get_emb.return_value = mock_emb + mock_settings.EMBEDDINGS_NAME = "test" + mock_settings.VALKEY_HOST = "localhost" + mock_settings.VALKEY_PORT = 6379 + mock_settings.VALKEY_PASSWORD = None + mock_settings.VALKEY_USE_TLS = False + mock_settings.VALKEY_INDEX_NAME = "docsgpt" + mock_settings.VALKEY_PREFIX = "doc:" + mock_glide_cls.create = Mock(return_value=MagicMock()) + + from application.vectorstore.valkey import ValkeyStore + + ValkeyStore(source_id="test", embeddings_key="key") + + # Verify no credentials kwarg + config_call_kwargs = mock_config_cls.call_args[1] + assert "credentials" not in config_call_kwargs + + def test_empty_string_password_skips_credentials(self): + """When VALKEY_PASSWORD is empty string, no credentials should be passed.""" + with patch( + "application.vectorstore.base.BaseVectorStore._get_embeddings" + ) as mock_get_emb, patch( + "application.vectorstore.valkey.settings" + ) as mock_settings, patch( + "application.vectorstore.valkey.GlideClientConfiguration" + ) as mock_config_cls, patch( + "application.vectorstore.valkey.GlideClient" + ) as mock_glide_cls, patch( + "application.vectorstore.valkey.ValkeyStore._ensure_index_exists" + ): + mock_emb = Mock() + mock_emb.dimension = 768 + mock_get_emb.return_value = mock_emb + mock_settings.EMBEDDINGS_NAME = "test" + mock_settings.VALKEY_HOST = "localhost" + mock_settings.VALKEY_PORT = 6379 + mock_settings.VALKEY_PASSWORD = "" + mock_settings.VALKEY_USE_TLS = False + mock_settings.VALKEY_INDEX_NAME = "docsgpt" + mock_settings.VALKEY_PREFIX = "doc:" + mock_glide_cls.create = Mock(return_value=MagicMock()) + + from application.vectorstore.valkey import ValkeyStore + + ValkeyStore(source_id="test", embeddings_key="key") + + # Verify no credentials kwarg + config_call_kwargs = mock_config_cls.call_args[1] + assert "credentials" not in config_call_kwargs + + def test_non_empty_password_sets_credentials(self): + """When VALKEY_PASSWORD has a value, credentials should be passed.""" + with patch( + "application.vectorstore.base.BaseVectorStore._get_embeddings" + ) as mock_get_emb, patch( + "application.vectorstore.valkey.settings" + ) as mock_settings, patch( + "application.vectorstore.valkey.GlideClientConfiguration" + ) as mock_config_cls, patch( + "application.vectorstore.valkey.GlideClient" + ) as mock_glide_cls, patch( + "application.vectorstore.valkey.ValkeyStore._ensure_index_exists" + ), patch( + "application.vectorstore.valkey.ServerCredentials" + ) as mock_creds: + mock_emb = Mock() + mock_emb.dimension = 768 + mock_get_emb.return_value = mock_emb + mock_settings.EMBEDDINGS_NAME = "test" + mock_settings.VALKEY_HOST = "localhost" + mock_settings.VALKEY_PORT = 6379 + mock_settings.VALKEY_PASSWORD = "secret123" + mock_settings.VALKEY_USE_TLS = False + mock_settings.VALKEY_INDEX_NAME = "docsgpt" + mock_settings.VALKEY_PREFIX = "doc:" + mock_glide_cls.create = Mock(return_value=MagicMock()) + + from application.vectorstore.valkey import ValkeyStore + + ValkeyStore(source_id="test", embeddings_key="key") + + # Verify credentials kwarg was passed + config_call_kwargs = mock_config_cls.call_args[1] + assert "credentials" in config_call_kwargs + mock_creds.assert_called_once_with(password="secret123") + + +@pytest.mark.unit +class TestValkeyStoreSchemaConfig: + """Tests for configurable distance metric, vector type, and algorithm.""" + + def test_resolve_distance_metric_cosine(self): + from application.vectorstore.valkey import ValkeyStore + + from glide_sync import DistanceMetricType + + assert ValkeyStore._resolve_distance_metric("cosine") == DistanceMetricType.COSINE + + def test_resolve_distance_metric_l2(self): + from application.vectorstore.valkey import ValkeyStore + + from glide_sync import DistanceMetricType + + assert ValkeyStore._resolve_distance_metric("l2") == DistanceMetricType.L2 + + def test_resolve_distance_metric_ip(self): + from application.vectorstore.valkey import ValkeyStore + + from glide_sync import DistanceMetricType + + assert ValkeyStore._resolve_distance_metric("ip") == DistanceMetricType.IP + + def test_resolve_distance_metric_invalid_falls_back(self): + from application.vectorstore.valkey import ValkeyStore + + from glide_sync import DistanceMetricType + + assert ValkeyStore._resolve_distance_metric("invalid") == DistanceMetricType.COSINE + + def test_resolve_distance_metric_case_insensitive(self): + from application.vectorstore.valkey import ValkeyStore + + from glide_sync import DistanceMetricType + + assert ValkeyStore._resolve_distance_metric("COSINE") == DistanceMetricType.COSINE + assert ValkeyStore._resolve_distance_metric("L2") == DistanceMetricType.L2 + + def test_resolve_vector_type_float32(self): + from application.vectorstore.valkey import ValkeyStore + + from glide_sync import VectorType + + assert ValkeyStore._resolve_vector_type("float32") == VectorType.FLOAT32 + + def test_resolve_vector_type_invalid_falls_back(self): + from application.vectorstore.valkey import ValkeyStore + + from glide_sync import VectorType + + assert ValkeyStore._resolve_vector_type("float64") == VectorType.FLOAT32 + + def test_resolve_vector_algorithm_hnsw(self): + from application.vectorstore.valkey import ValkeyStore + + from glide_sync import VectorAlgorithm + + assert ValkeyStore._resolve_vector_algorithm("hnsw") == VectorAlgorithm.HNSW + + def test_resolve_vector_algorithm_flat(self): + from application.vectorstore.valkey import ValkeyStore + + from glide_sync import VectorAlgorithm + + assert ValkeyStore._resolve_vector_algorithm("flat") == VectorAlgorithm.FLAT + + def test_resolve_vector_algorithm_invalid_falls_back(self): + from application.vectorstore.valkey import ValkeyStore + + from glide_sync import VectorAlgorithm + + assert ValkeyStore._resolve_vector_algorithm("annoy") == VectorAlgorithm.HNSW + + +@pytest.mark.unit +class TestValkeyStoreClose: + """Tests for client resource cleanup.""" + + def test_close_calls_client_close(self): + store, mock_client, _ = _make_store() + + store.close() + + mock_client.close.assert_called_once() + assert store._client is None + + def test_close_is_idempotent(self): + store, mock_client, _ = _make_store() + + store.close() + store.close() # second call should not raise + + mock_client.close.assert_called_once() + + def test_close_handles_exception(self): + store, mock_client, _ = _make_store() + mock_client.close = Mock(side_effect=Exception("already closed")) + + # Should not raise + store.close() + assert store._client is None + + def test_context_manager_calls_close(self): + store, mock_client, _ = _make_store() + + store.__enter__() + store.__exit__(None, None, None) + + mock_client.close.assert_called_once() + assert store._client is None + + +@pytest.mark.unit +class TestValkeyStoreEnsureIndex: + """Tests for _ensure_index_exists with FLAT algorithm path.""" + + def test_ensure_index_flat_algorithm(self): + """Verify FLAT algorithm path creates index without error.""" + with patch( + "application.vectorstore.base.BaseVectorStore._get_embeddings" + ) as mock_get_emb, patch( + "application.vectorstore.valkey.settings" + ) as mock_settings, patch( + "application.vectorstore.valkey.ValkeyStore._create_client" + ) as mock_create_client, patch( + "application.vectorstore.valkey.ft" + ) as mock_ft: + mock_emb = Mock() + mock_emb.dimension = 128 + mock_get_emb.return_value = mock_emb + + mock_settings.EMBEDDINGS_NAME = "test" + mock_settings.VALKEY_HOST = "localhost" + mock_settings.VALKEY_PORT = 6379 + mock_settings.VALKEY_PASSWORD = None + mock_settings.VALKEY_USE_TLS = False + mock_settings.VALKEY_INDEX_NAME = "test_idx" + mock_settings.VALKEY_PREFIX = "doc:" + mock_settings.VALKEY_DISTANCE_METRIC = "l2" + mock_settings.VALKEY_VECTOR_TYPE = "float32" + mock_settings.VALKEY_VECTOR_ALGORITHM = "flat" + + mock_client = MagicMock() + mock_create_client.return_value = mock_client + + from application.vectorstore.valkey import ValkeyStore + + # This should not raise — exercises the FLAT branch + ValkeyStore(source_id="test", embeddings_key="key") + + # Verify ft.create was called with the schema + mock_ft.create.assert_called_once() + + def test_ensure_index_already_exists_is_silent(self): + """Verify 'already exists' error is handled gracefully.""" + with patch( + "application.vectorstore.base.BaseVectorStore._get_embeddings" + ) as mock_get_emb, patch( + "application.vectorstore.valkey.settings" + ) as mock_settings, patch( + "application.vectorstore.valkey.ValkeyStore._create_client" + ) as mock_create_client, patch( + "application.vectorstore.valkey.ft" + ) as mock_ft: + mock_emb = Mock() + mock_emb.dimension = 768 + mock_get_emb.return_value = mock_emb + + mock_settings.EMBEDDINGS_NAME = "test" + mock_settings.VALKEY_HOST = "localhost" + mock_settings.VALKEY_PORT = 6379 + mock_settings.VALKEY_PASSWORD = None + mock_settings.VALKEY_USE_TLS = False + mock_settings.VALKEY_INDEX_NAME = "test_idx" + mock_settings.VALKEY_PREFIX = "doc:" + mock_settings.VALKEY_DISTANCE_METRIC = "cosine" + mock_settings.VALKEY_VECTOR_TYPE = "float32" + mock_settings.VALKEY_VECTOR_ALGORITHM = "hnsw" + + mock_client = MagicMock() + mock_create_client.return_value = mock_client + + # Simulate "Index already exists" error (must be RequestError) + mock_ft.create.side_effect = RequestError("Index already exists") + + from application.vectorstore.valkey import ValkeyStore + + # Should not raise + store = ValkeyStore(source_id="test", embeddings_key="key") + assert store is not None + + def test_ensure_index_unknown_error_raises(self): + """Verify non-'already exists' errors are re-raised.""" + with patch( + "application.vectorstore.base.BaseVectorStore._get_embeddings" + ) as mock_get_emb, patch( + "application.vectorstore.valkey.settings" + ) as mock_settings, patch( + "application.vectorstore.valkey.ValkeyStore._create_client" + ) as mock_create_client, patch( + "application.vectorstore.valkey.ft" + ) as mock_ft: + mock_emb = Mock() + mock_emb.dimension = 768 + mock_get_emb.return_value = mock_emb + + mock_settings.EMBEDDINGS_NAME = "test" + mock_settings.VALKEY_HOST = "localhost" + mock_settings.VALKEY_PORT = 6379 + mock_settings.VALKEY_PASSWORD = None + mock_settings.VALKEY_USE_TLS = False + mock_settings.VALKEY_INDEX_NAME = "test_idx" + mock_settings.VALKEY_PREFIX = "doc:" + mock_settings.VALKEY_DISTANCE_METRIC = "cosine" + mock_settings.VALKEY_VECTOR_TYPE = "float32" + mock_settings.VALKEY_VECTOR_ALGORITHM = "hnsw" + + mock_client = MagicMock() + mock_create_client.return_value = mock_client + + mock_ft.create.side_effect = Exception("Connection refused") + + from application.vectorstore.valkey import ValkeyStore + + with pytest.raises(Exception, match="Connection refused"): + ValkeyStore(source_id="test", embeddings_key="key") diff --git a/tests/vectorstore/test_valkey_integration.py b/tests/vectorstore/test_valkey_integration.py new file mode 100644 index 000000000..736605ec0 --- /dev/null +++ b/tests/vectorstore/test_valkey_integration.py @@ -0,0 +1,178 @@ +"""Integration tests for Valkey vector store. + +These tests require a running Valkey instance with the valkey-search module loaded. +Run with: podman run -d --name valkey-test -p 6379:6379 valkey/valkey-bundle:latest + +Skip these tests when Valkey is not available by running: + pytest -m "not integration" +""" + +import os +import uuid + +import pytest + +VALKEY_HOST = os.environ.get("VALKEY_HOST", "localhost") +VALKEY_PORT = int(os.environ.get("VALKEY_PORT", "6379")) + + +def _valkey_available() -> bool: + """Check if a Valkey instance is reachable.""" + try: + from glide_sync import GlideClient, GlideClientConfiguration, NodeAddress + + config = GlideClientConfiguration( + addresses=[NodeAddress(host=VALKEY_HOST, port=VALKEY_PORT)] + ) + client = GlideClient.create(config) + client.ping() + return True + except Exception: + return False + + +pytestmark = [ + pytest.mark.integration, + pytest.mark.skipif( + not _valkey_available(), + reason=f"Valkey not available at {VALKEY_HOST}:{VALKEY_PORT}", + ), +] + + +class FakeEmbeddings: + """Deterministic fake embeddings for integration testing.""" + + dimension = 4 + + def embed_query(self, text: str) -> list: + """Return a simple hash-based vector.""" + h = hash(text) % 1000 + return [h / 1000.0, (h + 1) / 1000.0, (h + 2) / 1000.0, (h + 3) / 1000.0] + + def embed_documents(self, texts: list) -> list: + """Embed multiple documents.""" + return [self.embed_query(t) for t in texts] + + +@pytest.fixture +def valkey_store(monkeypatch): + """Create a ValkeyStore with a unique source_id for test isolation. + + Uses environment variables to override only test-specific settings (index + name, prefix, connection target). This avoids fully mocking settings, so + new settings never silently break the fixture. + """ + from unittest.mock import patch + + source_id = f"test-{uuid.uuid4().hex[:8]}" + index_name = f"test_idx_{uuid.uuid4().hex[:8]}" + + # Override settings via env vars before importing the store + monkeypatch.setenv("VALKEY_HOST", VALKEY_HOST) + monkeypatch.setenv("VALKEY_PORT", str(VALKEY_PORT)) + monkeypatch.setenv("VALKEY_PASSWORD", "") + monkeypatch.setenv("VALKEY_USE_TLS", "False") + monkeypatch.setenv("VALKEY_INDEX_NAME", index_name) + monkeypatch.setenv("VALKEY_PREFIX", f"test:{source_id}:") + + # Reload settings with test env vars + from application.core.settings import Settings + + test_settings = Settings() + + with patch( + "application.vectorstore.base.BaseVectorStore._get_embeddings", + return_value=FakeEmbeddings(), + ), patch( + "application.vectorstore.valkey.settings", + test_settings, + ): + from application.vectorstore.valkey import ValkeyStore + + store = ValkeyStore(source_id=source_id, embeddings_key="test") + + yield store + + # Cleanup: delete all test documents and drop the index + try: + store.delete_index() + from glide_sync import ft + + ft.dropindex(store._client, index_name) + except Exception: + pass + + +class TestValkeyIntegrationAddAndSearch: + def test_add_texts_and_search(self, valkey_store): + """Test basic add and search flow.""" + texts = [ + "Python is a programming language", + "Valkey is an in-memory data store", + "Machine learning uses neural networks", + ] + metadatas = [ + {"source": "python.txt"}, + {"source": "valkey.txt"}, + {"source": "ml.txt"}, + ] + + ids = valkey_store.add_texts(texts, metadatas) + + assert len(ids) == 3 + assert all(isinstance(id_, str) for id_ in ids) + + # Search should return results + results = valkey_store.search("programming language", k=2) + assert len(results) > 0 + assert all(hasattr(r, "page_content") for r in results) + assert all(hasattr(r, "metadata") for r in results) + + def test_add_chunk_and_get_chunks(self, valkey_store): + """Test single chunk add and retrieval.""" + chunk_id = valkey_store.add_chunk( + "Test document content", + metadata={"author": "test", "page": 1}, + ) + + assert isinstance(chunk_id, str) + + chunks = valkey_store.get_chunks() + assert len(chunks) == 1 + assert chunks[0]["text"] == "Test document content" + assert chunks[0]["metadata"]["author"] == "test" + + def test_delete_chunk(self, valkey_store): + """Test deleting a specific chunk.""" + chunk_id = valkey_store.add_chunk("to be deleted") + + result = valkey_store.delete_chunk(chunk_id) + assert result is True + + # Verify it's gone + chunks = valkey_store.get_chunks() + assert len(chunks) == 0 + + def test_delete_nonexistent_chunk(self, valkey_store): + """Test deleting a chunk that doesn't exist.""" + result = valkey_store.delete_chunk("nonexistent-id") + assert result is False + + def test_delete_index(self, valkey_store): + """Test deleting all documents for a source.""" + valkey_store.add_texts(["doc1", "doc2", "doc3"]) + + valkey_store.delete_index() + + chunks = valkey_store.get_chunks() + assert len(chunks) == 0 + + def test_save_local_is_noop(self, valkey_store): + """Test that save_local doesn't raise.""" + assert valkey_store.save_local() is None + + def test_empty_search(self, valkey_store): + """Test search with no documents returns empty.""" + results = valkey_store.search("anything", k=5) + assert results == []