Skip to content

Commit 6055290

Browse files
feat: add workflow chaining (#636)
* feat: add workflow chaining * test: tidy workflow chaining coverage * fix: harden workflow chaining concurrency * docs: update workflow chaining plan * feat: add workflow stage postprocessors * feat: expose workflow stage outputs * fix: align workflow selected output export * fix: address workflow chaining review issues * fix: align workflow parquet export selection Signed-off-by: Andre Manoel <amanoel@nvidia.com> * fix: preserve generated columns in drop validation Signed-off-by: Andre Manoel <amanoel@nvidia.com> * fix: clarify workflow output processors Signed-off-by: Andre Manoel <amanoel@nvidia.com> * docs: add workflow chaining page * docs: align workflow chaining warning * fix: address workflow review nits --------- Signed-off-by: Andre Manoel <amanoel@nvidia.com>
1 parent 7199762 commit 6055290

22 files changed

Lines changed: 2239 additions & 25 deletions

File tree

docs/concepts/workflow-chaining.md

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
# Workflow Chaining
2+
3+
!!! warning "Experimental Feature"
4+
Workflow chaining is currently **experimental** and under active development. The documentation, examples, workflow API, metadata schema, and artifact layout are subject to significant changes in future releases. If you encounter any issues, have questions, or have ideas for improvement, please consider starting [a discussion on GitHub](https://github.com/NVIDIA-NeMo/DataDesigner/discussions).
5+
6+
Workflow chaining lets you split a dataset build into named stages. Each stage runs a normal `DataDesigner.create()` call, writes its own artifact directory, and hands a selected parquet output to the next stage as a `LocalFileSeedSource`.
7+
8+
Use it when one generation step naturally depends on the cleaned or reshaped output of another step, especially when a processor-only stage is clearer than mixing all transformations into one config.
9+
10+
## Basic shape
11+
12+
```python
13+
import data_designer.config as dd
14+
from data_designer.interface import DataDesigner
15+
16+
data_designer = DataDesigner()
17+
18+
drafts = (
19+
dd.DataDesignerConfigBuilder(model_configs=[fast_model])
20+
.with_seed_dataset(dd.LocalFileSeedSource(path="parsed_docs/*.parquet"))
21+
.add_column(
22+
name="chunk_summary",
23+
column_type="llm_text",
24+
model_alias="fast",
25+
prompt="Summarize this passage:\n\n{{ text }}",
26+
)
27+
.add_column(
28+
name="question",
29+
column_type="llm_text",
30+
model_alias="fast",
31+
prompt="Write a question about this passage:\n\n{{ chunk_summary }}",
32+
)
33+
.add_column(
34+
name="answer",
35+
column_type="llm_text",
36+
model_alias="fast",
37+
prompt="Answer {{ question }} using this passage:\n\n{{ text }}",
38+
)
39+
)
40+
41+
chatml = dd.DataDesignerConfigBuilder().add_processor(
42+
dd.SchemaTransformProcessorConfig(
43+
name="chatml",
44+
template={
45+
"messages": [
46+
{"role": "user", "content": "{{ question }}"},
47+
{"role": "assistant", "content": "{{ answer }}"},
48+
],
49+
},
50+
)
51+
)
52+
53+
workflow = data_designer.compose_workflow(name="doc-qa")
54+
workflow.add_stage(
55+
"drafts",
56+
drafts,
57+
num_records=100,
58+
output_processors=[
59+
dd.DropColumnsProcessorConfig(
60+
name="drop_scratch",
61+
column_names=["text", "chunk_summary"],
62+
)
63+
],
64+
)
65+
workflow.add_stage("chatml", chatml, output="processor:chatml")
66+
67+
results = workflow.run()
68+
training_rows = results.load_dataset()
69+
results.export("chatml.jsonl")
70+
```
71+
72+
## Stage outputs
73+
74+
A stage can expose different views of its data:
75+
76+
| Surface | What it returns |
77+
|---------|-----------------|
78+
| `results["stage_name"]` | The effective `DatasetCreationResults` for that stage. If the stage uses `output_processors`, this points at the output-processor run. |
79+
| `results.load_stage_output("stage_name")` | The selected output handed to downstream stages. This follows `output="processor:<name>"` and `on_success`. |
80+
| `results.load_dataset()` | The selected output from the final stage. |
81+
82+
Processors added with `config_builder.add_processor(...)` run inside the stage and usually create side artifacts. They do not automatically change what the next stage receives. Use `output_processors=[...]` when a processor should define the stage boundary output.
83+
84+
## Processor-only stages
85+
86+
Stages can be processor-only when they receive seed data from an upstream stage:
87+
88+
```python
89+
cleanup = dd.DataDesignerConfigBuilder().add_processor(
90+
dd.DropColumnsProcessorConfig(
91+
name="drop_private_fields",
92+
column_names=["email", "raw_notes"],
93+
)
94+
)
95+
96+
workflow.add_stage("cleanup", cleanup)
97+
```
98+
99+
This is useful for final cleanup, schema transforms, and format-specific export preparation.
100+
101+
## Current limits
102+
103+
- Stages are linear. DAGs, parallel branches, and joins are planned separately.
104+
- Stage-level resume is not implemented yet.
105+
- `push_to_hub()` does not support selected processor or callback outputs yet. Use `export()` for the selected workflow output.
106+
- `on_success` callbacks are trusted user code. If a callback returns a path, Data Designer reads that path as the next stage input.
107+
- The artifact layout is intended for inspection, but it is not yet a stable public contract.

fern/versions/latest.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ navigation:
3535
path: ./latest/pages/concepts/validators.mdx
3636
- page: Processors
3737
path: ./latest/pages/concepts/processors.mdx
38+
- page: Workflow Chaining
39+
path: ./latest/pages/concepts/workflow-chaining.mdx
3840
- page: Person Sampling
3941
path: ./latest/pages/concepts/person_sampling.mdx
4042
- page: Traces
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
---
2+
title: "Workflow Chaining"
3+
description: ""
4+
position: 8
5+
---
6+
<Warning title="Experimental Feature">
7+
Workflow chaining is currently **experimental** and under active development. The documentation, examples, workflow API, metadata schema, and artifact layout are subject to significant changes in future releases. If you encounter any issues, have questions, or have ideas for improvement, please consider starting [a discussion on GitHub](https://github.com/NVIDIA-NeMo/DataDesigner/discussions).
8+
</Warning>
9+
10+
Workflow chaining lets you split a dataset build into named stages. Each stage runs a normal `DataDesigner.create()` call, writes its own artifact directory, and hands a selected parquet output to the next stage as a `LocalFileSeedSource`.
11+
12+
Use it when one generation step naturally depends on the cleaned or reshaped output of another step, especially when a processor-only stage is clearer than mixing all transformations into one config.
13+
14+
## Basic shape
15+
16+
```python
17+
import data_designer.config as dd
18+
from data_designer.interface import DataDesigner
19+
20+
data_designer = DataDesigner()
21+
22+
drafts = (
23+
dd.DataDesignerConfigBuilder(model_configs=[fast_model])
24+
.with_seed_dataset(dd.LocalFileSeedSource(path="parsed_docs/*.parquet"))
25+
.add_column(
26+
name="chunk_summary",
27+
column_type="llm_text",
28+
model_alias="fast",
29+
prompt="Summarize this passage:\n\n{{ text }}",
30+
)
31+
.add_column(
32+
name="question",
33+
column_type="llm_text",
34+
model_alias="fast",
35+
prompt="Write a question about this passage:\n\n{{ chunk_summary }}",
36+
)
37+
.add_column(
38+
name="answer",
39+
column_type="llm_text",
40+
model_alias="fast",
41+
prompt="Answer {{ question }} using this passage:\n\n{{ text }}",
42+
)
43+
)
44+
45+
chatml = dd.DataDesignerConfigBuilder().add_processor(
46+
dd.SchemaTransformProcessorConfig(
47+
name="chatml",
48+
template={
49+
"messages": [
50+
{"role": "user", "content": "{{ question }}"},
51+
{"role": "assistant", "content": "{{ answer }}"},
52+
],
53+
},
54+
)
55+
)
56+
57+
workflow = data_designer.compose_workflow(name="doc-qa")
58+
workflow.add_stage(
59+
"drafts",
60+
drafts,
61+
num_records=100,
62+
output_processors=[
63+
dd.DropColumnsProcessorConfig(
64+
name="drop_scratch",
65+
column_names=["text", "chunk_summary"],
66+
)
67+
],
68+
)
69+
workflow.add_stage("chatml", chatml, output="processor:chatml")
70+
71+
results = workflow.run()
72+
training_rows = results.load_dataset()
73+
results.export("chatml.jsonl")
74+
```
75+
76+
## Stage outputs
77+
78+
A stage can expose different views of its data:
79+
80+
| Surface | What it returns |
81+
|---------|-----------------|
82+
| `results["stage_name"]` | The effective `DatasetCreationResults` for that stage. If the stage uses `output_processors`, this points at the output-processor run. |
83+
| `results.load_stage_output("stage_name")` | The selected output handed to downstream stages. This follows `output="processor:<name>"` and `on_success`. |
84+
| `results.load_dataset()` | The selected output from the final stage. |
85+
86+
Processors added with `config_builder.add_processor(...)` run inside the stage and usually create side artifacts. They do not automatically change what the next stage receives. Use `output_processors=[...]` when a processor should define the stage boundary output.
87+
88+
## Processor-only stages
89+
90+
Stages can be processor-only when they receive seed data from an upstream stage:
91+
92+
```python
93+
cleanup = dd.DataDesignerConfigBuilder().add_processor(
94+
dd.DropColumnsProcessorConfig(
95+
name="drop_private_fields",
96+
column_names=["email", "raw_notes"],
97+
)
98+
)
99+
100+
workflow.add_stage("cleanup", cleanup)
101+
```
102+
103+
This is useful for final cleanup, schema transforms, and format-specific export preparation.
104+
105+
## Current limits
106+
107+
- Stages are linear. DAGs, parallel branches, and joins are planned separately.
108+
- Stage-level resume is not implemented yet.
109+
- `push_to_hub()` does not support selected processor or callback outputs yet. Use `export()` for the selected workflow output.
110+
- `on_success` callbacks are trusted user code. If a callback returns a path, Data Designer reads that path as the next stage input.
111+
- The artifact layout is intended for inspection, but it is not yet a stable public contract.

mkdocs.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ nav:
2020
- Custom Columns: concepts/custom_columns.md
2121
- Validators: concepts/validators.md
2222
- Processors: concepts/processors.md
23+
- Workflow Chaining: concepts/workflow-chaining.md
2324
- Person Sampling: concepts/person_sampling.md
2425
- Traces: concepts/traces.md
2526
- Tool Use & MCP:

packages/data-designer-config/src/data_designer/config/data_designer_config.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class DataDesignerConfig(ExportableConfigBase):
2828
2929
Attributes:
3030
columns: Required list of column configurations defining how each column
31-
should be generated. Must contain at least one column.
31+
should be generated. May be empty for seeded processor-only configs.
3232
model_configs: Optional list of model configurations for LLM-based generation.
3333
Each model config defines the model, provider, and inference parameters.
3434
tool_configs: Optional list of tool configurations for MCP tool calling.
@@ -39,7 +39,7 @@ class DataDesignerConfig(ExportableConfigBase):
3939
processors: Optional list of processor configurations for post-generation transformations.
4040
"""
4141

42-
columns: list[Annotated[ColumnConfigT, Field(discriminator="column_type")]] = Field(min_length=1)
42+
columns: list[Annotated[ColumnConfigT, Field(discriminator="column_type")]]
4343
model_configs: list[ModelConfig] | None = None
4444
tool_configs: list[ToolConfig] | None = None
4545
seed_config: SeedConfig | None = None

packages/data-designer-config/src/data_designer/config/preview_results.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from data_designer.config.analysis.dataset_profiler import DatasetProfilerResults
99
from data_designer.config.config_builder import DataDesignerConfigBuilder
1010
from data_designer.config.dataset_metadata import DatasetMetadata
11+
from data_designer.config.seed_source_dataframe import DataFrameSeedSource
1112
from data_designer.config.utils.visualization import WithRecordSamplerMixin
1213

1314
if TYPE_CHECKING:
@@ -41,3 +42,18 @@ def __init__(
4142
self.dataset_metadata: DatasetMetadata | None = dataset_metadata
4243
self.task_traces: list[Any] | None = task_traces
4344
self._config_builder = config_builder
45+
46+
def to_config_builder(self, columns: list[str] | None = None) -> DataDesignerConfigBuilder:
47+
"""Create a new config builder seeded from this preview dataset.
48+
49+
Copies the full preview dataset in memory; intended for interactive use.
50+
"""
51+
if self.dataset is None:
52+
raise ValueError("Preview dataset is not available.")
53+
df = self.dataset
54+
if columns is not None:
55+
df = df.loc[:, columns]
56+
return DataDesignerConfigBuilder(
57+
model_configs=self._config_builder.model_configs,
58+
tool_configs=self._config_builder.tool_configs,
59+
).with_seed_dataset(DataFrameSeedSource(df=df.copy()))

packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,10 @@ def __init__(
163163
def artifact_storage(self) -> ArtifactStorage:
164164
return self._resource_provider.artifact_storage
165165

166+
@property
167+
def data_designer_config(self) -> DataDesignerConfig:
168+
return self._data_designer_config
169+
166170
@property
167171
def processors(self) -> tuple[Processor, ...]:
168172
return self._processor_runner.processors

packages/data-designer-engine/src/data_designer/engine/models/factory.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
if TYPE_CHECKING:
1414
from data_designer.config.run_config import RunConfig
1515
from data_designer.engine.mcp.registry import MCPRegistry
16+
from data_designer.engine.models.clients.throttle_manager import ThrottleManager
1617
from data_designer.engine.models.registry import ModelRegistry
1718

1819

@@ -24,6 +25,7 @@ def create_model_registry(
2425
mcp_registry: MCPRegistry | None = None,
2526
client_concurrency_mode: ClientConcurrencyMode = ClientConcurrencyMode.SYNC,
2627
run_config: RunConfig | None = None,
28+
throttle_manager: ThrottleManager | None = None,
2729
) -> ModelRegistry:
2830
"""Factory function for creating a ModelRegistry instance.
2931
@@ -43,6 +45,8 @@ def create_model_registry(
4345
run_config: Optional runtime configuration. The nested
4446
``run_config.throttle`` (a ``ThrottleConfig``) is forwarded to the
4547
``ThrottleManager`` constructor.
48+
throttle_manager: Optional shared throttle manager. When omitted, a new
49+
manager is created for this registry.
4650
4751
Returns:
4852
A configured ModelRegistry instance.
@@ -54,7 +58,8 @@ def create_model_registry(
5458
from data_designer.engine.models.facade import ModelFacade
5559
from data_designer.engine.models.registry import ModelRegistry
5660

57-
throttle_manager = ThrottleManager((run_config or RunConfig()).throttle)
61+
if throttle_manager is None:
62+
throttle_manager = ThrottleManager((run_config or RunConfig()).throttle)
5863

5964
def model_facade_factory(
6065
model_config: ModelConfig,

packages/data-designer-engine/src/data_designer/engine/resources/resource_provider.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from __future__ import annotations
55

66
import os
7+
from typing import TYPE_CHECKING
78

89
from data_designer.config.base import ConfigBase
910
from data_designer.config.dataset_metadata import DatasetMetadata
@@ -26,6 +27,9 @@
2627
from data_designer.engine.secret_resolver import SecretResolver
2728
from data_designer.engine.storage.artifact_storage import ArtifactStorage
2829

30+
if TYPE_CHECKING:
31+
from data_designer.engine.models.clients.throttle_manager import ThrottleManager
32+
2933

3034
class ResourceType(StrEnum):
3135
PERSON_READER = "person_reader"
@@ -91,6 +95,7 @@ def create_resource_provider(
9195
mcp_providers: list[MCPProviderT] | None = None,
9296
tool_configs: list[ToolConfig] | None = None,
9397
client_concurrency_mode: ClientConcurrencyMode | None = None,
98+
throttle_manager: ThrottleManager | None = None,
9499
) -> ResourceProvider:
95100
"""Factory function for creating a ResourceProvider instance.
96101
@@ -111,6 +116,7 @@ def create_resource_provider(
111116
run_config: Optional runtime configuration.
112117
mcp_providers: Optional list of MCP provider configurations.
113118
tool_configs: Optional list of tool configurations.
119+
throttle_manager: Optional shared throttle manager for model clients.
114120
115121
Returns:
116122
A configured ResourceProvider instance.
@@ -158,6 +164,7 @@ def create_resource_provider(
158164
mcp_registry=mcp_registry,
159165
client_concurrency_mode=client_concurrency_mode,
160166
run_config=effective_run_config,
167+
throttle_manager=throttle_manager,
161168
),
162169
person_reader=person_reader,
163170
mcp_registry=mcp_registry,

packages/data-designer-engine/src/data_designer/engine/resources/seed_reader.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import logging
77
from abc import ABC, abstractmethod
88
from collections.abc import Callable, Iterable, Sequence
9+
from copy import copy
910
from dataclasses import dataclass
1011
from fnmatch import fnmatchcase
1112
from pathlib import Path, PurePosixPath
@@ -673,7 +674,9 @@ def add_reader(self, reader: SeedReader) -> Self:
673674
return self
674675

675676
def get_reader(self, seed_dataset_source: SeedSource, secret_resolver: SecretResolver) -> SeedReader:
676-
reader = self._get_reader_for_source(seed_dataset_source)
677+
# attach() mutates top-level source/resolver state. Reader subclasses must
678+
# not keep nested mutable state shared across attaches.
679+
reader = copy(self._get_reader_for_source(seed_dataset_source))
677680
reader.attach(seed_dataset_source, secret_resolver)
678681
return reader
679682

0 commit comments

Comments
 (0)