Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 77 additions & 1 deletion backend/app/api/routes/projects/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@
make_mermaid_diagram,
output_from_pipeline,
)
from app.pipeline import (
color_mermaid_by_status,
compute_stage_statuses,
find_stage_for_path,
overall_pipeline_status,
)
from app.git import (
get_ck_info,
get_ck_info_from_repo,
Expand Down Expand Up @@ -1454,6 +1460,24 @@ def _maybe_add_figure(path: str) -> None:
).all()
)
# Get the figure content and base64 encode it.
dvc_lock: dict = {}
if tree.is_file("dvc.lock"):
dvc_lock = ryaml.load(tree.read_bytes("dvc.lock").decode()) or {}
dvc_yaml: dict = {}
if tree.is_file("dvc.yaml"):
dvc_yaml = ryaml.load(tree.read_bytes("dvc.yaml").decode()) or {}
stage_statuses = {}
try:
stage_statuses = compute_stage_statuses(
dvc_yaml=dvc_yaml,
dvc_lock=dvc_lock,
tree=tree,
owner_name=project.owner_account_name,
project_name=project.name,
fs=get_object_fs(),
)
except Exception as e:
logger.warning(f"Failed to compute pipeline status for figures: {e}")
for fig in figures:
item = app.projects.get_contents_from_tree(
project=project,
Expand All @@ -1466,6 +1490,12 @@ def _maybe_add_figure(path: str) -> None:
fig["content"] = item.content
fig["url"] = item.url
fig["comment_count"] = comment_counts.get(fig["path"], 0)
if not fig.get("stage"):
auto_stage = find_stage_for_path(fig["path"], dvc_lock)
if auto_stage is not None:
fig["stage"] = auto_stage
if fig.get("stage") and fig["stage"] in stage_statuses:
fig["stage_status"] = stage_statuses[fig["stage"]].model_dump()
fig["storage"] = item.storage
return [Figure.model_validate(fig) for fig in figures]

Expand Down Expand Up @@ -2641,9 +2671,32 @@ def get_project_publications(
ck_info_full, dvc_lock_outs, zip_path_map = (
app.projects.get_ck_info_and_dvc_outs_from_tree(project, tree)
)
dvc_lock: dict = {}
if tree.is_file("dvc.lock"):
dvc_lock = ryaml.load(tree.read_bytes("dvc.lock").decode()) or {}
stage_statuses = {}
try:
stage_statuses = compute_stage_statuses(
dvc_yaml=pipeline,
dvc_lock=dvc_lock,
tree=tree,
owner_name=project.owner_account_name,
project_name=project.name,
fs=get_object_fs(),
)
except Exception as e:
logger.warning(
f"Failed to compute pipeline status for publications: {e}"
)
for pub in publications:
if "stage" in pub:
if not pub.get("stage") and pub.get("path"):
auto_stage = find_stage_for_path(pub["path"], dvc_lock)
if auto_stage is not None:
pub["stage"] = auto_stage
if pub.get("stage"):
pub["stage_info"] = pipeline.get("stages", {}).get(pub["stage"])
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When stage is auto-detected via find_stage_for_path, it may include @... expansions from dvc.lock, but pipeline.get('stages', ...) is keyed by base stage names from dvc.yaml. That can make stage_info unexpectedly None. Consider looking up stage info with the base name (e.g., pub['stage'].split('@')[0]) while still keeping the full stage name for stage_status lookup.

Suggested change
pub["stage_info"] = pipeline.get("stages", {}).get(pub["stage"])
base_stage = pub["stage"].split("@", 1)[0]
pub["stage_info"] = pipeline.get("stages", {}).get(base_stage)

Copilot uses AI. Check for mistakes.
if pub["stage"] in stage_statuses:
pub["stage_status"] = stage_statuses[pub["stage"]].model_dump()
# See if we can fetch the content for this publication
if "path" in pub:
try:
Expand Down Expand Up @@ -3358,11 +3411,34 @@ def get_project_pipeline(
stream = io.StringIO()
ryaml.dump({"pipeline": ck_info["pipeline"]}, stream)
calkit_content = stream.getvalue()
# Compute per-stage staleness against the committed dvc.lock
stage_statuses: dict = {}
overall_status = "unknown"
try:
tree = app.projects.get_repo_tree_for_ref(repo, ref)
dvc_lock: dict = {}
if tree.is_file("dvc.lock"):
dvc_lock = ryaml.load(tree.read_bytes("dvc.lock").decode()) or {}
fs = get_object_fs()
stage_statuses = compute_stage_statuses(
dvc_yaml=dvc_pipeline,
dvc_lock=dvc_lock,
tree=tree,
owner_name=project.owner_account_name,
project_name=project.name,
fs=fs,
)
Comment on lines +3418 to +3430
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When ref is provided, tree = get_repo_tree_for_ref(repo, ref) represents that historical ref, but dvc_pipeline / mermaid / dvc_content were read from the working tree checkout (which get_repo(..., ref=...) intentionally does not checkout). This means compute_stage_statuses() can end up diffing a dvc.yaml from HEAD against a dvc.lock/tree from ref, producing incorrect staleness results. Consider loading dvc.yaml (and params/calkit.yaml if needed) from tree when ref is set, or otherwise document/disable ref for this endpoint.

Copilot uses AI. Check for mistakes.
overall_status = overall_pipeline_status(stage_statuses)
mermaid = color_mermaid_by_status(mermaid, stage_statuses)
except Exception as e:
logger.warning(f"Failed to compute pipeline status: {e}")
return Pipeline(
dvc_stages=dvc_pipeline["stages"],
mermaid=mermaid,
dvc_yaml=dvc_content,
calkit_yaml=calkit_content,
stage_statuses=stage_statuses,
Comment on lines 3435 to +3440
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pipeline.stage_statuses is typed as dict[str, models.core.StageStatus], but here you pass through the dict returned by compute_stage_statuses, whose values are app.pipeline.StageStatus (a different Pydantic model). With Pydantic v2 this is likely to raise a validation error (Input should be a valid dictionary or instance of StageStatus). Consider converting to the API model before constructing Pipeline (e.g., dump each status to a dict / validate into models.core.StageStatus), or unify the StageStatus model so both layers use the same type.

Suggested change
return Pipeline(
dvc_stages=dvc_pipeline["stages"],
mermaid=mermaid,
dvc_yaml=dvc_content,
calkit_yaml=calkit_content,
stage_statuses=stage_statuses,
pipeline_stage_statuses = {
stage_name: (
stage_status.model_dump()
if isinstance(stage_status, BaseModel)
else stage_status
)
for stage_name, stage_status in stage_statuses.items()
}
return Pipeline(
dvc_stages=dvc_pipeline["stages"],
mermaid=mermaid,
dvc_yaml=dvc_content,
calkit_yaml=calkit_content,
stage_statuses=pipeline_stage_statuses,

Copilot uses AI. Check for mistakes.
status=overall_status,
)


Expand Down
12 changes: 12 additions & 0 deletions backend/app/models/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,11 +595,21 @@ class DvcForeachStage(SQLModel):
do: DvcPipelineStage


class StageStatus(SQLModel):
status: Literal["up-to-date", "stale", "not-run", "unknown"]
modified_command: bool = False
modified_inputs: list[str] = Field(default_factory=list)
modified_outputs: list[str] = Field(default_factory=list)
missing_outputs: list[str] = Field(default_factory=list)


class Pipeline(SQLModel):
mermaid: str
dvc_stages: dict[str, DvcPipelineStage | DvcForeachStage]
dvc_yaml: str
calkit_yaml: str | None
stage_statuses: dict[str, StageStatus] = Field(default_factory=dict)
status: Literal["up-to-date", "stale", "unknown"] = "unknown"


class Question(SQLModel, table=True):
Expand All @@ -616,6 +626,7 @@ class Figure(SQLModel):
title: str
description: str | None = None
stage: str | None = None
stage_status: "StageStatus | None" = None
dataset: str | None = None
content: str | None = None # Base64 encoded
url: str | None = None
Expand Down Expand Up @@ -889,6 +900,7 @@ class Publication(BaseModel):
| None
) = None
stage: str | None = None
stage_status: "StageStatus | None" = None
content: str | None = None
stage_info: DvcPipelineStage | None = None
url: str | None = None
Expand Down
Loading
Loading