Skip to content

Commit 1457cc9

Browse files
committed
docs: address review feedback - data contract, resume safety, seed controls, edge cases
1 parent 9f640ca commit 1457cc9

1 file changed

Lines changed: 37 additions & 18 deletions

File tree

plans/workflow-chaining/workflow-chaining.md

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,13 @@ Add **workflow chaining**: a thin orchestration layer that sequences multiple ge
2323

2424
As a secondary benefit, chaining also enables the removal of `allow_resize` and simplification of the engine's resize handling.
2525

26-
### Secondary benefit: `allow_resize` removal
26+
### Secondary benefit: `allow_resize` removal and sync/async convergence
2727

2828
The `allow_resize` flag on column configs lets a generator change the row count mid-generation. This works in the sync engine but is fundamentally incompatible with the async engine's fixed-size `CompletionTracker` grid (currently rejected with a validation error). Pre-batch processors that resize have a similar problem.
2929

30-
With chaining in place, resize becomes a between-stage concern rather than a mid-generation concern. This lets us remove `allow_resize` and the associated engine complexity, and disallow row-count changes in pre-batch processors. Users who need resize use a pipeline with a stage boundary at the resize point.
30+
`allow_resize` is one of the remaining divergences between sync and async. Since the long-term direction is to remove the sync engine entirely, maintaining a sync-only feature is counterproductive. With chaining in place, resize becomes a between-stage concern rather than a mid-generation concern. This lets us remove `allow_resize` and the associated engine complexity, and disallow row-count changes in pre-batch processors. Users who need resize use a pipeline with a stage boundary at the resize point.
31+
32+
Note: `allow_resize` is currently documented in custom columns, plugin examples, and agent rollout ingestion docs. Removal requires a deprecation cycle and doc updates.
3133

3234
### Why chaining instead of fixing async resize
3335

@@ -72,21 +74,28 @@ config_convos = (
7274
result_2 = dd.create(config_convos, num_records=1000)
7375
```
7476

75-
This is a thin wrapper: loads the dataset, optionally filters columns, wraps in `DataFrameSeedSource`, returns a new config builder. No tracking, no provenance, no callbacks - just a quick bridge for iteration.
77+
This is a thin wrapper: loads the dataset into memory, optionally filters columns, wraps in `DataFrameSeedSource`, returns a new config builder. No tracking, no provenance, no callbacks - just a quick bridge for iteration. Not suitable for large datasets (loads full DataFrame into memory) or serializable configs (`DataFrameSeedSource` can't be written to YAML). For production pipelines, use the `Pipeline` class.
7678

7779
**Auto-chaining from a single config (future):**
7880

7981
The engine detects columns that were previously `allow_resize=True` (or a new marker like `stage_boundary=True`) and auto-splits the DAG into stages. This is a convenience layer on top of the explicit API - not required for v1.
8082

83+
#### Stage data contract
84+
85+
Each stage seeds from the **previous stage's final dataset** - the post-processor output with dropped columns excluded. This is the same DataFrame returned by `DatasetCreationResults.load_dataset()`.
86+
87+
Processor outputs (named processor artifacts) and media assets (images stored on disk with relative paths in the DataFrame) are NOT automatically forwarded. If a downstream stage needs image columns from an upstream stage, the pipeline must resolve image paths relative to the upstream stage's artifact directory. This needs explicit handling - TBD in implementation.
88+
8189
#### Between-stage callbacks
8290

8391
Users may need to transform data between stages. The pipeline supports an optional callback:
8492

8593
```python
8694
def filter_high_quality(stage_output_path: Path) -> Path:
87-
df = pd.read_parquet(stage_output_path / "data")
95+
df = pd.read_parquet(stage_output_path / "parquet-files")
8896
df = df[df["quality_score"] > 0.8]
8997
out = stage_output_path.parent / "filtered"
98+
out.mkdir(exist_ok=True)
9099
df.to_parquet(out / "data.parquet")
91100
return out
92101

@@ -98,52 +107,58 @@ pipeline.add_stage(
98107
)
99108
```
100109

101-
The callback receives the path to the completed stage's artifacts and returns a path to the (possibly modified) artifacts. This keeps large DataFrames on disk and gives users full control.
110+
The callback receives the path to the completed stage's artifact directory (containing `parquet-files/`, `metadata.json`, etc.) and returns a path that the next stage will seed from. This keeps large DataFrames on disk and gives users full control.
102111

103-
The callback signature is `(Path) -> Path`. If the user returns the same path, no copy is made. If they return a new path, the next stage seeds from that.
112+
**Empty stage policy**: If a callback filters all rows (or a stage produces zero rows), the pipeline raises `DataDesignerPipelineError` by default. Stages can opt in to empty output with `allow_empty=True` on `add_stage()`, in which case the pipeline short-circuits and skips subsequent stages.
104113

105-
#### `num_records` behavior
114+
#### `num_records` and seed behavior
106115

107116
- If `num_records` is explicitly set on a stage, that value is used.
108117
- If omitted, defaults to the previous stage's output row count (after any between-stage callback).
109118
- The seed reader's existing cycling behavior handles the explode case: requesting 1000 records from a 100-row seed cycles through the seed 10 times.
119+
- `add_stage()` accepts optional `sampling_strategy` (ordered/shuffle) and `selection_strategy` (IndexRange/PartitionBlock) to control how the previous stage's output is sampled. Defaults to ordered.
110120

111121
#### Artifact management
112122

113-
Each stage writes to its own subdirectory under the pipeline's artifact path:
123+
The pipeline owns its directory layout directly, bypassing `ArtifactStorage`'s default auto-rename behavior (which appends timestamps to non-empty directories). Stage directories use stable, deterministic names based on stage index and name:
114124

115125
```
116126
artifacts/
117127
pipeline-name/
118-
stage-1-personas/
128+
stage-0-personas/
119129
parquet-files/
120130
metadata.json
121-
stage-2-conversations/
131+
stage-1-conversations/
122132
parquet-files/
123133
metadata.json
124-
stage-3-judged/
134+
stage-2-judged/
125135
parquet-files/
126136
metadata.json
127-
pipeline-metadata.json # stage order, configs, lineage
137+
pipeline-metadata.json
128138
```
129139

140+
The pipeline creates each stage's `ArtifactStorage` with the stage directory as `dataset_name`, ensuring stable paths across reruns.
141+
130142
#### Checkpointing and resume
131143

132144
Each stage produces durable parquet output before the next stage starts. This provides natural checkpoint boundaries:
133145

134146
- If stage 3 of 4 fails, stages 1 and 2 are already on disk.
135-
- A `resume=True` flag on `pipeline.run()` skips completed stages (detected via `pipeline-metadata.json`).
147+
- A `resume=True` flag on `pipeline.run()` skips completed stages.
136148
- Within a stage, batch-level resume (#525) can further reduce re-work.
137149

150+
**Resume safety**: Naive "skip if directory exists" is not sufficient. Configs, model settings, callbacks, or DD version may have changed between runs. Resume must compare a fingerprint of each stage's inputs (config hash, num_records, DD version, upstream stage fingerprint) against what's recorded in `pipeline-metadata.json`. If any input changed, that stage and all downstream stages must re-run. This is a phase 3 concern but the metadata format in phase 1 should record enough information to support it.
151+
138152
The connection to #525: chaining gives coarse (stage-level) checkpointing for free. #525 gives fine (batch-level) checkpointing within a stage. They are complementary.
139153

140154
#### Provenance
141155

142156
`pipeline-metadata.json` records:
143157
- Stage order, names, and configs used
158+
- Config fingerprint (hash) per stage for resume invalidation
144159
- `num_records` requested vs actual per stage
145160
- Which stage's output seeded the next
146-
- Timestamp and duration per stage
161+
- Timestamp, duration, and DD version per stage
147162

148163
### Part 2: Remove `allow_resize`
149164

@@ -167,9 +182,7 @@ With the pipeline in place, `allow_resize` is no longer needed as an engine-inte
167182

168183
In `ProcessorRunner.run_pre_batch()` and `run_pre_batch_on_df()`, raise `DatasetProcessingError` if the returned DataFrame has a different row count than the input.
169184

170-
This applies to both sync and async paths. Users who need to filter or expand between seeds and generation use the pipeline's between-stage callback instead.
171-
172-
For users who need programmatic filtering at the seed boundary, a seed reader plugin is the escape hatch (the seed reader can filter/transform before the engine ever sees the data).
185+
This applies to both sync and async paths. Users who need to filter or expand between seeds and generation use the pipeline's between-stage callback instead. Note that a seed reader plugin is NOT an equivalent escape hatch: seed readers run before any columns are generated (including samplers), so they can't filter on generated column values.
173186

174187
### Where it fits in the architecture
175188

@@ -305,14 +318,20 @@ result_2 = dd.create(config_2, num_records=200) # explode: 50 -> 200
305318

306319
## Open questions
307320

308-
1. **In-memory vs on-disk handoff between stages**: For small datasets, `DataFrameSeedSource` avoids disk I/O. For large datasets, writing parquet between stages is safer. Should the pipeline auto-detect based on row count, or always go through disk for consistency?
321+
1. **In-memory vs on-disk handoff between stages**: For small datasets, `DataFrameSeedSource` avoids disk I/O. For large datasets, writing parquet between stages is safer. Should the pipeline auto-detect based on row count, or always go through disk for consistency? (Leaning toward always-on-disk for simplicity and resume support.)
309322

310323
2. **Preview support**: Should `pipeline.preview()` run all stages with small `num_records`? Or just preview the last stage seeded from a prior full run?
311324

312325
3. **Config serialization**: A pipeline config can't be serialized to YAML if stages use `DataFrameSeedSource`. For persistence, stages would need symbolic references ("seed from stage X's output"). This is needed for auto-chaining (phase 4) but not for the explicit API (phases 1-3).
313326

314327
4. **Naming**: `Pipeline` vs `Chain` vs `WorkflowChain`. `Pipeline` is the most intuitive and aligns with ML pipeline terminology.
315328

329+
5. **Image/media column forwarding**: Images in create mode are stored as relative file paths. If a downstream stage seeds from an upstream stage that produced images, the relative paths break. Options: (a) resolve to absolute paths at stage boundary, (b) copy media assets into downstream stage's directory, (c) document as unsupported in v1.
330+
331+
6. **Branch/fan-out semantics**: Linear chaining covers the common cases. But "generate once, judge several ways" (fan-out) currently requires building multiple pipelines that repeat stage 1. Should the pipeline support DAG-shaped stage graphs, or is that future work?
332+
333+
7. **Downstream seeding scope**: Should downstream stages only seed from the final dataset, or should they also be able to access dropped columns or named processor outputs from upstream stages?
334+
316335
## Related issues
317336

318337
- #447 - AsyncRunController refactor (partially superseded: pre-batch resize handling moves to pipeline level instead of controller level)

0 commit comments

Comments
 (0)