Skip to content

Commit f06bdc4

Browse files
committed
Address node normalization review feedback
1 parent e106501 commit f06bdc4

2 files changed

Lines changed: 182 additions & 20 deletions

File tree

src/nodenorm/handlers/normalized_nodes.py

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -370,34 +370,17 @@ async def _lookup_curie_metadata(
370370
for conflation_curie in conflation_identifiers:
371371
if conflation_curie in malformed_conflation_curies:
372372
skipped_conflation_curies.append(conflation_curie)
373-
logger.warning(
374-
"Unable to resolve conflation CURIE %s while normalizing %s; skipping it.",
375-
conflation_curie,
376-
input_curie,
377-
)
378373
continue
379374

380375
conflation_result = conflation_result_lookup.get(conflation_curie)
381376
if not conflation_result:
382377
skipped_conflation_curies.append(conflation_curie)
383-
logger.warning(
384-
"No lookup result found for conflation CURIE %s while normalizing %s; skipping it.",
385-
conflation_curie,
386-
input_curie,
387-
)
388378
continue
389379

390380
conflation_biolink_type = conflation_result.get("_source", {}).get("type", [])
391381
conflation_identifier_lookup = conflation_result.get("_source", {}).get("identifiers", [])
392382
if not conflation_identifier_lookup:
393383
skipped_conflation_curies.append(conflation_curie)
394-
logger.warning(
395-
"Conflation CURIE %s resolved to document %s with no identifiers while normalizing %s; "
396-
"skipping it.",
397-
conflation_curie,
398-
conflation_result.get("_id"),
399-
input_curie,
400-
)
401384
continue
402385

403386
for conflation_entry in conflation_identifier_lookup:
@@ -418,7 +401,7 @@ async def _lookup_curie_metadata(
418401
replacement_types = unique_list(replacement_types)
419402

420403
if not replacement_identifiers:
421-
logger.error(
404+
logger.warning(
422405
"Unable to resolve any conflation CURIEs for %s; falling back to base normalized node. "
423406
"Skipped conflation CURIEs: %s",
424407
input_curie,
@@ -443,6 +426,13 @@ async def _lookup_curie_metadata(
443426
types=replacement_types,
444427
taxa=taxa,
445428
)
429+
if skipped_conflation_curies:
430+
logger.warning(
431+
"Skipped %s conflation CURIEs while normalizing %s: %s",
432+
len(skipped_conflation_curies),
433+
input_curie,
434+
skipped_conflation_curies[:10],
435+
)
446436
nodes.append(node)
447437
else:
448438
node = NormalizedNode(
@@ -502,7 +492,7 @@ async def _lookup_equivalent_identifiers(
502492

503493
searches = []
504494
for curie in curies:
505-
searches.append({"index": search_indices})
495+
searches.append({})
506496
searches.append(
507497
{
508498
"query": {"bool": {"filter": [{"terms": {"identifiers.i": [curie]}}]}},
@@ -513,13 +503,21 @@ async def _lookup_equivalent_identifiers(
513503
)
514504

515505
msearch_result = await biothings_metadata.elasticsearch.async_client.msearch(
506+
index=search_indices,
516507
searches=searches,
517508
)
518509

519510
# Post processing to ensure we can identify invalid curies provided by the query
520511
identifier_result_lookup = {}
521512
malformed_curies = set()
522-
for curie, response in zip(curies, msearch_result.body["responses"]):
513+
responses = msearch_result.body["responses"]
514+
if len(responses) != len(curies):
515+
raise RuntimeError(
516+
f"Elasticsearch msearch returned {len(responses)} responses for {len(curies)} CURIEs; "
517+
"unable to safely align lookup results."
518+
)
519+
520+
for curie, response in zip(curies, responses):
523521
if "error" in response:
524522
raise RuntimeError(f"Elasticsearch msearch failed for CURIE {curie}: {response['error']}")
525523

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
import importlib.util
2+
import logging
3+
import sys
4+
from pathlib import Path
5+
from types import ModuleType, SimpleNamespace
6+
7+
import pytest
8+
9+
10+
class FakeBiolinkToolkit:
11+
def get_ancestors(self, biolink_type):
12+
return [biolink_type]
13+
14+
def get_element(self, ancestor):
15+
return {"class_uri": ancestor}
16+
17+
18+
def load_normalized_nodes_module():
19+
module_name = "_normalized_nodes_under_test"
20+
module_path = Path(__file__).parents[1] / "src" / "nodenorm" / "handlers" / "normalized_nodes.py"
21+
fake_biolink = ModuleType("nodenorm.biolink")
22+
fake_biolink.BIOLINK_MODEL_VERSION = "test"
23+
fake_biolink.toolkit = FakeBiolinkToolkit()
24+
25+
original_biolink = sys.modules.get("nodenorm.biolink")
26+
sys.modules["nodenorm.biolink"] = fake_biolink
27+
try:
28+
spec = importlib.util.spec_from_file_location(module_name, module_path)
29+
module = importlib.util.module_from_spec(spec)
30+
sys.modules[module_name] = module
31+
spec.loader.exec_module(module)
32+
finally:
33+
if original_biolink is None:
34+
sys.modules.pop("nodenorm.biolink", None)
35+
else:
36+
sys.modules["nodenorm.biolink"] = original_biolink
37+
38+
return module
39+
40+
41+
normalized_nodes = load_normalized_nodes_module()
42+
_lookup_curie_metadata = normalized_nodes._lookup_curie_metadata
43+
_lookup_equivalent_identifiers = normalized_nodes._lookup_equivalent_identifiers
44+
45+
46+
class FakeAsyncElasticsearch:
47+
def __init__(self, response_batches):
48+
self.response_batches = list(response_batches)
49+
self.calls = []
50+
51+
async def msearch(self, **kwargs):
52+
self.calls.append(kwargs)
53+
return SimpleNamespace(body={"responses": self.response_batches.pop(0)})
54+
55+
56+
def fake_namespace(response_batches, indices=None):
57+
return SimpleNamespace(
58+
elasticsearch=SimpleNamespace(
59+
async_client=FakeAsyncElasticsearch(response_batches),
60+
indices=indices or ["nodenorm"],
61+
)
62+
)
63+
64+
65+
def hit_response(curie, source=None, total=1):
66+
if source is None:
67+
source = {
68+
"identifiers": [{"i": curie, "l": curie}],
69+
"type": "biolink:ChemicalEntity",
70+
"ic": 1.0,
71+
"preferred_name": curie,
72+
"taxa": [],
73+
}
74+
return {"hits": {"total": {"value": total}, "hits": [{"_id": curie, "_source": source}]}}
75+
76+
77+
def no_hit_response():
78+
return {"hits": {"total": {"value": 0}, "hits": []}}
79+
80+
81+
@pytest.mark.asyncio
82+
async def test_lookup_equivalent_identifiers_uses_shared_msearch_index():
83+
namespace = fake_namespace([[hit_response("CHEBI:17310"), no_hit_response()]])
84+
85+
lookup, malformed = await _lookup_equivalent_identifiers(namespace, ["CHEBI:17310", "MISSING:1"])
86+
87+
assert set(lookup) == {"CHEBI:17310"}
88+
assert malformed == {"MISSING:1"}
89+
90+
msearch_call = namespace.elasticsearch.async_client.calls[0]
91+
assert msearch_call["index"] == ["nodenorm"]
92+
assert msearch_call["searches"][0] == {}
93+
assert msearch_call["searches"][1]["query"]["bool"]["filter"][0]["terms"] == {"identifiers.i": ["CHEBI:17310"]}
94+
assert msearch_call["searches"][2] == {}
95+
assert msearch_call["searches"][3]["query"]["bool"]["filter"][0]["terms"] == {"identifiers.i": ["MISSING:1"]}
96+
97+
98+
@pytest.mark.asyncio
99+
async def test_lookup_equivalent_identifiers_rejects_msearch_response_count_mismatch():
100+
namespace = fake_namespace([[hit_response("CHEBI:17310")]])
101+
102+
with pytest.raises(RuntimeError, match="returned 1 responses for 2 CURIEs"):
103+
await _lookup_equivalent_identifiers(namespace, ["CHEBI:17310", "CHEBI:12"])
104+
105+
106+
@pytest.mark.asyncio
107+
async def test_lookup_equivalent_identifiers_raises_on_per_search_error():
108+
namespace = fake_namespace([[{"error": {"type": "query_shard_exception", "reason": "boom"}}]])
109+
110+
with pytest.raises(RuntimeError, match="Elasticsearch msearch failed for CURIE CHEBI:17310"):
111+
await _lookup_equivalent_identifiers(namespace, ["CHEBI:17310"])
112+
113+
114+
@pytest.mark.asyncio
115+
async def test_lookup_curie_metadata_falls_back_when_all_conflation_curies_are_missing(caplog):
116+
base_source = {
117+
"identifiers": [{"i": "BASE:1", "l": "base", "c": {"dc": ["MISSING:1"]}}],
118+
"type": "biolink:ChemicalEntity",
119+
"ic": 1.0,
120+
"preferred_name": "base",
121+
"taxa": [],
122+
}
123+
namespace = fake_namespace([[hit_response("BASE:1", base_source)], [no_hit_response()]])
124+
125+
with caplog.at_level(logging.WARNING):
126+
nodes = await _lookup_curie_metadata(namespace, ["BASE:1"], {"DrugChemical": True})
127+
128+
assert len(nodes) == 1
129+
assert nodes[0].curie == "BASE:1"
130+
assert [identifier["i"] for identifier in nodes[0].identifiers] == ["BASE:1"]
131+
assert "falling back to base normalized node" in caplog.text
132+
assert not [record for record in caplog.records if record.levelno >= logging.ERROR]
133+
134+
135+
@pytest.mark.asyncio
136+
async def test_lookup_curie_metadata_logs_skipped_conflation_curies_once(caplog):
137+
base_source = {
138+
"identifiers": [{"i": "BASE:1", "l": "base", "c": {"dc": ["MISSING:1", "CONF:1"]}}],
139+
"type": "biolink:ChemicalEntity",
140+
"ic": 1.0,
141+
"preferred_name": "base",
142+
"taxa": [],
143+
}
144+
conflation_source = {
145+
"identifiers": [{"i": "CONF:1", "l": "conflated"}],
146+
"type": "biolink:Drug",
147+
"ic": 2.0,
148+
"preferred_name": "conflated",
149+
"taxa": [],
150+
}
151+
namespace = fake_namespace(
152+
[
153+
[hit_response("BASE:1", base_source)],
154+
[no_hit_response(), hit_response("CONF:1", conflation_source)],
155+
]
156+
)
157+
158+
with caplog.at_level(logging.WARNING):
159+
nodes = await _lookup_curie_metadata(namespace, ["BASE:1"], {"DrugChemical": True})
160+
161+
assert len(nodes) == 1
162+
assert [identifier["i"] for identifier in nodes[0].identifiers] == ["CONF:1"]
163+
skip_logs = [record for record in caplog.records if "Skipped 1 conflation CURIEs" in record.message]
164+
assert len(skip_logs) == 1

0 commit comments

Comments
 (0)