@@ -299,15 +299,9 @@ async def _lookup_curie_metadata(
299299 """
300300 Handles the lookup process for the CURIE identifiers within our elasticsearch instance
301301
302- Ported from the redis instance, this performs one set of batch lookup calls via a singular
303- terms query with the entire set of curies. Given the default maximum amount of terms queries
304- specified by index.max_terms_count is 65536 (2**16), we should be well under given our
305- usual maximum query size is ~3000 CURIE identifiers
306-
307- We must also be careful though as the terms query is simply checking if any document
308- contains at least one of the terms. We expect a 1-1 matching for CURIE identifier to
309- document, so we can determine which terms were not found via set difference between the
310- returned document CURIE identifiers and the user provided set of CURIE identifiers
302+ Ported from the redis instance, this performs one batch lookup call through Elasticsearch
303+ msearch, with one size-1 search per input CURIE. We expect a 1-1 mapping for CURIE
304+ identifier to document; upstream data processing is responsible for resolving duplicates.
311305 """
312306 identifier_result_lookup , malformed_curies = await _lookup_equivalent_identifiers (biothings_metadata , curies )
313307
@@ -453,26 +447,54 @@ def unique_list(seq) -> list:
453447
454448async def _lookup_equivalent_identifiers (
455449 biothings_metadata : NodeNormalizationAPINamespace , curies : list [str ]
456- ) -> tuple [list , list ]:
450+ ) -> tuple [dict , set ]:
457451 if len (curies ) == 0 :
458- return [], []
452+ return {}, set ()
459453
460- curie_terms_query = {"bool" : {"filter" : [{"terms" : {"identifiers.i" : curies }}]}}
461454 source_fields = ["identifiers" , "type" , "ic" , "preferred_name" , "taxa" ]
462455 search_indices = biothings_metadata .elasticsearch .indices
463- term_search_result = await biothings_metadata .elasticsearch .async_client .search (
464- query = curie_terms_query , index = search_indices , size = len (curies ), source_includes = source_fields
456+
457+ searches = []
458+ for curie in curies :
459+ searches .append ({"index" : search_indices })
460+ searches .append (
461+ {
462+ "query" : {"bool" : {"filter" : [{"terms" : {"identifiers.i" : [curie ]}}]}},
463+ "size" : 1 ,
464+ "track_total_hits" : True ,
465+ "_source" : source_fields ,
466+ }
467+ )
468+
469+ msearch_result = await biothings_metadata .elasticsearch .async_client .msearch (
470+ searches = searches ,
465471 )
466472
467473 # Post processing to ensure we can identify invalid curies provided by the query
468- identifiers_set = set ()
469474 identifier_result_lookup = {}
470- for result in term_search_result .body ["hits" ]["hits" ]:
471- identifiers = result .get ("_source" , {}).get ("identifiers" , [])
472- for identifier in identifiers :
473- equivalent_identifier = identifier .get ("i" , None )
474- identifiers_set .add (equivalent_identifier )
475- identifier_result_lookup [equivalent_identifier ] = result
476-
477- malformed_curies = set (curies ) - identifiers_set
475+ malformed_curies = set ()
476+ for curie , response in zip (curies , msearch_result .body ["responses" ]):
477+ if "error" in response :
478+ raise RuntimeError (f"Elasticsearch msearch failed for CURIE { curie } : { response ['error' ]} " )
479+
480+ hits_metadata = response .get ("hits" , {})
481+ total_hits = hits_metadata .get ("total" , 0 )
482+ if isinstance (total_hits , dict ):
483+ total_hits = total_hits .get ("value" , 0 )
484+
485+ hits = hits_metadata .get ("hits" , [])
486+ if len (hits ) == 0 :
487+ malformed_curies .add (curie )
488+ continue
489+
490+ if total_hits > 1 :
491+ logger .warning (
492+ "Expected 1 Elasticsearch document for CURIE %s but found %s. Returning first hit %s." ,
493+ curie ,
494+ total_hits ,
495+ hits [0 ].get ("_id" ),
496+ )
497+
498+ identifier_result_lookup [curie ] = hits [0 ]
499+
478500 return identifier_result_lookup , malformed_curies
0 commit comments