Skip to content

Commit 35f466a

Browse files
committed
fix: validate workflow rerun inputs
1 parent 8334456 commit 35f466a

3 files changed

Lines changed: 48 additions & 1 deletion

File tree

fern/versions/latest/pages/concepts/workflow-chaining.mdx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ results = workflow.run(
129129
)
130130
```
131131

132-
If the reviewed data replaces a stage's selected output in place, use `rerun_from="expanded"` to force that stage and its descendants to rebuild from the current boundary output.
132+
If the reviewed data replaces a stage's selected output in place, run with `resume=ResumeMode.IF_POSSIBLE` and `rerun_from="expanded"` to rebuild that stage and its descendants from the current boundary output.
133133

134134
## Current limits
135135

packages/data-designer/src/data_designer/interface/composite_workflow.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,10 @@ def run(
258258
"""
259259
if not self._stages:
260260
raise DataDesignerWorkflowError(f"Workflow {self.name!r} has no stages.")
261+
if rerun_from is not None and resume == ResumeMode.NEVER:
262+
raise DataDesignerWorkflowError(
263+
"rerun_from requires resume to be ResumeMode.IF_POSSIBLE or ResumeMode.ALWAYS."
264+
)
261265

262266
stage_indices = _stage_indices_by_name(self._stages)
263267
target_stage_names = _normalize_stage_names(targets, stage_indices, "target")
@@ -537,6 +541,13 @@ def _validate_stage_output_overrides(
537541
raise DataDesignerWorkflowError(
538542
f"Stage output override(s) must be ancestors of a target stage: {', '.join(non_ancestors)}."
539543
)
544+
for name in sorted(stage_output_overrides):
545+
override_path = _stage_output_override(name, stage_output_overrides)
546+
if override_path is not None:
547+
try:
548+
_count_parquet_records(override_path)
549+
except DataDesignerWorkflowError as exc:
550+
raise DataDesignerWorkflowError(f"Invalid stage output override for stage {name!r}: {exc}") from exc
540551

541552

542553
def _stage_output_override(

packages/data-designer/tests/interface/test_composite_workflow.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,24 @@ def test_composite_workflow_rerun_from_forces_stage_and_descendants(
493493
assert [call.kwargs["resume"] for call in create_mock.call_args_list] == [ResumeMode.NEVER, ResumeMode.NEVER]
494494

495495

496+
def test_composite_workflow_rerun_from_requires_resume(
497+
stub_artifact_path: Path,
498+
stub_model_providers: list[ModelProvider],
499+
stub_model_configs: list[ModelConfig],
500+
stub_dataset_profiler_results,
501+
) -> None:
502+
data_designer = _data_designer(stub_artifact_path, stub_model_providers)
503+
create_mock = _patch_create(data_designer, stub_dataset_profiler_results)
504+
workflow = data_designer.compose_workflow(name="rerun-from-no-resume")
505+
workflow.add_stage("base", _category_builder(stub_model_configs), num_records=2)
506+
workflow.add_stage("copy", _copy_builder(stub_model_configs))
507+
508+
with pytest.raises(DataDesignerWorkflowError, match="rerun_from requires resume"):
509+
workflow.run(rerun_from="copy")
510+
511+
assert create_mock.call_count == 0
512+
513+
496514
def test_composite_workflow_stage_output_override_seeds_descendants(
497515
tmp_path: Path,
498516
stub_model_providers: list[ModelProvider],
@@ -532,6 +550,24 @@ def test_composite_workflow_stage_output_override_seeds_descendants(
532550
assert metadata["stages"][1]["seed_path"] == str(approved_path.resolve())
533551

534552

553+
def test_composite_workflow_stage_output_override_path_must_exist(
554+
stub_artifact_path: Path,
555+
stub_model_providers: list[ModelProvider],
556+
stub_model_configs: list[ModelConfig],
557+
stub_dataset_profiler_results,
558+
) -> None:
559+
data_designer = _data_designer(stub_artifact_path, stub_model_providers)
560+
create_mock = _patch_create(data_designer, stub_dataset_profiler_results)
561+
workflow = data_designer.compose_workflow(name="missing-override")
562+
workflow.add_stage("base", _category_builder(stub_model_configs), num_records=2)
563+
workflow.add_stage("copy", _copy_builder(stub_model_configs))
564+
565+
with pytest.raises(DataDesignerWorkflowError, match="Invalid stage output override"):
566+
workflow.run(targets="copy", stage_output_overrides={"base": stub_artifact_path / "missing.parquet"})
567+
568+
assert create_mock.call_count == 0
569+
570+
535571
def test_composite_workflow_resume_if_possible_skips_completed_stages(
536572
stub_artifact_path: Path,
537573
stub_model_providers: list[ModelProvider],

0 commit comments

Comments
 (0)