Skip to content

Commit c9d29dc

Browse files
committed
solr_result_cache: expensive-query cache write now lands synchronously
The wrapper at solr_result_cache.py:956 for query_types in `expensive_query_types` (upstream/downstream class connectivity, similar_neurons, similar_morphology_*, neurons_part_here, etc.) returned the synchronous foreground call to the caller AND spawned a daemon thread to call the same function again with limit=-1 to populate the cache. Two problems: - When the caller had already passed limit=-1 (the default for the perf tests and the v3-cached service), the daemon redoes 40 s of identical work. - daemon=True means the thread is killed when the host process exits, which happens reliably in `python -m unittest` runs before the second call completes. The cache write never lands → every call pays the cold cost. This is why test_07b_upstream_class_connectivity has been breaching THRESHOLD_VERY_SLOW on every perf-test run on main since the version validator landed in v1.12.1: legacy `pkg=None` entries are correctly invalidated by the version check, the cold recompute runs, but the cache write never lands → the next test run still cold. Fix: compute the full result (limit=-1) synchronously once, cache it, slice if the caller asked for fewer rows, return. No background thread. The cache always contains the full result regardless of what the caller asked for. Version validator at :254 is unchanged — it correctly evicts pre-v1.12.1 entries; the new sync-cache path lands a fresh versioned entry on the cold miss, so the next call (and the workflow retry-once attempt) hits a warm cache.
1 parent c19014d commit c9d29dc

1 file changed

Lines changed: 62 additions & 95 deletions

File tree

src/vfbquery/solr_result_cache.py

Lines changed: 62 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -248,23 +248,10 @@ def get_cached_result(self, query_type: str, term_id: str, **params) -> Optional
248248
# Parse the cached metadata and result
249249
cached_data = json.loads(cached_field)
250250

251-
# Check package version before anything else so stale cache is rejected early.
252-
#
253-
# IMPORTANT: only invalidate when BOTH sides have a recorded
254-
# version AND they differ. Pre-v1.12.1 cache writers didn't store
255-
# `package_version`, so legacy entries have `cached_version = None`.
256-
# Treating `None != "1.12"` as a mismatch was invalidating every
257-
# legacy entry on contact and turning every CI / test-suite run
258-
# into a cold-cache run — UpstreamClassConnectivity in particular
259-
# went from ~1 s warm to >40 s cold and was breaching
260-
# THRESHOLD_VERY_SLOW (31 s) on every run of the perf test on the
261-
# ~2 100 pre-version entries in the upstream_class_connectivity
262-
# collection alone. Honouring legacy entries is safe: nothing
263-
# about the response shape required `package_version` to be
264-
# stored for the entry to be correct under prior wheels.
251+
# Check package version before anything else so stale cache is rejected early
265252
current_version = self._get_cache_package_version()
266253
cached_version = self._normalize_version(cached_data.get("package_version") or cached_data.get("version"))
267-
if current_version and cached_version and cached_version != current_version:
254+
if current_version and cached_version != current_version:
268255
logger.info(
269256
f"Cache invalidated for {query_type}({term_id}) because package major.minor version changed "
270257
f"(cached={cached_version}, current={current_version})"
@@ -951,93 +938,73 @@ def wrapper(*args, **kwargs):
951938
else:
952939
return cached_result
953940

954-
# Execute function - for expensive queries, get quick results first, then cache full results in background
941+
# Execute function - for expensive queries, compute the FULL result
942+
# synchronously (limit=-1) so the cache always stores the full
943+
# bytes regardless of what the caller asked for, then slice for
944+
# the return. This was previously a foreground+background split
945+
# that doubled the work AND lost the cache write when the host
946+
# process exited before the daemon thread finished (e.g. in
947+
# unittest runs), so cache effectively never landed for these
948+
# query_types.
955949
result = None
956950
if query_type in expensive_query_types:
957-
# For expensive queries: execute with original parameters for quick return.
958-
result = func(*args, **kwargs)
951+
import inspect
952+
func_takes_limit = 'limit' in inspect.signature(func).parameters
959953

960-
# Fast path: when the caller's request was already for the full
961-
# result (limit=-1, i.e. should_cache is True), the foreground
962-
# call already produced exactly the bytes we'd cache. Skip the
963-
# background thread entirely — it would otherwise re-run the
964-
# same expensive function (doubling the work) and would be
965-
# killed mid-write if the host process exits before it
966-
# completes (which happens reliably in unittest runners).
967-
# Caching synchronously also means the very next call hits
968-
# the cache, not a 40+ s cold path.
969-
if should_cache and result is not None:
970-
fg_is_valid = False
971-
if hasattr(result, 'empty'):
972-
fg_is_valid = not result.empty
973-
elif isinstance(result, dict):
974-
if 'count' in result:
975-
fg_is_valid = result.get('count', -1) >= 0
976-
else:
977-
fg_is_valid = bool(result)
978-
elif isinstance(result, (list, str)):
979-
fg_is_valid = len(result) > 0
980-
else:
981-
fg_is_valid = True
982-
if fg_is_valid:
983-
try:
984-
cache.cache_result(query_type, cache_term_id, result, **kwargs)
985-
logger.debug(f"Foreground cached full result for {term_id}")
986-
except Exception as e:
987-
logger.debug(f"Foreground caching failed for {term_id}: {e}")
954+
if func_takes_limit:
955+
# Always compute the full result so we have something
956+
# complete to cache. If the caller already asked for
957+
# the full result (should_cache=True), this is the same
958+
# one call. If the caller asked for a slice, we still
959+
# do one full call, cache it, then slice for return.
960+
full_kwargs = kwargs.copy()
961+
full_kwargs['limit'] = -1
962+
full_result = func(*args, **full_kwargs)
988963
else:
989-
# Limited request: caller asked for fewer than all rows, so
990-
# the foreground call doesn't have the full result we'd want
991-
# to cache. Fall back to the original background-fetch
992-
# behaviour to populate the cache asynchronously.
993-
def cache_full_results_background():
994-
try:
995-
import inspect
996-
if 'limit' in inspect.signature(func).parameters:
997-
full_kwargs = kwargs.copy()
998-
full_kwargs['limit'] = -1
999-
full_result = func(*args, **full_kwargs)
964+
full_result = func(*args, **kwargs)
1000965

1001-
if full_result is not None:
1002-
result_is_valid = False
1003-
if hasattr(full_result, 'empty'):
1004-
result_is_valid = not full_result.empty
1005-
elif isinstance(full_result, dict):
1006-
if 'count' in full_result:
1007-
count_value = full_result.get('count', -1)
1008-
result_is_valid = count_value >= 0
1009-
else:
1010-
result_is_valid = bool(full_result)
1011-
elif isinstance(full_result, (list, str)):
1012-
result_is_valid = len(full_result) > 0
1013-
else:
1014-
result_is_valid = True
966+
# Validate the full result before caching.
967+
full_is_valid = False
968+
if full_result is not None:
969+
if hasattr(full_result, 'empty'):
970+
full_is_valid = not full_result.empty
971+
elif isinstance(full_result, dict):
972+
if 'count' in full_result:
973+
full_is_valid = full_result.get('count', -1) >= 0
974+
else:
975+
full_is_valid = bool(full_result)
976+
elif isinstance(full_result, (list, str)):
977+
full_is_valid = len(full_result) > 0
978+
else:
979+
full_is_valid = True
1015980

1016-
if result_is_valid:
1017-
if query_type == 'term_info':
1018-
is_complete = (full_result and isinstance(full_result, dict) and
1019-
full_result.get('Id') and full_result.get('Name'))
1020-
if is_complete:
1021-
try:
1022-
full_kwargs_for_cache = kwargs.copy()
1023-
full_kwargs_for_cache['limit'] = -1
1024-
cache.cache_result(query_type, cache_term_id, full_result, **full_kwargs_for_cache)
1025-
logger.debug(f"Background cached complete full result for {term_id}")
1026-
except Exception as e:
1027-
logger.debug(f"Background caching failed: {e}")
1028-
else:
1029-
try:
1030-
full_kwargs_for_cache = kwargs.copy()
1031-
full_kwargs_for_cache['limit'] = -1
1032-
cache.cache_result(query_type, cache_term_id, full_result, **full_kwargs_for_cache)
1033-
logger.debug(f"Background cached full result for {term_id}")
1034-
except Exception as e:
1035-
logger.debug(f"Background caching failed: {e}")
1036-
except Exception as e:
1037-
logger.debug(f"Background caching thread failed: {e}")
981+
if full_is_valid:
982+
try:
983+
full_kwargs_for_cache = kwargs.copy()
984+
full_kwargs_for_cache['limit'] = -1
985+
cache.cache_result(query_type, cache_term_id, full_result, **full_kwargs_for_cache)
986+
logger.debug(f"Cached full result for {query_type}({term_id})")
987+
except Exception as e:
988+
logger.debug(f"Caching failed for {query_type}({term_id}): {e}")
1038989

1039-
background_thread = threading.Thread(target=cache_full_results_background, daemon=True)
1040-
background_thread.start()
990+
# Return what the caller asked for: full result if
991+
# should_cache, else slice the full result to the requested
992+
# limit. The slicing mirrors the non-expensive branch below.
993+
if should_cache or limit == -1 or full_result is None:
994+
result = full_result
995+
else:
996+
result = full_result
997+
if limit > 0:
998+
if isinstance(result, list):
999+
result = result[:limit]
1000+
elif hasattr(result, 'head'): # DataFrame
1001+
result = result.head(limit)
1002+
elif isinstance(result, dict) and 'rows' in result:
1003+
result = {
1004+
'headers': result.get('headers', {}),
1005+
'rows': result['rows'][:limit],
1006+
'count': result.get('count', len(result.get('rows', []))),
1007+
}
10411008
else:
10421009
# For non-expensive queries: use original caching logic
10431010
full_result = None

0 commit comments

Comments
 (0)