Add staleness determination to backend and mermaid diagrams#558
Add staleness determination to backend and mermaid diagrams#558petebachant wants to merge 6 commits intomainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Adds backend logic to determine DVC pipeline “staleness” per stage (and overall), and uses that information to enrich pipeline/figure/publication API responses and color Mermaid pipeline diagrams accordingly.
Changes:
- Introduces
app.pipelineutilities to compute per-stage staleness and color Mermaid diagrams by status. - Extends API response models (Pipeline/Figure/Publication) to include
stage_status(es)and overall pipelinestatus. - Updates project routes to compute and attach stage status metadata (and infer stages from
dvc.lockouts when missing).
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
| backend/app/pipeline.py | New staleness computation + Mermaid coloring + stage lookup helpers. |
| backend/app/tests/test_pipeline.py | Unit tests covering core staleness scenarios and stage lookup. |
| backend/app/models/core.py | Adds StageStatus model and exposes stage status fields in Pipeline/Figure/Publication responses. |
| backend/app/api/routes/projects/core.py | Computes stage statuses in pipeline/figures/publications endpoints and attaches status data to responses. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if path in outs_index: | ||
| return outs_index[path] | ||
| ptr = _read_dvc_pointer_md5(tree, path) | ||
| if ptr is not None: | ||
| return ptr | ||
| if tree.is_file(path): | ||
| return _tree_file_md5(tree, path) |
There was a problem hiding this comment.
_resolve_current_dep_md5() returns outs_index[path] for any dep that is also listed as an out elsewhere in dvc.lock, without checking whether that out actually exists in the repo tree (as a file or .dvc pointer) or in object storage. This can mark downstream stages as up-to-date even when their input artifact is missing (the upstream stage would be stale, but the downstream stage’s deps wouldn’t reflect that). Consider resolving deps by checking the tree / pointer first and only falling back to outs_index if the artifact is actually available, or treat missing upstream outs as modified inputs.
| if path in outs_index: | |
| return outs_index[path] | |
| ptr = _read_dvc_pointer_md5(tree, path) | |
| if ptr is not None: | |
| return ptr | |
| if tree.is_file(path): | |
| return _tree_file_md5(tree, path) | |
| ptr = _read_dvc_pointer_md5(tree, path) | |
| if ptr is not None: | |
| return ptr | |
| if tree.is_file(path): | |
| return _tree_file_md5(tree, path) | |
| if path in outs_index: | |
| return outs_index[path] |
| def _md5_in_object_storage( | ||
| md5: str | None, owner_name: str, project_name: str, fs | ||
| ) -> bool: | ||
| if not md5: | ||
| return False | ||
| try: | ||
| return ( | ||
| get_data_fpath_for_md5( | ||
| owner_name=owner_name, | ||
| project_name=project_name, | ||
| md5=md5, | ||
| fs=fs, | ||
| ) | ||
| is not None | ||
| ) | ||
| except Exception as e: | ||
| logger.warning(f"Object storage existence check failed for {md5}: {e}") | ||
| return False |
There was a problem hiding this comment.
_md5_in_object_storage() calls get_data_fpath_for_md5() (which can do multiple remote fs.exists checks) every time it’s invoked. compute_stage_statuses() may call this repeatedly for the same md5s across stages/outs/deps, which can add noticeable latency on pipeline-heavy repos. Consider memoizing md5→bool for the duration of compute_stage_statuses() (or functools.lru_cache on _md5_in_object_storage keyed by md5/owner/project) so each md5 triggers at most one object-store probe per request.
| ck_info = get_ck_info_from_repo(repo) | ||
| pipeline = get_dvc_pipeline_from_repo(repo) | ||
| publications = ck_info.get("publications", []) | ||
| overleaf_info = calkit.overleaf.get_sync_info( | ||
| wdir=repo.working_dir, ck_info=ck_info, fix_legacy=False | ||
| ) | ||
| resp = [] | ||
| tree = app.projects.get_repo_tree_for_ref(repo, ref) | ||
| ck_info_full, dvc_lock_outs, zip_path_map = ( | ||
| app.projects.get_ck_info_and_dvc_outs_from_tree(project, tree) | ||
| ) | ||
| dvc_lock: dict = {} | ||
| if tree.is_file("dvc.lock"): | ||
| dvc_lock = ryaml.load(tree.read_bytes("dvc.lock").decode()) or {} | ||
| stage_statuses = {} | ||
| try: | ||
| stage_statuses = compute_stage_statuses( | ||
| dvc_yaml=pipeline, | ||
| dvc_lock=dvc_lock, | ||
| tree=tree, | ||
| owner_name=project.owner_account_name, | ||
| project_name=project.name, | ||
| fs=get_object_fs(), | ||
| ) |
There was a problem hiding this comment.
compute_stage_statuses() is run against tree = get_repo_tree_for_ref(repo, ref) (so it reflects ref), but dvc_yaml is taken from pipeline = get_dvc_pipeline_from_repo(repo), which reads dvc.yaml from the working tree checkout. Since get_repo(..., ref=...) intentionally does not checkout/mutate the working tree for ref-based reads, this can compute stage statuses using a dvc.yaml that doesn't correspond to ref. Consider loading/parsing dvc.yaml from tree (similar to the figures route) when ref is provided, so the status computation is consistent.
| tree = app.projects.get_repo_tree_for_ref(repo, ref) | ||
| dvc_lock: dict = {} | ||
| if tree.is_file("dvc.lock"): | ||
| dvc_lock = ryaml.load(tree.read_bytes("dvc.lock").decode()) or {} | ||
| fs = get_object_fs() | ||
| stage_statuses = compute_stage_statuses( | ||
| dvc_yaml=dvc_pipeline, | ||
| dvc_lock=dvc_lock, | ||
| tree=tree, | ||
| owner_name=project.owner_account_name, | ||
| project_name=project.name, | ||
| fs=fs, | ||
| ) |
There was a problem hiding this comment.
When ref is provided, tree = get_repo_tree_for_ref(repo, ref) represents that historical ref, but dvc_pipeline / mermaid / dvc_content were read from the working tree checkout (which get_repo(..., ref=...) intentionally does not checkout). This means compute_stage_statuses() can end up diffing a dvc.yaml from HEAD against a dvc.lock/tree from ref, producing incorrect staleness results. Consider loading dvc.yaml (and params/calkit.yaml if needed) from tree when ref is set, or otherwise document/disable ref for this endpoint.
| return Pipeline( | ||
| dvc_stages=dvc_pipeline["stages"], | ||
| mermaid=mermaid, | ||
| dvc_yaml=dvc_content, | ||
| calkit_yaml=calkit_content, | ||
| stage_statuses=stage_statuses, |
There was a problem hiding this comment.
Pipeline.stage_statuses is typed as dict[str, models.core.StageStatus], but here you pass through the dict returned by compute_stage_statuses, whose values are app.pipeline.StageStatus (a different Pydantic model). With Pydantic v2 this is likely to raise a validation error (Input should be a valid dictionary or instance of StageStatus). Consider converting to the API model before constructing Pipeline (e.g., dump each status to a dict / validate into models.core.StageStatus), or unify the StageStatus model so both layers use the same type.
| return Pipeline( | |
| dvc_stages=dvc_pipeline["stages"], | |
| mermaid=mermaid, | |
| dvc_yaml=dvc_content, | |
| calkit_yaml=calkit_content, | |
| stage_statuses=stage_statuses, | |
| pipeline_stage_statuses = { | |
| stage_name: ( | |
| stage_status.model_dump() | |
| if isinstance(stage_status, BaseModel) | |
| else stage_status | |
| ) | |
| for stage_name, stage_status in stage_statuses.items() | |
| } | |
| return Pipeline( | |
| dvc_stages=dvc_pipeline["stages"], | |
| mermaid=mermaid, | |
| dvc_yaml=dvc_content, | |
| calkit_yaml=calkit_content, | |
| stage_statuses=pipeline_stage_statuses, |
| if auto_stage is not None: | ||
| pub["stage"] = auto_stage | ||
| if pub.get("stage"): | ||
| pub["stage_info"] = pipeline.get("stages", {}).get(pub["stage"]) |
There was a problem hiding this comment.
When stage is auto-detected via find_stage_for_path, it may include @... expansions from dvc.lock, but pipeline.get('stages', ...) is keyed by base stage names from dvc.yaml. That can make stage_info unexpectedly None. Consider looking up stage info with the base name (e.g., pub['stage'].split('@')[0]) while still keeping the full stage name for stage_status lookup.
| pub["stage_info"] = pipeline.get("stages", {}).get(pub["stage"]) | |
| base_stage = pub["stage"].split("@", 1)[0] | |
| pub["stage_info"] = pipeline.get("stages", {}).get(base_stage) |
TODO