1- """Valkey vector store implementation using valkey-glide-sync and valkey-search module."""
1+ """Valkey vector store implementation using valkey-glide-sync and valkey-search module.
2+
3+ NOTE: The try/except ImportError guard around the glide_sync import below is
4+ **required** by ``application/vectorstore/vector_creator.py`` which eagerly
5+ imports all vector store modules at module level. Without this guard, a missing
6+ ``valkey-glide-sync`` package would break VectorCreator for *all* backends.
7+ """
28
39import json
410import logging
511import struct
612import uuid
7- from typing import Any , Dict , List , Optional
13+ from typing import Any , Dict , Generator , List , Optional , Tuple
814
915_GLIDE_AVAILABLE = False
1016try :
5460# Page size for paginated scan in delete_index / get_chunks.
5561_SCAN_PAGE_SIZE = 10000
5662
63+ # Safety limit to prevent infinite pagination loops (supports ~10M documents).
64+ _MAX_SCAN_PAGES = 1000
65+
66+ # Maximum allowed k for vector search to prevent memory exhaustion.
67+ _MAX_SEARCH_K = 100
68+
5769
5870class ValkeyStore (BaseVectorStore ):
5971 """Vector store backed by Valkey with the valkey-search module.
@@ -62,6 +74,11 @@ class ValkeyStore(BaseVectorStore):
6274 Creates a search index with FT.CREATE for KNN vector similarity search.
6375
6476 Requires a Valkey server with the valkey-search module loaded.
77+
78+ Supports use as a context manager for deterministic connection cleanup::
79+
80+ with ValkeyStore(source_id="my-source") as store:
81+ store.search("query")
6582 """
6683
6784 def __init__ (
@@ -93,6 +110,19 @@ def __init__(
93110 self ._client = self ._create_client ()
94111 self ._ensure_index_exists ()
95112
113+ # --- Context manager support ---
114+
115+ def __enter__ (self ):
116+ """Enter the context manager."""
117+ return self
118+
119+ def __exit__ (self , exc_type , exc_val , exc_tb ):
120+ """Exit the context manager and close the connection."""
121+ self .close ()
122+ return False
123+
124+ # --- Connection lifecycle ---
125+
96126 def close (self ):
97127 """Close the underlying Valkey client connection.
98128
@@ -117,19 +147,20 @@ def _create_client(self) -> GlideClient:
117147 A connected GlideClient instance (synchronous).
118148 """
119149 addresses = [NodeAddress (host = settings .VALKEY_HOST , port = settings .VALKEY_PORT )]
150+ timeout = settings .VALKEY_REQUEST_TIMEOUT
120151
121152 if settings .VALKEY_PASSWORD is not None and settings .VALKEY_PASSWORD != "" :
122153 config = GlideClientConfiguration (
123154 addresses = addresses ,
124155 use_tls = settings .VALKEY_USE_TLS ,
125156 credentials = ServerCredentials (password = settings .VALKEY_PASSWORD ),
126- request_timeout = 5000 ,
157+ request_timeout = timeout ,
127158 )
128159 else :
129160 config = GlideClientConfiguration (
130161 addresses = addresses ,
131162 use_tls = settings .VALKEY_USE_TLS ,
132- request_timeout = 5000 ,
163+ request_timeout = timeout ,
133164 )
134165
135166 return GlideClient .create (config )
@@ -187,12 +218,11 @@ def _ensure_index_exists(self):
187218 try :
188219 ft .create (self ._client , self ._index_name , schema , options )
189220 logger .info (f"Created Valkey search index '{ self ._index_name } '" )
190- except Exception as e :
221+ except RequestError as e :
191222 error_msg = str (e ).lower ()
192223 if "already exists" in error_msg or "index already" in error_msg :
193224 logger .debug (f"Valkey index '{ self ._index_name } ' already exists" )
194225 else :
195- logger .error (f"Error creating Valkey index: { e } " )
196226 raise
197227
198228 @staticmethod
@@ -293,16 +323,63 @@ def _doc_key(self, doc_id: str) -> str:
293323 """
294324 return f"{ self ._prefix } { doc_id } "
295325
326+ # --- Shared pagination helper ---
327+
328+ def _paginated_search (
329+ self , query : str , return_fields : List [ReturnField ]
330+ ) -> Generator [Tuple [str , Dict [str , Any ]], None , None ]:
331+ """Yield (key_str, field_dict) tuples across all pages.
332+
333+ Handles pagination with a safety limit of _MAX_SCAN_PAGES iterations
334+ to prevent infinite loops from concurrent inserts.
335+
336+ Args:
337+ query: The ft.search query string.
338+ return_fields: Fields to return from each document.
339+
340+ Yields:
341+ Tuples of (key_string, field_dictionary) for each matched document.
342+ """
343+ offset = 0
344+ for _ in range (_MAX_SCAN_PAGES ):
345+ results = ft .search (
346+ self ._client ,
347+ self ._index_name ,
348+ query ,
349+ FtSearchOptions (
350+ limit = FtSearchLimit (offset , _SCAN_PAGE_SIZE ),
351+ return_fields = return_fields ,
352+ ),
353+ )
354+
355+ if not results or len (results ) < 2 :
356+ break
357+
358+ page_count = 0
359+ for entry in results [1 :]:
360+ if isinstance (entry , dict ):
361+ for key , fields in entry .items ():
362+ key_str = key .decode ("utf-8" ) if isinstance (key , bytes ) else str (key )
363+ yield key_str , fields
364+ page_count += 1
365+
366+ if page_count < _SCAN_PAGE_SIZE :
367+ break
368+ offset += _SCAN_PAGE_SIZE
369+
370+ # --- Public interface ---
371+
296372 def search (self , question : str , k : int = 2 , * args , ** kwargs ) -> List [Document ]:
297373 """Search for similar documents using vector similarity.
298374
299375 Args:
300376 question: The query text to search for.
301- k: Number of results to return.
377+ k: Number of results to return (capped at 100) .
302378
303379 Returns:
304380 A list of Document objects sorted by similarity.
305381 """
382+ k = max (1 , min (k , _MAX_SEARCH_K ))
306383 query_vector = self ._embedding .embed_query (question )
307384 vector_bytes = struct .pack (f"{ len (query_vector )} f" , * query_vector )
308385
@@ -450,66 +527,29 @@ def add_texts(
450527
451528 return doc_ids
452529
453- def _paginated_source_scan (self ) -> List [str ]:
454- """Scan all keys matching this source_id, handling pagination.
455-
456- Uses a minimal return field to avoid fetching full document content —
457- only the key names are needed for deletion.
458-
459- Returns:
460- List of key strings for all documents with this source_id.
461- """
462- all_keys : List [str ] = []
463- offset = 0
464- escaped_source = self ._escape_tag_value (self ._source_id )
465- query = f"@source_id:{{{ escaped_source } }}"
466-
467- while True :
468- results = ft .search (
469- self ._client ,
470- self ._index_name ,
471- query ,
472- FtSearchOptions (
473- limit = FtSearchLimit (offset , _SCAN_PAGE_SIZE ),
474- return_fields = [ReturnField ("source_id" )],
475- ),
476- )
477-
478- if not results or len (results ) < 2 :
479- break
480-
481- page_keys : List [str ] = []
482- for entry in results [1 :]:
483- if isinstance (entry , dict ):
484- for key in entry .keys ():
485- key_str = key .decode ("utf-8" ) if isinstance (key , bytes ) else str (key )
486- page_keys .append (key_str )
487-
488- all_keys .extend (page_keys )
489-
490- # If we got fewer results than page size, we've reached the end
491- if len (page_keys ) < _SCAN_PAGE_SIZE :
492- break
493- offset += _SCAN_PAGE_SIZE
494-
495- return all_keys
496-
497530 def delete_index (self , * args , ** kwargs ):
498531 """Delete all documents for this source_id.
499532
500533 Searches for all documents with matching source_id and deletes them
501534 in batches. Handles sources with more than 10,000 documents via pagination.
535+
536+ Raises:
537+ RequestError: If the Valkey server returns an error.
538+ ConnectionError: If the connection to Valkey is lost.
539+ TimeoutError: If the operation exceeds the request timeout.
502540 """
503- try :
504- keys = self . _paginated_source_scan ()
541+ escaped_source = self . _escape_tag_value ( self . _source_id )
542+ query = f"@source_id:{{ { escaped_source } }}"
505543
506- # Batch deletes for efficiency
507- for i in range ( 0 , len ( keys ), _DELETE_BATCH_SIZE ):
508- batch = keys [ i : i + _DELETE_BATCH_SIZE ]
509- self . _client . delete ( batch )
544+ keys = [
545+ key_str
546+ for key_str , _ in self . _paginated_search ( query , [ ReturnField ( "source_id" )])
547+ ]
510548
511- except (RequestError , GlideConnectionError , GlideTimeoutError ) as e :
512- logger .error (f"Error deleting index from Valkey: { e } " , exc_info = True )
549+ # Batch deletes for efficiency
550+ for i in range (0 , len (keys ), _DELETE_BATCH_SIZE ):
551+ batch = keys [i : i + _DELETE_BATCH_SIZE ]
552+ self ._client .delete (batch )
513553
514554 def save_local (self , * args , ** kwargs ):
515555 """No-op for Valkey — data is already persisted."""
@@ -526,47 +566,24 @@ def get_chunks(self) -> List[Dict[str, Any]]:
526566 query = f"@source_id:{{{ escaped_source } }}"
527567
528568 chunks : List [Dict [str , Any ]] = []
529- offset = 0
530-
531- while True :
532- results = ft .search (
533- self ._client ,
534- self ._index_name ,
535- query ,
536- FtSearchOptions (
537- limit = FtSearchLimit (offset , _SCAN_PAGE_SIZE ),
538- return_fields = [
539- ReturnField ("content" ),
540- ReturnField ("source_id" ),
541- ReturnField ("metadata" ),
542- ],
543- ),
544- )
545569
546- if not results or len (results ) < 2 :
547- break
548-
549- page_count = 0
550- for entry in results [1 :]:
551- if isinstance (entry , dict ):
552- for key , fields in entry .items ():
553- key_str = (
554- key .decode ("utf-8" ) if isinstance (key , bytes ) else str (key )
555- )
556- doc_id = key_str .replace (self ._prefix , "" , 1 )
557- field_dict = self ._decode_fields (fields )
558- chunks .append (
559- {
560- "doc_id" : doc_id ,
561- "text" : field_dict .get ("content" , "" ),
562- "metadata" : self ._parse_metadata (field_dict ),
563- }
564- )
565- page_count += 1
566-
567- if page_count < _SCAN_PAGE_SIZE :
568- break
569- offset += _SCAN_PAGE_SIZE
570+ for key_str , fields in self ._paginated_search (
571+ query ,
572+ [
573+ ReturnField ("content" ),
574+ ReturnField ("source_id" ),
575+ ReturnField ("metadata" ),
576+ ],
577+ ):
578+ doc_id = key_str .replace (self ._prefix , "" , 1 )
579+ field_dict = self ._decode_fields (fields )
580+ chunks .append (
581+ {
582+ "doc_id" : doc_id ,
583+ "text" : field_dict .get ("content" , "" ),
584+ "metadata" : self ._parse_metadata (field_dict ),
585+ }
586+ )
570587
571588 return chunks
572589
0 commit comments