Skip to content

Commit 4950ed5

Browse files
committed
docs: update workflow chaining plan
1 parent 2d38c88 commit 4950ed5

1 file changed

Lines changed: 84 additions & 5 deletions

File tree

plans/workflow-chaining/workflow-chaining.md

Lines changed: 84 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,30 @@ This is the *only* place in the chaining surface that uses an in-memory handoff.
9191

9292
Each stage seeds from the **previous stage's final dataset** - the post-processor output with dropped columns excluded. This is the same DataFrame returned by `DatasetCreationResults.load_dataset()`.
9393

94-
Processor outputs (named processor artifacts), schema-transform artifacts, dropped columns, and media assets (images stored on disk with relative paths in the DataFrame) are NOT automatically forwarded in v1. If a downstream stage needs one of these artifacts, use an `on_success` callback to read the upstream stage artifact directory and write a `LocalFileSeedSource`-compatible dataset for the next stage. First-class seeding from named processor outputs is a future extension.
94+
Processor outputs (named processor artifacts), schema-transform artifacts, dropped columns, and media assets (images stored on disk with relative paths in the DataFrame) are NOT automatically forwarded in v1. If a downstream stage needs one of these artifacts, use an `on_success` callback to read the upstream stage artifact directory and write a `LocalFileSeedSource`-compatible dataset for the next stage. First-class seeding from named processor outputs is a future extension. Related near-term extensions are seeded processor-only configs and structured stage `postprocessors`, so processor-expressible transforms can remain normal Data Designer processor configs rather than workflow-specific callback code.
9595

96-
#### Between-stage callbacks
96+
#### Between-stage postprocessors and callbacks
9797

98-
Users may need to transform data between stages. `CompositeWorkflow.add_stage()` supports an optional `on_success` callback that transforms a completed stage's output before child stages seed from it:
98+
Users may need to transform data between stages. The preferred future API is a `postprocessors` argument on `CompositeWorkflow.add_stage()`:
99+
100+
```python
101+
workflow.add_stage(
102+
"generated",
103+
config_gen,
104+
num_records=1000,
105+
postprocessors=[
106+
DropColumnsProcessorConfig(
107+
name="drop_scratch",
108+
column_names=["scratch_*"],
109+
)
110+
],
111+
)
112+
workflow.add_stage("enriched", config_enrich)
113+
```
114+
115+
Postprocessors run after the stage's `DataDesigner.create()` completes and before child stages seed from that stage. They operate on the stage's final dataset, write a deterministic postprocessed artifact, and make that artifact the stage output for downstream seeding. They should accept existing built-in or plugin `ProcessorConfig` objects and be fingerprinted from those configs, so no callback version string is needed.
116+
117+
Arbitrary Python data transforms should come through the `CustomProcessor` work tracked in #159, rather than a workflow-only callable type. Until that exists, `on_success` remains the escape hatch for arbitrary data-changing code:
99118

100119
```python
101120
def filter_high_quality(stage_output_path: Path) -> Path:
@@ -117,12 +136,33 @@ workflow.add_stage(
117136
workflow.add_stage("enriched", config_enrich)
118137
```
119138

120-
The callback receives the path to the completed stage's artifact directory (containing `parquet-files/`, `metadata.json`, etc.) and returns a path that child stages will seed from. This keeps large DataFrames on disk and gives users full control. Callback outputs should live under a managed subdirectory such as `<stage-dir>/callback-outputs/<callback-name>/` so they stay deterministic and easy to clean.
139+
The callback receives the path to the completed stage's artifact directory (containing `parquet-files/`, `metadata.json`, etc.) and returns a path that child stages will seed from. This keeps large DataFrames on disk and gives users full control. Callback outputs should live under a managed subdirectory such as `<stage-dir>/callback-outputs/<callback-name>/` so they stay deterministic and easy to clean. Once postprocessors and seeded processor-only configs are valid, processor-expressible transforms should use those structured APIs instead of raw callbacks:
121140

122-
**Callback resume policy**: The composite workflow does not hash arbitrary Python source or bytecode in v1. `on_success_version` is the explicit callback identity recorded in `workflow-metadata.json` and included in the stage's exposed output fingerprint. If `on_success` is set without `on_success_version`, that stage's transformed output is treated as dirty on every resume so a changed callback cannot silently reuse stale transformed data. The resolved path returned by the callback is also recorded as the dependent stage's seed path; a stage seeded from callback output is skippable only if that recorded path still exists and is readable by `LocalFileSeedSource`.
141+
```python
142+
filter_config = DataDesignerConfigBuilder().add_processor(
143+
DropColumnsProcessorConfig(
144+
name="drop_scratch",
145+
column_names=["scratch_*"],
146+
),
147+
)
148+
workflow.add_stage("generated", config_gen, num_records=1000)
149+
workflow.add_stage("filtered", filter_config)
150+
workflow.add_stage("enriched", config_enrich)
151+
```
152+
153+
**Callback resume policy**: The composite workflow does not hash arbitrary Python source or bytecode in v1. `on_success_version` is the explicit callback identity recorded in `workflow-metadata.json` and included in the stage's exposed output fingerprint. If `on_success` is set without `on_success_version`, that stage's transformed output is treated as dirty on every resume so a changed callback cannot silently reuse stale transformed data. The resolved path returned by the callback is also recorded as the dependent stage's seed path; a stage seeded from callback output is skippable only if that recorded path still exists and is readable by `LocalFileSeedSource`. Structured postprocessors should use their processor config fingerprint instead.
123154

124155
**Empty stage policy**: If a callback filters all rows (or a stage produces zero rows), the composite workflow raises `DataDesignerWorkflowError` by default. Stages can opt in to empty output with `allow_empty=True` on `add_stage()`, in which case the workflow marks that stage as `completed_empty` and all downstream stages as `skipped_empty_upstream`. `CompositeWorkflowResults` still contains every declared stage name: executed stages map to `DatasetCreationResults`, while skipped downstream stages map to `SkippedStageResult` with `status="skipped_empty_upstream"` and `upstream_stage=<stage_name>`. This avoids `KeyError`/`None` ambiguity and gives resume a durable state distinct from normal completion.
125156

157+
**Stage output selection**: The default stage output is the final dataset returned by `DatasetCreationResults.load_dataset()`. A future additive option can select a named processor artifact as the stage output:
158+
159+
```python
160+
workflow.add_stage("drafts", config_drafts, num_records=1000)
161+
workflow.add_stage("sft", sft_config, output="processor:sft")
162+
```
163+
164+
When `output="processor:<name>"` is set, child stages seed from `processors-files/<name>/` instead of `parquet-files/`, and workflow metadata records the selected output source. This covers processors such as `SchemaTransformProcessorConfig`, which write side datasets while leaving the main dataset unchanged.
165+
126166
**Export/push policy**: `CompositeWorkflowResults` export and push helpers default to the final stage's dataset. Exporting selected stages or the full composite workflow artifact bundle is explicit future work.
127167

128168
#### `num_records` and seed behavior
@@ -361,6 +401,8 @@ result_2 = data_designer.create(config_2, num_records=200) # explode: 50 -> 200
361401
- Add `compose_workflow(name: str)` factory method on `DataDesigner`.
362402
- Tests: multi-stage runs, explode/filter via callbacks, num_records defaulting, duplicate stage-name rejection, artifact layout, throttle reuse across stages.
363403

404+
**Status after PR #636:** Implemented `CompositeWorkflow`, `compose_workflow()`, `to_config_builder()`, disk handoff, stage metadata, `acreate()`, shared throttle manager reuse, explicit stage artifact roots, cloned stage builders, and concurrent-safe seed reader/resource-provider handling. Still deferred: stage-level resume, DAG branches, `allow_resize` removal, seeded processor-only configs, postprocessors, stage output selection, config bundles, and first-class artifact seeding.
405+
364406
### Sidecar: `acreate()` on `DataDesigner` (independent of chaining v1)
365407

366408
- Add `async def acreate(...)` mirroring `create()` but returning the awaitable instead of blocking.
@@ -377,6 +419,7 @@ result_2 = data_designer.create(config_2, num_records=200) # explode: 50 -> 200
377419
- Remove the `allow_resize` field from the config schema.
378420
- Add fail-fast guard in `ProcessorRunner` for pre-batch row-count changes.
379421
- Tests: verify rejection, migration path examples.
422+
- Migration examples should prefer `postprocessors` for inline processor-expressible transforms, or seeded processor-only configs when the transform deserves its own named stage. Raw `on_success` remains the escape hatch for arbitrary custom transforms.
380423

381424
### Phase 3: Stage-level resume
382425

@@ -413,6 +456,39 @@ Items not on the current roadmap but worth flagging so they don't get accidental
413456

414457
**Failure hooks.** v1 should raise on failure by default. A future `on_failure` callback could support cleanup, custom recovery, or explicit handling of all-rows-filtered cases, but it should not obscure the default failure semantics.
415458

459+
**Seeded processor-only configs.** Do not introduce a separate public `stages` wrapper module or a workflow-specific `processors=[...]` stage argument. Keep `CompositeWorkflow.add_stage()` as the orchestration surface, and make processor-only work expressible as a normal seeded config:
460+
461+
```python
462+
workflow.add_stage("drafts", config_drafts, num_records=1000)
463+
workflow.add_stage("filtered", DataDesignerConfigBuilder().add_processor(...))
464+
workflow.add_stage("enriched", config_enrich)
465+
```
466+
467+
The same config should also work outside workflows when the user supplies a seed directly:
468+
469+
```python
470+
config = (
471+
DataDesignerConfigBuilder()
472+
.with_seed_dataset(LocalFileSeedSource(path="data/*.parquet"))
473+
.add_processor(...)
474+
)
475+
result = data_designer.create(config)
476+
```
477+
478+
This requires validation to allow a seed-only config when it has processors and a seed dataset, while still rejecting processor-only configs with no seed and configs that drop all output columns.
479+
480+
**Stage postprocessors.** Add `postprocessors=[ProcessorConfig, ...]` to `CompositeWorkflow.add_stage()` for compact inline transforms at a stage boundary. Internally this can reuse the seeded processor-only config path above: seed a temporary processor config from the completed stage output, run it under a deterministic subdirectory, and use its final dataset as the stage output. This keeps row-count-changing 1:N and N:1 operations out of the async generation batch path while making the common case concise. Custom arbitrary transforms should use `CustomProcessor` when #159 lands; until then, keep raw `on_success` as the escape hatch.
481+
482+
**Config bundles.** Built-in or plugin-provided bundles should be reusable config fragments/factories, not execution stages. A bundle can return or extend a `DataDesignerConfigBuilder`, and `add_stage()` can accept either a builder or a bundle by normalizing the bundle to a builder:
483+
484+
```python
485+
workflow.add_stage("draft_qa", bundles.document_qa(model_alias="fast"))
486+
workflow.add_stage("quality", bundles.quality_judge(model_alias="judge"))
487+
workflow.add_stage("sft", bundles.sft_export(), output="processor:sft")
488+
```
489+
490+
Bundles can cover DataArc-like document QA, refinement, distillation, SFT export, or GRPO export shapes without introducing another workflow abstraction. Plugins can contribute bundles alongside existing column, seed-reader, and processor plugins.
491+
416492
**First-class artifact seeding.** v1 can bridge named processor outputs or schema-transform artifacts through `on_success` callbacks. A later API could support `seed_from_artifact(...)` or named processor output references directly.
417493

418494
**Auto-composition from a single config.** Auto-detecting stage boundaries in one config is possible, but likely overkill for the initial roadmap. If it returns later, it should be a convenience layer on top of `CompositeWorkflow`, not a separate execution model.
@@ -433,6 +509,8 @@ These were open in earlier drafts; recording the resolutions here so the design
433509

434510
6. **Export and push semantics** -> v1 export/push defaults to the final stage's dataset only. Exporting selected stages or a full composite workflow artifact bundle is future work.
435511

512+
7. **Stage extension model** -> The public surface remains `CompositeWorkflow`, not a separate `stages` module. Near-term extensibility is seeded processor-only configs, postprocessors, and stage output selection. Shared built-in/plugin units are config bundles, not a new executor.
513+
436514
## Open questions
437515

438516
1. **Preview support**: Should `workflow.preview()` run all stages with small `num_records`? Or just preview the last stage seeded from a prior full run?
@@ -445,5 +523,6 @@ These were open in earlier drafts; recording the resolutions here so the design
445523

446524
- #447 - AsyncRunController refactor (partially superseded: pre-batch resize handling moves to composite workflow level instead of controller level)
447525
- #526 / #525 - Resume interrupted runs (single-stage batch/row-group resume primitive used by composite workflow stage resume)
526+
- #159 - Custom processors (needed for arbitrary Python postprocessors without workflow-only callback types)
448527
- #462 - Progress bar and scheduler polish (independent)
449528
- #464 - Custom column retryable errors (independent)

0 commit comments

Comments
 (0)