77import asyncio
88import json
99import struct
10- from typing import Any , ClassVar , Literal
11-
1210from dataclasses import replace
11+ from typing import Any , ClassVar , Literal
1312
1413from glide import (
1514 BackoffStrategy ,
3736 VectorType ,
3837)
3938from glide_shared .commands .server_modules .ft_options .ft_search_options import (
40- FtSearchOptions ,
4139 FtSearchLimit ,
40+ FtSearchOptions ,
4241 ReturnField ,
4342)
4443from glide_sync import (
@@ -189,7 +188,7 @@ def __init__(
189188 self ._distance_metric = self ._parse_metric (distance_metric )
190189 self ._embedding_dim = embedding_dim
191190 self ._dummy_vector = [ValkeyDocumentStore ._DUMMY_VALUE ] * self ._embedding_dim
192-
191+
193192 # Validate and normalize metadata fields
194193 self ._metadata_fields = self ._validate_and_normalize_metadata_fields (metadata_fields or {})
195194
@@ -332,16 +331,16 @@ def _prepare_index_fields(self) -> list[Field]:
332331 ),
333332 ),
334333 ]
335-
334+
336335 # _metadata_fields keys already have meta_ prefix
337336 for field_name , field_type in self ._metadata_fields .items ():
338337 field_path = f"$.{ field_name } "
339-
338+
340339 if field_type == "tag" :
341340 fields .append (TagField (field_path , alias = field_name ))
342341 elif field_type == "numeric" :
343342 fields .append (NumericField (field_path , alias = field_name ))
344-
343+
345344 return fields
346345
347346 def _create_index (self ) -> None :
@@ -391,7 +390,7 @@ def to_dict(self) -> dict[str, Any]:
391390 # Remove meta_ prefix: meta_category -> category
392391 clean_name = field_name [5 :] if field_name .startswith ("meta_" ) else field_name
393392 metadata_fields_for_ser [clean_name ] = str if field_type == "tag" else int
394-
393+
395394 return default_to_dict (
396395 self ,
397396 nodes_list = self ._nodes_list ,
@@ -579,7 +578,7 @@ async def filter_documents_async(self, filters: dict[str, Any] | None = None) ->
579578 # furthermore, we are querying with a dummy vector, so the scores are meaningless
580579 docs_no_score = []
581580 for doc in documents :
582- docs_no_score .append (replace (doc , score = None ))
581+ docs_no_score .append (replace (doc , score = None ))
583582
584583 return docs_no_score
585584
@@ -737,7 +736,11 @@ def delete_documents(self, document_ids: list[str]) -> None:
737736 try :
738737 result = client .delete (keys )
739738 if result < len (document_ids ):
740- logger .warning ("Some documents not found. Expected {len_document_ids}, deleted {result}" , len_document_ids = len (document_ids ), result = result )
739+ logger .warning (
740+ "Some documents not found. Expected {len_document_ids}, deleted {result}" ,
741+ len_document_ids = len (document_ids ),
742+ result = result ,
743+ )
741744 except Exception as e :
742745 msg = f"Failed to delete documents: { e } "
743746 raise ValkeyDocumentStoreError (msg ) from e
@@ -769,7 +772,11 @@ async def delete_documents_async(self, document_ids: list[str]) -> None:
769772 try :
770773 result = await client .delete (keys )
771774 if result < len (document_ids ):
772- logger .warning ("Some documents not found. Expected {len_document_ids}, deleted {result}" , len_document_ids = len (document_ids ), result = result )
775+ logger .warning (
776+ "Some documents not found. Expected {len_document_ids}, deleted {result}" ,
777+ len_document_ids = len (document_ids ),
778+ result = result ,
779+ )
773780 except Exception as e :
774781 msg = f"Failed to delete documents: { e } "
775782 raise ValkeyDocumentStoreError (msg ) from e
@@ -995,19 +1002,30 @@ async def _embedding_retrieval_async(
9951002 msg = f"Failed to retrieve documents by embedding: { e } "
9961003 raise ValkeyDocumentStoreError (msg ) from e
9971004
998- def _prepare_document_dict (self , doc : Document ) -> dict :
1005+ def _prepare_document_dict (self , doc : Document ) -> dict [ str , Any ] :
9991006 """Prepare document dictionary for storage."""
10001007 payload = doc .to_dict (flatten = False )
10011008 payload .pop ("embedding" , None )
10021009
10031010 meta = doc .meta or {}
1004- doc_dict = {"id" : doc .id , "payload" : payload }
1005-
1011+ doc_dict : dict [ str , Any ] = {"id" : doc .id , "payload" : payload }
1012+
10061013 # _metadata_fields keys already have meta_ prefix
10071014 for field_name_with_prefix , field_type in self ._metadata_fields .items ():
10081015 # Extract original field name: meta_category -> category
10091016 field_name = field_name_with_prefix [5 :] # Remove "meta_"
1010- doc_dict [field_name_with_prefix ] = meta .get (field_name , None )
1017+ value = meta .get (field_name )
1018+
1019+ # Validate value type matches field type
1020+ if value is not None :
1021+ if field_type == "tag" and not isinstance (value , str ):
1022+ msg = f"Field '{ field_name } ' expects string value but got { type (value ).__name__ } "
1023+ raise ValueError (msg )
1024+ elif field_type == "numeric" and not isinstance (value , (int , float )):
1025+ msg = f"Field '{ field_name } ' expects numeric value but got { type (value ).__name__ } "
1026+ raise ValueError (msg )
1027+
1028+ doc_dict [field_name_with_prefix ] = value
10111029
10121030 doc_dict ["vector" ] = doc .embedding if doc .embedding else [self ._DUMMY_VALUE ] * self ._embedding_dim
10131031 return doc_dict
@@ -1028,7 +1046,7 @@ def _parse_documents_from_ft(raw: Any, *, with_embedding: bool) -> list[Document
10281046 # This occurs when no documents match the query filters or the index is empty
10291047 if not raw or raw [0 ] == 0 :
10301048 return documents
1031-
1049+
10321050 for doc_info in raw [1 ].values ():
10331051 # Get payload from doc_info
10341052 payload_data = doc_info .get (b"payload" )
@@ -1065,8 +1083,12 @@ def _parse_documents_from_ft(raw: Any, *, with_embedding: bool) -> list[Document
10651083
10661084 @staticmethod
10671085 def _build_search_query_and_options (
1068- embedding : list [float ], filters : dict [str , Any ] | None , limit : int , * , with_embedding : bool ,
1069- supported_fields : dict [str , str ]
1086+ embedding : list [float ],
1087+ filters : dict [str , Any ] | None ,
1088+ limit : int ,
1089+ * ,
1090+ with_embedding : bool ,
1091+ supported_fields : dict [str , str ],
10701092 ) -> tuple [str , FtSearchOptions ]:
10711093 if filters :
10721094 _validate_filters (filters )
@@ -1084,9 +1106,9 @@ def _build_search_query_and_options(
10841106
10851107 query = f"{ filter_query } =>[KNN { limit } @vector $query_vector]"
10861108 query_options = FtSearchOptions (
1087- params = {"query_vector" : ValkeyDocumentStore ._to_float32_bytes (embedding )},
1109+ params = {"query_vector" : ValkeyDocumentStore ._to_float32_bytes (embedding )},
10881110 return_fields = return_fields ,
1089- limit = FtSearchLimit (offset = 0 , count = limit )
1111+ limit = FtSearchLimit (offset = 0 , count = limit ),
10901112 )
10911113 return query , query_options
10921114
@@ -1105,7 +1127,7 @@ def _validate_policy(policy: DuplicatePolicy) -> None:
11051127 logger .warning (
11061128 "ValkeyDocumentStore only supports `DuplicatePolicy.OVERWRITE`"
11071129 "but got {policy}. Overwriting duplicates is enabled by default." ,
1108- policy = policy
1130+ policy = policy ,
11091131 )
11101132
11111133 @staticmethod
@@ -1140,27 +1162,28 @@ def _parse_metric(metric: str) -> DistanceMetricType:
11401162 def _validate_and_normalize_metadata_fields (metadata_fields : dict [str , type [str ] | type [int ]]) -> dict [str , str ]:
11411163 """
11421164 Validate and normalize metadata field definitions.
1143-
1165+
11441166 :param metadata_fields: User-provided metadata field definitions mapping field names to Python types.
11451167 :return: Normalized metadata fields with meta_ prefix mapping to Valkey field types ("tag" or "numeric").
11461168 :raises ValueError: If field definitions are invalid.
11471169 """
11481170 if not isinstance (metadata_fields , dict ):
11491171 msg = "metadata_fields must be a dictionary"
11501172 raise ValueError (msg )
1151-
1152- TYPE_MAPPING = {str : "tag" , int : "numeric" }
1153-
1173+
1174+ type_mapping = {str : "tag" , int : "numeric" }
1175+
11541176 normalized = {}
11551177 for field_name , field_type in metadata_fields .items ():
11561178 if not isinstance (field_name , str ) or not field_name :
11571179 msg = f"Field name must be a non-empty string, got { field_name !r} "
11581180 raise ValueError (msg )
1159-
1160- if field_type not in TYPE_MAPPING :
1161- msg = f"Unsupported field type { field_type !r} for field '{ field_name } '. Supported: { list (TYPE_MAPPING .keys ())} "
1181+
1182+ if field_type not in type_mapping :
1183+ supported_types = list (type_mapping .keys ())
1184+ msg = f"Unsupported field type { field_type !r} for field '{ field_name } '. Supported: { supported_types } "
11621185 raise ValueError (msg )
1163-
1164- normalized [f"meta_{ field_name } " ] = TYPE_MAPPING [field_type ]
1165-
1186+
1187+ normalized [f"meta_{ field_name } " ] = type_mapping [field_type ]
1188+
11661189 return normalized
0 commit comments