|
| 1 | +"""Check whether a model performs a set of metabolic tasks. |
| 2 | +
|
| 3 | +For each task the model is constrained by the task's allowed inputs/outputs (and any |
| 4 | +extra reactions / bound changes), then tested for feasibility: a task *passes* if a |
| 5 | +steady-state flux exists, unless it is marked ``should_fail`` (then it passes iff |
| 6 | +infeasible). |
| 7 | +
|
| 8 | +Inputs/outputs are encoded as ranges on the per-metabolite mass-balance constraint |
| 9 | +(``model.constraints[met.id]``): an input allows net consumption (``Sv ∈ [-UB, -LB]``) |
| 10 | +and an output allows / requires net production (``Sv ≤ UB``, and ``≥ LB`` if |
| 11 | +``LB > 0``). Existing boundary reactions are closed first, so inputs/outputs are |
| 12 | +defined solely by the task (closed-model semantics). |
| 13 | +""" |
| 14 | +from __future__ import annotations |
| 15 | + |
| 16 | +import pickle |
| 17 | +from collections.abc import Iterable |
| 18 | +from dataclasses import dataclass |
| 19 | +from pathlib import Path |
| 20 | + |
| 21 | +import cobra |
| 22 | +from cobra.exceptions import OptimizationError |
| 23 | +from cobra.flux_analysis import flux_variability_analysis, pfba |
| 24 | +from optlang.symbolics import Zero |
| 25 | + |
| 26 | +from raven_python.manipulation.add import add_reactions_from_equations |
| 27 | +from raven_python.tasks.tasklist import Task, parse_task_list |
| 28 | + |
| 29 | +_ALLMETS = "ALLMETS" |
| 30 | +_ALLMETSIN = "ALLMETSIN" |
| 31 | + |
| 32 | + |
| 33 | +@dataclass |
| 34 | +class TaskResult: |
| 35 | + """Result of one task: ``passed`` is the verdict (accounts for ``should_fail``).""" |
| 36 | + |
| 37 | + id: str |
| 38 | + description: str |
| 39 | + passed: bool |
| 40 | + feasible: bool |
| 41 | + error: str | None = None |
| 42 | + |
| 43 | + |
| 44 | +def _set_constraint_bounds(constraint, lb: float, ub: float) -> None: |
| 45 | + """Set an optlang constraint's bounds without a transient lb > ub.""" |
| 46 | + if lb > constraint.ub: |
| 47 | + constraint.ub = ub |
| 48 | + constraint.lb = lb |
| 49 | + else: |
| 50 | + constraint.lb = lb |
| 51 | + constraint.ub = ub |
| 52 | + |
| 53 | + |
| 54 | +def _classify(token: str) -> tuple[str, str | None]: |
| 55 | + """Return ``("all", None)``, ``("comp", COMP)``, or ``("met", token_upper)``.""" |
| 56 | + upper = token.upper() |
| 57 | + if upper == _ALLMETS: |
| 58 | + return "all", None |
| 59 | + if upper.startswith(_ALLMETSIN + "[") and upper.endswith("]"): |
| 60 | + return "comp", upper[len(_ALLMETSIN) + 1: -1] |
| 61 | + return "met", upper # incl. malformed ALLMETSIN[... → treated as a (missing) metabolite |
| 62 | + |
| 63 | + |
| 64 | +def _metabolite_bounds( |
| 65 | + task: Task, name_to_ids: dict[str, list[str]], comp_to_ids: dict[str, list[str]] |
| 66 | +) -> tuple[dict[str, list[float]], list[str]]: |
| 67 | + """Compute ``{met_id: [lb, ub]}`` from a task's inputs/outputs (RAVEN ``b``). |
| 68 | +
|
| 69 | + Bulk tokens (ALLMETS / ALLMETSIN) are applied before specific metabolites, as |
| 70 | + RAVEN does. Returns the bounds and a list of unresolved tokens (→ task error). |
| 71 | + """ |
| 72 | + bounds: dict[str, list[float]] = {} |
| 73 | + missing: list[str] = [] |
| 74 | + |
| 75 | + def touch(mid: str) -> list[float]: |
| 76 | + return bounds.setdefault(mid, [0.0, 0.0]) |
| 77 | + |
| 78 | + for entries, is_input in ((task.inputs, True), (task.outputs, False)): |
| 79 | + bulk = [(t, lb, ub) for (t, lb, ub) in entries if _classify(t)[0] != "met"] |
| 80 | + specific = [(t, lb, ub) for (t, lb, ub) in entries if _classify(t)[0] == "met"] |
| 81 | + for token, lb, ub in bulk + specific: |
| 82 | + kind, arg = _classify(token) |
| 83 | + if kind == "all": |
| 84 | + ids = [mid for group in comp_to_ids.values() for mid in group] |
| 85 | + elif kind == "comp": |
| 86 | + ids = comp_to_ids.get(arg, []) |
| 87 | + else: |
| 88 | + ids = name_to_ids.get(arg, []) |
| 89 | + if not ids: |
| 90 | + missing.append(token) |
| 91 | + continue |
| 92 | + for mid in ids: |
| 93 | + b = touch(mid) |
| 94 | + if is_input: |
| 95 | + b[0] = -ub # allow net consumption up to UB (RAVEN b1 = -UBin) |
| 96 | + if kind == "met": |
| 97 | + b[1] = -lb |
| 98 | + else: |
| 99 | + b[1] = ub # allow net production up to UB |
| 100 | + if kind == "met" and lb > 0: |
| 101 | + b[0] = lb # require at least LB produced |
| 102 | + return bounds, missing |
| 103 | + |
| 104 | + |
| 105 | +def task_name_maps(model: cobra.Model) -> tuple[dict[str, list[str]], dict[str, list[str]]]: |
| 106 | + """Build ``name[comp]→[ids]`` and ``comp→[ids]`` lookups for a model's metabolites. |
| 107 | +
|
| 108 | + ``name[comp]`` maps to a *list* because a model can carry several metabolites with |
| 109 | + the same name and compartment; a task referencing it constrains all of them (as |
| 110 | + RAVEN does), rather than an arbitrary one. |
| 111 | + """ |
| 112 | + name_to_ids: dict[str, list[str]] = {} |
| 113 | + comp_to_ids: dict[str, list[str]] = {} |
| 114 | + for m in model.metabolites: |
| 115 | + name_to_ids.setdefault(f"{m.name}[{m.compartment}]".upper(), []).append(m.id) |
| 116 | + comp_to_ids.setdefault((m.compartment or "").upper(), []).append(m.id) |
| 117 | + return name_to_ids, comp_to_ids |
| 118 | + |
| 119 | + |
| 120 | +def apply_task_constraints( |
| 121 | + model: cobra.Model, task: Task, name_to_id, comp_to_ids |
| 122 | +) -> tuple[set[str], str | None]: |
| 123 | + """Apply a task's inputs/outputs/equations/bound-changes to ``model`` in place. |
| 124 | +
|
| 125 | + Sets a feasibility (zero) objective. Returns ``(task_metabolite_ids, error)``; |
| 126 | + ``task_metabolite_ids`` are the model metabolites the task references (RAVEN's |
| 127 | + ``essentialMetsForTasks``). On error the model may be partially modified. |
| 128 | + """ |
| 129 | + bounds, missing = _metabolite_bounds(task, name_to_id, comp_to_ids) |
| 130 | + if missing: |
| 131 | + return set(), f"unknown metabolite(s): {sorted(set(missing))}" |
| 132 | + task_mets = {mid for mid in bounds} |
| 133 | + for mid, (lb, ub) in bounds.items(): |
| 134 | + if (lb, ub) != (0.0, 0.0): |
| 135 | + _set_constraint_bounds(model.constraints[mid], lb, ub) |
| 136 | + |
| 137 | + if task.equations: |
| 138 | + existing = {m.id for m in model.metabolites} |
| 139 | + specs = [ |
| 140 | + {"id": f"TASK_TMP_{i}", "equation": equ, "bounds": (lb, ub)} |
| 141 | + for i, (equ, lb, ub) in enumerate(task.equations) |
| 142 | + ] |
| 143 | + add_reactions_from_equations(model, specs, mets_by="name", allow_new_mets=True) |
| 144 | + for i in range(len(specs)): |
| 145 | + tmp = model.reactions.get_by_id(f"TASK_TMP_{i}") |
| 146 | + task_mets |= {m.id for m in tmp.metabolites if m.id in existing} |
| 147 | + |
| 148 | + for rxn_id, lb, ub in task.changed: |
| 149 | + if rxn_id not in model.reactions: |
| 150 | + return set(), f"CHANGED RXN not in model: {rxn_id!r}" |
| 151 | + model.reactions.get_by_id(rxn_id).bounds = (lb, ub) |
| 152 | + |
| 153 | + model.objective = model.problem.Objective(Zero, direction="max") # feasibility only |
| 154 | + return task_mets, None |
| 155 | + |
| 156 | + |
| 157 | +def _build_task_model( |
| 158 | + base: cobra.Model, task: Task, name_to_id, comp_to_ids |
| 159 | +) -> tuple[cobra.Model | None, set[str], str | None]: |
| 160 | + """Copy ``base`` and apply a task's constraints (``model``/``error`` exclusive).""" |
| 161 | + model = base.copy() |
| 162 | + task_mets, error = apply_task_constraints(model, task, name_to_id, comp_to_ids) |
| 163 | + return (None if error else model), task_mets, error |
| 164 | + |
| 165 | + |
| 166 | +def _run_task(base: cobra.Model, task: Task, name_to_id, comp_to_ids) -> TaskResult: |
| 167 | + """Test one task by applying its constraints to ``base`` in place, then reverting. |
| 168 | +
|
| 169 | + Avoids copying the (genome-scale) model per task — the copy dominates ``check_tasks`` |
| 170 | + runtime. ``with base:`` reverts everything ``apply_task_constraints`` does through |
| 171 | + cobra's API (temp reactions/metabolites for equations, reaction bounds, objective); |
| 172 | + the one untracked change — direct metabolite mass-balance (``model.constraints[mid]``) |
| 173 | + bound edits — is snapshotted and restored explicitly. Net result is identical to the |
| 174 | + copy-based version but reuses a single model across all tasks. |
| 175 | + """ |
| 176 | + bounds, missing = _metabolite_bounds(task, name_to_id, comp_to_ids) |
| 177 | + if missing: |
| 178 | + return TaskResult(task.id, task.description, False, False, |
| 179 | + f"unknown metabolite(s): {sorted(set(missing))}") |
| 180 | + saved = {mid: (base.constraints[mid].lb, base.constraints[mid].ub) for mid in bounds} |
| 181 | + try: |
| 182 | + with base: # reverts temp reactions/mets, reaction bounds, objective on exit |
| 183 | + _, error = apply_task_constraints(base, task, name_to_id, comp_to_ids) |
| 184 | + if error is not None: |
| 185 | + return TaskResult(task.id, task.description, False, False, error) |
| 186 | + base.slim_optimize() |
| 187 | + feasible = base.solver.status == "optimal" |
| 188 | + finally: # restore the untracked metabolite-constraint bound edits |
| 189 | + for mid, (lb, ub) in saved.items(): |
| 190 | + _set_constraint_bounds(base.constraints[mid], lb, ub) |
| 191 | + return TaskResult(task.id, task.description, feasible != task.should_fail, feasible) |
| 192 | + |
| 193 | + |
| 194 | +def check_tasks( |
| 195 | + model: cobra.Model, |
| 196 | + tasks: str | Iterable[Task], |
| 197 | + *, |
| 198 | + close_boundaries: bool = True, |
| 199 | +) -> list[TaskResult]: |
| 200 | + """Run a task list against ``model`` and return a :class:`TaskResult` per task. |
| 201 | +
|
| 202 | + ``tasks`` is a parsed list of :class:`Task` or a path to a task-list file. With |
| 203 | + ``close_boundaries`` (default), existing exchange/sink/demand reactions are |
| 204 | + closed so inputs/outputs are defined purely by the tasks (as RAVEN assumes). |
| 205 | + """ |
| 206 | + tasks = _as_tasks(tasks) |
| 207 | + base, name_to_id, comp_to_ids = _prepare_base(model, close_boundaries) |
| 208 | + return [_run_task(base, task, name_to_id, comp_to_ids) for task in tasks] |
| 209 | + |
| 210 | + |
| 211 | +def _as_tasks(tasks: str | Iterable[Task]) -> list[Task]: |
| 212 | + if isinstance(tasks, (str, bytes)) or hasattr(tasks, "__fspath__"): |
| 213 | + return parse_task_list(tasks) |
| 214 | + return list(tasks) |
| 215 | + |
| 216 | + |
| 217 | +def _prepare_base(model: cobra.Model, close_boundaries: bool): |
| 218 | + base = model.copy() |
| 219 | + if close_boundaries: |
| 220 | + for rxn in base.boundary: |
| 221 | + rxn.bounds = (0.0, 0.0) |
| 222 | + name_to_id, comp_to_ids = task_name_maps(base) |
| 223 | + return base, name_to_id, comp_to_ids |
| 224 | + |
| 225 | + |
| 226 | +@dataclass |
| 227 | +class EssentialReactionsResult: |
| 228 | + """Reactions a model *must* use to perform a task list (RAVEN ``essentialRxns``). |
| 229 | +
|
| 230 | + ``reactions`` maps reaction id → forced flux direction (``+1`` forward, ``-1`` |
| 231 | + reverse): the reaction must carry flux of that sign in every feasible solution of |
| 232 | + at least one task. ``per_task`` is the same, split by task id. ``task_metabolites`` |
| 233 | + are the model metabolites the tasks reference (RAVEN ``essentialMetsForTasks``, |
| 234 | + protected from removal). ``failed_tasks`` are tasks that were infeasible or |
| 235 | + malformed and thus skipped (RAVEN drops these from the task list). |
| 236 | + """ |
| 237 | + |
| 238 | + reactions: dict[str, int] |
| 239 | + per_task: dict[str, dict[str, int]] |
| 240 | + task_metabolites: set[str] |
| 241 | + failed_tasks: list[str] |
| 242 | + |
| 243 | + |
| 244 | +def _task_essential_reactions( |
| 245 | + task_model: cobra.Model, candidates: list[str], tol: float |
| 246 | +) -> dict[str, int]: |
| 247 | + """Reactions in ``candidates`` forced to carry flux, with direction, via FVA. |
| 248 | +
|
| 249 | + A reaction is *essential* for the task iff zero is not attainable in any feasible |
| 250 | + solution — i.e. its FVA range excludes 0. This is exactly RAVEN's |
| 251 | + "constrain to 0 → infeasible" definition, but obtained from FVA ranges (no |
| 252 | + per-reaction knockout loop). The nonzero side of the range gives the forced |
| 253 | + direction. FVA is restricted to ``candidates`` — the reactions carrying flux in a |
| 254 | + minimal feasible solution, the only ones that *can* be essential (an essential |
| 255 | + reaction is nonzero in every feasible solution, so also in that one) — which keeps |
| 256 | + this cheap on genome-scale templates instead of ranging all reactions. |
| 257 | + """ |
| 258 | + if not candidates: |
| 259 | + return {} |
| 260 | + fva = flux_variability_analysis(task_model, reaction_list=candidates, fraction_of_optimum=0.0) |
| 261 | + essential: dict[str, int] = {} |
| 262 | + for rxn_id, lo, hi in zip(fva.index, fva["minimum"], fva["maximum"], strict=True): |
| 263 | + if lo > tol: |
| 264 | + essential[rxn_id] = 1 |
| 265 | + elif hi < -tol: |
| 266 | + essential[rxn_id] = -1 |
| 267 | + return essential |
| 268 | + |
| 269 | + |
| 270 | +def find_task_essential_reactions( |
| 271 | + model: cobra.Model, |
| 272 | + tasks: str | Iterable[Task], |
| 273 | + *, |
| 274 | + close_boundaries: bool = True, |
| 275 | + tol: float = 1e-8, |
| 276 | + cache_path: str | Path | None = None, |
| 277 | +) -> EssentialReactionsResult: |
| 278 | + """Find the reactions a model must use to satisfy a task list. |
| 279 | +
|
| 280 | + For each task the model is constrained as in :func:`check_tasks`, then FVA |
| 281 | + identifies reactions whose flux can never be zero (essential) and their forced |
| 282 | + direction. This is the ``prepINITModel`` step that feeds (ft)INIT: essential |
| 283 | + reactions are kept regardless of expression score and made irreversible in their |
| 284 | + forced direction. When a reaction is essential in several tasks with conflicting |
| 285 | + directions, the majority wins (ties → forward), matching RAVEN's ``pos < neg``. |
| 286 | +
|
| 287 | + On a genome-scale model this is slow (an FVA per task). Pass ``cache_path`` to make |
| 288 | + it **resumable**: each task's result is written there as it completes (atomically), |
| 289 | + and a re-run skips tasks already cached — so it survives interruptions and finishes |
| 290 | + across several sessions. |
| 291 | + """ |
| 292 | + tasks = _as_tasks(tasks) |
| 293 | + base, name_to_id, comp_to_ids = _prepare_base(model, close_boundaries) |
| 294 | + original_ids = {r.id for r in base.reactions} |
| 295 | + |
| 296 | + per_task: dict[str, dict[str, int]] = {} |
| 297 | + task_metabolites: set[str] = set() |
| 298 | + failed: list[str] = [] |
| 299 | + if cache_path is not None and Path(cache_path).exists(): |
| 300 | + cached = pickle.load(open(cache_path, "rb")) |
| 301 | + per_task, task_metabolites, failed = cached["per_task"], set(cached["mets"]), list(cached["failed"]) |
| 302 | + |
| 303 | + done = set(per_task) | set(failed) |
| 304 | + for task in tasks: |
| 305 | + if task.should_fail or task.id in done: |
| 306 | + continue # a should-fail task defines no essentials; cached ones are skipped |
| 307 | + task_model, task_mets, error = _build_task_model(base, task, name_to_id, comp_to_ids) |
| 308 | + if error is not None: |
| 309 | + failed.append(task.id) |
| 310 | + else: |
| 311 | + # One min-flux solve both proves feasibility and yields the essential-reaction |
| 312 | + # candidates (the original reactions carrying flux in a sparse solution). |
| 313 | + try: |
| 314 | + fluxes = pfba(task_model).fluxes |
| 315 | + candidates = [rid for rid in original_ids if abs(fluxes.get(rid, 0.0)) > tol] |
| 316 | + task_metabolites |= task_mets |
| 317 | + per_task[task.id] = _task_essential_reactions(task_model, candidates, tol) |
| 318 | + except OptimizationError: |
| 319 | + failed.append(task.id) |
| 320 | + if cache_path is not None: # atomic checkpoint after each task |
| 321 | + tmp = Path(f"{cache_path}.part") |
| 322 | + pickle.dump({"per_task": per_task, "mets": task_metabolites, "failed": failed}, |
| 323 | + open(tmp, "wb")) |
| 324 | + tmp.replace(cache_path) |
| 325 | + |
| 326 | + # Majority direction; tie (sum == 0) → forward, as RAVEN's `pos < neg`. |
| 327 | + direction_votes: dict[str, int] = {} |
| 328 | + for essential in per_task.values(): |
| 329 | + for rxn_id, direction in essential.items(): |
| 330 | + direction_votes[rxn_id] = direction_votes.get(rxn_id, 0) + direction |
| 331 | + reactions = {rid: (-1 if votes < 0 else 1) for rid, votes in direction_votes.items()} |
| 332 | + return EssentialReactionsResult(reactions, per_task, task_metabolites, failed) |
0 commit comments