From 53113ff026a4186b3ad1d753561389fd7870822c Mon Sep 17 00:00:00 2001 From: Andre Manoel Date: Wed, 15 Apr 2026 20:14:38 +0000 Subject: [PATCH 01/13] docs: add plan for workflow chaining and allow_resize removal Proposes replacing the in-place allow_resize mechanism with a Pipeline class that chains multiple generation stages. Each stage gets a fresh fixed-size tracker, and resize becomes a between-stage concern. --- plans/workflow-chaining/workflow-chaining.md | 204 +++++++++++++++++++ 1 file changed, 204 insertions(+) create mode 100644 plans/workflow-chaining/workflow-chaining.md diff --git a/plans/workflow-chaining/workflow-chaining.md b/plans/workflow-chaining/workflow-chaining.md new file mode 100644 index 000000000..9a498a6f8 --- /dev/null +++ b/plans/workflow-chaining/workflow-chaining.md @@ -0,0 +1,204 @@ +--- +date: 2026-04-15 +authors: + - amanoel +--- + +# Plan: Workflow chaining and `allow_resize` removal + +## Problem + +DataDesigner workflows are self-contained: one config, one `create()` call, one output. There is no first-class way to combine workflows in sequence, where the output of one feeds the input of the next. Users who need this must manually wire `DataFrameSeedSource` between calls. + +Separately, the `allow_resize` flag on column configs lets a generator change the row count mid-generation. This works in the sync engine via in-place buffer replacement, but is fundamentally incompatible with the async engine's fixed-size `CompletionTracker` grid. The async engine currently rejects `allow_resize=True` with a validation error. Pre-batch processors that resize have a similar problem: the async path handles shrinking accidentally (via drop-marking), but expansion is silently ignored. + +These are the same problem viewed from different angles: the need to change row counts between generation steps. + +## Proposed solution + +Replace the in-place resize mechanism with **workflow chaining**: a thin orchestration layer that sequences multiple generation stages, passing each stage's output as the next stage's seed dataset. + +This is a three-part change: + +1. **Remove `allow_resize`** from the column config and all engine code that supports it. +2. **Disallow row-count changes in pre-batch processors** (fail-fast if the processor returns a different number of rows). +3. **Add a `Pipeline` class** in the interface layer that auto-chains stages, with support for explicit multi-stage configs. + +### Why chaining instead of fixing async resize + +The async scheduler's `CompletionTracker` pre-allocates a (row_group x row_index x column) task grid. Supporting mid-run resize requires either rebuilding the tracker (complex, error-prone) or pausing execution at resize boundaries (loses parallelism). Chaining sidesteps this entirely: each stage gets a fresh tracker sized to its actual input. The engine stays simple - always fixed-size - and resize becomes a between-stage concern. + +## Design + +### Part 1: Remove `allow_resize` + +**Config changes** (`data-designer-config`): + +- Remove `allow_resize: bool = False` from `SingleColumnConfig` (or its base class `ColumnConfigBase`). +- Deprecation: keep the field for one release cycle with a deprecation warning, then remove. + +**Engine changes** (`data-designer-engine`): + +- Remove `_cell_resize_mode`, `_cell_resize_results`, and the resize branch in `_finalize_fan_out()` from `DatasetBuilder`. +- Remove `allow_resize` parameter from `DatasetBatchManager.replace_buffer()`. +- Remove `_validate_async_compatibility()` (no longer needed - nothing to reject). +- Simplify `_run_full_column_generator()` to always enforce row-count invariance. + +**Migration path**: Users with `allow_resize=True` columns split their config into a pipeline with a stage boundary at the resize column. The resize column becomes the last column of its stage, and downstream columns move to the next stage. + +### Part 2: Fail-fast on pre-batch processor resize + +In `ProcessorRunner.run_pre_batch()` and `run_pre_batch_on_df()`, raise `DatasetProcessingError` if the returned DataFrame has a different row count than the input. + +This applies to both sync and async paths. Users who need to filter or expand between seeds and generation use the pipeline's between-stage callback instead. + +For users who need programmatic filtering at the seed boundary, a seed reader plugin is the escape hatch (the seed reader can filter/transform before the engine ever sees the data). + +### Part 3: Pipeline class + +A new `Pipeline` class in `data_designer.interface` that orchestrates multi-stage generation. + +#### User-facing API + +**Explicit multi-stage pipeline:** + +```python +pipeline = dd.pipeline() +pipeline.add_stage("personas", config_personas, num_records=100) +pipeline.add_stage("conversations", config_convos, num_records=1000) # explode: 100 -> 1000 +pipeline.add_stage("judged", config_judge) # defaults to previous stage's output size + +results = pipeline.run() + +results["personas"].load_dataset() # stage 1 output +results["conversations"].load_dataset() # stage 2 output +results["judged"].load_dataset() # final output +``` + +**Auto-chaining from a single config (future):** + +The engine detects columns that were previously `allow_resize=True` (or a new marker like `stage_boundary=True`) and auto-splits the DAG into stages. This is a convenience layer on top of the explicit API - not required for v1. + +#### Between-stage callbacks + +Users may need to transform data between stages. The pipeline supports an optional callback: + +```python +def filter_high_quality(stage_output_path: Path) -> Path: + df = pd.read_parquet(stage_output_path / "data") + df = df[df["quality_score"] > 0.8] + out = stage_output_path.parent / "filtered" + df.to_parquet(out / "data.parquet") + return out + +pipeline.add_stage("generated", config_gen, num_records=1000) +pipeline.add_stage( + "enriched", + config_enrich, + after=filter_high_quality, # runs on stage output before next stage seeds from it +) +``` + +The callback receives the path to the completed stage's artifacts and returns a path to the (possibly modified) artifacts. This keeps large DataFrames on disk and gives users full control. + +The callback signature is `(Path) -> Path`. If the user returns the same path, no copy is made. If they return a new path, the next stage seeds from that. + +#### `num_records` behavior + +- If `num_records` is explicitly set on a stage, that value is used. +- If omitted, defaults to the previous stage's output row count (after any between-stage callback). +- The seed reader's existing cycling behavior handles the explode case: requesting 1000 records from a 100-row seed cycles through the seed 10 times. + +#### Artifact management + +Each stage writes to its own subdirectory under the pipeline's artifact path: + +``` +artifacts/ + pipeline-name/ + stage-1-personas/ + parquet-files/ + metadata.json + stage-2-conversations/ + parquet-files/ + metadata.json + stage-3-judged/ + parquet-files/ + metadata.json + pipeline-metadata.json # stage order, configs, lineage +``` + +#### Checkpointing and resume + +Each stage produces durable parquet output before the next stage starts. This provides natural checkpoint boundaries: + +- If stage 3 of 4 fails, stages 1 and 2 are already on disk. +- A `resume=True` flag on `pipeline.run()` skips completed stages (detected via `pipeline-metadata.json`). +- Within a stage, batch-level resume (#525) can further reduce re-work. + +The connection to #525: chaining gives coarse (stage-level) checkpointing for free. #525 gives fine (batch-level) checkpointing within a stage. They are complementary. + +#### Provenance + +`pipeline-metadata.json` records: +- Stage order, names, and configs used +- `num_records` requested vs actual per stage +- Which stage's output seeded the next +- Timestamp and duration per stage + +### Where it fits in the architecture + +| Layer | Changes | +|-------|---------| +| `data-designer-config` | Remove `allow_resize` field. No new config models needed for v1 (pipeline is imperative, not declarative). | +| `data-designer-engine` | Remove resize code paths. Add fail-fast guard in `ProcessorRunner`. No new engine features. | +| `data-designer` (interface) | New `Pipeline` class. Thin orchestration: calls `DataDesigner.create()` per stage, wires `DataFrameSeedSource` between stages for in-memory handoff or `LocalFileSeedSource` for on-disk handoff. | + +The engine does not know about pipelines. Each stage is a regular `DatasetBuilder.build()` call. + +## Implementation phases + +### Phase 1: Pipeline class (can ship independently) + +- Add `Pipeline` class with `add_stage()`, `run()`, between-stage callbacks. +- Add `pipeline-metadata.json` writing. +- Add `dd.pipeline()` factory method on `DataDesigner`. +- Tests: multi-stage runs, explode/filter via callbacks, num_records defaulting, artifact layout. + +### Phase 2: Remove `allow_resize` + +- Deprecate `allow_resize` with a warning pointing to pipelines. +- Remove resize code from sync engine (`_cell_resize_mode`, `_finalize_fan_out` resize branch, `replace_buffer` `allow_resize` param). +- Remove `_validate_async_compatibility()` from async engine. +- Add fail-fast guard in `ProcessorRunner` for pre-batch row-count changes. +- Tests: verify rejection, migration path examples. + +### Phase 3: Stage-level resume + +- Add `resume=True` to `pipeline.run()`. +- Read `pipeline-metadata.json` to detect completed stages. +- Skip completed stages, seed next stage from last completed output. +- Depends on artifact layout from phase 1. + +### Phase 4 (future): Auto-chaining from single config + +- Detect stage boundaries in the DAG (via a new config marker or heuristic). +- Auto-split into pipeline stages internally. +- User sees a single `dd.create(config)` call but gets multi-stage execution. + +## Open questions + +1. **In-memory vs on-disk handoff between stages**: For small datasets, `DataFrameSeedSource` avoids disk I/O. For large datasets, writing parquet between stages is safer. Should the pipeline auto-detect based on row count, or always go through disk for consistency? + +2. **Preview support**: Should `pipeline.preview()` run all stages with small `num_records`? Or just preview the last stage seeded from a prior full run? + +3. **Config serialization**: A pipeline config can't be serialized to YAML if stages use `DataFrameSeedSource`. For persistence, stages would need symbolic references ("seed from stage X's output"). This is needed for auto-chaining (phase 4) but not for the explicit API (phases 1-3). + +4. **Naming**: `Pipeline` vs `Chain` vs `WorkflowChain`. `Pipeline` is the most intuitive and aligns with ML pipeline terminology. + +## Related issues + +- #447 - AsyncRunController refactor (partially superseded: pre-batch resize handling moves to pipeline level instead of controller level) +- #525 - Resume interrupted runs (complementary: stage-level resume from pipeline, batch-level resume from #525) +- #462 - Progress bar and scheduler polish (independent) +- #464 - Custom column retryable errors (independent) From d1c6c64e200d9675b6639ef9d59f521d97e73707 Mon Sep 17 00:00:00 2001 From: Andre Manoel Date: Wed, 15 Apr 2026 20:21:22 +0000 Subject: [PATCH 02/13] docs: reframe plan - chaining is the primary goal, allow_resize removal is secondary --- plans/workflow-chaining/workflow-chaining.md | 73 +++++++++++--------- 1 file changed, 40 insertions(+), 33 deletions(-) diff --git a/plans/workflow-chaining/workflow-chaining.md b/plans/workflow-chaining/workflow-chaining.md index 9a498a6f8..9394e8af1 100644 --- a/plans/workflow-chaining/workflow-chaining.md +++ b/plans/workflow-chaining/workflow-chaining.md @@ -4,25 +4,30 @@ authors: - amanoel --- -# Plan: Workflow chaining and `allow_resize` removal +# Plan: Workflow chaining ## Problem DataDesigner workflows are self-contained: one config, one `create()` call, one output. There is no first-class way to combine workflows in sequence, where the output of one feeds the input of the next. Users who need this must manually wire `DataFrameSeedSource` between calls. -Separately, the `allow_resize` flag on column configs lets a generator change the row count mid-generation. This works in the sync engine via in-place buffer replacement, but is fundamentally incompatible with the async engine's fixed-size `CompletionTracker` grid. The async engine currently rejects `allow_resize=True` with a validation error. Pre-batch processors that resize have a similar problem: the async path handles shrinking accidentally (via drop-marking), but expansion is silently ignored. +This matters for several use cases: -These are the same problem viewed from different angles: the need to change row counts between generation steps. +- **Filter-then-enrich**: Generate candidates, filter to high-quality rows, then generate detailed content from survivors. The second stage's row count depends on the first stage's filter output. +- **Explode**: Generate a small set of seed entities (e.g., 100 personas), then generate many records from each (e.g., 1000 conversations). The seed reader's cycling handles the expansion, but the user must manually wire stages. +- **Generate-then-judge**: Generate a dataset, then run a separate LLM-as-judge pass with different models or stricter prompts. Iterating on the judging config shouldn't require re-generating the base data. +- **Multi-turn construction**: Each conversation turn has a different prompt structure and possibly a different model. Composing these as sequential stages is more natural than a single flat config. ## Proposed solution -Replace the in-place resize mechanism with **workflow chaining**: a thin orchestration layer that sequences multiple generation stages, passing each stage's output as the next stage's seed dataset. +Add **workflow chaining**: a thin orchestration layer that sequences multiple generation stages, passing each stage's output as the next stage's seed dataset. This is the primary deliverable. -This is a three-part change: +As a secondary benefit, chaining also enables the removal of `allow_resize` and simplification of the engine's resize handling. -1. **Remove `allow_resize`** from the column config and all engine code that supports it. -2. **Disallow row-count changes in pre-batch processors** (fail-fast if the processor returns a different number of rows). -3. **Add a `Pipeline` class** in the interface layer that auto-chains stages, with support for explicit multi-stage configs. +### Secondary benefit: `allow_resize` removal + +The `allow_resize` flag on column configs lets a generator change the row count mid-generation. This works in the sync engine but is fundamentally incompatible with the async engine's fixed-size `CompletionTracker` grid (currently rejected with a validation error). Pre-batch processors that resize have a similar problem. + +With chaining in place, resize becomes a between-stage concern rather than a mid-generation concern. This lets us remove `allow_resize` and the associated engine complexity, and disallow row-count changes in pre-batch processors. Users who need resize use a pipeline with a stage boundary at the resize point. ### Why chaining instead of fixing async resize @@ -30,31 +35,7 @@ The async scheduler's `CompletionTracker` pre-allocates a (row_group x row_index ## Design -### Part 1: Remove `allow_resize` - -**Config changes** (`data-designer-config`): - -- Remove `allow_resize: bool = False` from `SingleColumnConfig` (or its base class `ColumnConfigBase`). -- Deprecation: keep the field for one release cycle with a deprecation warning, then remove. - -**Engine changes** (`data-designer-engine`): - -- Remove `_cell_resize_mode`, `_cell_resize_results`, and the resize branch in `_finalize_fan_out()` from `DatasetBuilder`. -- Remove `allow_resize` parameter from `DatasetBatchManager.replace_buffer()`. -- Remove `_validate_async_compatibility()` (no longer needed - nothing to reject). -- Simplify `_run_full_column_generator()` to always enforce row-count invariance. - -**Migration path**: Users with `allow_resize=True` columns split their config into a pipeline with a stage boundary at the resize column. The resize column becomes the last column of its stage, and downstream columns move to the next stage. - -### Part 2: Fail-fast on pre-batch processor resize - -In `ProcessorRunner.run_pre_batch()` and `run_pre_batch_on_df()`, raise `DatasetProcessingError` if the returned DataFrame has a different row count than the input. - -This applies to both sync and async paths. Users who need to filter or expand between seeds and generation use the pipeline's between-stage callback instead. - -For users who need programmatic filtering at the seed boundary, a seed reader plugin is the escape hatch (the seed reader can filter/transform before the engine ever sees the data). - -### Part 3: Pipeline class +### Part 1: Pipeline class A new `Pipeline` class in `data_designer.interface` that orchestrates multi-stage generation. @@ -146,6 +127,32 @@ The connection to #525: chaining gives coarse (stage-level) checkpointing for fr - Which stage's output seeded the next - Timestamp and duration per stage +### Part 2: Remove `allow_resize` + +With the pipeline in place, `allow_resize` is no longer needed as an engine-internal mechanism. Resize becomes a between-stage concern. + +**Config changes** (`data-designer-config`): + +- Remove `allow_resize: bool = False` from `SingleColumnConfig` (or its base class `ColumnConfigBase`). +- Deprecation: keep the field for one release cycle with a deprecation warning, then remove. + +**Engine changes** (`data-designer-engine`): + +- Remove `_cell_resize_mode`, `_cell_resize_results`, and the resize branch in `_finalize_fan_out()` from `DatasetBuilder`. +- Remove `allow_resize` parameter from `DatasetBatchManager.replace_buffer()`. +- Remove `_validate_async_compatibility()` (no longer needed - nothing to reject). +- Simplify `_run_full_column_generator()` to always enforce row-count invariance. + +**Migration path**: Users with `allow_resize=True` columns split their config into a pipeline with a stage boundary at the resize column. The resize column becomes the last column of its stage, and downstream columns move to the next stage. + +### Part 3: Fail-fast on pre-batch processor resize + +In `ProcessorRunner.run_pre_batch()` and `run_pre_batch_on_df()`, raise `DatasetProcessingError` if the returned DataFrame has a different row count than the input. + +This applies to both sync and async paths. Users who need to filter or expand between seeds and generation use the pipeline's between-stage callback instead. + +For users who need programmatic filtering at the seed boundary, a seed reader plugin is the escape hatch (the seed reader can filter/transform before the engine ever sees the data). + ### Where it fits in the architecture | Layer | Changes | From 9f640ca8f8a69b86e38bcb90d8dffba1d7f59455 Mon Sep 17 00:00:00 2001 From: Andre Manoel Date: Wed, 15 Apr 2026 20:31:42 +0000 Subject: [PATCH 03/13] docs: add to_config_builder convenience method and concrete use cases --- plans/workflow-chaining/workflow-chaining.md | 112 ++++++++++++++++++- 1 file changed, 111 insertions(+), 1 deletion(-) diff --git a/plans/workflow-chaining/workflow-chaining.md b/plans/workflow-chaining/workflow-chaining.md index 9394e8af1..f5a8b618e 100644 --- a/plans/workflow-chaining/workflow-chaining.md +++ b/plans/workflow-chaining/workflow-chaining.md @@ -56,6 +56,24 @@ results["conversations"].load_dataset() # stage 2 output results["judged"].load_dataset() # final output ``` +**Convenience method on results (lightweight, for notebooks):** + +For interactive use where a full pipeline is overkill, a `to_config_builder()` method on `DatasetCreationResults` returns a pre-seeded `DataDesignerConfigBuilder`: + +```python +# Stage 1 +result = dd.create(config_personas, num_records=100) + +# Stage 2 - just grab the result and keep going +config_convos = ( + result.to_config_builder(columns=["name", "age", "background"]) # optional column selection + .add_column(name="conversation", column_type="llm_text", prompt="...") +) +result_2 = dd.create(config_convos, num_records=1000) +``` + +This is a thin wrapper: loads the dataset, optionally filters columns, wraps in `DataFrameSeedSource`, returns a new config builder. No tracking, no provenance, no callbacks - just a quick bridge for iteration. + **Auto-chaining from a single config (future):** The engine detects columns that were previously `allow_resize=True` (or a new marker like `stage_boundary=True`) and auto-splits the DAG into stages. This is a convenience layer on top of the explicit API - not required for v1. @@ -163,10 +181,102 @@ For users who need programmatic filtering at the seed boundary, a seed reader pl The engine does not know about pipelines. Each stage is a regular `DatasetBuilder.build()` call. +## Use cases for implementation and testing + +These should guide the implementation and serve as the basis for tutorial notebooks. + +### 1. Explode: personas to conversations + +Generate a small, high-quality set of personas, then produce many conversations from each. + +```python +# Stage 1: 100 diverse personas +config_personas = ( + DataDesignerConfigBuilder() + .add_column(name="name", column_type="sampler", sampler_type="person_name") + .add_column(name="age", column_type="sampler", sampler_type="uniform_int", params=...) + .add_column(name="background", column_type="llm_text", prompt="Write a short background for {{ name }}, age {{ age }}.") +) + +# Stage 2: 1000 conversations (each persona used ~10 times via seed cycling) +config_convos = ( + DataDesignerConfigBuilder() + .add_column(name="topic", column_type="llm_text", prompt="Generate a conversation topic for {{ name }}...") + .add_column(name="conversation", column_type="llm_text", prompt="Write a conversation between {{ name }} and an assistant about {{ topic }}...") +) + +pipeline = dd.pipeline() +pipeline.add_stage("personas", config_personas, num_records=100) +pipeline.add_stage("conversations", config_convos, num_records=1000) +results = pipeline.run() +``` + +### 2. Filter-then-enrich + +Generate candidates, use a between-stage callback to filter, then enrich survivors. + +```python +config_gen = ... # generates rows with a quality_score column +config_enrich = ... # adds detailed analysis columns + +def keep_high_quality(stage_output_path: Path) -> Path: + df = pd.read_parquet(stage_output_path / "parquet-files") + df = df[df["quality_score"] > 0.8] + out = stage_output_path.parent / "filtered" + out.mkdir(exist_ok=True) + df.to_parquet(out / "data.parquet") + return out + +pipeline = dd.pipeline() +pipeline.add_stage("candidates", config_gen, num_records=5000) +pipeline.add_stage("enriched", config_enrich, after=keep_high_quality) +results = pipeline.run() +``` + +### 3. Generate-then-judge with different models + +Iterate on the judging config without re-generating the base data. + +```python +# Stage 1: generate with a fast model +config_gen = DataDesignerConfigBuilder(model_configs=[fast_model])... + +# Stage 2: judge with a stronger model +config_judge = DataDesignerConfigBuilder(model_configs=[strong_model])... + +pipeline = dd.pipeline() +pipeline.add_stage("generated", config_gen, num_records=1000) +pipeline.add_stage("judged", config_judge) +results = pipeline.run() + +# Later: tweak judging config, resume from stage 1 output +pipeline_v2 = dd.pipeline() +pipeline_v2.add_stage("generated", config_gen, num_records=1000) +pipeline_v2.add_stage("judged", config_judge_v2) +results_v2 = pipeline_v2.run(resume=True) # skips stage 1 +``` + +### 4. Interactive notebook chaining (lightweight, no pipeline) + +Quick iteration using `to_config_builder()`: + +```python +result = dd.create(config_personas, num_records=50) +result.load_dataset() # inspect, looks good + +# Chain into next step +config_2 = ( + result.to_config_builder(columns=["name", "background"]) + .add_column(name="question", column_type="llm_text", prompt="...") +) +result_2 = dd.create(config_2, num_records=200) # explode: 50 -> 200 +``` + ## Implementation phases -### Phase 1: Pipeline class (can ship independently) +### Phase 1: Pipeline class and `to_config_builder()` (can ship independently) +- Add `to_config_builder()` on `DatasetCreationResults` and `PreviewResults`. - Add `Pipeline` class with `add_stage()`, `run()`, between-stage callbacks. - Add `pipeline-metadata.json` writing. - Add `dd.pipeline()` factory method on `DataDesigner`. From 1457cc96db8fed6310400957bc2f92638d9b8c95 Mon Sep 17 00:00:00 2001 From: Andre Manoel Date: Wed, 15 Apr 2026 20:50:10 +0000 Subject: [PATCH 04/13] docs: address review feedback - data contract, resume safety, seed controls, edge cases --- plans/workflow-chaining/workflow-chaining.md | 55 +++++++++++++------- 1 file changed, 37 insertions(+), 18 deletions(-) diff --git a/plans/workflow-chaining/workflow-chaining.md b/plans/workflow-chaining/workflow-chaining.md index f5a8b618e..1b7e797ae 100644 --- a/plans/workflow-chaining/workflow-chaining.md +++ b/plans/workflow-chaining/workflow-chaining.md @@ -23,11 +23,13 @@ Add **workflow chaining**: a thin orchestration layer that sequences multiple ge As a secondary benefit, chaining also enables the removal of `allow_resize` and simplification of the engine's resize handling. -### Secondary benefit: `allow_resize` removal +### Secondary benefit: `allow_resize` removal and sync/async convergence The `allow_resize` flag on column configs lets a generator change the row count mid-generation. This works in the sync engine but is fundamentally incompatible with the async engine's fixed-size `CompletionTracker` grid (currently rejected with a validation error). Pre-batch processors that resize have a similar problem. -With chaining in place, resize becomes a between-stage concern rather than a mid-generation concern. This lets us remove `allow_resize` and the associated engine complexity, and disallow row-count changes in pre-batch processors. Users who need resize use a pipeline with a stage boundary at the resize point. +`allow_resize` is one of the remaining divergences between sync and async. Since the long-term direction is to remove the sync engine entirely, maintaining a sync-only feature is counterproductive. With chaining in place, resize becomes a between-stage concern rather than a mid-generation concern. This lets us remove `allow_resize` and the associated engine complexity, and disallow row-count changes in pre-batch processors. Users who need resize use a pipeline with a stage boundary at the resize point. + +Note: `allow_resize` is currently documented in custom columns, plugin examples, and agent rollout ingestion docs. Removal requires a deprecation cycle and doc updates. ### Why chaining instead of fixing async resize @@ -72,21 +74,28 @@ config_convos = ( result_2 = dd.create(config_convos, num_records=1000) ``` -This is a thin wrapper: loads the dataset, optionally filters columns, wraps in `DataFrameSeedSource`, returns a new config builder. No tracking, no provenance, no callbacks - just a quick bridge for iteration. +This is a thin wrapper: loads the dataset into memory, optionally filters columns, wraps in `DataFrameSeedSource`, returns a new config builder. No tracking, no provenance, no callbacks - just a quick bridge for iteration. Not suitable for large datasets (loads full DataFrame into memory) or serializable configs (`DataFrameSeedSource` can't be written to YAML). For production pipelines, use the `Pipeline` class. **Auto-chaining from a single config (future):** The engine detects columns that were previously `allow_resize=True` (or a new marker like `stage_boundary=True`) and auto-splits the DAG into stages. This is a convenience layer on top of the explicit API - not required for v1. +#### Stage data contract + +Each stage seeds from the **previous stage's final dataset** - the post-processor output with dropped columns excluded. This is the same DataFrame returned by `DatasetCreationResults.load_dataset()`. + +Processor outputs (named processor artifacts) and media assets (images stored on disk with relative paths in the DataFrame) are NOT automatically forwarded. If a downstream stage needs image columns from an upstream stage, the pipeline must resolve image paths relative to the upstream stage's artifact directory. This needs explicit handling - TBD in implementation. + #### Between-stage callbacks Users may need to transform data between stages. The pipeline supports an optional callback: ```python def filter_high_quality(stage_output_path: Path) -> Path: - df = pd.read_parquet(stage_output_path / "data") + df = pd.read_parquet(stage_output_path / "parquet-files") df = df[df["quality_score"] > 0.8] out = stage_output_path.parent / "filtered" + out.mkdir(exist_ok=True) df.to_parquet(out / "data.parquet") return out @@ -98,52 +107,58 @@ pipeline.add_stage( ) ``` -The callback receives the path to the completed stage's artifacts and returns a path to the (possibly modified) artifacts. This keeps large DataFrames on disk and gives users full control. +The callback receives the path to the completed stage's artifact directory (containing `parquet-files/`, `metadata.json`, etc.) and returns a path that the next stage will seed from. This keeps large DataFrames on disk and gives users full control. -The callback signature is `(Path) -> Path`. If the user returns the same path, no copy is made. If they return a new path, the next stage seeds from that. +**Empty stage policy**: If a callback filters all rows (or a stage produces zero rows), the pipeline raises `DataDesignerPipelineError` by default. Stages can opt in to empty output with `allow_empty=True` on `add_stage()`, in which case the pipeline short-circuits and skips subsequent stages. -#### `num_records` behavior +#### `num_records` and seed behavior - If `num_records` is explicitly set on a stage, that value is used. - If omitted, defaults to the previous stage's output row count (after any between-stage callback). - The seed reader's existing cycling behavior handles the explode case: requesting 1000 records from a 100-row seed cycles through the seed 10 times. +- `add_stage()` accepts optional `sampling_strategy` (ordered/shuffle) and `selection_strategy` (IndexRange/PartitionBlock) to control how the previous stage's output is sampled. Defaults to ordered. #### Artifact management -Each stage writes to its own subdirectory under the pipeline's artifact path: +The pipeline owns its directory layout directly, bypassing `ArtifactStorage`'s default auto-rename behavior (which appends timestamps to non-empty directories). Stage directories use stable, deterministic names based on stage index and name: ``` artifacts/ pipeline-name/ - stage-1-personas/ + stage-0-personas/ parquet-files/ metadata.json - stage-2-conversations/ + stage-1-conversations/ parquet-files/ metadata.json - stage-3-judged/ + stage-2-judged/ parquet-files/ metadata.json - pipeline-metadata.json # stage order, configs, lineage + pipeline-metadata.json ``` +The pipeline creates each stage's `ArtifactStorage` with the stage directory as `dataset_name`, ensuring stable paths across reruns. + #### Checkpointing and resume Each stage produces durable parquet output before the next stage starts. This provides natural checkpoint boundaries: - If stage 3 of 4 fails, stages 1 and 2 are already on disk. -- A `resume=True` flag on `pipeline.run()` skips completed stages (detected via `pipeline-metadata.json`). +- A `resume=True` flag on `pipeline.run()` skips completed stages. - Within a stage, batch-level resume (#525) can further reduce re-work. +**Resume safety**: Naive "skip if directory exists" is not sufficient. Configs, model settings, callbacks, or DD version may have changed between runs. Resume must compare a fingerprint of each stage's inputs (config hash, num_records, DD version, upstream stage fingerprint) against what's recorded in `pipeline-metadata.json`. If any input changed, that stage and all downstream stages must re-run. This is a phase 3 concern but the metadata format in phase 1 should record enough information to support it. + The connection to #525: chaining gives coarse (stage-level) checkpointing for free. #525 gives fine (batch-level) checkpointing within a stage. They are complementary. #### Provenance `pipeline-metadata.json` records: - Stage order, names, and configs used +- Config fingerprint (hash) per stage for resume invalidation - `num_records` requested vs actual per stage - Which stage's output seeded the next -- Timestamp and duration per stage +- Timestamp, duration, and DD version per stage ### Part 2: Remove `allow_resize` @@ -167,9 +182,7 @@ With the pipeline in place, `allow_resize` is no longer needed as an engine-inte In `ProcessorRunner.run_pre_batch()` and `run_pre_batch_on_df()`, raise `DatasetProcessingError` if the returned DataFrame has a different row count than the input. -This applies to both sync and async paths. Users who need to filter or expand between seeds and generation use the pipeline's between-stage callback instead. - -For users who need programmatic filtering at the seed boundary, a seed reader plugin is the escape hatch (the seed reader can filter/transform before the engine ever sees the data). +This applies to both sync and async paths. Users who need to filter or expand between seeds and generation use the pipeline's between-stage callback instead. Note that a seed reader plugin is NOT an equivalent escape hatch: seed readers run before any columns are generated (including samplers), so they can't filter on generated column values. ### Where it fits in the architecture @@ -305,7 +318,7 @@ result_2 = dd.create(config_2, num_records=200) # explode: 50 -> 200 ## Open questions -1. **In-memory vs on-disk handoff between stages**: For small datasets, `DataFrameSeedSource` avoids disk I/O. For large datasets, writing parquet between stages is safer. Should the pipeline auto-detect based on row count, or always go through disk for consistency? +1. **In-memory vs on-disk handoff between stages**: For small datasets, `DataFrameSeedSource` avoids disk I/O. For large datasets, writing parquet between stages is safer. Should the pipeline auto-detect based on row count, or always go through disk for consistency? (Leaning toward always-on-disk for simplicity and resume support.) 2. **Preview support**: Should `pipeline.preview()` run all stages with small `num_records`? Or just preview the last stage seeded from a prior full run? @@ -313,6 +326,12 @@ result_2 = dd.create(config_2, num_records=200) # explode: 50 -> 200 4. **Naming**: `Pipeline` vs `Chain` vs `WorkflowChain`. `Pipeline` is the most intuitive and aligns with ML pipeline terminology. +5. **Image/media column forwarding**: Images in create mode are stored as relative file paths. If a downstream stage seeds from an upstream stage that produced images, the relative paths break. Options: (a) resolve to absolute paths at stage boundary, (b) copy media assets into downstream stage's directory, (c) document as unsupported in v1. + +6. **Branch/fan-out semantics**: Linear chaining covers the common cases. But "generate once, judge several ways" (fan-out) currently requires building multiple pipelines that repeat stage 1. Should the pipeline support DAG-shaped stage graphs, or is that future work? + +7. **Downstream seeding scope**: Should downstream stages only seed from the final dataset, or should they also be able to access dropped columns or named processor outputs from upstream stages? + ## Related issues - #447 - AsyncRunController refactor (partially superseded: pre-batch resize handling moves to pipeline level instead of controller level) From af44492fe15dfb27b80d1a0ca38792898c2aeb29 Mon Sep 17 00:00:00 2001 From: Andre Manoel Date: Thu, 7 May 2026 20:05:10 +0000 Subject: [PATCH 05/13] docs: refresh plan against current main - deprecation already shipped, fingerprint feature available - Update allow_resize framing: now logs DeprecationWarning and falls back to sync (#553), no longer hard-rejected. Async is default as of #592. - Reference DataDesignerConfig.fingerprint() (#587) as the per-stage hash for resume invalidation. - Rename _validate_async_compatibility() to _resolve_async_compatibility() to match current code. - Mark Phase 2 step 1 as done; list the concrete docs that still need updates. --- plans/workflow-chaining/workflow-chaining.md | 30 +++++++++++++------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/plans/workflow-chaining/workflow-chaining.md b/plans/workflow-chaining/workflow-chaining.md index 1b7e797ae..f97a043b8 100644 --- a/plans/workflow-chaining/workflow-chaining.md +++ b/plans/workflow-chaining/workflow-chaining.md @@ -25,11 +25,11 @@ As a secondary benefit, chaining also enables the removal of `allow_resize` and ### Secondary benefit: `allow_resize` removal and sync/async convergence -The `allow_resize` flag on column configs lets a generator change the row count mid-generation. This works in the sync engine but is fundamentally incompatible with the async engine's fixed-size `CompletionTracker` grid (currently rejected with a validation error). Pre-batch processors that resize have a similar problem. +The `allow_resize` flag on column configs lets a generator change the row count mid-generation. This works in the sync engine but is fundamentally incompatible with the async engine's fixed-size `CompletionTracker` grid. As of #553, an `allow_resize=True` config in async mode logs a `DeprecationWarning` and silently falls back to the sync engine for that run; it is no longer hard-rejected. -`allow_resize` is one of the remaining divergences between sync and async. Since the long-term direction is to remove the sync engine entirely, maintaining a sync-only feature is counterproductive. With chaining in place, resize becomes a between-stage concern rather than a mid-generation concern. This lets us remove `allow_resize` and the associated engine complexity, and disallow row-count changes in pre-batch processors. Users who need resize use a pipeline with a stage boundary at the resize point. +`allow_resize` is one of the remaining divergences between sync and async. The async engine is the default execution path as of #592; sync remains only as a fallback for `allow_resize` runs. Maintaining a sync-only feature to keep one fallback path alive is counterproductive. With chaining in place, resize becomes a between-stage concern rather than a mid-generation concern. This lets us remove `allow_resize` and the associated engine complexity, and disallow row-count changes in pre-batch processors. Users who need resize use a pipeline with a stage boundary at the resize point. -Note: `allow_resize` is currently documented in custom columns, plugin examples, and agent rollout ingestion docs. Removal requires a deprecation cycle and doc updates. +Note: `allow_resize` is documented in custom columns, plugin examples, and agent rollout ingestion docs (verified post-Fern migration in #581). The deprecation warning has shipped in #553; full removal still requires doc updates and the migration of any in-tree usage. ### Why chaining instead of fixing async resize @@ -147,7 +147,14 @@ Each stage produces durable parquet output before the next stage starts. This pr - A `resume=True` flag on `pipeline.run()` skips completed stages. - Within a stage, batch-level resume (#525) can further reduce re-work. -**Resume safety**: Naive "skip if directory exists" is not sufficient. Configs, model settings, callbacks, or DD version may have changed between runs. Resume must compare a fingerprint of each stage's inputs (config hash, num_records, DD version, upstream stage fingerprint) against what's recorded in `pipeline-metadata.json`. If any input changed, that stage and all downstream stages must re-run. This is a phase 3 concern but the metadata format in phase 1 should record enough information to support it. +**Resume safety**: Naive "skip if directory exists" is not sufficient. Configs, model settings, callbacks, or DD version may have changed between runs. Resume must compare a fingerprint of each stage's inputs against what's recorded in `pipeline-metadata.json`. The per-stage fingerprint composes: + +- `DataDesignerConfig.fingerprint()` (introduced in #587) — content-addressable sha256 over the data-relevant portion of the config +- `num_records` (requested) +- DD version +- Upstream stage fingerprint (the directly preceding stage's recorded fingerprint, so a change anywhere in the chain invalidates downstream stages) + +If any component changed, that stage and all downstream stages must re-run. This is a phase 3 concern but the metadata format in phase 1 should record enough information to support it. The connection to #525: chaining gives coarse (stage-level) checkpointing for free. #525 gives fine (batch-level) checkpointing within a stage. They are complementary. @@ -155,7 +162,7 @@ The connection to #525: chaining gives coarse (stage-level) checkpointing for fr `pipeline-metadata.json` records: - Stage order, names, and configs used -- Config fingerprint (hash) per stage for resume invalidation +- Per-stage fingerprint for resume invalidation: `DataDesignerConfig.fingerprint()` (#587) combined with `num_records`, DD version, and the upstream stage fingerprint - `num_records` requested vs actual per stage - Which stage's output seeded the next - Timestamp, duration, and DD version per stage @@ -167,13 +174,13 @@ With the pipeline in place, `allow_resize` is no longer needed as an engine-inte **Config changes** (`data-designer-config`): - Remove `allow_resize: bool = False` from `SingleColumnConfig` (or its base class `ColumnConfigBase`). -- Deprecation: keep the field for one release cycle with a deprecation warning, then remove. +- The deprecation warning has already shipped in #553. After one release cycle from that point, remove the field. **Engine changes** (`data-designer-engine`): - Remove `_cell_resize_mode`, `_cell_resize_results`, and the resize branch in `_finalize_fan_out()` from `DatasetBuilder`. - Remove `allow_resize` parameter from `DatasetBatchManager.replace_buffer()`. -- Remove `_validate_async_compatibility()` (no longer needed - nothing to reject). +- Remove `_resolve_async_compatibility()` and the sync-fallback branch in `_build_async()` (no longer needed - nothing to fall back for). - Simplify `_run_full_column_generator()` to always enforce row-count invariance. **Migration path**: Users with `allow_resize=True` columns split their config into a pipeline with a stage boundary at the resize column. The resize column becomes the last column of its stage, and downstream columns move to the next stage. @@ -297,9 +304,11 @@ result_2 = dd.create(config_2, num_records=200) # explode: 50 -> 200 ### Phase 2: Remove `allow_resize` -- Deprecate `allow_resize` with a warning pointing to pipelines. +- (Done in #553) `allow_resize=True` in async mode emits a `DeprecationWarning` and falls back to sync. +- Update docs that still reference `allow_resize` (`docs/concepts/custom_columns.md`, `docs/plugins/example.md`, `docs/concepts/agent-rollout-ingestion.md`) to point at pipelines. - Remove resize code from sync engine (`_cell_resize_mode`, `_finalize_fan_out` resize branch, `replace_buffer` `allow_resize` param). -- Remove `_validate_async_compatibility()` from async engine. +- Remove `_resolve_async_compatibility()` and its sync-fallback branch from `_build_async()`. +- Remove the `allow_resize` field from the config schema. - Add fail-fast guard in `ProcessorRunner` for pre-batch row-count changes. - Tests: verify rejection, migration path examples. @@ -307,7 +316,8 @@ result_2 = dd.create(config_2, num_records=200) # explode: 50 -> 200 - Add `resume=True` to `pipeline.run()`. - Read `pipeline-metadata.json` to detect completed stages. -- Skip completed stages, seed next stage from last completed output. +- Compute each stage's fingerprint via `DataDesignerConfig.fingerprint()` (#587) combined with `num_records`, DD version, and upstream stage fingerprint; invalidate the stage and everything downstream on any mismatch. +- Skip stages whose fingerprints match, seed next stage from last completed output. - Depends on artifact layout from phase 1. ### Phase 4 (future): Auto-chaining from single config From 85dba6879f79a5646c95268d1afa51c7b3b1e213 Mon Sep 17 00:00:00 2001 From: Andre Manoel Date: Thu, 7 May 2026 21:03:17 +0000 Subject: [PATCH 06/13] docs: bake parallel-async carefulness into the plan - throttle invariant, on-disk handoffs, DAG-ready, acreate sidecar - Resolve in-memory vs on-disk handoff to always-on-disk inside Pipeline; reserve in-memory for to_config_builder() notebook ergonomic. - Add Composability section: parent DataDesigner reuse is a load-bearing API contract for throttle coordination across stages and parallel branches. - Add Engine API surface section: acreate() as a small additive sidecar, independent of chaining v1 but a hard dependency for Phase 4. - Promote DAG semantics from "future work" to "designed-in"; add Phase 4 (parallel branches via asyncio.gather over acreate); demote auto-chaining to Phase 5. - New Resolved decisions section captures the three load-bearing API decisions; trim the Open questions list accordingly. - Mention possible future external orchestration only as a vague composability constraint, no commitment. --- plans/workflow-chaining/workflow-chaining.md | 77 ++++++++++++++++---- 1 file changed, 64 insertions(+), 13 deletions(-) diff --git a/plans/workflow-chaining/workflow-chaining.md b/plans/workflow-chaining/workflow-chaining.md index f97a043b8..3ea040bd8 100644 --- a/plans/workflow-chaining/workflow-chaining.md +++ b/plans/workflow-chaining/workflow-chaining.md @@ -74,7 +74,9 @@ config_convos = ( result_2 = dd.create(config_convos, num_records=1000) ``` -This is a thin wrapper: loads the dataset into memory, optionally filters columns, wraps in `DataFrameSeedSource`, returns a new config builder. No tracking, no provenance, no callbacks - just a quick bridge for iteration. Not suitable for large datasets (loads full DataFrame into memory) or serializable configs (`DataFrameSeedSource` can't be written to YAML). For production pipelines, use the `Pipeline` class. +This is a thin wrapper: loads the dataset into memory, optionally filters columns, wraps in `DataFrameSeedSource`, returns a new config builder. No tracking, no provenance, no callbacks - just a quick bridge for iteration. Not suitable for large datasets (loads full DataFrame into memory) or serializable configs (`DataFrameSeedSource` can't be written to YAML). + +This is the *only* place in the chaining surface that uses an in-memory handoff. `Pipeline` itself always hands off between stages on disk - see "Composability and the throttle invariant" below. For production pipelines, use the `Pipeline` class. **Auto-chaining from a single config (future):** @@ -167,6 +169,31 @@ The connection to #525: chaining gives coarse (stage-level) checkpointing for fr - Which stage's output seeded the next - Timestamp, duration, and DD version per stage +#### Composability and the throttle invariant + +The `Pipeline` is constructed via `dd.pipeline()` and holds a reference to the parent `DataDesigner`. Every stage runs `dd.create()` (or `dd.acreate()` once available - see Engine API surface below) on that same instance. This is a load-bearing API contract for two reasons. + +**Throttle coordination across stages.** A `DataDesigner` owns one `ModelRegistry`, which owns one `ThrottleManager`. AIMD rate-limit state is per-instance. If the pipeline constructed a fresh `DataDesigner` per stage, each stage would adapt independently and the aggregate request rate against a provider could exceed the configured cap by a multiple of the stage count. The same hazard applies to parallel branches in Phase 4: branches sharing one `DataDesigner` automatically share throttling; branches each holding their own `DataDesigner` silently fragment it. Reusing one instance is the simple, correct default. + +**Door open for external orchestration.** If cross-process or distributed execution is ever introduced, the natural seam is the throttle backend (today an in-memory `ThrottleManager`; potentially a coordinator-backed implementation later). By keeping ownership of throttling *explicit* on the parent `DataDesigner` rather than *implicit* per stage, the pipeline's shape does not preclude swapping in such a backend. v1 does not need this and will not implement it; v1 only needs to avoid encoding assumptions that would prevent it. + +**On-disk handoffs for the same reason.** Stage handoffs go through parquet on disk via `LocalFileSeedSource`, never through an in-memory `DataFrameSeedSource`. This composes with any future orchestration model (in-process, cross-process, distributed) without per-environment branching. The cost is one parquet round-trip per stage boundary, which is negligible compared to LLM call time at any realistic scale. The notebook ergonomic `to_config_builder()` is the in-memory escape hatch and is explicitly not a Pipeline. + +**Internal stage model is a graph, not a list.** v1 exposes a linear `add_stage()` API and runs stages sequentially. Internally the pipeline represents stages as a DAG with the linear case being the default chain. This lets Phase 4 add parallel branches as an additive API change without restructuring orchestration. + +#### Engine API surface: `acreate()` + +`Pipeline` v1 calls `DataDesigner.create()` synchronously per stage and runs them in order. Sequential execution doesn't need an async API. Parallel execution does, and the engine doesn't expose one today. + +Adding `async def acreate(...)` on `DataDesigner` is a small, additive change. The underlying `_build_async` already runs on a singleton background event loop and submits work via a `concurrent.futures.Future`; `acreate()` bridges it into the caller's loop via `asyncio.wrap_future`. The sync `create()` becomes a one-line wrapper. No breaking changes. + +`acreate()` enables two things without touching `Pipeline`: + +- **Parallel-independent workflows.** Users can `asyncio.gather(dd.acreate(c1), dd.acreate(c2))` for unrelated configs and get coordinated throttling automatically through the shared `ThrottleManager`. +- **Pipeline DAG branches (Phase 4).** When the pipeline graduates to a DAG, parallel branches are a pure orchestration change - `asyncio.gather` over `acreate()` calls inside `pipeline.run()` - with no further engine work required. + +`acreate()` is *not* part of chaining v1. It ships as its own small piece of work that can land before, alongside, or after Phase 1; the dependency only becomes hard for Phase 4. Listed as a sidecar under Implementation phases. + ### Part 2: Remove `allow_resize` With the pipeline in place, `allow_resize` is no longer needed as an engine-internal mechanism. Resize becomes a between-stage concern. @@ -197,7 +224,7 @@ This applies to both sync and async paths. Users who need to filter or expand be |-------|---------| | `data-designer-config` | Remove `allow_resize` field. No new config models needed for v1 (pipeline is imperative, not declarative). | | `data-designer-engine` | Remove resize code paths. Add fail-fast guard in `ProcessorRunner`. No new engine features. | -| `data-designer` (interface) | New `Pipeline` class. Thin orchestration: calls `DataDesigner.create()` per stage, wires `DataFrameSeedSource` between stages for in-memory handoff or `LocalFileSeedSource` for on-disk handoff. | +| `data-designer` (interface) | New `Pipeline` class. Thin orchestration: holds a reference to the parent `DataDesigner`, calls `DataDesigner.create()` per stage, hands off between stages on disk via `LocalFileSeedSource`. All stages share the same `ModelRegistry` and `ThrottleManager`. Optionally consumes `DataDesigner.acreate()` (sidecar) once available, for Phase 4 parallel branches. | The engine does not know about pipelines. Each stage is a regular `DatasetBuilder.build()` call. @@ -297,10 +324,19 @@ result_2 = dd.create(config_2, num_records=200) # explode: 50 -> 200 ### Phase 1: Pipeline class and `to_config_builder()` (can ship independently) - Add `to_config_builder()` on `DatasetCreationResults` and `PreviewResults`. -- Add `Pipeline` class with `add_stage()`, `run()`, between-stage callbacks. +- Add `Pipeline` class with `add_stage()`, `run()`, between-stage callbacks. Pipeline holds a reference to the parent `DataDesigner` and reuses it across stages. +- Stage handoff is always on disk via `LocalFileSeedSource`; no in-memory handoff path inside `Pipeline`. +- Internal stage representation is a DAG (linear-only inputs in v1). - Add `pipeline-metadata.json` writing. - Add `dd.pipeline()` factory method on `DataDesigner`. -- Tests: multi-stage runs, explode/filter via callbacks, num_records defaulting, artifact layout. +- Tests: multi-stage runs, explode/filter via callbacks, num_records defaulting, artifact layout, throttle reuse across stages. + +### Sidecar: `acreate()` on `DataDesigner` (independent of chaining v1) + +- Add `async def acreate(...)` mirroring `create()` but returning the awaitable instead of blocking. +- `create()` becomes a one-line wrapper around `acreate()` (or both share a common builder helper). +- Tests: parallel-independent workflows via `asyncio.gather`; verify shared `ThrottleManager` keeps aggregate request rate within configured caps. +- Can ship before, alongside, or after Phase 1. Hard dependency for Phase 4. ### Phase 2: Remove `allow_resize` @@ -320,27 +356,42 @@ result_2 = dd.create(config_2, num_records=200) # explode: 50 -> 200 - Skip stages whose fingerprints match, seed next stage from last completed output. - Depends on artifact layout from phase 1. -### Phase 4 (future): Auto-chaining from single config +### Phase 4: DAG-shaped stages with parallel branches + +- Extend `add_stage()` with an optional `depends_on=[stage_name, ...]` argument; default keeps the linear behavior. +- `pipeline.run()` walks the resulting DAG, gathering independent branches via `asyncio.gather` over `dd.acreate()` calls. +- Per-stage fingerprint composition (Phase 3) generalizes naturally: a stage's upstream fingerprint becomes the hash of all its parents' fingerprints. +- Throttle coordination relies on the existing invariant: all branches run on the same parent `DataDesigner`, so `ThrottleManager` is shared. +- Hard dependency on the `acreate()` sidecar. +- Tests: fan-out (one upstream, multiple parallel children); join (multiple upstreams, one child); resume invalidation when one branch's fingerprint changes; throttle behavior under N parallel branches. + +### Phase 5 (future): Auto-chaining from single config - Detect stage boundaries in the DAG (via a new config marker or heuristic). - Auto-split into pipeline stages internally. - User sees a single `dd.create(config)` call but gets multi-stage execution. -## Open questions +## Resolved decisions + +These were open in earlier drafts; recording the resolutions here so the design is unambiguous. + +1. **In-memory vs on-disk handoff between stages** -> Always on-disk inside `Pipeline`. The in-memory `DataFrameSeedSource` mode is reserved for the lightweight `to_config_builder()` notebook ergonomic, which is explicitly *not* a `Pipeline`. Reasons: single execution model, simpler resume story, and composability with any future external orchestration that can't share an in-memory DataFrame across process boundaries. Cost is one parquet round-trip per stage, negligible relative to LLM call time. -1. **In-memory vs on-disk handoff between stages**: For small datasets, `DataFrameSeedSource` avoids disk I/O. For large datasets, writing parquet between stages is safer. Should the pipeline auto-detect based on row count, or always go through disk for consistency? (Leaning toward always-on-disk for simplicity and resume support.) +2. **Branch/fan-out semantics (DAG)** -> Designed-in but not v1. The internal stage representation is a DAG; v1 only accepts linear inputs through `add_stage()`. Phase 4 ships parallel branches via `asyncio.gather` over `acreate()`. v1 stays sequential. -2. **Preview support**: Should `pipeline.preview()` run all stages with small `num_records`? Or just preview the last stage seeded from a prior full run? +3. **Pipeline construction** -> `Pipeline` is created via `dd.pipeline()` and reuses the parent `DataDesigner`'s `ModelRegistry` and `ThrottleManager` across all stages. The pipeline does not construct its own `DataDesigner` instances. This is the throttle-coordination invariant (see Composability section). + +## Open questions -3. **Config serialization**: A pipeline config can't be serialized to YAML if stages use `DataFrameSeedSource`. For persistence, stages would need symbolic references ("seed from stage X's output"). This is needed for auto-chaining (phase 4) but not for the explicit API (phases 1-3). +1. **Preview support**: Should `pipeline.preview()` run all stages with small `num_records`? Or just preview the last stage seeded from a prior full run? -4. **Naming**: `Pipeline` vs `Chain` vs `WorkflowChain`. `Pipeline` is the most intuitive and aligns with ML pipeline terminology. +2. **Config serialization**: For persistence, pipeline configs would need symbolic stage references ("seed from stage X's output"). With the on-disk handoff decision above, the `DataFrameSeedSource` blocker is no longer relevant; the remaining question is how to encode stage dependencies in YAML. Needed for auto-chaining (Phase 5) but not for the explicit API (phases 1-4). -5. **Image/media column forwarding**: Images in create mode are stored as relative file paths. If a downstream stage seeds from an upstream stage that produced images, the relative paths break. Options: (a) resolve to absolute paths at stage boundary, (b) copy media assets into downstream stage's directory, (c) document as unsupported in v1. +3. **Naming**: `Pipeline` vs `Chain` vs `WorkflowChain`. `Pipeline` is the most intuitive and aligns with ML pipeline terminology. -6. **Branch/fan-out semantics**: Linear chaining covers the common cases. But "generate once, judge several ways" (fan-out) currently requires building multiple pipelines that repeat stage 1. Should the pipeline support DAG-shaped stage graphs, or is that future work? +4. **Image/media column forwarding**: Images in create mode are stored as relative file paths. If a downstream stage seeds from an upstream stage that produced images, the relative paths break. Options: (a) resolve to absolute paths at stage boundary, (b) copy media assets into downstream stage's directory, (c) document as unsupported in v1. -7. **Downstream seeding scope**: Should downstream stages only seed from the final dataset, or should they also be able to access dropped columns or named processor outputs from upstream stages? +5. **Downstream seeding scope**: Should downstream stages only seed from the final dataset, or should they also be able to access dropped columns or named processor outputs from upstream stages? ## Related issues From 17d534362c6c41f264845fb4c1a4df4e24b116cf Mon Sep 17 00:00:00 2001 From: Andre Manoel Date: Thu, 7 May 2026 22:05:03 +0000 Subject: [PATCH 07/13] docs: align plan framing with cross-process orchestration discussion - Soften "Door open for external orchestration" - drop throttle-backend-as-seam framing; cross-reference Future considerations. - Make acreate() scope explicit (in-process); cross-process orchestration is not the same problem. - Add Phase 4 scope clarifier - branch parallelism, not stage pipelining. - New Future considerations section: external orchestration (vague, uncommitted) and pipelined execution of dependent stages. --- plans/workflow-chaining/workflow-chaining.md | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/plans/workflow-chaining/workflow-chaining.md b/plans/workflow-chaining/workflow-chaining.md index 3ea040bd8..791884aaf 100644 --- a/plans/workflow-chaining/workflow-chaining.md +++ b/plans/workflow-chaining/workflow-chaining.md @@ -175,7 +175,7 @@ The `Pipeline` is constructed via `dd.pipeline()` and holds a reference to the p **Throttle coordination across stages.** A `DataDesigner` owns one `ModelRegistry`, which owns one `ThrottleManager`. AIMD rate-limit state is per-instance. If the pipeline constructed a fresh `DataDesigner` per stage, each stage would adapt independently and the aggregate request rate against a provider could exceed the configured cap by a multiple of the stage count. The same hazard applies to parallel branches in Phase 4: branches sharing one `DataDesigner` automatically share throttling; branches each holding their own `DataDesigner` silently fragment it. Reusing one instance is the simple, correct default. -**Door open for external orchestration.** If cross-process or distributed execution is ever introduced, the natural seam is the throttle backend (today an in-memory `ThrottleManager`; potentially a coordinator-backed implementation later). By keeping ownership of throttling *explicit* on the parent `DataDesigner` rather than *implicit* per stage, the pipeline's shape does not preclude swapping in such a backend. v1 does not need this and will not implement it; v1 only needs to avoid encoding assumptions that would prevent it. +**Door open for external orchestration.** The pipeline's choice to reuse one `DataDesigner` is the in-process strategy: shared throttling across stages, branches gathered in the orchestrator process. A cross-process strategy is a separate but compatible model - see Future considerations. v1 only needs to avoid encoding assumptions that would prevent it. **On-disk handoffs for the same reason.** Stage handoffs go through parquet on disk via `LocalFileSeedSource`, never through an in-memory `DataFrameSeedSource`. This composes with any future orchestration model (in-process, cross-process, distributed) without per-environment branching. The cost is one parquet round-trip per stage boundary, which is negligible compared to LLM call time at any realistic scale. The notebook ergonomic `to_config_builder()` is the in-memory escape hatch and is explicitly not a Pipeline. @@ -183,7 +183,7 @@ The `Pipeline` is constructed via `dd.pipeline()` and holds a reference to the p #### Engine API surface: `acreate()` -`Pipeline` v1 calls `DataDesigner.create()` synchronously per stage and runs them in order. Sequential execution doesn't need an async API. Parallel execution does, and the engine doesn't expose one today. +`Pipeline` v1 calls `DataDesigner.create()` synchronously per stage and runs them in order. Sequential execution doesn't need an async API. *In-process* parallel execution does, and the engine doesn't expose one today. Cross-process orchestration is not the same problem: each worker runs sync `create()` in its own process and doesn't need an async surface. Adding `async def acreate(...)` on `DataDesigner` is a small, additive change. The underlying `_build_async` already runs on a singleton background event loop and submits work via a `concurrent.futures.Future`; `acreate()` bridges it into the caller's loop via `asyncio.wrap_future`. The sync `create()` becomes a one-line wrapper. No breaking changes. @@ -363,6 +363,7 @@ result_2 = dd.create(config_2, num_records=200) # explode: 50 -> 200 - Per-stage fingerprint composition (Phase 3) generalizes naturally: a stage's upstream fingerprint becomes the hash of all its parents' fingerprints. - Throttle coordination relies on the existing invariant: all branches run on the same parent `DataDesigner`, so `ThrottleManager` is shared. - Hard dependency on the `acreate()` sidecar. +- **Scope: branch parallelism, not stage pipelining.** Stages still wait for their dependencies to fully complete before starting; pipelined execution of dependent stages is a separate direction sketched in Future considerations. - Tests: fan-out (one upstream, multiple parallel children); join (multiple upstreams, one child); resume invalidation when one branch's fingerprint changes; throttle behavior under N parallel branches. ### Phase 5 (future): Auto-chaining from single config @@ -371,6 +372,14 @@ result_2 = dd.create(config_2, num_records=200) # explode: 50 -> 200 - Auto-split into pipeline stages internally. - User sees a single `dd.create(config)` call but gets multi-stage execution. +## Future considerations + +Items not on the current roadmap but worth flagging so they don't get accidentally precluded by v1-v5 design choices. + +**External orchestration for cross-process / distributed execution.** There is interest in eventually running DataDesigner workloads across processes or nodes - self-hosted serving, multi-host fan-out, scheduling against external clusters. The specific shape of that orchestration is still under discussion and is not committed to here. The chaining plan's design choices (parent `DataDesigner` reuse, on-disk handoffs, no new engine surface) compose naturally with such a system: an external orchestrator could dispatch independent `DataDesigner.create()` calls against partitioned slices and per-replica endpoints without the pipeline class needing to change. v1-v5 do not depend on this materializing. + +**Pipelined execution of dependent stages.** Today the stage data contract is "final dataset" - a downstream stage waits for its upstream to fully complete. A future direction is to let downstream stages consume upstream batches as they're produced, overlapping execution across the dependency edge. Required changes: streaming seed sources, an explicit "stage done" sentinel rather than file-completion checks, and resume semantics for partially-consumed upstreams. Most useful when stage bottlenecks are heterogeneous (LLM-bound stage feeding a CPU-bound validator); little gain when both stages are LLM-bound since they share provider capacity. Not designed here; flagged so the stage contract isn't quietly closed off. + ## Resolved decisions These were open in earlier drafts; recording the resolutions here so the design is unambiguous. From 3de41007ec3147d4625179d95e93dc1922d01d6b Mon Sep 17 00:00:00 2001 From: Andre Manoel Date: Fri, 8 May 2026 20:20:50 +0000 Subject: [PATCH 08/13] docs: address workflow chaining review comments --- plans/workflow-chaining/workflow-chaining.md | 29 ++++++++++++-------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/plans/workflow-chaining/workflow-chaining.md b/plans/workflow-chaining/workflow-chaining.md index 791884aaf..cb204e5ec 100644 --- a/plans/workflow-chaining/workflow-chaining.md +++ b/plans/workflow-chaining/workflow-chaining.md @@ -46,7 +46,7 @@ A new `Pipeline` class in `data_designer.interface` that orchestrates multi-stag **Explicit multi-stage pipeline:** ```python -pipeline = dd.pipeline() +pipeline = dd.pipeline(name="persona-conversations") pipeline.add_stage("personas", config_personas, num_records=100) pipeline.add_stage("conversations", config_convos, num_records=1000) # explode: 100 -> 1000 pipeline.add_stage("judged", config_judge) # defaults to previous stage's output size @@ -58,6 +58,8 @@ results["conversations"].load_dataset() # stage 2 output results["judged"].load_dataset() # final output ``` +`name` is required and is the durable identity for artifact lookup and resume. Reusing the same name across Python sessions lets `pipeline.run(resume=True)` find the previous `pipeline-metadata.json`. + **Convenience method on results (lightweight, for notebooks):** For interactive use where a full pipeline is overkill, a `to_config_builder()` method on `DatasetCreationResults` returns a pre-seeded `DataDesignerConfigBuilder`: @@ -101,6 +103,7 @@ def filter_high_quality(stage_output_path: Path) -> Path: df.to_parquet(out / "data.parquet") return out +pipeline = dd.pipeline(name="filter-enrich") pipeline.add_stage("generated", config_gen, num_records=1000) pipeline.add_stage( "enriched", @@ -122,11 +125,11 @@ The callback receives the path to the completed stage's artifact directory (cont #### Artifact management -The pipeline owns its directory layout directly, bypassing `ArtifactStorage`'s default auto-rename behavior (which appends timestamps to non-empty directories). Stage directories use stable, deterministic names based on stage index and name: +The pipeline owns its directory layout directly, bypassing `ArtifactStorage`'s default auto-rename behavior (which appends timestamps to non-empty directories). `dd.pipeline(name=...)` maps to `artifacts//`; no timestamp, UUID, or object-derived default is used for resumable pipelines. Stage directories use stable, deterministic names based on stage index and name: ``` artifacts/ - pipeline-name/ + / stage-0-personas/ parquet-files/ metadata.json @@ -139,7 +142,7 @@ artifacts/ pipeline-metadata.json ``` -The pipeline creates each stage's `ArtifactStorage` with the stage directory as `dataset_name`, ensuring stable paths across reruns. +The pipeline creates each stage's `ArtifactStorage` with the stage directory as `dataset_name`, ensuring stable paths across reruns. A fresh `dd.pipeline(name="gen-judge")` finds the same `artifacts/gen-judge/pipeline-metadata.json` path as the original run. #### Checkpointing and resume @@ -163,6 +166,7 @@ The connection to #525: chaining gives coarse (stage-level) checkpointing for fr #### Provenance `pipeline-metadata.json` records: +- Pipeline name - Stage order, names, and configs used - Per-stage fingerprint for resume invalidation: `DataDesignerConfig.fingerprint()` (#587) combined with `num_records`, DD version, and the upstream stage fingerprint - `num_records` requested vs actual per stage @@ -171,7 +175,7 @@ The connection to #525: chaining gives coarse (stage-level) checkpointing for fr #### Composability and the throttle invariant -The `Pipeline` is constructed via `dd.pipeline()` and holds a reference to the parent `DataDesigner`. Every stage runs `dd.create()` (or `dd.acreate()` once available - see Engine API surface below) on that same instance. This is a load-bearing API contract for two reasons. +The `Pipeline` is constructed via `dd.pipeline(name=...)` and holds a reference to the parent `DataDesigner`. Every stage runs `dd.create()` (or `dd.acreate()` once available - see Engine API surface below) on that same instance. This is a load-bearing API contract for two reasons. **Throttle coordination across stages.** A `DataDesigner` owns one `ModelRegistry`, which owns one `ThrottleManager`. AIMD rate-limit state is per-instance. If the pipeline constructed a fresh `DataDesigner` per stage, each stage would adapt independently and the aggregate request rate against a provider could exceed the configured cap by a multiple of the stage count. The same hazard applies to parallel branches in Phase 4: branches sharing one `DataDesigner` automatically share throttling; branches each holding their own `DataDesigner` silently fragment it. Reusing one instance is the simple, correct default. @@ -252,7 +256,7 @@ config_convos = ( .add_column(name="conversation", column_type="llm_text", prompt="Write a conversation between {{ name }} and an assistant about {{ topic }}...") ) -pipeline = dd.pipeline() +pipeline = dd.pipeline(name="persona-conversations") pipeline.add_stage("personas", config_personas, num_records=100) pipeline.add_stage("conversations", config_convos, num_records=1000) results = pipeline.run() @@ -274,7 +278,7 @@ def keep_high_quality(stage_output_path: Path) -> Path: df.to_parquet(out / "data.parquet") return out -pipeline = dd.pipeline() +pipeline = dd.pipeline(name="filter-enrich") pipeline.add_stage("candidates", config_gen, num_records=5000) pipeline.add_stage("enriched", config_enrich, after=keep_high_quality) results = pipeline.run() @@ -291,13 +295,13 @@ config_gen = DataDesignerConfigBuilder(model_configs=[fast_model])... # Stage 2: judge with a stronger model config_judge = DataDesignerConfigBuilder(model_configs=[strong_model])... -pipeline = dd.pipeline() +pipeline = dd.pipeline(name="gen-judge") pipeline.add_stage("generated", config_gen, num_records=1000) pipeline.add_stage("judged", config_judge) results = pipeline.run() # Later: tweak judging config, resume from stage 1 output -pipeline_v2 = dd.pipeline() +pipeline_v2 = dd.pipeline(name="gen-judge") pipeline_v2.add_stage("generated", config_gen, num_records=1000) pipeline_v2.add_stage("judged", config_judge_v2) results_v2 = pipeline_v2.run(resume=True) # skips stage 1 @@ -328,7 +332,7 @@ result_2 = dd.create(config_2, num_records=200) # explode: 50 -> 200 - Stage handoff is always on disk via `LocalFileSeedSource`; no in-memory handoff path inside `Pipeline`. - Internal stage representation is a DAG (linear-only inputs in v1). - Add `pipeline-metadata.json` writing. -- Add `dd.pipeline()` factory method on `DataDesigner`. +- Add `dd.pipeline(name: str)` factory method on `DataDesigner`. - Tests: multi-stage runs, explode/filter via callbacks, num_records defaulting, artifact layout, throttle reuse across stages. ### Sidecar: `acreate()` on `DataDesigner` (independent of chaining v1) @@ -352,6 +356,7 @@ result_2 = dd.create(config_2, num_records=200) # explode: 50 -> 200 - Add `resume=True` to `pipeline.run()`. - Read `pipeline-metadata.json` to detect completed stages. +- Resolve the metadata path from the explicit pipeline name. - Compute each stage's fingerprint via `DataDesignerConfig.fingerprint()` (#587) combined with `num_records`, DD version, and upstream stage fingerprint; invalidate the stage and everything downstream on any mismatch. - Skip stages whose fingerprints match, seed next stage from last completed output. - Depends on artifact layout from phase 1. @@ -360,7 +365,7 @@ result_2 = dd.create(config_2, num_records=200) # explode: 50 -> 200 - Extend `add_stage()` with an optional `depends_on=[stage_name, ...]` argument; default keeps the linear behavior. - `pipeline.run()` walks the resulting DAG, gathering independent branches via `asyncio.gather` over `dd.acreate()` calls. -- Per-stage fingerprint composition (Phase 3) generalizes naturally: a stage's upstream fingerprint becomes the hash of all its parents' fingerprints. +- Per-stage fingerprint composition (Phase 3) generalizes naturally: a stage's upstream fingerprint becomes the hash of all parent fingerprints sorted by stage name, making joins stable regardless of `depends_on` declaration order. - Throttle coordination relies on the existing invariant: all branches run on the same parent `DataDesigner`, so `ThrottleManager` is shared. - Hard dependency on the `acreate()` sidecar. - **Scope: branch parallelism, not stage pipelining.** Stages still wait for their dependencies to fully complete before starting; pipelined execution of dependent stages is a separate direction sketched in Future considerations. @@ -388,7 +393,7 @@ These were open in earlier drafts; recording the resolutions here so the design 2. **Branch/fan-out semantics (DAG)** -> Designed-in but not v1. The internal stage representation is a DAG; v1 only accepts linear inputs through `add_stage()`. Phase 4 ships parallel branches via `asyncio.gather` over `acreate()`. v1 stays sequential. -3. **Pipeline construction** -> `Pipeline` is created via `dd.pipeline()` and reuses the parent `DataDesigner`'s `ModelRegistry` and `ThrottleManager` across all stages. The pipeline does not construct its own `DataDesigner` instances. This is the throttle-coordination invariant (see Composability section). +3. **Pipeline construction** -> `Pipeline` is created via `dd.pipeline(name=...)` and reuses the parent `DataDesigner`'s `ModelRegistry` and `ThrottleManager` across all stages. The explicit name is the durable artifact identity used for resume, and the pipeline does not construct its own `DataDesigner` instances. This is the throttle-coordination invariant (see Composability section). ## Open questions From 246bfb33579bc2f144b5a6d3b724acc3f43693f7 Mon Sep 17 00:00:00 2001 From: Andre Manoel Date: Mon, 11 May 2026 17:36:15 +0000 Subject: [PATCH 09/13] docs: tighten workflow chaining resume semantics --- plans/workflow-chaining/workflow-chaining.md | 34 +++++++++++++------- 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/plans/workflow-chaining/workflow-chaining.md b/plans/workflow-chaining/workflow-chaining.md index cb204e5ec..0cdf5e897 100644 --- a/plans/workflow-chaining/workflow-chaining.md +++ b/plans/workflow-chaining/workflow-chaining.md @@ -58,7 +58,7 @@ results["conversations"].load_dataset() # stage 2 output results["judged"].load_dataset() # final output ``` -`name` is required and is the durable identity for artifact lookup and resume. Reusing the same name across Python sessions lets `pipeline.run(resume=True)` find the previous `pipeline-metadata.json`. +`name` is required and is the durable identity for artifact lookup and resume. Reusing the same name across Python sessions lets `pipeline.run(resume=ResumeMode.IF_POSSIBLE)` find the previous `pipeline-metadata.json`. **Convenience method on results (lightweight, for notebooks):** @@ -109,11 +109,14 @@ pipeline.add_stage( "enriched", config_enrich, after=filter_high_quality, # runs on stage output before next stage seeds from it + after_version="quality-filter-v1", ) ``` The callback receives the path to the completed stage's artifact directory (containing `parquet-files/`, `metadata.json`, etc.) and returns a path that the next stage will seed from. This keeps large DataFrames on disk and gives users full control. +**Callback resume policy**: The pipeline does not hash arbitrary Python source or bytecode in v1. `after_version` is the explicit callback identity recorded in `pipeline-metadata.json` and included in the next stage's fingerprint. If `after` is set without `after_version`, that stage is treated as dirty on every resume so a changed callback cannot silently reuse stale transformed data. + **Empty stage policy**: If a callback filters all rows (or a stage produces zero rows), the pipeline raises `DataDesignerPipelineError` by default. Stages can opt in to empty output with `allow_empty=True` on `add_stage()`, in which case the pipeline short-circuits and skips subsequent stages. #### `num_records` and seed behavior @@ -149,26 +152,32 @@ The pipeline creates each stage's `ArtifactStorage` with the stage directory as Each stage produces durable parquet output before the next stage starts. This provides natural checkpoint boundaries: - If stage 3 of 4 fails, stages 1 and 2 are already on disk. -- A `resume=True` flag on `pipeline.run()` skips completed stages. -- Within a stage, batch-level resume (#525) can further reduce re-work. +- `pipeline.run(resume=ResumeMode.IF_POSSIBLE)` skips compatible completed stages and resumes compatible partial stages. +- Within a stage, batch/row-group resume from #526 can further reduce re-work. + +**Relationship to #526**: #526 is the fine-grained single-stage resume primitive. It resumes one `DataDesigner.create()` call from completed batches (sync) or row groups (async), but its compatibility check applies to the whole `DataDesignerConfig`. Pipeline resume adds a coarser stage graph above that primitive. A downstream config change invalidates only that stage and its descendants, so upstream generation can be reused instead of failing the whole-config compatibility check. + +Pipeline resume should decide stage compatibility before calling `DataDesigner.create()`. If a stage fingerprint matches and the stage is partial, the pipeline delegates to `create(..., resume=ResumeMode.ALWAYS)` for that stage. If a stage fingerprint changed, the pipeline invalidates that stage directory and descendants, then starts them fresh. It should not blindly pass `ResumeMode.IF_POSSIBLE` through to stage `create()`, because pipeline stage directories must remain deterministic under `artifacts//`. **Resume safety**: Naive "skip if directory exists" is not sufficient. Configs, model settings, callbacks, or DD version may have changed between runs. Resume must compare a fingerprint of each stage's inputs against what's recorded in `pipeline-metadata.json`. The per-stage fingerprint composes: -- `DataDesignerConfig.fingerprint()` (introduced in #587) — content-addressable sha256 over the data-relevant portion of the config +- `DataDesignerConfig.fingerprint()` (introduced in #587) - content-addressable sha256 over the data-relevant portion of the config - `num_records` (requested) +- `sampling_strategy`, `selection_strategy`, and `allow_empty` +- `after_version` when `after` is configured; if omitted, the stage is always dirty on resume - DD version - Upstream stage fingerprint (the directly preceding stage's recorded fingerprint, so a change anywhere in the chain invalidates downstream stages) If any component changed, that stage and all downstream stages must re-run. This is a phase 3 concern but the metadata format in phase 1 should record enough information to support it. -The connection to #525: chaining gives coarse (stage-level) checkpointing for free. #525 gives fine (batch-level) checkpointing within a stage. They are complementary. +The connection to #526/#525: chaining gives workflow-level checkpointing and smaller invalidation boundaries. #526 gives fine-grained crash recovery within each stage. They are complementary, and Phase 3 should use #526 rather than inventing another intra-stage resume mechanism. #### Provenance `pipeline-metadata.json` records: - Pipeline name - Stage order, names, and configs used -- Per-stage fingerprint for resume invalidation: `DataDesignerConfig.fingerprint()` (#587) combined with `num_records`, DD version, and the upstream stage fingerprint +- Per-stage fingerprint for resume invalidation: `DataDesignerConfig.fingerprint()` (#587) combined with `num_records`, seed sampling/selection controls, `after_version`, DD version, and the upstream stage fingerprint - `num_records` requested vs actual per stage - Which stage's output seeded the next - Timestamp, duration, and DD version per stage @@ -280,7 +289,7 @@ def keep_high_quality(stage_output_path: Path) -> Path: pipeline = dd.pipeline(name="filter-enrich") pipeline.add_stage("candidates", config_gen, num_records=5000) -pipeline.add_stage("enriched", config_enrich, after=keep_high_quality) +pipeline.add_stage("enriched", config_enrich, after=keep_high_quality, after_version="quality-filter-v1") results = pipeline.run() ``` @@ -304,7 +313,7 @@ results = pipeline.run() pipeline_v2 = dd.pipeline(name="gen-judge") pipeline_v2.add_stage("generated", config_gen, num_records=1000) pipeline_v2.add_stage("judged", config_judge_v2) -results_v2 = pipeline_v2.run(resume=True) # skips stage 1 +results_v2 = pipeline_v2.run(resume=ResumeMode.IF_POSSIBLE) # skips stage 1 ``` ### 4. Interactive notebook chaining (lightweight, no pipeline) @@ -354,11 +363,12 @@ result_2 = dd.create(config_2, num_records=200) # explode: 50 -> 200 ### Phase 3: Stage-level resume -- Add `resume=True` to `pipeline.run()`. +- Add `resume: ResumeMode` to `pipeline.run()`, reusing the enum introduced by #526. - Read `pipeline-metadata.json` to detect completed stages. - Resolve the metadata path from the explicit pipeline name. -- Compute each stage's fingerprint via `DataDesignerConfig.fingerprint()` (#587) combined with `num_records`, DD version, and upstream stage fingerprint; invalidate the stage and everything downstream on any mismatch. -- Skip stages whose fingerprints match, seed next stage from last completed output. +- Compute each stage's fingerprint via `DataDesignerConfig.fingerprint()` (#587) combined with `num_records`, seed sampling/selection controls, `after_version`, DD version, and upstream stage fingerprint; invalidate the stage and everything downstream on any mismatch. +- Skip stages whose fingerprints match and are complete; for matching partial stages, call `DataDesigner.create(..., resume=ResumeMode.ALWAYS)` to use #526's batch/row-group resume. +- For invalidated stages, clear or replace the deterministic stage directory before starting fresh so `ArtifactStorage` does not timestamp away from the pipeline layout. - Depends on artifact layout from phase 1. ### Phase 4: DAG-shaped stages with parallel branches @@ -410,6 +420,6 @@ These were open in earlier drafts; recording the resolutions here so the design ## Related issues - #447 - AsyncRunController refactor (partially superseded: pre-batch resize handling moves to pipeline level instead of controller level) -- #525 - Resume interrupted runs (complementary: stage-level resume from pipeline, batch-level resume from #525) +- #526 / #525 - Resume interrupted runs (single-stage batch/row-group resume primitive used by pipeline stage resume) - #462 - Progress bar and scheduler polish (independent) - #464 - Custom column retryable errors (independent) From b10d763eb7cdea2ecbdbb14bb04b929ad9dddfbd Mon Sep 17 00:00:00 2001 From: Andre Manoel Date: Mon, 11 May 2026 17:59:36 +0000 Subject: [PATCH 10/13] docs: validate callback seed paths on resume --- plans/workflow-chaining/workflow-chaining.md | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/plans/workflow-chaining/workflow-chaining.md b/plans/workflow-chaining/workflow-chaining.md index 0cdf5e897..870b7c6cf 100644 --- a/plans/workflow-chaining/workflow-chaining.md +++ b/plans/workflow-chaining/workflow-chaining.md @@ -115,7 +115,7 @@ pipeline.add_stage( The callback receives the path to the completed stage's artifact directory (containing `parquet-files/`, `metadata.json`, etc.) and returns a path that the next stage will seed from. This keeps large DataFrames on disk and gives users full control. -**Callback resume policy**: The pipeline does not hash arbitrary Python source or bytecode in v1. `after_version` is the explicit callback identity recorded in `pipeline-metadata.json` and included in the next stage's fingerprint. If `after` is set without `after_version`, that stage is treated as dirty on every resume so a changed callback cannot silently reuse stale transformed data. +**Callback resume policy**: The pipeline does not hash arbitrary Python source or bytecode in v1. `after_version` is the explicit callback identity recorded in `pipeline-metadata.json` and included in the next stage's fingerprint. If `after` is set without `after_version`, that stage is treated as dirty on every resume so a changed callback cannot silently reuse stale transformed data. The resolved path returned by the callback is also recorded as the dependent stage's seed path; a stage seeded from callback output is skippable only if that recorded path still exists and is readable by `LocalFileSeedSource`. **Empty stage policy**: If a callback filters all rows (or a stage produces zero rows), the pipeline raises `DataDesignerPipelineError` by default. Stages can opt in to empty output with `allow_empty=True` on `add_stage()`, in which case the pipeline short-circuits and skips subsequent stages. @@ -159,6 +159,8 @@ Each stage produces durable parquet output before the next stage starts. This pr Pipeline resume should decide stage compatibility before calling `DataDesigner.create()`. If a stage fingerprint matches and the stage is partial, the pipeline delegates to `create(..., resume=ResumeMode.ALWAYS)` for that stage. If a stage fingerprint changed, the pipeline invalidates that stage directory and descendants, then starts them fresh. It should not blindly pass `ResumeMode.IF_POSSIBLE` through to stage `create()`, because pipeline stage directories must remain deterministic under `artifacts//`. +If a stage's seed came from an `after` callback, a fingerprint match is necessary but not sufficient to skip it. Resume must also verify the recorded callback output path exists. If the path is missing, the dependent stage and its descendants are invalidated and the callback/stage boundary is re-run from the upstream stage output. + **Resume safety**: Naive "skip if directory exists" is not sufficient. Configs, model settings, callbacks, or DD version may have changed between runs. Resume must compare a fingerprint of each stage's inputs against what's recorded in `pipeline-metadata.json`. The per-stage fingerprint composes: - `DataDesignerConfig.fingerprint()` (introduced in #587) - content-addressable sha256 over the data-relevant portion of the config @@ -180,6 +182,7 @@ The connection to #526/#525: chaining gives workflow-level checkpointing and sma - Per-stage fingerprint for resume invalidation: `DataDesignerConfig.fingerprint()` (#587) combined with `num_records`, seed sampling/selection controls, `after_version`, DD version, and the upstream stage fingerprint - `num_records` requested vs actual per stage - Which stage's output seeded the next +- Resolved seed path per stage, including callback output paths returned by `after` - Timestamp, duration, and DD version per stage #### Composability and the throttle invariant @@ -368,6 +371,7 @@ result_2 = dd.create(config_2, num_records=200) # explode: 50 -> 200 - Resolve the metadata path from the explicit pipeline name. - Compute each stage's fingerprint via `DataDesignerConfig.fingerprint()` (#587) combined with `num_records`, seed sampling/selection controls, `after_version`, DD version, and upstream stage fingerprint; invalidate the stage and everything downstream on any mismatch. - Skip stages whose fingerprints match and are complete; for matching partial stages, call `DataDesigner.create(..., resume=ResumeMode.ALWAYS)` to use #526's batch/row-group resume. +- Before skipping or resuming a stage seeded by `after`, validate the recorded callback output path exists and can seed `LocalFileSeedSource`; if missing, invalidate that stage and descendants. - For invalidated stages, clear or replace the deterministic stage directory before starting fresh so `ArtifactStorage` does not timestamp away from the pipeline layout. - Depends on artifact layout from phase 1. From 6986a75ce7c9dc7b63f9fe200817a41367718d00 Mon Sep 17 00:00:00 2001 From: Andre Manoel Date: Mon, 11 May 2026 18:29:01 +0000 Subject: [PATCH 11/13] docs: define empty pipeline stage results --- plans/workflow-chaining/workflow-chaining.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/plans/workflow-chaining/workflow-chaining.md b/plans/workflow-chaining/workflow-chaining.md index 870b7c6cf..56dff9d7b 100644 --- a/plans/workflow-chaining/workflow-chaining.md +++ b/plans/workflow-chaining/workflow-chaining.md @@ -117,7 +117,7 @@ The callback receives the path to the completed stage's artifact directory (cont **Callback resume policy**: The pipeline does not hash arbitrary Python source or bytecode in v1. `after_version` is the explicit callback identity recorded in `pipeline-metadata.json` and included in the next stage's fingerprint. If `after` is set without `after_version`, that stage is treated as dirty on every resume so a changed callback cannot silently reuse stale transformed data. The resolved path returned by the callback is also recorded as the dependent stage's seed path; a stage seeded from callback output is skippable only if that recorded path still exists and is readable by `LocalFileSeedSource`. -**Empty stage policy**: If a callback filters all rows (or a stage produces zero rows), the pipeline raises `DataDesignerPipelineError` by default. Stages can opt in to empty output with `allow_empty=True` on `add_stage()`, in which case the pipeline short-circuits and skips subsequent stages. +**Empty stage policy**: If a callback filters all rows (or a stage produces zero rows), the pipeline raises `DataDesignerPipelineError` by default. Stages can opt in to empty output with `allow_empty=True` on `add_stage()`, in which case the pipeline marks that stage as `completed_empty` and all downstream stages as `skipped_empty_upstream`. `PipelineResults` still contains every declared stage name: executed stages map to `DatasetCreationResults`, while skipped downstream stages map to `SkippedStageResult` with `status="skipped_empty_upstream"` and `upstream_stage=`. This avoids `KeyError`/`None` ambiguity and gives resume a durable state distinct from normal completion. #### `num_records` and seed behavior @@ -179,6 +179,7 @@ The connection to #526/#525: chaining gives workflow-level checkpointing and sma `pipeline-metadata.json` records: - Pipeline name - Stage order, names, and configs used +- Per-stage status: `completed`, `completed_empty`, `skipped_empty_upstream`, or `failed` - Per-stage fingerprint for resume invalidation: `DataDesignerConfig.fingerprint()` (#587) combined with `num_records`, seed sampling/selection controls, `after_version`, DD version, and the upstream stage fingerprint - `num_records` requested vs actual per stage - Which stage's output seeded the next @@ -371,6 +372,7 @@ result_2 = dd.create(config_2, num_records=200) # explode: 50 -> 200 - Resolve the metadata path from the explicit pipeline name. - Compute each stage's fingerprint via `DataDesignerConfig.fingerprint()` (#587) combined with `num_records`, seed sampling/selection controls, `after_version`, DD version, and upstream stage fingerprint; invalidate the stage and everything downstream on any mismatch. - Skip stages whose fingerprints match and are complete; for matching partial stages, call `DataDesigner.create(..., resume=ResumeMode.ALWAYS)` to use #526's batch/row-group resume. +- Preserve `completed_empty` and `skipped_empty_upstream` statuses on resume when fingerprints still match; rerun from the first changed or missing upstream stage otherwise. - Before skipping or resuming a stage seeded by `after`, validate the recorded callback output path exists and can seed `LocalFileSeedSource`; if missing, invalidate that stage and descendants. - For invalidated stages, clear or replace the deterministic stage directory before starting fresh so `ArtifactStorage` does not timestamp away from the pipeline layout. - Depends on artifact layout from phase 1. From 8916703a5314fb81fb4fbfdc9eb2fe6830ad6001 Mon Sep 17 00:00:00 2001 From: Andre Manoel Date: Mon, 11 May 2026 23:12:44 +0000 Subject: [PATCH 12/13] docs: clarify composite workflow plan --- plans/workflow-chaining/workflow-chaining.md | 248 ++++++++++--------- 1 file changed, 132 insertions(+), 116 deletions(-) diff --git a/plans/workflow-chaining/workflow-chaining.md b/plans/workflow-chaining/workflow-chaining.md index 56dff9d7b..9427ec06c 100644 --- a/plans/workflow-chaining/workflow-chaining.md +++ b/plans/workflow-chaining/workflow-chaining.md @@ -19,7 +19,9 @@ This matters for several use cases: ## Proposed solution -Add **workflow chaining**: a thin orchestration layer that sequences multiple generation stages, passing each stage's output as the next stage's seed dataset. This is the primary deliverable. +Add **composite workflows**: a thin orchestration layer that sequences multiple independent DataDesigner workflows, passing each stage's output as the next stage's seed dataset. This is the primary deliverable. + +A `CompositeWorkflow` composes independent `DataDesigner.create()` runs. Each stage is still a normal DataDesigner workflow with its own config; the composition layer manages ordering/dependencies, deterministic on-disk handoffs, shared throttling, provenance, and resume. v1 exposes linear stage composition, but the internal representation is DAG-shaped so future branches and joins do not require renaming the abstraction. As a secondary benefit, chaining also enables the removal of `allow_resize` and simplification of the engine's resize handling. @@ -27,7 +29,7 @@ As a secondary benefit, chaining also enables the removal of `allow_resize` and The `allow_resize` flag on column configs lets a generator change the row count mid-generation. This works in the sync engine but is fundamentally incompatible with the async engine's fixed-size `CompletionTracker` grid. As of #553, an `allow_resize=True` config in async mode logs a `DeprecationWarning` and silently falls back to the sync engine for that run; it is no longer hard-rejected. -`allow_resize` is one of the remaining divergences between sync and async. The async engine is the default execution path as of #592; sync remains only as a fallback for `allow_resize` runs. Maintaining a sync-only feature to keep one fallback path alive is counterproductive. With chaining in place, resize becomes a between-stage concern rather than a mid-generation concern. This lets us remove `allow_resize` and the associated engine complexity, and disallow row-count changes in pre-batch processors. Users who need resize use a pipeline with a stage boundary at the resize point. +`allow_resize` is one of the remaining divergences between sync and async. The async engine is the default execution path as of #592; sync remains only as a fallback for `allow_resize` runs. Maintaining a sync-only feature to keep one fallback path alive is counterproductive. With composite workflows in place, resize becomes a between-stage concern rather than a mid-generation concern. This lets us remove `allow_resize` and the associated engine complexity, and disallow row-count changes in pre-batch processors. Users who need resize use a composite workflow with a stage boundary at the resize point. Note: `allow_resize` is documented in custom columns, plugin examples, and agent rollout ingestion docs (verified post-Fern migration in #581). The deprecation warning has shipped in #553; full removal still requires doc updates and the migration of any in-tree usage. @@ -37,87 +39,89 @@ The async scheduler's `CompletionTracker` pre-allocates a (row_group x row_index ## Design -### Part 1: Pipeline class +### Part 1: `CompositeWorkflow` class -A new `Pipeline` class in `data_designer.interface` that orchestrates multi-stage generation. +A new `CompositeWorkflow` class in `data_designer.interface` that orchestrates multi-stage generation. It is created with `DataDesigner.compose_workflow(name=...)`: the factory name describes the action of composing workflows, while the returned object names the resulting abstraction. #### User-facing API -**Explicit multi-stage pipeline:** +**Explicit multi-stage composite workflow:** ```python -pipeline = dd.pipeline(name="persona-conversations") -pipeline.add_stage("personas", config_personas, num_records=100) -pipeline.add_stage("conversations", config_convos, num_records=1000) # explode: 100 -> 1000 -pipeline.add_stage("judged", config_judge) # defaults to previous stage's output size +data_designer = DataDesigner() +workflow = data_designer.compose_workflow(name="persona-conversations") +workflow.add_stage("personas", config_personas, num_records=100) +workflow.add_stage("conversations", config_convos, num_records=1000) # explode: 100 -> 1000 +workflow.add_stage("judged", config_judge) # defaults to previous stage's output size -results = pipeline.run() +results = workflow.run() results["personas"].load_dataset() # stage 1 output results["conversations"].load_dataset() # stage 2 output results["judged"].load_dataset() # final output ``` -`name` is required and is the durable identity for artifact lookup and resume. Reusing the same name across Python sessions lets `pipeline.run(resume=ResumeMode.IF_POSSIBLE)` find the previous `pipeline-metadata.json`. +`name` is required and is the durable identity for artifact lookup and resume. Reusing the same name across Python sessions lets `workflow.run(resume=ResumeMode.IF_POSSIBLE)` find the previous `workflow-metadata.json`. + +Each stage can use a different `DataDesignerConfig`, including different model configs. v1 still uses the parent `DataDesigner` instance and run settings for all stages; per-stage `RunConfig` and compute placement are future work. **Convenience method on results (lightweight, for notebooks):** -For interactive use where a full pipeline is overkill, a `to_config_builder()` method on `DatasetCreationResults` returns a pre-seeded `DataDesignerConfigBuilder`: +For interactive use where a full composite workflow is overkill, a `to_config_builder()` method on `DatasetCreationResults` returns a pre-seeded `DataDesignerConfigBuilder`: ```python # Stage 1 -result = dd.create(config_personas, num_records=100) +result = data_designer.create(config_personas, num_records=100) # Stage 2 - just grab the result and keep going config_convos = ( result.to_config_builder(columns=["name", "age", "background"]) # optional column selection .add_column(name="conversation", column_type="llm_text", prompt="...") ) -result_2 = dd.create(config_convos, num_records=1000) +result_2 = data_designer.create(config_convos, num_records=1000) ``` -This is a thin wrapper: loads the dataset into memory, optionally filters columns, wraps in `DataFrameSeedSource`, returns a new config builder. No tracking, no provenance, no callbacks - just a quick bridge for iteration. Not suitable for large datasets (loads full DataFrame into memory) or serializable configs (`DataFrameSeedSource` can't be written to YAML). - -This is the *only* place in the chaining surface that uses an in-memory handoff. `Pipeline` itself always hands off between stages on disk - see "Composability and the throttle invariant" below. For production pipelines, use the `Pipeline` class. - -**Auto-chaining from a single config (future):** +This is a thin wrapper: loads the dataset into memory, optionally filters columns, wraps in `DataFrameSeedSource`, returns a new config builder. It starts a new config seeded from existing results; it does not mutate, continue, or resume the original config. No tracking, no provenance, no callbacks - just a quick bridge for iteration. Not suitable for large datasets (loads full DataFrame into memory) or serializable configs (`DataFrameSeedSource` can't be written to YAML). -The engine detects columns that were previously `allow_resize=True` (or a new marker like `stage_boundary=True`) and auto-splits the DAG into stages. This is a convenience layer on top of the explicit API - not required for v1. +This is the *only* place in the chaining surface that uses an in-memory handoff. `CompositeWorkflow` itself always hands off between stages on disk - see "Composability and the throttle invariant" below. For production workflows, use the `CompositeWorkflow` class. #### Stage data contract Each stage seeds from the **previous stage's final dataset** - the post-processor output with dropped columns excluded. This is the same DataFrame returned by `DatasetCreationResults.load_dataset()`. -Processor outputs (named processor artifacts) and media assets (images stored on disk with relative paths in the DataFrame) are NOT automatically forwarded. If a downstream stage needs image columns from an upstream stage, the pipeline must resolve image paths relative to the upstream stage's artifact directory. This needs explicit handling - TBD in implementation. +Processor outputs (named processor artifacts), schema-transform artifacts, dropped columns, and media assets (images stored on disk with relative paths in the DataFrame) are NOT automatically forwarded in v1. If a downstream stage needs one of these artifacts, use an `on_success` callback to read the upstream stage artifact directory and write a `LocalFileSeedSource`-compatible dataset for the next stage. First-class seeding from named processor outputs is a future extension. #### Between-stage callbacks -Users may need to transform data between stages. The pipeline supports an optional callback: +Users may need to transform data between stages. `CompositeWorkflow.add_stage()` supports an optional `on_success` callback that transforms a completed stage's output before child stages seed from it: ```python def filter_high_quality(stage_output_path: Path) -> Path: df = pd.read_parquet(stage_output_path / "parquet-files") df = df[df["quality_score"] > 0.8] - out = stage_output_path.parent / "filtered" - out.mkdir(exist_ok=True) + out = stage_output_path / "callback-outputs" / "quality-filter-v1" + out.mkdir(parents=True, exist_ok=True) df.to_parquet(out / "data.parquet") return out -pipeline = dd.pipeline(name="filter-enrich") -pipeline.add_stage("generated", config_gen, num_records=1000) -pipeline.add_stage( - "enriched", - config_enrich, - after=filter_high_quality, # runs on stage output before next stage seeds from it - after_version="quality-filter-v1", +workflow = data_designer.compose_workflow(name="filter-enrich") +workflow.add_stage( + "generated", + config_gen, + num_records=1000, + on_success=filter_high_quality, + on_success_version="quality-filter-v1", ) +workflow.add_stage("enriched", config_enrich) ``` -The callback receives the path to the completed stage's artifact directory (containing `parquet-files/`, `metadata.json`, etc.) and returns a path that the next stage will seed from. This keeps large DataFrames on disk and gives users full control. +The callback receives the path to the completed stage's artifact directory (containing `parquet-files/`, `metadata.json`, etc.) and returns a path that child stages will seed from. This keeps large DataFrames on disk and gives users full control. Callback outputs should live under a managed subdirectory such as `/callback-outputs//` so they stay deterministic and easy to clean. -**Callback resume policy**: The pipeline does not hash arbitrary Python source or bytecode in v1. `after_version` is the explicit callback identity recorded in `pipeline-metadata.json` and included in the next stage's fingerprint. If `after` is set without `after_version`, that stage is treated as dirty on every resume so a changed callback cannot silently reuse stale transformed data. The resolved path returned by the callback is also recorded as the dependent stage's seed path; a stage seeded from callback output is skippable only if that recorded path still exists and is readable by `LocalFileSeedSource`. +**Callback resume policy**: The composite workflow does not hash arbitrary Python source or bytecode in v1. `on_success_version` is the explicit callback identity recorded in `workflow-metadata.json` and included in the stage's exposed output fingerprint. If `on_success` is set without `on_success_version`, that stage's transformed output is treated as dirty on every resume so a changed callback cannot silently reuse stale transformed data. The resolved path returned by the callback is also recorded as the dependent stage's seed path; a stage seeded from callback output is skippable only if that recorded path still exists and is readable by `LocalFileSeedSource`. -**Empty stage policy**: If a callback filters all rows (or a stage produces zero rows), the pipeline raises `DataDesignerPipelineError` by default. Stages can opt in to empty output with `allow_empty=True` on `add_stage()`, in which case the pipeline marks that stage as `completed_empty` and all downstream stages as `skipped_empty_upstream`. `PipelineResults` still contains every declared stage name: executed stages map to `DatasetCreationResults`, while skipped downstream stages map to `SkippedStageResult` with `status="skipped_empty_upstream"` and `upstream_stage=`. This avoids `KeyError`/`None` ambiguity and gives resume a durable state distinct from normal completion. +**Empty stage policy**: If a callback filters all rows (or a stage produces zero rows), the composite workflow raises `DataDesignerWorkflowError` by default. Stages can opt in to empty output with `allow_empty=True` on `add_stage()`, in which case the workflow marks that stage as `completed_empty` and all downstream stages as `skipped_empty_upstream`. `CompositeWorkflowResults` still contains every declared stage name: executed stages map to `DatasetCreationResults`, while skipped downstream stages map to `SkippedStageResult` with `status="skipped_empty_upstream"` and `upstream_stage=`. This avoids `KeyError`/`None` ambiguity and gives resume a durable state distinct from normal completion. + +**Export/push policy**: `CompositeWorkflowResults` export and push helpers default to the final stage's dataset. Exporting selected stages or the full composite workflow artifact bundle is explicit future work. #### `num_records` and seed behavior @@ -128,11 +132,11 @@ The callback receives the path to the completed stage's artifact directory (cont #### Artifact management -The pipeline owns its directory layout directly, bypassing `ArtifactStorage`'s default auto-rename behavior (which appends timestamps to non-empty directories). `dd.pipeline(name=...)` maps to `artifacts//`; no timestamp, UUID, or object-derived default is used for resumable pipelines. Stage directories use stable, deterministic names based on stage index and name: +The composite workflow owns its directory layout directly, bypassing `ArtifactStorage`'s default auto-rename behavior (which appends timestamps to non-empty directories). `data_designer.compose_workflow(name=...)` maps to `artifacts//`; no timestamp, UUID, or object-derived default is used for resumable workflows. Stage directories use stable, deterministic names based on stage index and name: ``` artifacts/ - / + / stage-0-personas/ parquet-files/ metadata.json @@ -142,31 +146,31 @@ artifacts/ stage-2-judged/ parquet-files/ metadata.json - pipeline-metadata.json + workflow-metadata.json ``` -The pipeline creates each stage's `ArtifactStorage` with the stage directory as `dataset_name`, ensuring stable paths across reruns. A fresh `dd.pipeline(name="gen-judge")` finds the same `artifacts/gen-judge/pipeline-metadata.json` path as the original run. +The composite workflow creates each stage's `ArtifactStorage` with the stage directory as `dataset_name`, ensuring stable paths across reruns. A fresh `data_designer.compose_workflow(name="gen-judge")` finds the same `artifacts/gen-judge/workflow-metadata.json` path as the original run. #### Checkpointing and resume Each stage produces durable parquet output before the next stage starts. This provides natural checkpoint boundaries: - If stage 3 of 4 fails, stages 1 and 2 are already on disk. -- `pipeline.run(resume=ResumeMode.IF_POSSIBLE)` skips compatible completed stages and resumes compatible partial stages. +- `workflow.run(resume=ResumeMode.IF_POSSIBLE)` skips compatible completed stages and resumes compatible partial stages. - Within a stage, batch/row-group resume from #526 can further reduce re-work. -**Relationship to #526**: #526 is the fine-grained single-stage resume primitive. It resumes one `DataDesigner.create()` call from completed batches (sync) or row groups (async), but its compatibility check applies to the whole `DataDesignerConfig`. Pipeline resume adds a coarser stage graph above that primitive. A downstream config change invalidates only that stage and its descendants, so upstream generation can be reused instead of failing the whole-config compatibility check. +**Relationship to #526**: #526 is the fine-grained single-stage resume primitive. It resumes one `DataDesigner.create()` call from completed batches (sync) or row groups (async), but its compatibility check applies to the whole `DataDesignerConfig`. Composite workflow resume adds a coarser stage graph above that primitive. A downstream config change invalidates only that stage and its descendants, so upstream generation can be reused instead of failing the whole-config compatibility check. -Pipeline resume should decide stage compatibility before calling `DataDesigner.create()`. If a stage fingerprint matches and the stage is partial, the pipeline delegates to `create(..., resume=ResumeMode.ALWAYS)` for that stage. If a stage fingerprint changed, the pipeline invalidates that stage directory and descendants, then starts them fresh. It should not blindly pass `ResumeMode.IF_POSSIBLE` through to stage `create()`, because pipeline stage directories must remain deterministic under `artifacts//`. +Composite workflow resume should decide stage compatibility before calling `DataDesigner.create()`. If a stage fingerprint matches and the stage is partial, the workflow delegates to `create(..., resume=ResumeMode.ALWAYS)` for that stage. If a stage fingerprint changed, the workflow invalidates that stage directory and descendants, then starts them fresh. It should not blindly pass `ResumeMode.IF_POSSIBLE` through to stage `create()`, because workflow stage directories must remain deterministic under `artifacts//`. -If a stage's seed came from an `after` callback, a fingerprint match is necessary but not sufficient to skip it. Resume must also verify the recorded callback output path exists. If the path is missing, the dependent stage and its descendants are invalidated and the callback/stage boundary is re-run from the upstream stage output. +If a stage's seed came from an `on_success` callback, a fingerprint match is necessary but not sufficient to skip it. Resume must also verify the recorded callback output path exists. If the path is missing, the dependent stage and its descendants are invalidated and the callback/stage boundary is re-run from the upstream stage output. -**Resume safety**: Naive "skip if directory exists" is not sufficient. Configs, model settings, callbacks, or DD version may have changed between runs. Resume must compare a fingerprint of each stage's inputs against what's recorded in `pipeline-metadata.json`. The per-stage fingerprint composes: +**Resume safety**: Naive "skip if directory exists" is not sufficient. Configs, model settings, callbacks, or DD version may have changed between runs. Resume must compare a fingerprint of each stage's inputs against what's recorded in `workflow-metadata.json`. The per-stage fingerprint composes: - `DataDesignerConfig.fingerprint()` (introduced in #587) - content-addressable sha256 over the data-relevant portion of the config - `num_records` (requested) - `sampling_strategy`, `selection_strategy`, and `allow_empty` -- `after_version` when `after` is configured; if omitted, the stage is always dirty on resume +- `on_success_version` when `on_success` is configured; if omitted, the stage's transformed output is always dirty on resume - DD version - Upstream stage fingerprint (the directly preceding stage's recorded fingerprint, so a change anywhere in the chain invalidates downstream stages) @@ -176,44 +180,44 @@ The connection to #526/#525: chaining gives workflow-level checkpointing and sma #### Provenance -`pipeline-metadata.json` records: -- Pipeline name +`workflow-metadata.json` records: +- Composite workflow name - Stage order, names, and configs used - Per-stage status: `completed`, `completed_empty`, `skipped_empty_upstream`, or `failed` -- Per-stage fingerprint for resume invalidation: `DataDesignerConfig.fingerprint()` (#587) combined with `num_records`, seed sampling/selection controls, `after_version`, DD version, and the upstream stage fingerprint +- Per-stage fingerprint for resume invalidation: `DataDesignerConfig.fingerprint()` (#587) combined with `num_records`, seed sampling/selection controls, `on_success_version`, DD version, and the upstream stage fingerprint - `num_records` requested vs actual per stage - Which stage's output seeded the next -- Resolved seed path per stage, including callback output paths returned by `after` +- Resolved seed path per stage, including callback output paths returned by `on_success` - Timestamp, duration, and DD version per stage #### Composability and the throttle invariant -The `Pipeline` is constructed via `dd.pipeline(name=...)` and holds a reference to the parent `DataDesigner`. Every stage runs `dd.create()` (or `dd.acreate()` once available - see Engine API surface below) on that same instance. This is a load-bearing API contract for two reasons. +The `CompositeWorkflow` is constructed via `DataDesigner.compose_workflow(name=...)` and holds a reference to that parent `DataDesigner`. Every stage runs `data_designer.create()` (or `data_designer.acreate()` once available - see Engine API surface below) on that same instance. This is a load-bearing API contract for two reasons. -**Throttle coordination across stages.** A `DataDesigner` owns one `ModelRegistry`, which owns one `ThrottleManager`. AIMD rate-limit state is per-instance. If the pipeline constructed a fresh `DataDesigner` per stage, each stage would adapt independently and the aggregate request rate against a provider could exceed the configured cap by a multiple of the stage count. The same hazard applies to parallel branches in Phase 4: branches sharing one `DataDesigner` automatically share throttling; branches each holding their own `DataDesigner` silently fragment it. Reusing one instance is the simple, correct default. +**Throttle coordination across stages.** A `DataDesigner` owns one `ModelRegistry`, which owns one `ThrottleManager`. AIMD rate-limit state is per-instance. If the workflow constructed a fresh `DataDesigner` per stage, each stage would adapt independently and the aggregate request rate against a provider could exceed the configured cap by a multiple of the stage count. The same hazard applies to parallel branches in Phase 4: branches sharing one `DataDesigner` automatically share throttling; branches each holding their own `DataDesigner` silently fragment it. Reusing one instance is the simple, correct default. -**Door open for external orchestration.** The pipeline's choice to reuse one `DataDesigner` is the in-process strategy: shared throttling across stages, branches gathered in the orchestrator process. A cross-process strategy is a separate but compatible model - see Future considerations. v1 only needs to avoid encoding assumptions that would prevent it. +**Door open for external orchestration.** The workflow's choice to reuse one `DataDesigner` is the in-process strategy: shared throttling across stages, branches gathered in the orchestrator process. A cross-process strategy is a separate but compatible model - see Future considerations. v1 only needs to avoid encoding assumptions that would prevent it. -**On-disk handoffs for the same reason.** Stage handoffs go through parquet on disk via `LocalFileSeedSource`, never through an in-memory `DataFrameSeedSource`. This composes with any future orchestration model (in-process, cross-process, distributed) without per-environment branching. The cost is one parquet round-trip per stage boundary, which is negligible compared to LLM call time at any realistic scale. The notebook ergonomic `to_config_builder()` is the in-memory escape hatch and is explicitly not a Pipeline. +**On-disk handoffs for the same reason.** Stage handoffs go through parquet on disk via `LocalFileSeedSource`, never through an in-memory `DataFrameSeedSource`. This composes with any future orchestration model (in-process, cross-process, distributed) without per-environment branching. The cost is one parquet round-trip per stage boundary, which is negligible compared to LLM call time at any realistic scale. The notebook ergonomic `to_config_builder()` is the in-memory escape hatch and is explicitly not a `CompositeWorkflow`. -**Internal stage model is a graph, not a list.** v1 exposes a linear `add_stage()` API and runs stages sequentially. Internally the pipeline represents stages as a DAG with the linear case being the default chain. This lets Phase 4 add parallel branches as an additive API change without restructuring orchestration. +**Internal stage model is a graph, not a list.** v1 exposes a linear `add_stage()` API and runs stages sequentially. Internally the composite workflow represents stages as a DAG with the linear case being the default chain. This lets Phase 4 add parallel branches as an additive API change without restructuring orchestration. #### Engine API surface: `acreate()` -`Pipeline` v1 calls `DataDesigner.create()` synchronously per stage and runs them in order. Sequential execution doesn't need an async API. *In-process* parallel execution does, and the engine doesn't expose one today. Cross-process orchestration is not the same problem: each worker runs sync `create()` in its own process and doesn't need an async surface. +`CompositeWorkflow` v1 calls `DataDesigner.create()` synchronously per stage and runs them in order. Sequential execution doesn't need an async API. *In-process* parallel execution does, and the engine doesn't expose one today. Cross-process orchestration is not the same problem: each worker runs sync `create()` in its own process and doesn't need an async surface. Adding `async def acreate(...)` on `DataDesigner` is a small, additive change. The underlying `_build_async` already runs on a singleton background event loop and submits work via a `concurrent.futures.Future`; `acreate()` bridges it into the caller's loop via `asyncio.wrap_future`. The sync `create()` becomes a one-line wrapper. No breaking changes. -`acreate()` enables two things without touching `Pipeline`: +`acreate()` enables two things without touching `CompositeWorkflow`: -- **Parallel-independent workflows.** Users can `asyncio.gather(dd.acreate(c1), dd.acreate(c2))` for unrelated configs and get coordinated throttling automatically through the shared `ThrottleManager`. -- **Pipeline DAG branches (Phase 4).** When the pipeline graduates to a DAG, parallel branches are a pure orchestration change - `asyncio.gather` over `acreate()` calls inside `pipeline.run()` - with no further engine work required. +- **Parallel-independent workflows.** Users can `asyncio.gather(data_designer.acreate(c1), data_designer.acreate(c2))` for unrelated configs and get coordinated throttling automatically through the shared `ThrottleManager`. +- **CompositeWorkflow DAG branches (Phase 4).** When the workflow accepts non-linear dependencies, parallel branches are a pure orchestration change - `asyncio.gather` over `acreate()` calls inside `workflow.run()` - with no further engine work required. `acreate()` is *not* part of chaining v1. It ships as its own small piece of work that can land before, alongside, or after Phase 1; the dependency only becomes hard for Phase 4. Listed as a sidecar under Implementation phases. ### Part 2: Remove `allow_resize` -With the pipeline in place, `allow_resize` is no longer needed as an engine-internal mechanism. Resize becomes a between-stage concern. +With composite workflows in place, `allow_resize` is no longer needed as an engine-internal mechanism. Resize becomes a between-stage concern. **Config changes** (`data-designer-config`): @@ -227,23 +231,23 @@ With the pipeline in place, `allow_resize` is no longer needed as an engine-inte - Remove `_resolve_async_compatibility()` and the sync-fallback branch in `_build_async()` (no longer needed - nothing to fall back for). - Simplify `_run_full_column_generator()` to always enforce row-count invariance. -**Migration path**: Users with `allow_resize=True` columns split their config into a pipeline with a stage boundary at the resize column. The resize column becomes the last column of its stage, and downstream columns move to the next stage. +**Migration path**: Users with `allow_resize=True` columns split their config into a composite workflow with a stage boundary at the resize column. The resize column becomes the last column of its stage, and downstream columns move to the next stage. ### Part 3: Fail-fast on pre-batch processor resize In `ProcessorRunner.run_pre_batch()` and `run_pre_batch_on_df()`, raise `DatasetProcessingError` if the returned DataFrame has a different row count than the input. -This applies to both sync and async paths. Users who need to filter or expand between seeds and generation use the pipeline's between-stage callback instead. Note that a seed reader plugin is NOT an equivalent escape hatch: seed readers run before any columns are generated (including samplers), so they can't filter on generated column values. +This applies to both sync and async paths. Users who need to filter or expand between seeds and generation use the composite workflow's between-stage callback instead. Note that a seed reader plugin is NOT an equivalent escape hatch: seed readers run before any columns are generated (including samplers), so they can't filter on generated column values. ### Where it fits in the architecture | Layer | Changes | |-------|---------| -| `data-designer-config` | Remove `allow_resize` field. No new config models needed for v1 (pipeline is imperative, not declarative). | +| `data-designer-config` | Remove `allow_resize` field. No new config models needed for v1 (composite workflow is imperative, not declarative). | | `data-designer-engine` | Remove resize code paths. Add fail-fast guard in `ProcessorRunner`. No new engine features. | -| `data-designer` (interface) | New `Pipeline` class. Thin orchestration: holds a reference to the parent `DataDesigner`, calls `DataDesigner.create()` per stage, hands off between stages on disk via `LocalFileSeedSource`. All stages share the same `ModelRegistry` and `ThrottleManager`. Optionally consumes `DataDesigner.acreate()` (sidecar) once available, for Phase 4 parallel branches. | +| `data-designer` (interface) | New `CompositeWorkflow` class. Thin orchestration: holds a reference to the parent `DataDesigner`, calls `DataDesigner.create()` per stage, hands off between stages on disk via `LocalFileSeedSource`. All stages share the same `ModelRegistry` and `ThrottleManager`. Optionally consumes `DataDesigner.acreate()` (sidecar) once available, for Phase 4 parallel branches. | -The engine does not know about pipelines. Each stage is a regular `DatasetBuilder.build()` call. +The engine does not know about composite workflows. Each stage is a regular `DatasetBuilder.build()` call. ## Use cases for implementation and testing @@ -269,10 +273,11 @@ config_convos = ( .add_column(name="conversation", column_type="llm_text", prompt="Write a conversation between {{ name }} and an assistant about {{ topic }}...") ) -pipeline = dd.pipeline(name="persona-conversations") -pipeline.add_stage("personas", config_personas, num_records=100) -pipeline.add_stage("conversations", config_convos, num_records=1000) -results = pipeline.run() +data_designer = DataDesigner() +workflow = data_designer.compose_workflow(name="persona-conversations") +workflow.add_stage("personas", config_personas, num_records=100) +workflow.add_stage("conversations", config_convos, num_records=1000) +results = workflow.run() ``` ### 2. Filter-then-enrich @@ -286,15 +291,21 @@ config_enrich = ... # adds detailed analysis columns def keep_high_quality(stage_output_path: Path) -> Path: df = pd.read_parquet(stage_output_path / "parquet-files") df = df[df["quality_score"] > 0.8] - out = stage_output_path.parent / "filtered" - out.mkdir(exist_ok=True) + out = stage_output_path / "callback-outputs" / "quality-filter-v1" + out.mkdir(parents=True, exist_ok=True) df.to_parquet(out / "data.parquet") return out -pipeline = dd.pipeline(name="filter-enrich") -pipeline.add_stage("candidates", config_gen, num_records=5000) -pipeline.add_stage("enriched", config_enrich, after=keep_high_quality, after_version="quality-filter-v1") -results = pipeline.run() +workflow = data_designer.compose_workflow(name="filter-enrich") +workflow.add_stage( + "candidates", + config_gen, + num_records=5000, + on_success=keep_high_quality, + on_success_version="quality-filter-v1", +) +workflow.add_stage("enriched", config_enrich) +results = workflow.run() ``` ### 3. Generate-then-judge with different models @@ -308,24 +319,24 @@ config_gen = DataDesignerConfigBuilder(model_configs=[fast_model])... # Stage 2: judge with a stronger model config_judge = DataDesignerConfigBuilder(model_configs=[strong_model])... -pipeline = dd.pipeline(name="gen-judge") -pipeline.add_stage("generated", config_gen, num_records=1000) -pipeline.add_stage("judged", config_judge) -results = pipeline.run() +workflow = data_designer.compose_workflow(name="gen-judge") +workflow.add_stage("generated", config_gen, num_records=1000) +workflow.add_stage("judged", config_judge) +results = workflow.run() # Later: tweak judging config, resume from stage 1 output -pipeline_v2 = dd.pipeline(name="gen-judge") -pipeline_v2.add_stage("generated", config_gen, num_records=1000) -pipeline_v2.add_stage("judged", config_judge_v2) -results_v2 = pipeline_v2.run(resume=ResumeMode.IF_POSSIBLE) # skips stage 1 +workflow_v2 = data_designer.compose_workflow(name="gen-judge") +workflow_v2.add_stage("generated", config_gen, num_records=1000) +workflow_v2.add_stage("judged", config_judge_v2) +results_v2 = workflow_v2.run(resume=ResumeMode.IF_POSSIBLE) # skips stage 1 ``` -### 4. Interactive notebook chaining (lightweight, no pipeline) +### 4. Interactive notebook chaining (lightweight, no composite workflow) Quick iteration using `to_config_builder()`: ```python -result = dd.create(config_personas, num_records=50) +result = data_designer.create(config_personas, num_records=50) result.load_dataset() # inspect, looks good # Chain into next step @@ -333,19 +344,19 @@ config_2 = ( result.to_config_builder(columns=["name", "background"]) .add_column(name="question", column_type="llm_text", prompt="...") ) -result_2 = dd.create(config_2, num_records=200) # explode: 50 -> 200 +result_2 = data_designer.create(config_2, num_records=200) # explode: 50 -> 200 ``` ## Implementation phases -### Phase 1: Pipeline class and `to_config_builder()` (can ship independently) +### Phase 1: `CompositeWorkflow` class and `to_config_builder()` (can ship independently) - Add `to_config_builder()` on `DatasetCreationResults` and `PreviewResults`. -- Add `Pipeline` class with `add_stage()`, `run()`, between-stage callbacks. Pipeline holds a reference to the parent `DataDesigner` and reuses it across stages. -- Stage handoff is always on disk via `LocalFileSeedSource`; no in-memory handoff path inside `Pipeline`. +- Add `CompositeWorkflow` class with `add_stage()`, `run()`, and `on_success` callbacks. `CompositeWorkflow` holds a reference to the parent `DataDesigner` and reuses it across stages. +- Stage handoff is always on disk via `LocalFileSeedSource`; no in-memory handoff path inside `CompositeWorkflow`. - Internal stage representation is a DAG (linear-only inputs in v1). -- Add `pipeline-metadata.json` writing. -- Add `dd.pipeline(name: str)` factory method on `DataDesigner`. +- Add `workflow-metadata.json` writing. +- Add `compose_workflow(name: str)` factory method on `DataDesigner`. - Tests: multi-stage runs, explode/filter via callbacks, num_records defaulting, artifact layout, throttle reuse across stages. ### Sidecar: `acreate()` on `DataDesigner` (independent of chaining v1) @@ -358,7 +369,7 @@ result_2 = dd.create(config_2, num_records=200) # explode: 50 -> 200 ### Phase 2: Remove `allow_resize` - (Done in #553) `allow_resize=True` in async mode emits a `DeprecationWarning` and falls back to sync. -- Update docs that still reference `allow_resize` (`docs/concepts/custom_columns.md`, `docs/plugins/example.md`, `docs/concepts/agent-rollout-ingestion.md`) to point at pipelines. +- Update docs that still reference `allow_resize` (`docs/concepts/custom_columns.md`, `docs/plugins/example.md`, `docs/concepts/agent-rollout-ingestion.md`) to point at composite workflows. - Remove resize code from sync engine (`_cell_resize_mode`, `_finalize_fan_out` resize branch, `replace_buffer` `allow_resize` param). - Remove `_resolve_async_compatibility()` and its sync-fallback branch from `_build_async()`. - Remove the `allow_resize` field from the config schema. @@ -367,65 +378,70 @@ result_2 = dd.create(config_2, num_records=200) # explode: 50 -> 200 ### Phase 3: Stage-level resume -- Add `resume: ResumeMode` to `pipeline.run()`, reusing the enum introduced by #526. -- Read `pipeline-metadata.json` to detect completed stages. -- Resolve the metadata path from the explicit pipeline name. -- Compute each stage's fingerprint via `DataDesignerConfig.fingerprint()` (#587) combined with `num_records`, seed sampling/selection controls, `after_version`, DD version, and upstream stage fingerprint; invalidate the stage and everything downstream on any mismatch. +- Add `resume: ResumeMode` to `workflow.run()`, reusing the enum introduced by #526. +- Read `workflow-metadata.json` to detect completed stages. +- Resolve the metadata path from the explicit workflow name. +- Compute each stage's fingerprint via `DataDesignerConfig.fingerprint()` (#587) combined with `num_records`, seed sampling/selection controls, `on_success_version`, DD version, and upstream stage fingerprint; invalidate the stage and everything downstream on any mismatch. - Skip stages whose fingerprints match and are complete; for matching partial stages, call `DataDesigner.create(..., resume=ResumeMode.ALWAYS)` to use #526's batch/row-group resume. - Preserve `completed_empty` and `skipped_empty_upstream` statuses on resume when fingerprints still match; rerun from the first changed or missing upstream stage otherwise. -- Before skipping or resuming a stage seeded by `after`, validate the recorded callback output path exists and can seed `LocalFileSeedSource`; if missing, invalidate that stage and descendants. -- For invalidated stages, clear or replace the deterministic stage directory before starting fresh so `ArtifactStorage` does not timestamp away from the pipeline layout. +- Before skipping or resuming a stage seeded by `on_success`, validate the recorded callback output path exists and can seed `LocalFileSeedSource`; if missing, invalidate that stage and descendants. +- For invalidated stages, clear or replace the deterministic stage directory before starting fresh so `ArtifactStorage` does not timestamp away from the workflow layout. - Depends on artifact layout from phase 1. ### Phase 4: DAG-shaped stages with parallel branches - Extend `add_stage()` with an optional `depends_on=[stage_name, ...]` argument; default keeps the linear behavior. -- `pipeline.run()` walks the resulting DAG, gathering independent branches via `asyncio.gather` over `dd.acreate()` calls. +- `workflow.run()` walks the resulting DAG, gathering independent branches via `asyncio.gather` over `data_designer.acreate()` calls. - Per-stage fingerprint composition (Phase 3) generalizes naturally: a stage's upstream fingerprint becomes the hash of all parent fingerprints sorted by stage name, making joins stable regardless of `depends_on` declaration order. +- Join data contract is not automatic dataset fan-in. A multi-parent child still needs one concrete seed dataset; Phase 4 can require a single declared seed parent or an explicit merge stage/callback that writes one `LocalFileSeedSource`-compatible dataset. First-class multi-parent callbacks keyed by parent name are future work. - Throttle coordination relies on the existing invariant: all branches run on the same parent `DataDesigner`, so `ThrottleManager` is shared. - Hard dependency on the `acreate()` sidecar. - **Scope: branch parallelism, not stage pipelining.** Stages still wait for their dependencies to fully complete before starting; pipelined execution of dependent stages is a separate direction sketched in Future considerations. - Tests: fan-out (one upstream, multiple parallel children); join (multiple upstreams, one child); resume invalidation when one branch's fingerprint changes; throttle behavior under N parallel branches. -### Phase 5 (future): Auto-chaining from single config - -- Detect stage boundaries in the DAG (via a new config marker or heuristic). -- Auto-split into pipeline stages internally. -- User sees a single `dd.create(config)` call but gets multi-stage execution. - ## Future considerations -Items not on the current roadmap but worth flagging so they don't get accidentally precluded by v1-v5 design choices. +Items not on the current roadmap but worth flagging so they don't get accidentally precluded by the current design choices. -**External orchestration for cross-process / distributed execution.** There is interest in eventually running DataDesigner workloads across processes or nodes - self-hosted serving, multi-host fan-out, scheduling against external clusters. The specific shape of that orchestration is still under discussion and is not committed to here. The chaining plan's design choices (parent `DataDesigner` reuse, on-disk handoffs, no new engine surface) compose naturally with such a system: an external orchestrator could dispatch independent `DataDesigner.create()` calls against partitioned slices and per-replica endpoints without the pipeline class needing to change. v1-v5 do not depend on this materializing. +**External orchestration for cross-process / distributed execution.** There is interest in eventually running DataDesigner workloads across processes or nodes - self-hosted serving, multi-host fan-out, scheduling against external clusters. The specific shape of that orchestration is still under discussion and is not committed to here. The chaining plan's design choices (parent `DataDesigner` reuse, on-disk handoffs, no new engine surface) compose naturally with such a system: an external orchestrator could dispatch independent `DataDesigner.create()` calls against partitioned slices and per-replica endpoints without the `CompositeWorkflow` class needing to change. Phases 1-4 do not depend on this materializing. **Pipelined execution of dependent stages.** Today the stage data contract is "final dataset" - a downstream stage waits for its upstream to fully complete. A future direction is to let downstream stages consume upstream batches as they're produced, overlapping execution across the dependency edge. Required changes: streaming seed sources, an explicit "stage done" sentinel rather than file-completion checks, and resume semantics for partially-consumed upstreams. Most useful when stage bottlenecks are heterogeneous (LLM-bound stage feeding a CPU-bound validator); little gain when both stages are LLM-bound since they share provider capacity. Not designed here; flagged so the stage contract isn't quietly closed off. +**Stage-level run config and compute placement.** v1 stages can have different `DataDesignerConfig` values, including different model configs, but reuse the parent `DataDesigner` and run settings for shared throttle coordination. A future extension can allow per-stage `RunConfig`, provider, or compute placement, but it needs explicit rules for throttling, artifact ownership, and cross-process resume. + +**Failure hooks.** v1 should raise on failure by default. A future `on_failure` callback could support cleanup, custom recovery, or explicit handling of all-rows-filtered cases, but it should not obscure the default failure semantics. + +**First-class artifact seeding.** v1 can bridge named processor outputs or schema-transform artifacts through `on_success` callbacks. A later API could support `seed_from_artifact(...)` or named processor output references directly. + +**Auto-composition from a single config.** Auto-detecting stage boundaries in one config is possible, but likely overkill for the initial roadmap. If it returns later, it should be a convenience layer on top of `CompositeWorkflow`, not a separate execution model. + ## Resolved decisions These were open in earlier drafts; recording the resolutions here so the design is unambiguous. -1. **In-memory vs on-disk handoff between stages** -> Always on-disk inside `Pipeline`. The in-memory `DataFrameSeedSource` mode is reserved for the lightweight `to_config_builder()` notebook ergonomic, which is explicitly *not* a `Pipeline`. Reasons: single execution model, simpler resume story, and composability with any future external orchestration that can't share an in-memory DataFrame across process boundaries. Cost is one parquet round-trip per stage, negligible relative to LLM call time. +1. **Naming** -> The public abstraction is `CompositeWorkflow`, created through `data_designer.compose_workflow(name=...)`. This makes composition of multiple independent workflows explicit without implying a product-level pipeline builder or a strictly linear linked list. -2. **Branch/fan-out semantics (DAG)** -> Designed-in but not v1. The internal stage representation is a DAG; v1 only accepts linear inputs through `add_stage()`. Phase 4 ships parallel branches via `asyncio.gather` over `acreate()`. v1 stays sequential. +2. **In-memory vs on-disk handoff between stages** -> Always on-disk inside `CompositeWorkflow`. The in-memory `DataFrameSeedSource` mode is reserved for the lightweight `to_config_builder()` notebook ergonomic, which is explicitly *not* a `CompositeWorkflow`. Reasons: single execution model, simpler resume story, and composability with any future external orchestration that can't share an in-memory DataFrame across process boundaries. Cost is one parquet round-trip per stage, negligible relative to LLM call time. -3. **Pipeline construction** -> `Pipeline` is created via `dd.pipeline(name=...)` and reuses the parent `DataDesigner`'s `ModelRegistry` and `ThrottleManager` across all stages. The explicit name is the durable artifact identity used for resume, and the pipeline does not construct its own `DataDesigner` instances. This is the throttle-coordination invariant (see Composability section). +3. **Branch/fan-out semantics (DAG)** -> Designed-in but not v1. The internal stage representation is a DAG; v1 only accepts linear inputs through `add_stage()`. Phase 4 ships parallel branches via `asyncio.gather` over `acreate()`. v1 stays sequential. -## Open questions +4. **CompositeWorkflow construction** -> `CompositeWorkflow` is created via `data_designer.compose_workflow(name=...)` and reuses the parent `DataDesigner`'s `ModelRegistry` and `ThrottleManager` across all stages. The explicit name is the durable artifact identity used for resume, and the workflow does not construct its own `DataDesigner` instances. This is the throttle-coordination invariant (see Composability section). + +5. **Downstream seeding scope** -> v1 seeds downstream stages from the upstream stage's final dataset only. Named processor outputs, dropped columns, schema-transform artifacts, and media forwarding require an `on_success` callback in v1 and can become first-class APIs later. -1. **Preview support**: Should `pipeline.preview()` run all stages with small `num_records`? Or just preview the last stage seeded from a prior full run? +6. **Export and push semantics** -> v1 export/push defaults to the final stage's dataset only. Exporting selected stages or a full composite workflow artifact bundle is future work. -2. **Config serialization**: For persistence, pipeline configs would need symbolic stage references ("seed from stage X's output"). With the on-disk handoff decision above, the `DataFrameSeedSource` blocker is no longer relevant; the remaining question is how to encode stage dependencies in YAML. Needed for auto-chaining (Phase 5) but not for the explicit API (phases 1-4). +## Open questions -3. **Naming**: `Pipeline` vs `Chain` vs `WorkflowChain`. `Pipeline` is the most intuitive and aligns with ML pipeline terminology. +1. **Preview support**: Should `workflow.preview()` run all stages with small `num_records`? Or just preview the last stage seeded from a prior full run? -4. **Image/media column forwarding**: Images in create mode are stored as relative file paths. If a downstream stage seeds from an upstream stage that produced images, the relative paths break. Options: (a) resolve to absolute paths at stage boundary, (b) copy media assets into downstream stage's directory, (c) document as unsupported in v1. +2. **Config serialization**: For persistence, composite workflow configs would need symbolic stage references ("seed from stage X's output"). With the on-disk handoff decision above, the `DataFrameSeedSource` blocker is no longer relevant; the remaining question is how to encode stage dependencies in YAML. Needed for future declarative workflows or auto-composition, but not for the explicit API in phases 1-4. -5. **Downstream seeding scope**: Should downstream stages only seed from the final dataset, or should they also be able to access dropped columns or named processor outputs from upstream stages? +3. **Image/media column forwarding**: Images in create mode are stored as relative file paths. If a downstream stage seeds from an upstream stage that produced images, the relative paths break. v1 can bridge this with an `on_success` callback or expression columns that rewrite paths relative to the upstream stage's artifact directory. First-class options remain: (a) resolve to absolute paths at stage boundary, (b) copy media assets into downstream stage's directory, (c) document as unsupported in v1. ## Related issues -- #447 - AsyncRunController refactor (partially superseded: pre-batch resize handling moves to pipeline level instead of controller level) -- #526 / #525 - Resume interrupted runs (single-stage batch/row-group resume primitive used by pipeline stage resume) +- #447 - AsyncRunController refactor (partially superseded: pre-batch resize handling moves to composite workflow level instead of controller level) +- #526 / #525 - Resume interrupted runs (single-stage batch/row-group resume primitive used by composite workflow stage resume) - #462 - Progress bar and scheduler polish (independent) - #464 - Custom column retryable errors (independent) From 75dce5e7ac6ec2739f7efef5a30254d2129e492b Mon Sep 17 00:00:00 2001 From: Andre Manoel Date: Tue, 12 May 2026 01:05:26 +0000 Subject: [PATCH 13/13] docs: require unique composite workflow stages --- plans/workflow-chaining/workflow-chaining.md | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/plans/workflow-chaining/workflow-chaining.md b/plans/workflow-chaining/workflow-chaining.md index 9427ec06c..95a4468ec 100644 --- a/plans/workflow-chaining/workflow-chaining.md +++ b/plans/workflow-chaining/workflow-chaining.md @@ -65,6 +65,8 @@ results["judged"].load_dataset() # final output Each stage can use a different `DataDesignerConfig`, including different model configs. v1 still uses the parent `DataDesigner` instance and run settings for all stages; per-stage `RunConfig` and compute placement are future work. +Stage names must be unique within a `CompositeWorkflow`. `add_stage()` validates this immediately and raises `DataDesignerWorkflowError` on duplicates before any artifact directory is created. This keeps `CompositeWorkflowResults`, `workflow-metadata.json`, and Phase 3 fingerprint lookup unambiguous. + **Convenience method on results (lightweight, for notebooks):** For interactive use where a full composite workflow is overkill, a `to_config_builder()` method on `DatasetCreationResults` returns a pre-seeded `DataDesignerConfigBuilder`: @@ -357,7 +359,7 @@ result_2 = data_designer.create(config_2, num_records=200) # explode: 50 -> 200 - Internal stage representation is a DAG (linear-only inputs in v1). - Add `workflow-metadata.json` writing. - Add `compose_workflow(name: str)` factory method on `DataDesigner`. -- Tests: multi-stage runs, explode/filter via callbacks, num_records defaulting, artifact layout, throttle reuse across stages. +- Tests: multi-stage runs, explode/filter via callbacks, num_records defaulting, duplicate stage-name rejection, artifact layout, throttle reuse across stages. ### Sidecar: `acreate()` on `DataDesigner` (independent of chaining v1) @@ -396,7 +398,7 @@ result_2 = data_designer.create(config_2, num_records=200) # explode: 50 -> 200 - Join data contract is not automatic dataset fan-in. A multi-parent child still needs one concrete seed dataset; Phase 4 can require a single declared seed parent or an explicit merge stage/callback that writes one `LocalFileSeedSource`-compatible dataset. First-class multi-parent callbacks keyed by parent name are future work. - Throttle coordination relies on the existing invariant: all branches run on the same parent `DataDesigner`, so `ThrottleManager` is shared. - Hard dependency on the `acreate()` sidecar. -- **Scope: branch parallelism, not stage pipelining.** Stages still wait for their dependencies to fully complete before starting; pipelined execution of dependent stages is a separate direction sketched in Future considerations. +- **Scope: branch parallelism, not streaming dependent-stage execution.** Stages still wait for their dependencies to fully complete before starting; overlapped execution across a dependency edge is a separate direction sketched in Future considerations. - Tests: fan-out (one upstream, multiple parallel children); join (multiple upstreams, one child); resume invalidation when one branch's fingerprint changes; throttle behavior under N parallel branches. ## Future considerations @@ -405,7 +407,7 @@ Items not on the current roadmap but worth flagging so they don't get accidental **External orchestration for cross-process / distributed execution.** There is interest in eventually running DataDesigner workloads across processes or nodes - self-hosted serving, multi-host fan-out, scheduling against external clusters. The specific shape of that orchestration is still under discussion and is not committed to here. The chaining plan's design choices (parent `DataDesigner` reuse, on-disk handoffs, no new engine surface) compose naturally with such a system: an external orchestrator could dispatch independent `DataDesigner.create()` calls against partitioned slices and per-replica endpoints without the `CompositeWorkflow` class needing to change. Phases 1-4 do not depend on this materializing. -**Pipelined execution of dependent stages.** Today the stage data contract is "final dataset" - a downstream stage waits for its upstream to fully complete. A future direction is to let downstream stages consume upstream batches as they're produced, overlapping execution across the dependency edge. Required changes: streaming seed sources, an explicit "stage done" sentinel rather than file-completion checks, and resume semantics for partially-consumed upstreams. Most useful when stage bottlenecks are heterogeneous (LLM-bound stage feeding a CPU-bound validator); little gain when both stages are LLM-bound since they share provider capacity. Not designed here; flagged so the stage contract isn't quietly closed off. +**Streaming dependent-stage execution.** Today the stage data contract is "final dataset" - a downstream stage waits for its upstream to fully complete. A future direction is to let downstream stages consume upstream batches as they're produced, overlapping execution across the dependency edge. Required changes: streaming seed sources, an explicit "stage done" sentinel rather than file-completion checks, and resume semantics for partially-consumed upstreams. Most useful when stage bottlenecks are heterogeneous (LLM-bound stage feeding a CPU-bound validator); little gain when both stages are LLM-bound since they share provider capacity. Not designed here; flagged so the stage contract isn't quietly closed off. **Stage-level run config and compute placement.** v1 stages can have different `DataDesignerConfig` values, including different model configs, but reuse the parent `DataDesigner` and run settings for shared throttle coordination. A future extension can allow per-stage `RunConfig`, provider, or compute placement, but it needs explicit rules for throttling, artifact ownership, and cross-process resume. @@ -419,7 +421,7 @@ Items not on the current roadmap but worth flagging so they don't get accidental These were open in earlier drafts; recording the resolutions here so the design is unambiguous. -1. **Naming** -> The public abstraction is `CompositeWorkflow`, created through `data_designer.compose_workflow(name=...)`. This makes composition of multiple independent workflows explicit without implying a product-level pipeline builder or a strictly linear linked list. +1. **Naming** -> The public abstraction is `CompositeWorkflow`, created through `data_designer.compose_workflow(name=...)`. This makes composition of multiple independent workflows explicit without implying a broader builder abstraction or a strictly linear linked list. 2. **In-memory vs on-disk handoff between stages** -> Always on-disk inside `CompositeWorkflow`. The in-memory `DataFrameSeedSource` mode is reserved for the lightweight `to_config_builder()` notebook ergonomic, which is explicitly *not* a `CompositeWorkflow`. Reasons: single execution model, simpler resume story, and composability with any future external orchestration that can't share an in-memory DataFrame across process boundaries. Cost is one parquet round-trip per stage, negligible relative to LLM call time.