@@ -59,18 +59,11 @@ def __init__(self,
5959 self .max_result_size_mb = max_result_size_mb
6060 self .max_result_size_bytes = max_result_size_mb * 1024 * 1024
6161
62- def _generate_field_name (self , query_type : str , ** params ) -> str :
63- """Generate SOLR field name for VFBquery results"""
64- if not params :
65- # Simple case - no parameters
66- return f"vfb_query_{ query_type } "
67- else :
68- # Complex case - include parameter hash
69- param_str = json .dumps (sorted (params .items ()), sort_keys = True )
70- param_hash = hashlib .md5 (param_str .encode ()).hexdigest ()[:8 ]
71- return f"vfb_query_{ query_type } _{ param_hash } "
62+ def _get_cache_field_name (self , query_type ):
63+ """Get the field name for a specific query type"""
64+ return f"vfb_query_{ query_type } _ss"
7265
73- def _create_cache_metadata (self , result : Any ) -> Dict [str , Any ]:
66+ def _create_cache_metadata (self , result : Any ) -> Optional [ Dict [str , Any ] ]:
7467 """Create metadata for cached result with 3-month expiration"""
7568 serialized_result = json .dumps (result , cls = NumpyEncoder )
7669 result_size = len (serialized_result .encode ('utf-8' ))
@@ -84,7 +77,7 @@ def _create_cache_metadata(self, result: Any) -> Dict[str, Any]:
8477 expires_at = now + timedelta (hours = self .ttl_hours ) # 2160 hours = 90 days = 3 months
8578
8679 return {
87- "result" : serialized_result ,
80+ "result" : result , # Store original object, not serialized string
8881 "cached_at" : now .isoformat (),
8982 "expires_at" : expires_at .isoformat (),
9083 "result_size" : result_size ,
@@ -105,7 +98,7 @@ def get_cached_result(self, query_type: str, term_id: str, **params) -> Optional
10598 Returns:
10699 Cached result or None if not found/expired
107100 """
108- field_name = self ._generate_field_name (query_type , ** params )
101+ field_name = self ._get_cache_field_name (query_type )
109102
110103 try :
111104 # Query existing vfb_json document for cached VFBquery result
@@ -155,8 +148,16 @@ def get_cached_result(self, query_type: str, term_id: str, **params) -> Optional
155148 # Increment hit count asynchronously
156149 self ._increment_field_hit_count (term_id , field_name , cached_data .get ("hit_count" , 0 ))
157150
158- # Deserialize and return result
159- result = json .loads (cached_data ["result" ])
151+ # Return cached result
152+ result = cached_data ["result" ]
153+ # If result is a string, parse it as JSON
154+ if isinstance (result , str ):
155+ try :
156+ result = json .loads (result )
157+ except json .JSONDecodeError :
158+ logger .warning (f"Failed to parse cached result for { term_id } " )
159+ return None
160+
160161 logger .info (f"Cache hit for { query_type } ({ term_id } )" )
161162 return result
162163
@@ -181,24 +182,58 @@ def cache_result(self, query_type: str, term_id: str, result: Any, **params) ->
181182 logger .debug ("Empty result, not caching" )
182183 return False
183184
184- field_name = self ._generate_field_name (query_type , ** params )
185+ field_name = self ._get_cache_field_name (query_type )
185186
186187 try :
187188 # Create cached metadata and result
188189 cached_data = self ._create_cache_metadata (result )
189190 if not cached_data :
190191 return False # Result too large or other issue
191192
192- # Update existing SOLR document with new field using atomic update
193- # This preserves all existing fields in the document
194- update_doc = {
195- "id" : term_id ,
196- field_name : {"set" : json .dumps (cached_data )}
197- }
193+ # First, get the existing document to ensure it exists
194+ existing_response = requests .get (f"{ self .cache_url } /select" , params = {
195+ "q" : f"id:{ term_id } " ,
196+ "wt" : "json" ,
197+ "fl" : "id"
198+ }, timeout = 5 )
199+
200+ if existing_response .status_code != 200 :
201+ logger .error (f"Cannot access document { term_id } for caching" )
202+ return False
203+
204+ existing_data = existing_response .json ()
205+ existing_docs = existing_data .get ("response" , {}).get ("docs" , [])
206+
207+ if not existing_docs :
208+ logger .warning (f"Document { term_id } does not exist - cannot add cache field" )
209+ return False
210+
211+ # Fetch complete existing document to preserve all fields
212+ complete_doc_response = requests .get (f"{ self .cache_url } /select" , params = {
213+ "q" : f"id:{ term_id } " ,
214+ "wt" : "json" ,
215+ "rows" : "1"
216+ }, timeout = 5 )
217+
218+ if complete_doc_response .status_code != 200 :
219+ logger .error (f"Cannot fetch complete document { term_id } " )
220+ return False
221+
222+ complete_data = complete_doc_response .json ()
223+ complete_docs = complete_data .get ("response" , {}).get ("docs" , [])
224+
225+ if not complete_docs :
226+ logger .error (f"Document { term_id } not found for complete fetch" )
227+ return False
198228
229+ # Get the existing document and add our cache field
230+ existing_doc = complete_docs [0 ].copy ()
231+ existing_doc [field_name ] = json .dumps (cached_data ) # Add cache field
232+
233+ # Replace entire document (like VFB indexer does)
199234 response = requests .post (
200- f"{ self .cache_url } /update/json/docs " ,
201- json = [ update_doc ] ,
235+ f"{ self .cache_url } /update" ,
236+ data = json . dumps ([ existing_doc ]) ,
202237 headers = {"Content-Type" : "application/json" },
203238 params = {"commit" : "true" }, # Immediate commit for availability
204239 timeout = 10
@@ -208,7 +243,7 @@ def cache_result(self, query_type: str, term_id: str, result: Any, **params) ->
208243 logger .info (f"Cached { field_name } for { term_id } , size: { cached_data ['result_size' ]/ 1024 :.1f} KB" )
209244 return True
210245 else :
211- logger .error (f"Failed to cache result: HTTP { response .status_code } " )
246+ logger .error (f"Failed to cache result: HTTP { response .status_code } - { response . text } " )
212247 return False
213248
214249 except Exception as e :
@@ -278,7 +313,7 @@ def get_cache_age(self, query_type: str, term_id: str, **params) -> Optional[Dic
278313 Returns:
279314 Dictionary with cache age info or None if not cached
280315 """
281- field_name = self ._generate_field_name (query_type , ** params )
316+ field_name = self ._get_cache_field_name (query_type )
282317
283318 try :
284319 response = requests .get (f"{ self .cache_url } /select" , params = {
@@ -334,7 +369,7 @@ def cleanup_expired_entries(self) -> int:
334369
335370 # Search for documents that have VFBquery cache fields
336371 response = requests .get (f"{ self .cache_url } /select" , params = {
337- "q" : "vfb_query_term_info :[* TO *] OR vfb_query_anatomy :[* TO *] OR vfb_query_neuron :[* TO *]" ,
372+ "q" : "vfb_query_term_info_str :[* TO *] OR vfb_query_anatomy_str :[* TO *] OR vfb_query_neuron_str :[* TO *]" ,
338373 "fl" : "id,vfb_query_*" , # Get ID and all VFBquery fields
339374 "rows" : "1000" , # Process in batches
340375 "wt" : "json"
@@ -407,7 +442,7 @@ def get_cache_stats(self) -> Dict[str, Any]:
407442 # Get documents with VFBquery cache fields
408443 # Use a specific field search since wildcards may not work in all SOLR versions
409444 response = requests .get (f"{ self .cache_url } /select" , params = {
410- "q" : "vfb_query_term_info :[* TO *] OR vfb_query_anatomy :[* TO *] OR vfb_query_neuron :[* TO *]" ,
445+ "q" : "vfb_query_term_info_str :[* TO *] OR vfb_query_anatomy_str :[* TO *] OR vfb_query_neuron_str :[* TO *]" ,
411446 "fl" : "id,vfb_query_*" , # Get ID and all VFBquery fields
412447 "rows" : "1000" , # Process in batches
413448 "wt" : "json"
@@ -432,8 +467,8 @@ def get_cache_stats(self) -> Dict[str, Any]:
432467 if field_name .startswith ("vfb_query_" ):
433468 total_fields += 1
434469
435- # Extract query type from field name
436- query_type = field_name .replace ("vfb_query_" , "" ).split ( "_" )[ 0 ]
470+ # Extract query type from field name (remove vfb_query_ prefix and _str suffix)
471+ query_type = field_name .replace ("vfb_query_" , "" ).replace ( "_str" , "" )
437472 field_stats [query_type ] = field_stats .get (query_type , 0 ) + 1
438473
439474 try :
0 commit comments