diff --git a/backend/app/api/routes/projects/core.py b/backend/app/api/routes/projects/core.py index 48cc9be0..4c9cb020 100644 --- a/backend/app/api/routes/projects/core.py +++ b/backend/app/api/routes/projects/core.py @@ -60,6 +60,12 @@ make_mermaid_diagram, output_from_pipeline, ) +from app.pipeline import ( + color_mermaid_by_status, + compute_stage_statuses, + find_stage_for_path, + overall_pipeline_status, +) from app.git import ( get_ck_info, get_ck_info_from_repo, @@ -1454,6 +1460,24 @@ def _maybe_add_figure(path: str) -> None: ).all() ) # Get the figure content and base64 encode it. + dvc_lock: dict = {} + if tree.is_file("dvc.lock"): + dvc_lock = ryaml.load(tree.read_bytes("dvc.lock").decode()) or {} + dvc_yaml: dict = {} + if tree.is_file("dvc.yaml"): + dvc_yaml = ryaml.load(tree.read_bytes("dvc.yaml").decode()) or {} + stage_statuses = {} + try: + stage_statuses = compute_stage_statuses( + dvc_yaml=dvc_yaml, + dvc_lock=dvc_lock, + tree=tree, + owner_name=project.owner_account_name, + project_name=project.name, + fs=get_object_fs(), + ) + except Exception as e: + logger.warning(f"Failed to compute pipeline status for figures: {e}") for fig in figures: item = app.projects.get_contents_from_tree( project=project, @@ -1466,6 +1490,12 @@ def _maybe_add_figure(path: str) -> None: fig["content"] = item.content fig["url"] = item.url fig["comment_count"] = comment_counts.get(fig["path"], 0) + if not fig.get("stage"): + auto_stage = find_stage_for_path(fig["path"], dvc_lock) + if auto_stage is not None: + fig["stage"] = auto_stage + if fig.get("stage") and fig["stage"] in stage_statuses: + fig["stage_status"] = stage_statuses[fig["stage"]].model_dump() fig["storage"] = item.storage return [Figure.model_validate(fig) for fig in figures] @@ -2641,9 +2671,32 @@ def get_project_publications( ck_info_full, dvc_lock_outs, zip_path_map = ( app.projects.get_ck_info_and_dvc_outs_from_tree(project, tree) ) + dvc_lock: dict = {} + if tree.is_file("dvc.lock"): + dvc_lock = ryaml.load(tree.read_bytes("dvc.lock").decode()) or {} + stage_statuses = {} + try: + stage_statuses = compute_stage_statuses( + dvc_yaml=pipeline, + dvc_lock=dvc_lock, + tree=tree, + owner_name=project.owner_account_name, + project_name=project.name, + fs=get_object_fs(), + ) + except Exception as e: + logger.warning( + f"Failed to compute pipeline status for publications: {e}" + ) for pub in publications: - if "stage" in pub: + if not pub.get("stage") and pub.get("path"): + auto_stage = find_stage_for_path(pub["path"], dvc_lock) + if auto_stage is not None: + pub["stage"] = auto_stage + if pub.get("stage"): pub["stage_info"] = pipeline.get("stages", {}).get(pub["stage"]) + if pub["stage"] in stage_statuses: + pub["stage_status"] = stage_statuses[pub["stage"]].model_dump() # See if we can fetch the content for this publication if "path" in pub: try: @@ -3358,11 +3411,34 @@ def get_project_pipeline( stream = io.StringIO() ryaml.dump({"pipeline": ck_info["pipeline"]}, stream) calkit_content = stream.getvalue() + # Compute per-stage staleness against the committed dvc.lock + stage_statuses: dict = {} + overall_status = "unknown" + try: + tree = app.projects.get_repo_tree_for_ref(repo, ref) + dvc_lock: dict = {} + if tree.is_file("dvc.lock"): + dvc_lock = ryaml.load(tree.read_bytes("dvc.lock").decode()) or {} + fs = get_object_fs() + stage_statuses = compute_stage_statuses( + dvc_yaml=dvc_pipeline, + dvc_lock=dvc_lock, + tree=tree, + owner_name=project.owner_account_name, + project_name=project.name, + fs=fs, + ) + overall_status = overall_pipeline_status(stage_statuses) + mermaid = color_mermaid_by_status(mermaid, stage_statuses) + except Exception as e: + logger.warning(f"Failed to compute pipeline status: {e}") return Pipeline( dvc_stages=dvc_pipeline["stages"], mermaid=mermaid, dvc_yaml=dvc_content, calkit_yaml=calkit_content, + stage_statuses=stage_statuses, + status=overall_status, ) diff --git a/backend/app/models/core.py b/backend/app/models/core.py index 593d4982..3dd00ce4 100644 --- a/backend/app/models/core.py +++ b/backend/app/models/core.py @@ -595,11 +595,21 @@ class DvcForeachStage(SQLModel): do: DvcPipelineStage +class StageStatus(SQLModel): + status: Literal["up-to-date", "stale", "not-run", "unknown"] + modified_command: bool = False + modified_inputs: list[str] = Field(default_factory=list) + modified_outputs: list[str] = Field(default_factory=list) + missing_outputs: list[str] = Field(default_factory=list) + + class Pipeline(SQLModel): mermaid: str dvc_stages: dict[str, DvcPipelineStage | DvcForeachStage] dvc_yaml: str calkit_yaml: str | None + stage_statuses: dict[str, StageStatus] = Field(default_factory=dict) + status: Literal["up-to-date", "stale", "unknown"] = "unknown" class Question(SQLModel, table=True): @@ -616,6 +626,7 @@ class Figure(SQLModel): title: str description: str | None = None stage: str | None = None + stage_status: "StageStatus | None" = None dataset: str | None = None content: str | None = None # Base64 encoded url: str | None = None @@ -889,6 +900,7 @@ class Publication(BaseModel): | None ) = None stage: str | None = None + stage_status: "StageStatus | None" = None content: str | None = None stage_info: DvcPipelineStage | None = None url: str | None = None diff --git a/backend/app/pipeline.py b/backend/app/pipeline.py new file mode 100644 index 00000000..735984cc --- /dev/null +++ b/backend/app/pipeline.py @@ -0,0 +1,345 @@ +"""Pipeline staleness detection. + +Determines whether each stage in a project's DVC pipeline is up-to-date, +stale, not-run, or unknown, by diffing the committed ``dvc.yaml`` and +``dvc.lock`` against the repo tree and object storage. The pipeline's +object storage (Calkit remote) is treated as authoritative for +DVC-tracked outputs since the API clones never ``dvc fetch``. + +Designed to be self-contained and easily swapped for a richer +implementation later (e.g., one that can validate environments or detect +when ``dvc.yaml`` needs recompiling from ``calkit.yaml``). +""" + +from __future__ import annotations + +import hashlib +import io +import logging +import re +from typing import Literal + +import ruamel.yaml +from pydantic import BaseModel, Field + +from app.dvc import get_data_fpath_for_md5 +from app.git import RepoTree + +logger = logging.getLogger(__name__) + +_yaml = ruamel.yaml.YAML(typ="safe") + +StatusLiteral = Literal["up-to-date", "stale", "not-run", "unknown"] +OverallStatusLiteral = Literal["up-to-date", "stale", "unknown"] + + +class StageStatus(BaseModel): + """Status for a single pipeline stage.""" + + status: StatusLiteral + modified_command: bool = False + modified_inputs: list[str] = Field(default_factory=list) + modified_outputs: list[str] = Field(default_factory=list) + missing_outputs: list[str] = Field(default_factory=list) + + +def _base_stage_name(name: str) -> str: + return name.split("@")[0] + + +def _is_dir_md5(md5: str | None) -> bool: + return bool(md5) and md5.endswith(".dir") + + +def _safe_yaml_load(data: bytes) -> dict | None: + try: + return _yaml.load(io.BytesIO(data)) + except Exception as e: + logger.warning(f"Failed to parse YAML: {e}") + return None + + +def _read_dvc_pointer_md5(tree: RepoTree, path: str) -> str | None: + """Return the md5 stored in ``.dvc`` if present.""" + dvc_path = path + ".dvc" + if not tree.is_file(dvc_path): + return None + data = _safe_yaml_load(tree.read_bytes(dvc_path)) + if not data: + return None + outs = data.get("outs") or [] + if not outs: + return None + return outs[0].get("md5") + + +def _tree_file_md5(tree: RepoTree, path: str) -> str | None: + if not tree.is_file(path): + return None + try: + return hashlib.md5(tree.read_bytes(path)).hexdigest() + except Exception as e: + logger.warning(f"Failed to hash {path}: {e}") + return None + + +def _md5_in_object_storage( + md5: str | None, owner_name: str, project_name: str, fs +) -> bool: + if not md5: + return False + try: + return ( + get_data_fpath_for_md5( + owner_name=owner_name, + project_name=project_name, + md5=md5, + fs=fs, + ) + is not None + ) + except Exception as e: + logger.warning(f"Object storage existence check failed for {md5}: {e}") + return False + + +def _build_outs_index(dvc_lock: dict) -> dict[str, str | None]: + """Map out path -> md5 across all stages in the lock.""" + out_map: dict[str, str | None] = {} + for stage in (dvc_lock.get("stages") or {}).values(): + for out in stage.get("outs") or []: + p = out.get("path") + if p: + out_map[p] = out.get("md5") + return out_map + + +def _resolve_current_dep_md5( + path: str, + tree: RepoTree, + outs_index: dict[str, str | None], +) -> str | None: + """Current md5 for a dep path, or None if missing from the tree.""" + if path in outs_index: + return outs_index[path] + ptr = _read_dvc_pointer_md5(tree, path) + if ptr is not None: + return ptr + if tree.is_file(path): + return _tree_file_md5(tree, path) + return None + + +def _get_nested(data: dict | None, dotted_key: str): + cur = data + for part in dotted_key.split("."): + if not isinstance(cur, dict) or part not in cur: + return None + cur = cur[part] + return cur + + +def _normalize_cmd(cmd) -> str | None: + if cmd is None: + return None + if isinstance(cmd, list): + return " && ".join(str(c) for c in cmd).strip() + return str(cmd).strip() + + +def compute_stage_statuses( + dvc_yaml: dict, + dvc_lock: dict, + tree: RepoTree, + owner_name: str, + project_name: str, + fs, +) -> dict[str, StageStatus]: + """Compute per-stage status for a pipeline. + + Keys are stage names as they appear in ``dvc.lock`` (including + ``@``-expansions for matrix/foreach stages). Stages declared in + ``dvc.yaml`` but never run are reported with status ``not-run`` under + their base name. + """ + lock_stages = dvc_lock.get("stages") or {} + yaml_stages = dvc_yaml.get("stages") or {} + outs_index = _build_outs_index(dvc_lock) + result: dict[str, StageStatus] = {} + locked_bases = {_base_stage_name(n) for n in lock_stages.keys()} + for stage_name in yaml_stages.keys(): + if stage_name.startswith("_"): + continue + if stage_name not in locked_bases: + result[stage_name] = StageStatus(status="not-run") + for stage_name, lock_stage in lock_stages.items(): + base = _base_stage_name(stage_name) + if base.startswith("_"): + continue + yaml_stage = yaml_stages.get(base) or {} + modified_command = False + modified_inputs: list[str] = [] + modified_outputs: list[str] = [] + missing_outputs: list[str] = [] + yaml_cmd = _normalize_cmd( + yaml_stage.get("cmd") if isinstance(yaml_stage, dict) else None + ) + lock_cmd = _normalize_cmd(lock_stage.get("cmd")) + if ( + yaml_cmd is not None + and lock_cmd is not None + and "${" not in yaml_cmd + and yaml_cmd != lock_cmd + ): + modified_command = True + for dep in lock_stage.get("deps") or []: + dep_path = dep.get("path") + lock_md5 = dep.get("md5") or dep.get("hash") + if not dep_path: + continue + if _is_dir_md5(lock_md5): + if _md5_in_object_storage( + lock_md5, owner_name, project_name, fs + ): + continue + if not tree.exists(dep_path): + modified_inputs.append(dep_path) + continue + current = _resolve_current_dep_md5(dep_path, tree, outs_index) + if current is None: + if _md5_in_object_storage( + lock_md5, owner_name, project_name, fs + ): + continue + modified_inputs.append(dep_path) + elif lock_md5 is not None and current != lock_md5: + modified_inputs.append(dep_path) + for params_file, locked_params in ( + lock_stage.get("params") or {} + ).items(): + if not isinstance(locked_params, dict): + continue + if not tree.is_file(params_file): + for key in locked_params: + modified_inputs.append(f"{params_file}:{key}") + continue + current_params = ( + _safe_yaml_load(tree.read_bytes(params_file)) or {} + ) + for key, locked_val in locked_params.items(): + if _get_nested(current_params, key) != locked_val: + modified_inputs.append(f"{params_file}:{key}") + for out in lock_stage.get("outs") or []: + out_path = out.get("path") + lock_md5 = out.get("md5") or out.get("hash") + if not out_path: + continue + if _is_dir_md5(lock_md5): + if _md5_in_object_storage( + lock_md5, owner_name, project_name, fs + ): + continue + if not tree.exists(out_path): + missing_outputs.append(out_path) + continue + available_md5: str | None = None + if tree.is_file(out_path): + available_md5 = _tree_file_md5(tree, out_path) + else: + ptr = _read_dvc_pointer_md5(tree, out_path) + if ptr is not None: + available_md5 = ptr + if available_md5 is None: + if _md5_in_object_storage( + lock_md5, owner_name, project_name, fs + ): + continue + missing_outputs.append(out_path) + elif lock_md5 is not None and available_md5 != lock_md5: + modified_outputs.append(out_path) + is_stale = bool( + modified_command + or modified_inputs + or modified_outputs + or missing_outputs + ) + result[stage_name] = StageStatus( + status="stale" if is_stale else "up-to-date", + modified_command=modified_command, + modified_inputs=modified_inputs, + modified_outputs=modified_outputs, + missing_outputs=missing_outputs, + ) + return result + + +def overall_pipeline_status( + stage_statuses: dict[str, StageStatus], +) -> OverallStatusLiteral: + if not stage_statuses: + return "unknown" + statuses = {s.status for s in stage_statuses.values()} + if "stale" in statuses or "not-run" in statuses: + return "stale" + if statuses == {"up-to-date"}: + return "up-to-date" + return "unknown" + + +_MERMAID_NODE_RE = re.compile(r'^\s*(node\d+)\["([^"]+)"\]\s*$') + +_MERMAID_STYLES = { + "stale": "fill:#8a6a00,stroke:#c9a227,color:#fff5cc", + "not-run": "fill:#3a3a3a,stroke:#888,color:#ddd", + "up-to-date": "fill:#1f5a1f,stroke:#3a8a3a,color:#d6f5d6", +} + + +def color_mermaid_by_status( + mermaid: str, stage_statuses: dict[str, StageStatus] +) -> str: + """Append classDef/class lines to a Mermaid diagram that color each + stage node by its status. Stages with ``unknown`` status are left + uncolored. + """ + if not mermaid or not stage_statuses: + return mermaid + rank = {"up-to-date": 0, "unknown": 1, "not-run": 2, "stale": 3} + # Collapse @-expanded matrix stages to their base, worst-status wins + base_status: dict[str, str] = {} + for name, info in stage_statuses.items(): + base = name.split("@")[0] + prev = base_status.get(base) + if prev is None or rank.get(info.status, 0) > rank.get(prev, 0): + base_status[base] = info.status + buckets: dict[str, list[str]] = { + "stale": [], + "not-run": [], + "up-to-date": [], + } + for line in mermaid.splitlines(): + m = _MERMAID_NODE_RE.match(line) + if not m: + continue + node_id, label = m.group(1), m.group(2) + status = base_status.get(label.split("@")[0]) + if status in buckets: + buckets[status].append(node_id) + extra: list[str] = [] + for status, nodes in buckets.items(): + if not nodes: + continue + extra.append(f"\tclassDef {status} {_MERMAID_STYLES[status]}") + extra.append(f"\tclass {','.join(nodes)} {status}") + if not extra: + return mermaid + return mermaid.rstrip() + "\n" + "\n".join(extra) + "\n" + + +def find_stage_for_path(path: str, dvc_lock: dict) -> str | None: + """Return the first stage in ``dvc.lock`` whose outs include *path*.""" + for stage_name, stage in (dvc_lock.get("stages") or {}).items(): + for out in stage.get("outs") or []: + if out.get("path") == path: + return stage_name + return None diff --git a/backend/app/tests/test_pipeline.py b/backend/app/tests/test_pipeline.py new file mode 100644 index 00000000..c88cb858 --- /dev/null +++ b/backend/app/tests/test_pipeline.py @@ -0,0 +1,245 @@ +"""Tests for app.pipeline (pipeline staleness detection).""" + +import hashlib + +import git + +from app.git import get_repo_tree_for_ref +from app.pipeline import ( + compute_stage_statuses, + find_stage_for_path, + overall_pipeline_status, +) + + +class FakeFS: + """Minimal fsspec-like FS recording which md5s exist in object storage.""" + + def __init__(self, existing_md5s: set[str] | None = None) -> None: + self._existing = existing_md5s or set() + + def exists(self, path: str) -> bool: + return any( + md5[:2] in path and md5[2:] in path for md5 in self._existing + ) + + +def _init_repo(repo_dir) -> git.Repo: + repo = git.Repo.init(repo_dir) + repo.git.config(["user.name", "CI Test"]) + repo.git.config(["user.email", "ci-test@example.com"]) + return repo + + +def _commit(repo: git.Repo, files: dict[str, str], msg: str) -> None: + root = repo.working_dir + paths = [] + for rel, content in files.items(): + full = f"{root}/{rel}" + import os + + os.makedirs(os.path.dirname(full) or root, exist_ok=True) + with open(full, "w") as f: + f.write(content) + paths.append(rel) + repo.git.add(paths) + repo.git.commit(["-m", msg]) + + +def _md5(s: str) -> str: + return hashlib.md5(s.encode()).hexdigest() + + +def test_up_to_date_stage(tmp_path): + repo = _init_repo(tmp_path / "repo") + script = "print('hi')\n" + _commit( + repo, + {"script.py": script, "out.txt": "result\n"}, + "init", + ) + tree = get_repo_tree_for_ref(repo, None) + dvc_yaml = { + "stages": { + "run": { + "cmd": "python script.py", + "deps": ["script.py"], + "outs": ["out.txt"], + } + } + } + dvc_lock = { + "stages": { + "run": { + "cmd": "python script.py", + "deps": [{"path": "script.py", "md5": _md5(script)}], + "outs": [{"path": "out.txt", "md5": _md5("result\n")}], + } + } + } + statuses = compute_stage_statuses( + dvc_yaml, dvc_lock, tree, "o", "p", FakeFS() + ) + assert statuses["run"].status == "up-to-date" + assert overall_pipeline_status(statuses) == "up-to-date" + + +def test_modified_command(tmp_path): + repo = _init_repo(tmp_path / "repo") + script = "print('hi')\n" + _commit( + repo, + {"script.py": script, "out.txt": "result\n"}, + "init", + ) + tree = get_repo_tree_for_ref(repo, None) + dvc_yaml = { + "stages": { + "run": { + "cmd": "python script.py --new", + "deps": ["script.py"], + "outs": ["out.txt"], + } + } + } + dvc_lock = { + "stages": { + "run": { + "cmd": "python script.py", + "deps": [{"path": "script.py", "md5": _md5(script)}], + "outs": [{"path": "out.txt", "md5": _md5("result\n")}], + } + } + } + statuses = compute_stage_statuses( + dvc_yaml, dvc_lock, tree, "o", "p", FakeFS() + ) + assert statuses["run"].status == "stale" + assert statuses["run"].modified_command is True + + +def test_modified_input(tmp_path): + repo = _init_repo(tmp_path / "repo") + _commit( + repo, + {"script.py": "print('new')\n", "out.txt": "result\n"}, + "init", + ) + tree = get_repo_tree_for_ref(repo, None) + dvc_yaml = { + "stages": { + "run": { + "cmd": "python script.py", + "deps": ["script.py"], + "outs": ["out.txt"], + } + } + } + dvc_lock = { + "stages": { + "run": { + "cmd": "python script.py", + "deps": [{"path": "script.py", "md5": _md5("print('old')\n")}], + "outs": [{"path": "out.txt", "md5": _md5("result\n")}], + } + } + } + statuses = compute_stage_statuses( + dvc_yaml, dvc_lock, tree, "o", "p", FakeFS() + ) + assert statuses["run"].status == "stale" + assert "script.py" in statuses["run"].modified_inputs + + +def test_missing_output_found_in_object_storage(tmp_path): + repo = _init_repo(tmp_path / "repo") + _commit(repo, {"script.py": "x\n"}, "init") + tree = get_repo_tree_for_ref(repo, None) + out_md5 = _md5("result\n") + dvc_yaml = { + "stages": { + "run": { + "cmd": "python script.py", + "deps": ["script.py"], + "outs": ["data/out.bin"], + } + } + } + dvc_lock = { + "stages": { + "run": { + "cmd": "python script.py", + "deps": [{"path": "script.py", "md5": _md5("x\n")}], + "outs": [{"path": "data/out.bin", "md5": out_md5}], + } + } + } + fs = FakeFS(existing_md5s={out_md5}) + statuses = compute_stage_statuses(dvc_yaml, dvc_lock, tree, "o", "p", fs) + assert statuses["run"].status == "up-to-date" + + +def test_missing_output_not_in_object_storage(tmp_path): + repo = _init_repo(tmp_path / "repo") + _commit(repo, {"script.py": "x\n"}, "init") + tree = get_repo_tree_for_ref(repo, None) + dvc_yaml = { + "stages": { + "run": { + "cmd": "python script.py", + "deps": ["script.py"], + "outs": ["data/out.bin"], + } + } + } + dvc_lock = { + "stages": { + "run": { + "cmd": "python script.py", + "deps": [{"path": "script.py", "md5": _md5("x\n")}], + "outs": [{"path": "data/out.bin", "md5": _md5("result\n")}], + } + } + } + statuses = compute_stage_statuses( + dvc_yaml, dvc_lock, tree, "o", "p", FakeFS() + ) + assert statuses["run"].status == "stale" + assert "data/out.bin" in statuses["run"].missing_outputs + + +def test_not_run_stage(tmp_path): + repo = _init_repo(tmp_path / "repo") + _commit(repo, {"script.py": "x\n"}, "init") + tree = get_repo_tree_for_ref(repo, None) + dvc_yaml = { + "stages": { + "run": { + "cmd": "python script.py", + "deps": ["script.py"], + "outs": ["out.txt"], + } + } + } + dvc_lock = {"stages": {}} + statuses = compute_stage_statuses( + dvc_yaml, dvc_lock, tree, "o", "p", FakeFS() + ) + assert statuses["run"].status == "not-run" + assert overall_pipeline_status(statuses) == "stale" + + +def test_find_stage_for_path(): + dvc_lock = { + "stages": { + "plot": { + "outs": [{"path": "figures/foo.png", "md5": "abc"}], + }, + "train": { + "outs": [{"path": "model.pkl", "md5": "def"}], + }, + } + } + assert find_stage_for_path("figures/foo.png", dvc_lock) == "plot" + assert find_stage_for_path("model.pkl", dvc_lock) == "train" + assert find_stage_for_path("missing", dvc_lock) is None