|
| 1 | +""" |
| 2 | +Ontologies-stubs transform. |
| 3 | +
|
| 4 | +KG-Microbe deliberately does NOT load the full NCIT or MESH ontologies — those |
| 5 | +belong to the sibling ``kg-microbe-biomedical`` pipeline. But the |
| 6 | +chemical-mapping consolidator and the BacDive isolation-source mapper reference |
| 7 | +~150 NCIT and MESH IDs as canonical xrefs for ingredients (e.g. |
| 8 | +``NCIT:C29298 'Oatmeal'``, ``mesh:D011136 'Tween'``). Without this transform |
| 9 | +those CURIEs would appear as dangling node ids in the merged KG: edges point at |
| 10 | +them but no node row carries the label. |
| 11 | +
|
| 12 | +This transform: |
| 13 | +
|
| 14 | +1. Calls :func:`~kg_microbe.utils.stub_curie_collection.collect_stub_curies` to |
| 15 | + discover every NCIT and MESH CURIE referenced anywhere under ``mappings/``. |
| 16 | +2. For each CURIE, queries the local SemSQL DB (``data/raw/ncit.db``, |
| 17 | + ``data/raw/mesh.db``) via OAK to fetch its ``rdfs:label``, exact synonyms, |
| 18 | + and dbxrefs. The same pattern is used by the chemical-mapping consolidator |
| 19 | + for ChEBI in ``scripts/consolidate_chemical_mappings.py``. |
| 20 | +3. Writes one KGX node TSV per stub ontology to |
| 21 | + ``data/transformed/ontologies_stubs/{ncit,mesh}_nodes.tsv`` carrying |
| 22 | + ``id, category, name, synonym, xref, provided_by, knowledge_source``. |
| 23 | + No edges file — stubs are isolated nodes; edges arrive from the source |
| 24 | + transforms (BacDive, MediaDive ingredients via the chemical-mapping path, |
| 25 | + etc.). |
| 26 | +
|
| 27 | +Note for downstream consumers: if a KG built with this transform is ever |
| 28 | +merged with a kg-microbe-biomedical KG that loads NCIT/MESH fully, biolink |
| 29 | +merge semantics will union nodes — the stub node here is a strict subset of |
| 30 | +what the full ontology would emit (label/synonym/xref only; no edges, no |
| 31 | +deprecated flag, no parent classes), so the union will simply pick the |
| 32 | +fuller record. |
| 33 | +""" |
| 34 | + |
| 35 | +from __future__ import annotations |
| 36 | + |
| 37 | +import csv |
| 38 | +import gzip |
| 39 | +import shutil |
| 40 | +from pathlib import Path |
| 41 | +from typing import Dict, Iterable, List, Optional, Set |
| 42 | + |
| 43 | +from kg_microbe.transform_utils.constants import ( |
| 44 | + CATEGORY_COLUMN, |
| 45 | + DEPRECATED_COLUMN, |
| 46 | + DESCRIPTION_COLUMN, |
| 47 | + ID_COLUMN, |
| 48 | + NAME_COLUMN, |
| 49 | + PROVIDED_BY_COLUMN, |
| 50 | + SAME_AS_COLUMN, |
| 51 | + SYNONYM_COLUMN, |
| 52 | + XREF_COLUMN, |
| 53 | +) |
| 54 | +from kg_microbe.transform_utils.transform import Transform |
| 55 | +from kg_microbe.utils.isolation_source_mapping_utils import STUB_ONTOLOGY_CATEGORY |
| 56 | +from kg_microbe.utils.stub_curie_collection import collect_stub_curies |
| 57 | + |
| 58 | +# Stub ontologies handled by this transform. Each entry maps the canonical |
| 59 | +# CURIE prefix (case-sensitive — must match how the prefix appears in |
| 60 | +# existing mapping rows) to the local SemSQL DB and the InforES knowledge |
| 61 | +# source string. |
| 62 | +STUB_ONTOLOGY_SOURCES: Dict[str, Dict[str, str]] = { |
| 63 | + "NCIT": { |
| 64 | + "db_filename": "ncit.db", |
| 65 | + "knowledge_source": "infores:ncit", |
| 66 | + }, |
| 67 | + "mesh": { |
| 68 | + "db_filename": "mesh.db", |
| 69 | + "knowledge_source": "infores:mesh", |
| 70 | + }, |
| 71 | +} |
| 72 | + |
| 73 | +ONTOLOGIES_STUBS_SOURCE_NAME = "ontologies_stubs" |
| 74 | + |
| 75 | + |
| 76 | +class OntologiesStubsTransform(Transform): |
| 77 | + |
| 78 | + """Emit one labelled stub node per referenced NCIT / MESH CURIE.""" |
| 79 | + |
| 80 | + def __init__( |
| 81 | + self, |
| 82 | + input_dir: Optional[Path] = None, |
| 83 | + output_dir: Optional[Path] = None, |
| 84 | + ): |
| 85 | + """ |
| 86 | + Instantiate transform. |
| 87 | +
|
| 88 | + :param input_dir: Where the SemSQL DBs live (defaults to ``data/raw/``). |
| 89 | + :param output_dir: Where ``ontologies_stubs/{ncit,mesh}_nodes.tsv`` are |
| 90 | + written (defaults to ``data/transformed/``). |
| 91 | + """ |
| 92 | + super().__init__(ONTOLOGIES_STUBS_SOURCE_NAME, input_dir, output_dir) |
| 93 | + |
| 94 | + def run(self, data_file=None) -> None: # noqa: D401 — base class signature |
| 95 | + """ |
| 96 | + Collect stub CURIEs, fetch metadata via OAK, write per-ontology node TSVs. |
| 97 | +
|
| 98 | + :param data_file: Unused (kept for the base-class signature). The |
| 99 | + transform discovers its inputs from the mapping TSVs and the |
| 100 | + SemSQL DBs in ``input_base_dir``. |
| 101 | + """ |
| 102 | + prefixes = list(STUB_ONTOLOGY_SOURCES.keys()) |
| 103 | + curies_by_prefix = collect_stub_curies(prefixes) |
| 104 | + |
| 105 | + for prefix, curies in curies_by_prefix.items(): |
| 106 | + cfg = STUB_ONTOLOGY_SOURCES[prefix] |
| 107 | + db_path = self.input_base_dir / cfg["db_filename"] |
| 108 | + output_file = self.output_dir / f"{prefix.lower()}_nodes.tsv" |
| 109 | + self._write_stub_nodes( |
| 110 | + prefix=prefix, |
| 111 | + curies=sorted(curies), |
| 112 | + db_path=db_path, |
| 113 | + knowledge_source=cfg["knowledge_source"], |
| 114 | + output_file=output_file, |
| 115 | + ) |
| 116 | + |
| 117 | + # ------------------------------------------------------------------ |
| 118 | + # internal helpers |
| 119 | + # ------------------------------------------------------------------ |
| 120 | + |
| 121 | + def _write_stub_nodes( |
| 122 | + self, |
| 123 | + prefix: str, |
| 124 | + curies: List[str], |
| 125 | + db_path: Path, |
| 126 | + knowledge_source: str, |
| 127 | + output_file: Path, |
| 128 | + ) -> None: |
| 129 | + """Fetch label/synonyms/xrefs per CURIE and write the node TSV.""" |
| 130 | + if not curies: |
| 131 | + print(f" [{prefix}] no CURIEs to import; skipping {output_file.name}") |
| 132 | + # Write an empty file with header so the merge step doesn't fail |
| 133 | + # on a missing file declared in merge.yaml. |
| 134 | + self._write_node_file(output_file, []) |
| 135 | + return |
| 136 | + |
| 137 | + adapter = self._open_adapter(prefix, db_path) |
| 138 | + if adapter is None: |
| 139 | + raise FileNotFoundError( |
| 140 | + f"OAK adapter for {prefix} could not be opened (expected SemSQL DB at " |
| 141 | + f"{db_path}). Run `poetry run kg download` to fetch it. The stub " |
| 142 | + f"transform refuses to silently emit unlabelled nodes — that would " |
| 143 | + f"reintroduce the dangling-xref hazard this transform exists to fix." |
| 144 | + ) |
| 145 | + |
| 146 | + rows: List[List[Optional[str]]] = [] |
| 147 | + missing: List[str] = [] |
| 148 | + for curie in curies: |
| 149 | + label, synonyms, xrefs = self._fetch_metadata(adapter, curie) |
| 150 | + if not label: |
| 151 | + # Last-resort fallback: use the CURIE as the name. Log it so |
| 152 | + # curators can chase down obsolete or missing entries upstream. |
| 153 | + missing.append(curie) |
| 154 | + label = curie |
| 155 | + row = [ |
| 156 | + curie, # id |
| 157 | + STUB_ONTOLOGY_CATEGORY, # category |
| 158 | + label, # name |
| 159 | + None, # description |
| 160 | + _join_pipe(xrefs), # xref |
| 161 | + ONTOLOGIES_STUBS_SOURCE_NAME, # provided_by |
| 162 | + _join_pipe(synonyms), # synonym |
| 163 | + None, # deprecated |
| 164 | + None, # same_as |
| 165 | + ] |
| 166 | + rows.append(row) |
| 167 | + |
| 168 | + self._write_node_file(output_file, rows) |
| 169 | + print( |
| 170 | + f" [{prefix}] wrote {len(rows)} stub nodes to {output_file.name} " |
| 171 | + f"(knowledge_source={knowledge_source}, missing labels: {len(missing)})" |
| 172 | + ) |
| 173 | + if missing: |
| 174 | + print(f" [{prefix}] CURIEs with no SemSQL label (used CURIE as name): {missing}") |
| 175 | + |
| 176 | + def _open_adapter(self, prefix: str, db_path: Path): |
| 177 | + """ |
| 178 | + Open an OAK SemSQL adapter against the local DB; return None on failure. |
| 179 | +
|
| 180 | + OBO Foundry distributes the SemSQL DBs as ``.db.gz`` and ``download.yaml`` |
| 181 | + stores the gzipped form. If the unzipped ``.db`` is missing but a sibling |
| 182 | + ``.db.gz`` is present, decompress it once (idempotent) and use the result. |
| 183 | + """ |
| 184 | + if not db_path.is_file(): |
| 185 | + gz_path = db_path.with_suffix(db_path.suffix + ".gz") |
| 186 | + if gz_path.is_file(): |
| 187 | + print(f" [{prefix}] decompressing {gz_path.name} → {db_path.name}") |
| 188 | + with gzip.open(gz_path, "rb") as src, db_path.open("wb") as dst: |
| 189 | + shutil.copyfileobj(src, dst) |
| 190 | + else: |
| 191 | + return None |
| 192 | + try: |
| 193 | + from oaklib import get_adapter |
| 194 | + except ImportError as exc: # pragma: no cover — oaklib is a dep |
| 195 | + raise RuntimeError( |
| 196 | + f"oaklib import failed while opening SemSQL adapter for {prefix}: {exc}" |
| 197 | + ) from exc |
| 198 | + return get_adapter(f"sqlite:{db_path}") |
| 199 | + |
| 200 | + def _fetch_metadata(self, adapter, curie: str): |
| 201 | + """Return (label, synonyms_set, xrefs_set) for ``curie`` via the OAK adapter.""" |
| 202 | + label = "" |
| 203 | + synonyms: Set[str] = set() |
| 204 | + xrefs: Set[str] = set() |
| 205 | + try: |
| 206 | + label = adapter.label(curie) or "" |
| 207 | + except Exception: # noqa: S110 — obsolete CURIEs are expected to miss |
| 208 | + pass |
| 209 | + try: |
| 210 | + synonyms = {s for s in adapter.entity_aliases(curie) if s} |
| 211 | + except Exception: # noqa: S110 |
| 212 | + pass |
| 213 | + # Drop the canonical label out of the synonym set to keep them disjoint. |
| 214 | + synonyms.discard(label) |
| 215 | + try: |
| 216 | + metadata = adapter.entity_metadata_map(curie) or {} |
| 217 | + except Exception: # noqa: S110 |
| 218 | + metadata = {} |
| 219 | + # OAK returns metadata keyed by short-form predicate. dbxref entries |
| 220 | + # land under "oio:hasDbXref" (or "oboInOwl:hasDbXref" on older |
| 221 | + # adapters). Accept both. |
| 222 | + for predicate_key in ("oio:hasDbXref", "oboInOwl:hasDbXref"): |
| 223 | + for value in metadata.get(predicate_key, []) or []: |
| 224 | + if value: |
| 225 | + xrefs.add(str(value)) |
| 226 | + return label, sorted(synonyms), sorted(xrefs) |
| 227 | + |
| 228 | + def _write_node_file(self, path: Path, rows: Iterable[Iterable[Optional[str]]]) -> None: |
| 229 | + """Write ``rows`` to ``path`` using the standard Transform node header.""" |
| 230 | + path.parent.mkdir(parents=True, exist_ok=True) |
| 231 | + # Use the canonical 9-column node header from the Transform base class. |
| 232 | + header = [ |
| 233 | + ID_COLUMN, |
| 234 | + CATEGORY_COLUMN, |
| 235 | + NAME_COLUMN, |
| 236 | + DESCRIPTION_COLUMN, |
| 237 | + XREF_COLUMN, |
| 238 | + PROVIDED_BY_COLUMN, |
| 239 | + SYNONYM_COLUMN, |
| 240 | + DEPRECATED_COLUMN, |
| 241 | + SAME_AS_COLUMN, |
| 242 | + ] |
| 243 | + with path.open("w", newline="", encoding="utf-8") as fh: |
| 244 | + writer = csv.writer(fh, delimiter="\t", lineterminator="\n") |
| 245 | + writer.writerow(header) |
| 246 | + for row in rows: |
| 247 | + writer.writerow(["" if cell is None else cell for cell in row]) |
| 248 | + |
| 249 | + |
| 250 | +def _join_pipe(values: Iterable[str]) -> str: |
| 251 | + """Pipe-join a sequence; return ``""`` when empty (matches existing TSV convention).""" |
| 252 | + items = [v for v in values if v] |
| 253 | + return "|".join(items) if items else "" |
0 commit comments