feat: add workflow stage resume#747
Conversation
|
MkDocs preview: https://55b9c747.dd-docs-preview.pages.dev Fern preview: https://nvidia-preview-pr-747.docs.buildwithfern.com/nemo/datadesigner
|
Greptile SummaryThis PR adds stage-level resume support to
|
| Filename | Overview |
|---|---|
| packages/data-designer/src/data_designer/interface/composite_workflow.py | Core implementation of stage-level resume; adds ~155 lines of new logic including fingerprint-matched skip/resume, downstream invalidation, relative-path metadata, atomic metadata writes, and result reconstruction from metadata. |
| packages/data-designer/tests/interface/test_composite_workflow.py | Adds 11 new resume tests covering the main happy-path and failure scenarios; good breadth of coverage for skip, partial-stage delegation, callback invalidation, corrupt metadata, and strict mode rejection. |
| docs/concepts/workflow-chaining.md | Adds a Resume section describing IF_POSSIBLE and ALWAYS modes; removes the 'not implemented yet' bullet from Current limits. |
| fern/versions/latest/pages/concepts/workflow-chaining.mdx | Mirror of the MkDocs resume section update for Fern docs. |
| plans/workflow-chaining/workflow-chaining.md | Minor status update marking stage-level resume as implemented and listing what remains deferred. |
Flowchart
%%{init: {'theme': 'neutral'}}%%
flowchart TD
A["workflow.run(resume=...)"] --> B[_read_prior_workflow_metadata]
B --> C{NEVER mode?}
C -- Yes --> D[prior_metadata = None]
C -- No --> E{metadata valid?}
E -- corrupt/missing --> F{ALWAYS mode?}
F -- Yes --> RAISE1[raise WorkflowError]
F -- No --> D
E -- Yes --> G[prior_metadata loaded]
D & G --> LOOP[For each stage]
LOOP --> H{skipped_upstream set?}
H -- Yes --> SKIP_EMPTY["status=skipped_empty_upstream, continue"]
H -- No --> I[compute stage_fingerprint]
I --> J{"prior_matches?\nfingerprint == prior\n&& !force_rerun_downstream"}
J -- Yes --> K{"_can_skip_prior_stage?\nstatus=completed*\noutput parquet exists"}
K -- Yes --> L["Copy prior metadata\nReconstruct result from disk\ncontinue"]
L -- completed_empty --> M[set skipped_upstream_stage]
L --> LOOP
K -- No --> N{"prior status\nrunning/failed &\nstage_path exists?"}
N -- Yes --> O[stage_resume = ALWAYS]
N -- No --> P{"ALWAYS mode &\n!force_rerun_downstream?"}
P -- Yes --> RAISE2[raise WorkflowError: not reusable]
J -- No --> P
P -- No --> Q[stage_resume = NEVER]
O & Q --> R{"stage_resume == NEVER\n& stage_path exists?"}
R -- Yes --> S[rmtree stage_path]
R -- No --> T[keep stage dir]
S & T --> U["Write status=running to metadata"]
U --> V["data_designer.create(..., resume=stage_resume)"]
V --> W{output_processors?}
W -- Yes --> X["rmtree output-processors\nre-run processors"]
W -- No --> Y[use main result]
X & Y --> Z{on_success callback?}
Z -- Yes --> AA[output_seed_path = callback output]
Z -- No --> AB[output_seed_path = stage output]
AA & AB --> AC{output_records == 0?}
AC -- "Yes & allow_empty" --> AD["status=completed_empty\nset skipped_upstream_stage"]
AC -- No --> AE[status=completed]
AD & AE --> AF["force_rerun_downstream = True\nwrite metadata, continue"]
AF --> LOOP
LOOP --> DONE[return CompositeWorkflowResults]
Reviews (3): Last reviewed commit: "fix: address workflow resume review feed..." | Re-trigger Greptile
Review: PR #747 —
|
📋 Summary
Adds stage-level resume support for chained workflows so compatible completed stages can be reused, matching partial stages can continue through the existing single-stage resume path, and downstream stages rerun when upstream outputs change.
🔗 Related Issue
N/A
🔄 Changes
CompositeWorkflow.run(resume=...)with completed-stage reuse and partial-stage delegation.ResumeMode.IF_POSSIBLEfall back to fresh runs when prior metadata is unusable.🧪 Testing
make testpassesRan:
.venv/bin/ruff format ..venv/bin/ruff check packages/data-designer/src/data_designer/interface/composite_workflow.py packages/data-designer/tests/interface/test_composite_workflow.py.venv/bin/ruff format --check packages/data-designer/src/data_designer/interface/composite_workflow.py packages/data-designer/tests/interface/test_composite_workflow.py.venv/bin/pytest packages/data-designer/tests/interface/test_composite_workflow.py -q- 55 passed, 2 warnings.venv/bin/pytest /home/ubuntu/Code/reviews/DataDesigner-747/smoke_test.py -q -s- 2 passed against NVIDIA Build (nvidia/nemotron-3-nano-30b-a3b) and NVIDIA Inference (openai/openai/gpt-5.4-nano) using/home/ubuntu/Code/.envNote: full
.venv/bin/ruff check --fix .currently hits an unrelated existing generated-notebook lint indocs/colab_notebooks/7-nemotron-personas.ipynb(F404).✅ Checklist