Skip to content

Commit c9976d7

Browse files
committed
Address review feedback: harden valkey store - Document eager import guard in vector_creator.py (fragility concern) - Catch RequestError instead of bare Exception in _ensure_index_exists - Re-raise exceptions in delete_index so callers can handle failures - Extract shared _paginated_search generator with max-iterations guard - Add __enter__/__exit__ for deterministic connection cleanup - Cap search k to [1, 100] to prevent memory exhaustion - Make request_timeout configurable via VALKEY_REQUEST_TIMEOUT setting - Refactor integration test fixture to use real Settings instance - Update tests to match new behavior, add context manager + k bounds tests All 63 tests pass (56 unit + 7 integration).
Signed-off-by: Daria Korenieva <daric2612@gmail.com>
1 parent 21bd412 commit c9976d7

5 files changed

Lines changed: 181 additions & 124 deletions

File tree

application/core/settings.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ class Settings(BaseSettings):
164164
VALKEY_USE_TLS: bool = False
165165
VALKEY_INDEX_NAME: str = "docsgpt"
166166
VALKEY_PREFIX: str = "doc:"
167+
VALKEY_REQUEST_TIMEOUT: int = 5000 # milliseconds
167168
VALKEY_DISTANCE_METRIC: str = "cosine" # "cosine", "l2", or "ip"
168169
VALKEY_VECTOR_TYPE: str = "float32" # "float32" (only option in valkey-glide-sync 2.x)
169170
VALKEY_VECTOR_ALGORITHM: str = "hnsw" # "hnsw" or "flat"

application/vectorstore/valkey.py

Lines changed: 116 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
1-
"""Valkey vector store implementation using valkey-glide-sync and valkey-search module."""
1+
"""Valkey vector store implementation using valkey-glide-sync and valkey-search module.
2+
3+
NOTE: The try/except ImportError guard around the glide_sync import below is
4+
**required** by ``application/vectorstore/vector_creator.py`` which eagerly
5+
imports all vector store modules at module level. Without this guard, a missing
6+
``valkey-glide-sync`` package would break VectorCreator for *all* backends.
7+
"""
28

39
import json
410
import logging
511
import struct
612
import uuid
7-
from typing import Any, Dict, List, Optional
13+
from typing import Any, Dict, Generator, List, Optional, Tuple
814

915
_GLIDE_AVAILABLE = False
1016
try:
@@ -54,6 +60,12 @@
5460
# Page size for paginated scan in delete_index / get_chunks.
5561
_SCAN_PAGE_SIZE = 10000
5662

63+
# Safety limit to prevent infinite pagination loops (supports ~10M documents).
64+
_MAX_SCAN_PAGES = 1000
65+
66+
# Maximum allowed k for vector search to prevent memory exhaustion.
67+
_MAX_SEARCH_K = 100
68+
5769

5870
class ValkeyStore(BaseVectorStore):
5971
"""Vector store backed by Valkey with the valkey-search module.
@@ -62,6 +74,11 @@ class ValkeyStore(BaseVectorStore):
6274
Creates a search index with FT.CREATE for KNN vector similarity search.
6375
6476
Requires a Valkey server with the valkey-search module loaded.
77+
78+
Supports use as a context manager for deterministic connection cleanup::
79+
80+
with ValkeyStore(source_id="my-source") as store:
81+
store.search("query")
6582
"""
6683

6784
def __init__(
@@ -93,6 +110,19 @@ def __init__(
93110
self._client = self._create_client()
94111
self._ensure_index_exists()
95112

113+
# --- Context manager support ---
114+
115+
def __enter__(self):
116+
"""Enter the context manager."""
117+
return self
118+
119+
def __exit__(self, exc_type, exc_val, exc_tb):
120+
"""Exit the context manager and close the connection."""
121+
self.close()
122+
return False
123+
124+
# --- Connection lifecycle ---
125+
96126
def close(self):
97127
"""Close the underlying Valkey client connection.
98128
@@ -117,19 +147,20 @@ def _create_client(self) -> GlideClient:
117147
A connected GlideClient instance (synchronous).
118148
"""
119149
addresses = [NodeAddress(host=settings.VALKEY_HOST, port=settings.VALKEY_PORT)]
150+
timeout = settings.VALKEY_REQUEST_TIMEOUT
120151

121152
if settings.VALKEY_PASSWORD is not None and settings.VALKEY_PASSWORD != "":
122153
config = GlideClientConfiguration(
123154
addresses=addresses,
124155
use_tls=settings.VALKEY_USE_TLS,
125156
credentials=ServerCredentials(password=settings.VALKEY_PASSWORD),
126-
request_timeout=5000,
157+
request_timeout=timeout,
127158
)
128159
else:
129160
config = GlideClientConfiguration(
130161
addresses=addresses,
131162
use_tls=settings.VALKEY_USE_TLS,
132-
request_timeout=5000,
163+
request_timeout=timeout,
133164
)
134165

135166
return GlideClient.create(config)
@@ -187,12 +218,11 @@ def _ensure_index_exists(self):
187218
try:
188219
ft.create(self._client, self._index_name, schema, options)
189220
logger.info(f"Created Valkey search index '{self._index_name}'")
190-
except Exception as e:
221+
except RequestError as e:
191222
error_msg = str(e).lower()
192223
if "already exists" in error_msg or "index already" in error_msg:
193224
logger.debug(f"Valkey index '{self._index_name}' already exists")
194225
else:
195-
logger.error(f"Error creating Valkey index: {e}")
196226
raise
197227

198228
@staticmethod
@@ -293,16 +323,63 @@ def _doc_key(self, doc_id: str) -> str:
293323
"""
294324
return f"{self._prefix}{doc_id}"
295325

326+
# --- Shared pagination helper ---
327+
328+
def _paginated_search(
329+
self, query: str, return_fields: List[ReturnField]
330+
) -> Generator[Tuple[str, Dict[str, Any]], None, None]:
331+
"""Yield (key_str, field_dict) tuples across all pages.
332+
333+
Handles pagination with a safety limit of _MAX_SCAN_PAGES iterations
334+
to prevent infinite loops from concurrent inserts.
335+
336+
Args:
337+
query: The ft.search query string.
338+
return_fields: Fields to return from each document.
339+
340+
Yields:
341+
Tuples of (key_string, field_dictionary) for each matched document.
342+
"""
343+
offset = 0
344+
for _ in range(_MAX_SCAN_PAGES):
345+
results = ft.search(
346+
self._client,
347+
self._index_name,
348+
query,
349+
FtSearchOptions(
350+
limit=FtSearchLimit(offset, _SCAN_PAGE_SIZE),
351+
return_fields=return_fields,
352+
),
353+
)
354+
355+
if not results or len(results) < 2:
356+
break
357+
358+
page_count = 0
359+
for entry in results[1:]:
360+
if isinstance(entry, dict):
361+
for key, fields in entry.items():
362+
key_str = key.decode("utf-8") if isinstance(key, bytes) else str(key)
363+
yield key_str, fields
364+
page_count += 1
365+
366+
if page_count < _SCAN_PAGE_SIZE:
367+
break
368+
offset += _SCAN_PAGE_SIZE
369+
370+
# --- Public interface ---
371+
296372
def search(self, question: str, k: int = 2, *args, **kwargs) -> List[Document]:
297373
"""Search for similar documents using vector similarity.
298374
299375
Args:
300376
question: The query text to search for.
301-
k: Number of results to return.
377+
k: Number of results to return (capped at 100).
302378
303379
Returns:
304380
A list of Document objects sorted by similarity.
305381
"""
382+
k = max(1, min(k, _MAX_SEARCH_K))
306383
query_vector = self._embedding.embed_query(question)
307384
vector_bytes = struct.pack(f"{len(query_vector)}f", *query_vector)
308385

@@ -450,66 +527,29 @@ def add_texts(
450527

451528
return doc_ids
452529

453-
def _paginated_source_scan(self) -> List[str]:
454-
"""Scan all keys matching this source_id, handling pagination.
455-
456-
Uses a minimal return field to avoid fetching full document content —
457-
only the key names are needed for deletion.
458-
459-
Returns:
460-
List of key strings for all documents with this source_id.
461-
"""
462-
all_keys: List[str] = []
463-
offset = 0
464-
escaped_source = self._escape_tag_value(self._source_id)
465-
query = f"@source_id:{{{escaped_source}}}"
466-
467-
while True:
468-
results = ft.search(
469-
self._client,
470-
self._index_name,
471-
query,
472-
FtSearchOptions(
473-
limit=FtSearchLimit(offset, _SCAN_PAGE_SIZE),
474-
return_fields=[ReturnField("source_id")],
475-
),
476-
)
477-
478-
if not results or len(results) < 2:
479-
break
480-
481-
page_keys: List[str] = []
482-
for entry in results[1:]:
483-
if isinstance(entry, dict):
484-
for key in entry.keys():
485-
key_str = key.decode("utf-8") if isinstance(key, bytes) else str(key)
486-
page_keys.append(key_str)
487-
488-
all_keys.extend(page_keys)
489-
490-
# If we got fewer results than page size, we've reached the end
491-
if len(page_keys) < _SCAN_PAGE_SIZE:
492-
break
493-
offset += _SCAN_PAGE_SIZE
494-
495-
return all_keys
496-
497530
def delete_index(self, *args, **kwargs):
498531
"""Delete all documents for this source_id.
499532
500533
Searches for all documents with matching source_id and deletes them
501534
in batches. Handles sources with more than 10,000 documents via pagination.
535+
536+
Raises:
537+
RequestError: If the Valkey server returns an error.
538+
ConnectionError: If the connection to Valkey is lost.
539+
TimeoutError: If the operation exceeds the request timeout.
502540
"""
503-
try:
504-
keys = self._paginated_source_scan()
541+
escaped_source = self._escape_tag_value(self._source_id)
542+
query = f"@source_id:{{{escaped_source}}}"
505543

506-
# Batch deletes for efficiency
507-
for i in range(0, len(keys), _DELETE_BATCH_SIZE):
508-
batch = keys[i : i + _DELETE_BATCH_SIZE]
509-
self._client.delete(batch)
544+
keys = [
545+
key_str
546+
for key_str, _ in self._paginated_search(query, [ReturnField("source_id")])
547+
]
510548

511-
except (RequestError, GlideConnectionError, GlideTimeoutError) as e:
512-
logger.error(f"Error deleting index from Valkey: {e}", exc_info=True)
549+
# Batch deletes for efficiency
550+
for i in range(0, len(keys), _DELETE_BATCH_SIZE):
551+
batch = keys[i : i + _DELETE_BATCH_SIZE]
552+
self._client.delete(batch)
513553

514554
def save_local(self, *args, **kwargs):
515555
"""No-op for Valkey — data is already persisted."""
@@ -526,47 +566,24 @@ def get_chunks(self) -> List[Dict[str, Any]]:
526566
query = f"@source_id:{{{escaped_source}}}"
527567

528568
chunks: List[Dict[str, Any]] = []
529-
offset = 0
530-
531-
while True:
532-
results = ft.search(
533-
self._client,
534-
self._index_name,
535-
query,
536-
FtSearchOptions(
537-
limit=FtSearchLimit(offset, _SCAN_PAGE_SIZE),
538-
return_fields=[
539-
ReturnField("content"),
540-
ReturnField("source_id"),
541-
ReturnField("metadata"),
542-
],
543-
),
544-
)
545569

546-
if not results or len(results) < 2:
547-
break
548-
549-
page_count = 0
550-
for entry in results[1:]:
551-
if isinstance(entry, dict):
552-
for key, fields in entry.items():
553-
key_str = (
554-
key.decode("utf-8") if isinstance(key, bytes) else str(key)
555-
)
556-
doc_id = key_str.replace(self._prefix, "", 1)
557-
field_dict = self._decode_fields(fields)
558-
chunks.append(
559-
{
560-
"doc_id": doc_id,
561-
"text": field_dict.get("content", ""),
562-
"metadata": self._parse_metadata(field_dict),
563-
}
564-
)
565-
page_count += 1
566-
567-
if page_count < _SCAN_PAGE_SIZE:
568-
break
569-
offset += _SCAN_PAGE_SIZE
570+
for key_str, fields in self._paginated_search(
571+
query,
572+
[
573+
ReturnField("content"),
574+
ReturnField("source_id"),
575+
ReturnField("metadata"),
576+
],
577+
):
578+
doc_id = key_str.replace(self._prefix, "", 1)
579+
field_dict = self._decode_fields(fields)
580+
chunks.append(
581+
{
582+
"doc_id": doc_id,
583+
"text": field_dict.get("content", ""),
584+
"metadata": self._parse_metadata(field_dict),
585+
}
586+
)
570587

571588
return chunks
572589

application/vectorstore/vector_creator.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@
44
from application.vectorstore.mongodb import MongoDBVectorStore
55
from application.vectorstore.qdrant import QdrantStore
66
from application.vectorstore.pgvector import PGVectorStore
7+
8+
# ValkeyStore uses a try/except ImportError guard around its glide_sync
9+
# dependency so that this eager import does NOT break VectorCreator when
10+
# valkey-glide-sync is not installed. Do not remove that guard.
711
from application.vectorstore.valkey import ValkeyStore
812

913

0 commit comments

Comments
 (0)