Skip to content

feat: add workflow stage resume#747

Open
andreatgretel wants to merge 4 commits into
mainfrom
andreatgretel/feat/stage-level-resume
Open

feat: add workflow stage resume#747
andreatgretel wants to merge 4 commits into
mainfrom
andreatgretel/feat/stage-level-resume

Conversation

@andreatgretel

@andreatgretel andreatgretel commented Jun 11, 2026

Copy link
Copy Markdown
Contributor

📋 Summary

Adds stage-level resume support for chained workflows so compatible completed stages can be reused, matching partial stages can continue through the existing single-stage resume path, and downstream stages rerun when upstream outputs change.

🔗 Related Issue

N/A

🔄 Changes

  • Add CompositeWorkflow.run(resume=...) with completed-stage reuse and partial-stage delegation.
  • Invalidate downstream stages when an upstream stage reruns, changes, or has missing selected/callback output.
  • Harden workflow metadata writes and let ResumeMode.IF_POSSIBLE fall back to fresh runs when prior metadata is unusable.
  • Add public workflow resume tests for skip, rerun, partial/failed-stage resume, callback-output, output-processor, completed-empty, corrupt metadata, and strict resume behavior.
  • Document workflow resume behavior in MkDocs and Fern docs.
  • Update the workflow chaining plan with the completed stage-level resume slice.

🧪 Testing

  • make test passes
  • Unit tests added/updated
  • E2E tests added/updated (if applicable)

Ran:

  • .venv/bin/ruff format .
  • .venv/bin/ruff check packages/data-designer/src/data_designer/interface/composite_workflow.py packages/data-designer/tests/interface/test_composite_workflow.py
  • .venv/bin/ruff format --check packages/data-designer/src/data_designer/interface/composite_workflow.py packages/data-designer/tests/interface/test_composite_workflow.py
  • .venv/bin/pytest packages/data-designer/tests/interface/test_composite_workflow.py -q - 55 passed, 2 warnings
  • .venv/bin/pytest /home/ubuntu/Code/reviews/DataDesigner-747/smoke_test.py -q -s - 2 passed against NVIDIA Build (nvidia/nemotron-3-nano-30b-a3b) and NVIDIA Inference (openai/openai/gpt-5.4-nano) using /home/ubuntu/Code/.env

Note: full .venv/bin/ruff check --fix . currently hits an unrelated existing generated-notebook lint in docs/colab_notebooks/7-nemotron-personas.ipynb (F404).

✅ Checklist

  • Follows commit message conventions
  • Commits are signed off (DCO)
  • Architecture docs updated (if applicable)

@andreatgretel andreatgretel marked this pull request as ready for review June 11, 2026 19:58
@andreatgretel andreatgretel requested a review from a team as a code owner June 11, 2026 19:58
@github-actions

github-actions Bot commented Jun 11, 2026

Copy link
Copy Markdown
Contributor

MkDocs preview: https://55b9c747.dd-docs-preview.pages.dev

Fern preview: https://nvidia-preview-pr-747.docs.buildwithfern.com/nemo/datadesigner

Fern previews include the docs-website version archive with PR changes synced into latest. Notebook tutorials are rendered without execution outputs in previews.

@greptile-apps

greptile-apps Bot commented Jun 11, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR adds stage-level resume support to CompositeWorkflow.run(), allowing completed stages to be reused, partially-run stages to be delegated through the existing single-stage resume path, and downstream stages to be invalidated when an upstream stage reruns or its outputs change.

  • Resume logic: Fingerprint-based skip/resume decision with force_rerun_downstream correctly preventing downstream stages from being skipped after any stage runs; ResumeMode.ALWAYS raises for non-resumable stages unless a prior stage already ran and set the force flag.
  • Metadata hardening: _write_workflow_metadata now uses PID+UUID temp file with os.fsync + atomic os.replace to prevent corrupt-on-crash scenarios.
  • Tests: 11 new tests covering skip, rerun, partial/failed-stage resume, callback-output invalidation, output-processor skip, moved-artifact (relative paths), corrupt metadata, and strict ALWAYS rejection — passing 55 tests in total.

Confidence Score: 5/5

Safe to merge; the resume logic is well-reasoned and the test suite covers the critical paths including skip, partial-stage delegation, downstream invalidation, corrupt-metadata fallback, and strict rejection.

The fingerprint-chain logic, force_rerun_downstream flag, and _can_skip_prior_stage guard all behave correctly across the scenarios exercised. The atomic metadata write (PID+UUID temp + os.replace) is a solid improvement. No logic errors or data-correctness issues were found in the changed code.

No files require special attention.

Important Files Changed

Filename Overview
packages/data-designer/src/data_designer/interface/composite_workflow.py Core implementation of stage-level resume; adds ~155 lines of new logic including fingerprint-matched skip/resume, downstream invalidation, relative-path metadata, atomic metadata writes, and result reconstruction from metadata.
packages/data-designer/tests/interface/test_composite_workflow.py Adds 11 new resume tests covering the main happy-path and failure scenarios; good breadth of coverage for skip, partial-stage delegation, callback invalidation, corrupt metadata, and strict mode rejection.
docs/concepts/workflow-chaining.md Adds a Resume section describing IF_POSSIBLE and ALWAYS modes; removes the 'not implemented yet' bullet from Current limits.
fern/versions/latest/pages/concepts/workflow-chaining.mdx Mirror of the MkDocs resume section update for Fern docs.
plans/workflow-chaining/workflow-chaining.md Minor status update marking stage-level resume as implemented and listing what remains deferred.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A["workflow.run(resume=...)"] --> B[_read_prior_workflow_metadata]
    B --> C{NEVER mode?}
    C -- Yes --> D[prior_metadata = None]
    C -- No --> E{metadata valid?}
    E -- corrupt/missing --> F{ALWAYS mode?}
    F -- Yes --> RAISE1[raise WorkflowError]
    F -- No --> D
    E -- Yes --> G[prior_metadata loaded]

    D & G --> LOOP[For each stage]
    LOOP --> H{skipped_upstream set?}
    H -- Yes --> SKIP_EMPTY["status=skipped_empty_upstream, continue"]

    H -- No --> I[compute stage_fingerprint]
    I --> J{"prior_matches?\nfingerprint == prior\n&& !force_rerun_downstream"}

    J -- Yes --> K{"_can_skip_prior_stage?\nstatus=completed*\noutput parquet exists"}
    K -- Yes --> L["Copy prior metadata\nReconstruct result from disk\ncontinue"]
    L -- completed_empty --> M[set skipped_upstream_stage]
    L --> LOOP

    K -- No --> N{"prior status\nrunning/failed &\nstage_path exists?"}
    N -- Yes --> O[stage_resume = ALWAYS]
    N -- No --> P{"ALWAYS mode &\n!force_rerun_downstream?"}
    P -- Yes --> RAISE2[raise WorkflowError: not reusable]

    J -- No --> P
    P -- No --> Q[stage_resume = NEVER]

    O & Q --> R{"stage_resume == NEVER\n& stage_path exists?"}
    R -- Yes --> S[rmtree stage_path]
    R -- No --> T[keep stage dir]

    S & T --> U["Write status=running to metadata"]
    U --> V["data_designer.create(..., resume=stage_resume)"]
    V --> W{output_processors?}
    W -- Yes --> X["rmtree output-processors\nre-run processors"]
    W -- No --> Y[use main result]
    X & Y --> Z{on_success callback?}
    Z -- Yes --> AA[output_seed_path = callback output]
    Z -- No --> AB[output_seed_path = stage output]
    AA & AB --> AC{output_records == 0?}
    AC -- "Yes & allow_empty" --> AD["status=completed_empty\nset skipped_upstream_stage"]
    AC -- No --> AE[status=completed]
    AD & AE --> AF["force_rerun_downstream = True\nwrite metadata, continue"]
    AF --> LOOP
    LOOP --> DONE[return CompositeWorkflowResults]
Loading

Reviews (3): Last reviewed commit: "fix: address workflow resume review feed..." | Re-trigger Greptile

Comment thread packages/data-designer/src/data_designer/interface/composite_workflow.py Outdated
Comment thread packages/data-designer/tests/interface/test_composite_workflow.py Outdated
@github-actions

Copy link
Copy Markdown
Contributor

Review: PR #747feat: add workflow stage resume

Summary

Adds stage-level resume to CompositeWorkflow.run(resume=...):

  • Reuses compatible completed stages (fingerprint match + output verifiable).
  • Delegates partial stages (running / failed) to the existing single-stage resume path via DataDesigner.create(..., resume=ResumeMode.ALWAYS).
  • Invalidates downstream stages on first re-run, change, or missing output.
  • Adds atomic, fsync'd workflow-metadata.json writes via tmp+os.replace.
  • Falls back to fresh runs in ResumeMode.IF_POSSIBLE when prior metadata is missing/corrupt; raises in ResumeMode.ALWAYS.
  • Tests for skip / rerun / partial-resume / callback-output / output-processor / completed-empty / corrupt metadata / strict-resume.
  • Docs added in both mkdocs and Fern; phase-3 plan updated.

Diff is well-scoped (5 files, +503/−10) and lives entirely in the interface layer — no engine/config touch, no import-direction violations.

Findings

Correctness — Medium

  1. output_seed_path is persisted and reused as an absolute path (composite_workflow.py:400, consumed at composite_workflow.py:309). If a user moves their artifact root (rsync, archive, container mount), prior_metadata still matches by name/stage_dir, but output_seed_path points at the old location while _stage_result_from_metadata builds an ArtifactStorage rooted at the new workflow_path. The result is a mixed-state read (old data, new storage object) or a _count_parquet_records raise — silently invalidating reuse in the best case, confusing behavior in the worst. Storing this as a path relative to workflow_path would be more robust. (Same applies to callback_output_path.)

  2. ResumeMode.ALWAYS semantics drift once any stage is re-run. The added doc says: "If a stage changed or its selected output is missing, the workflow raises instead of starting fresh." But once an ALWAYS resume successfully resumes a partial upstream stage from checkpoint (stage_resume == ALWAYS, line 328), force_rerun_downstream flips to True and the downstream elif resume == ResumeMode.ALWAYS and not force_rerun_downstream: guard (line 330) is bypassed — so a downstream stage whose fingerprint also differs from prior metadata silently runs fresh instead of raising. That may be the intended behavior, but it deviates from the doc and from the strict reading of the test_composite_workflow_resume_always_rejects_changed_stage contract. Consider tightening the doc and adding a test for "ALWAYS, upstream resumed-from-checkpoint, downstream fingerprint differs".

Correctness — Low

  1. _load_stage_analysis swallows every exception (composite_workflow.py:552):

    try:
        return DatasetProfilerResults.model_validate({...})
    except Exception:
        return None

    Bare except Exception masks both ValidationError (the only one you'd plausibly recover from) and unrelated bugs introduced by future schema changes. Narrow it to pydantic.ValidationError so a real bug doesn't decay into a silent analysis=None.

  2. stage_metadata.update(prior_stage_metadata) on the skip path (line 308) wholesale-imports every key the prior run wrote, including ones not produced by the current run (config, seed_path, seeded_from_stage, num_records_requested, duration_sec). That's by design here, but means the new metadata file's seeded_from_stage for a re-skipped stage may name a stage from the prior run that no longer exists in the current workflow definition. Fine for inspection; worth a note if later phases lean on those fields.

  3. _stage_result_from_metadata returns DatasetMetadata() — empty (line 528). The original run may have collected real DatasetMetadata. Reusing the cached stage exposes a stripped-down DatasetCreationResults to user code. Document or persist+rehydrate.

Style / Nits

  1. Unused import in TYPE_CHECKING removal: DatasetProfilerResults was lifted out of TYPE_CHECKING (line 18) because it's now used at module scope by _load_stage_analysis. That's correct, but DatasetProfilerResults is heavy; the project lazy-loads pandas already (lazy.pd). Confirm that data_designer.config.analysis.dataset_profiler doesn't drag in numpy/pandas at import — if it does, this regresses the import-time profile (make perf-import would catch it).

  2. tmp_path = path.with_name(f"{path.name}.tmp.{os.getpid()}") (line 706): if two threads in the same process both write workflow metadata for the same workflow (uncommon but possible during async cleanup), they collide on the temp filename. Adding a uuid suffix would be safer; PID alone is not unique within a process. Probably out of scope.

  3. force_rerun_downstream is largely redundant with the upstream-fingerprint chain (since each stage's fingerprint folds in upstream_fingerprint). Its real job is to short-circuit the ResumeMode.ALWAYS raise on line 330. A short comment to that effect would help future readers — right now the flag's necessity is non-obvious.

Tests

Coverage is strong. Suggestions only:

  • Add a test pinning the ALWAYS-after-partial-upstream-resume behavior (see finding DataDesigner.make_seed_reference_from_file doesn't support paths with multiple parquet partition #2) — whichever way the project decides on, lock it in.
  • test_composite_workflow_resume_if_possible_skips_stage_with_output_processors checks main-batch mtime; consider also asserting the output-processors directory wasn't touched (its mtime is the actual cache-hit signal for the output-processor work).
  • Worth a regression test for prior metadata whose stages length exceeds the current workflow's stage list — _get_prior_stage_metadata handles index >= len(stages), but the inverse case (current workflow has fewer stages than prior) isn't explicitly covered.

Security

No new attack surface. Metadata is JSON-validated on read and treated as data. The os.fsync + os.replace pattern is the right defense against torn writes from crashes mid-run.

Performance

  • One extra _count_parquet_records call per skippable stage (validation in _can_skip_prior_stage + the actual count in run). Cheap (parquet metadata, not row read), but doubling it is unnecessary — could cache the count or skip the validation since the count immediately follows.
  • _stage_result_from_metadata instantiates ArtifactStorage with resume=ResumeMode.ALWAYS, which triggers the resolved_dataset_name check. Fine for hits; on misses you'll get an ArtifactStorageError from the validator instead of going through DataDesignerWorkflowError. Consider catching and re-wrapping at the boundary so callers see a single error type per the project's "Errors normalize at boundaries" invariant.

Docs

  • docs/concepts/workflow-chaining.md and the Fern mirror are updated symmetrically — good.
  • Phase-3 plan update accurately describes the slice and remaining deferred items.
  • One precision nit: the ResumeMode.ALWAYS blurb should clarify the "first changed stage raises; downstream of a checkpoint-resumed stage runs fresh" behavior (or close the gap).

Verdict

Solid, well-tested addition. The core logic is correct and the new behavior is opt-in through resume=, so risk to existing callers is minimal. Address #1 (relative paths) before this is relied on for portable artifacts; #2 (ALWAYS doc/test) and #3 (narrow the except) are easy follow-ups. Everything else is taste.

Recommendation: approve with the path-portability and except Exception narrowing addressed (or filed as follow-ups). Other findings are non-blocking polish.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant