Skip to content

Add staleness determination to backend and mermaid diagrams#558

Open
petebachant wants to merge 6 commits intomainfrom
stale
Open

Add staleness determination to backend and mermaid diagrams#558
petebachant wants to merge 6 commits intomainfrom
stale

Conversation

@petebachant
Copy link
Copy Markdown
Member

@petebachant petebachant commented Apr 15, 2026

TODO

  • Don't slow down pipeline page load--maybe cache

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds backend logic to determine DVC pipeline “staleness” per stage (and overall), and uses that information to enrich pipeline/figure/publication API responses and color Mermaid pipeline diagrams accordingly.

Changes:

  • Introduces app.pipeline utilities to compute per-stage staleness and color Mermaid diagrams by status.
  • Extends API response models (Pipeline/Figure/Publication) to include stage_status(es) and overall pipeline status.
  • Updates project routes to compute and attach stage status metadata (and infer stages from dvc.lock outs when missing).

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 6 comments.

File Description
backend/app/pipeline.py New staleness computation + Mermaid coloring + stage lookup helpers.
backend/app/tests/test_pipeline.py Unit tests covering core staleness scenarios and stage lookup.
backend/app/models/core.py Adds StageStatus model and exposes stage status fields in Pipeline/Figure/Publication responses.
backend/app/api/routes/projects/core.py Computes stage statuses in pipeline/figures/publications endpoints and attaches status data to responses.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread backend/app/pipeline.py
Comment on lines +123 to +129
if path in outs_index:
return outs_index[path]
ptr = _read_dvc_pointer_md5(tree, path)
if ptr is not None:
return ptr
if tree.is_file(path):
return _tree_file_md5(tree, path)
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_resolve_current_dep_md5() returns outs_index[path] for any dep that is also listed as an out elsewhere in dvc.lock, without checking whether that out actually exists in the repo tree (as a file or .dvc pointer) or in object storage. This can mark downstream stages as up-to-date even when their input artifact is missing (the upstream stage would be stale, but the downstream stage’s deps wouldn’t reflect that). Consider resolving deps by checking the tree / pointer first and only falling back to outs_index if the artifact is actually available, or treat missing upstream outs as modified inputs.

Suggested change
if path in outs_index:
return outs_index[path]
ptr = _read_dvc_pointer_md5(tree, path)
if ptr is not None:
return ptr
if tree.is_file(path):
return _tree_file_md5(tree, path)
ptr = _read_dvc_pointer_md5(tree, path)
if ptr is not None:
return ptr
if tree.is_file(path):
return _tree_file_md5(tree, path)
if path in outs_index:
return outs_index[path]

Copilot uses AI. Check for mistakes.
Comment thread backend/app/pipeline.py
Comment on lines +86 to +103
def _md5_in_object_storage(
md5: str | None, owner_name: str, project_name: str, fs
) -> bool:
if not md5:
return False
try:
return (
get_data_fpath_for_md5(
owner_name=owner_name,
project_name=project_name,
md5=md5,
fs=fs,
)
is not None
)
except Exception as e:
logger.warning(f"Object storage existence check failed for {md5}: {e}")
return False
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_md5_in_object_storage() calls get_data_fpath_for_md5() (which can do multiple remote fs.exists checks) every time it’s invoked. compute_stage_statuses() may call this repeatedly for the same md5s across stages/outs/deps, which can add noticeable latency on pipeline-heavy repos. Consider memoizing md5→bool for the duration of compute_stage_statuses() (or functools.lru_cache on _md5_in_object_storage keyed by md5/owner/project) so each md5 triggers at most one object-store probe per request.

Copilot uses AI. Check for mistakes.
Comment on lines 2790 to +2813
ck_info = get_ck_info_from_repo(repo)
pipeline = get_dvc_pipeline_from_repo(repo)
publications = ck_info.get("publications", [])
overleaf_info = calkit.overleaf.get_sync_info(
wdir=repo.working_dir, ck_info=ck_info, fix_legacy=False
)
resp = []
tree = app.projects.get_repo_tree_for_ref(repo, ref)
ck_info_full, dvc_lock_outs, zip_path_map = (
app.projects.get_ck_info_and_dvc_outs_from_tree(project, tree)
)
dvc_lock: dict = {}
if tree.is_file("dvc.lock"):
dvc_lock = ryaml.load(tree.read_bytes("dvc.lock").decode()) or {}
stage_statuses = {}
try:
stage_statuses = compute_stage_statuses(
dvc_yaml=pipeline,
dvc_lock=dvc_lock,
tree=tree,
owner_name=project.owner_account_name,
project_name=project.name,
fs=get_object_fs(),
)
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compute_stage_statuses() is run against tree = get_repo_tree_for_ref(repo, ref) (so it reflects ref), but dvc_yaml is taken from pipeline = get_dvc_pipeline_from_repo(repo), which reads dvc.yaml from the working tree checkout. Since get_repo(..., ref=...) intentionally does not checkout/mutate the working tree for ref-based reads, this can compute stage statuses using a dvc.yaml that doesn't correspond to ref. Consider loading/parsing dvc.yaml from tree (similar to the figures route) when ref is provided, so the status computation is consistent.

Copilot uses AI. Check for mistakes.
Comment on lines +3544 to +3556
tree = app.projects.get_repo_tree_for_ref(repo, ref)
dvc_lock: dict = {}
if tree.is_file("dvc.lock"):
dvc_lock = ryaml.load(tree.read_bytes("dvc.lock").decode()) or {}
fs = get_object_fs()
stage_statuses = compute_stage_statuses(
dvc_yaml=dvc_pipeline,
dvc_lock=dvc_lock,
tree=tree,
owner_name=project.owner_account_name,
project_name=project.name,
fs=fs,
)
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When ref is provided, tree = get_repo_tree_for_ref(repo, ref) represents that historical ref, but dvc_pipeline / mermaid / dvc_content were read from the working tree checkout (which get_repo(..., ref=...) intentionally does not checkout). This means compute_stage_statuses() can end up diffing a dvc.yaml from HEAD against a dvc.lock/tree from ref, producing incorrect staleness results. Consider loading dvc.yaml (and params/calkit.yaml if needed) from tree when ref is set, or otherwise document/disable ref for this endpoint.

Copilot uses AI. Check for mistakes.
Comment on lines 3561 to +3566
return Pipeline(
dvc_stages=dvc_pipeline["stages"],
mermaid=mermaid,
dvc_yaml=dvc_content,
calkit_yaml=calkit_content,
stage_statuses=stage_statuses,
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pipeline.stage_statuses is typed as dict[str, models.core.StageStatus], but here you pass through the dict returned by compute_stage_statuses, whose values are app.pipeline.StageStatus (a different Pydantic model). With Pydantic v2 this is likely to raise a validation error (Input should be a valid dictionary or instance of StageStatus). Consider converting to the API model before constructing Pipeline (e.g., dump each status to a dict / validate into models.core.StageStatus), or unify the StageStatus model so both layers use the same type.

Suggested change
return Pipeline(
dvc_stages=dvc_pipeline["stages"],
mermaid=mermaid,
dvc_yaml=dvc_content,
calkit_yaml=calkit_content,
stage_statuses=stage_statuses,
pipeline_stage_statuses = {
stage_name: (
stage_status.model_dump()
if isinstance(stage_status, BaseModel)
else stage_status
)
for stage_name, stage_status in stage_statuses.items()
}
return Pipeline(
dvc_stages=dvc_pipeline["stages"],
mermaid=mermaid,
dvc_yaml=dvc_content,
calkit_yaml=calkit_content,
stage_statuses=pipeline_stage_statuses,

Copilot uses AI. Check for mistakes.
if auto_stage is not None:
pub["stage"] = auto_stage
if pub.get("stage"):
pub["stage_info"] = pipeline.get("stages", {}).get(pub["stage"])
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When stage is auto-detected via find_stage_for_path, it may include @... expansions from dvc.lock, but pipeline.get('stages', ...) is keyed by base stage names from dvc.yaml. That can make stage_info unexpectedly None. Consider looking up stage info with the base name (e.g., pub['stage'].split('@')[0]) while still keeping the full stage name for stage_status lookup.

Suggested change
pub["stage_info"] = pipeline.get("stages", {}).get(pub["stage"])
base_stage = pub["stage"].split("@", 1)[0]
pub["stage_info"] = pipeline.get("stages", {}).get(base_stage)

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: In progress

Development

Successfully merging this pull request may close these issues.

2 participants