Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion architecture/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ The config layer provides:

### Column Configs

All column configs inherit from `SingleColumnConfig(ConfigBase, ABC)`, which provides `name`, `drop`, `allow_resize`, and the `column_type` discriminator field.
All column configs inherit from `SingleColumnConfig(ConfigBase, ABC)`, which provides `name`, `drop`, `skip`, `propagate_skip`, and the `column_type` discriminator field.

Concrete types include: `SamplerColumnConfig`, `LLMTextColumnConfig`, `LLMStructuredColumnConfig`, `LLMCodeColumnConfig`, `LLMJudgeColumnConfig`, `EmbeddingColumnConfig`, `ImageColumnConfig`, `ValidationColumnConfig`, `ExpressionColumnConfig`, `SeedDatasetColumnConfig`, `CustomColumnConfig`.

Expand Down
4 changes: 2 additions & 2 deletions architecture/dataset-builders.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Iterates compiled column order. For each generator:
1. `log_pre_generation()` β€” logs model and optional MCP tool alias
2. **From-scratch generators** (empty buffer): `generate_from_scratch` β†’ optional `run_pre_batch` after first seed column
3. **`CELL_BY_CELL` generators**: `_fan_out_with_threads` or `_fan_out_with_async` β€” parallel cell generation
4. **`FULL_COLUMN` generators**: `generate` on the whole batch DataFrame; optional resize via `allow_resize`
4. **`FULL_COLUMN` generators**: `generate` on the whole batch DataFrame; output row count must match input row count

### Async Execution (`_build_async`)

Expand Down Expand Up @@ -92,7 +92,7 @@ Checkpoint state lives in `metadata.json`. Each metadata write includes the conf

Both engines resume the same way: they scan `parquet-files/batch_*.parquet` and read parquet metadata to recover the completed row-group IDs and their actual persisted row counts. `metadata.json` remains the source of truth for the run *configuration* (`buffer_size`, `target_num_records`, `original_target_num_records`, config fingerprint), but the filesystem is the source of truth for *progress* (`num_completed_batches`, `actual_num_records`). Splitting the two sources is what lets resume survive a crash between writing a batch parquet and updating metadata β€” the filesystem reflects the durable state even when metadata lags by a step. Reading actual row counts also matters for async early-shutdown salvage, where a completed parquet file can contain fewer rows than the requested row-group size. The async engine tolerates non-contiguous IDs because row groups can complete out of order; the sync engine writes batches sequentially and rejects holes (likely external mutation or a directory written by an incompatible engine).

Resume deliberately rejects `allow_resize=True` columns because resized batches mutate row boundaries and the original remaining batch plan cannot be reconstructed safely from aggregate counters. It also treats datasets that have completed `process_after_generation()` as terminal: after-generation processors operate on the whole dataset and can re-chunk rows or change schema, invalidating row-group identity for later resume/extension. The terminal-state check raises a clear `DatasetGenerationError` (not a `TypeError`) when the persisted metadata is missing required fields such as `target_num_records`.
Resume relies on stable row-group boundaries within a run. It treats datasets that have completed `process_after_generation()` as terminal: after-generation processors operate on the whole dataset and can re-chunk rows or change schema, invalidating row-group identity for later resume/extension. The terminal-state check raises a clear `DatasetGenerationError` (not a `TypeError`) when the persisted metadata is missing required fields such as `target_num_records`.

After-generation processors run unconditionally on the on-disk dataset whenever they are configured β€” including the case where resume sees every row group already on disk. This closes the crash window between the final row-group parquet write and the `post_generation_state="started"` marker write: in that window, the dataset is complete but post-generation never ran, and the on-disk parquet files are still clean (no processor has touched them). The `post_generation_state="started"` short-circuit still rejects the other direction (`process_after_generation()` crashed mid-rewrite, leaving the parquet files in an ambiguous state), so resume only re-runs after-generation when it is safe to do so.

Expand Down
45 changes: 23 additions & 22 deletions fern/versions/latest/pages/concepts/agent-rollout-ingestion.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -171,16 +171,13 @@ You can also explode imported rollouts into a tool-interaction dataset. This exa

```python
import data_designer.config as dd
import pandas as pd
from data_designer.interface import DataDesigner
from pydantic import BaseModel, Field
from typing import Literal


@dd.custom_column_generator(
required_columns=["messages"],
side_effect_columns=["tool_call", "tool_response", "tool_name"],
)
def explode_tool_interactions(row: dict) -> list[dict]:
def extract_tool_interactions(row: dict) -> list[dict]:
rows = []
tool_calls_by_id = {}
context_messages = []
Expand Down Expand Up @@ -229,7 +226,23 @@ class ToolInteractionAnalysis(BaseModel):


data_designer = DataDesigner()
config_builder = dd.DataDesignerConfigBuilder(
rollout_config = dd.DataDesignerConfigBuilder()
rollout_config.with_seed_dataset(
dd.AgentRolloutSeedSource(
format=dd.AgentRolloutFormat.CLAUDE_CODE,
)
)

rollout_result = data_designer.create(rollout_config, num_records=100)
rollout_df = rollout_result.load_dataset()
tool_rows = [
interaction
for _, row in rollout_df.iterrows()
for interaction in extract_tool_interactions(row.to_dict())
]
tool_df = pd.DataFrame(tool_rows)

analysis_config = dd.DataDesignerConfigBuilder(
model_configs=[
dd.ModelConfig(
alias="tool-analyst",
Expand All @@ -239,21 +252,9 @@ config_builder = dd.DataDesignerConfigBuilder(
]
)

config_builder.with_seed_dataset(
dd.AgentRolloutSeedSource(
format=dd.AgentRolloutFormat.CLAUDE_CODE,
)
)

config_builder.add_column(
dd.CustomColumnConfig(
name="tool_interaction_context",
generator_function=explode_tool_interactions,
allow_resize=True,
)
)
analysis_config.with_seed_dataset(dd.DataFrameSeedSource(df=tool_df))

config_builder.add_column(
analysis_config.add_column(
dd.LLMStructuredColumnConfig(
name="tool_interaction_analysis",
model_alias="tool-analyst",
Expand All @@ -279,11 +280,11 @@ Base your answer on the tool call arguments, the tool response, and the immediat
)
)

preview = data_designer.preview(config_builder, num_records=5)
preview = data_designer.preview(analysis_config, num_records=5)
preview.display_sample_record()
```

This pattern is useful when you want to curate evaluator or monitoring datasets from real traces. The resize-enabled custom column turns each tool interaction into its own record, and the structured column adds a consistent outcome label plus a grounded summary. Because the logic operates on normalized `tool_calls` and `tool` messages, the same pattern transfers across supported rollout formats. If your traces are long, consider adding a second custom or expression column that windows the context before sending it to a model.
This pattern is useful when you want to curate evaluator or monitoring datasets from real traces. The stage-boundary transform turns each tool interaction into its own record, and the structured column adds a consistent outcome label plus a grounded summary. Because the logic operates on normalized `tool_calls` and `tool` messages, the same pattern transfers across supported rollout formats. If your traces are long, consider adding a second custom or expression column that windows the context before sending it to a model.

## Related Guides

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ Resume has a few important invariants:

- `buffer_size` must match the original run.
- `num_records` must be at least the original target; you may extend a run by requesting more records.
- Runs with `allow_resize=True` columns are not resumable because row boundaries can change.
- Row counts must stay stable within a run. Put filtering, expansion, aggregation, or deduplication at workflow boundaries.
- Once `process_after_generation()` has run, the dataset is considered terminal for resume. Re-running with the same target returns the existing dataset; extending requires a fresh run.
- If a run crashed after every row group was written but before `process_after_generation()` could start, resume runs after-generation on the existing on-disk dataset (the parquet files are still clean) and marks it terminal afterwards. A crash _during_ `process_after_generation()` still raises β€” the parquet files may have been partially rewritten and starting fresh is the only safe option.

Expand Down
49 changes: 4 additions & 45 deletions fern/versions/latest/pages/concepts/custom_columns.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -100,58 +100,17 @@ This gives you direct access to all `ModelFacade` capabilities: custom parsers,
| `generator_function` | Callable | Yes | Decorated function |
| `generation_strategy` | GenerationStrategy | No | `CELL_BY_CELL` or `FULL_COLUMN` |
| `generator_params` | BaseModel | No | Typed params passed to function |
| `allow_resize` | bool | No | Allow 1:N or N:1 generation |

### Resizing (1:N and N:1)

**FULL_COLUMN:** Set `allow_resize=True` and return a DataFrame with more or fewer rows than the input:
Custom column generators must preserve row count. A `CELL_BY_CELL` generator returns one `dict` per input row, and a `FULL_COLUMN` generator returns a DataFrame with the same number of rows it received.

```python
@dd.custom_column_generator(
required_columns=["topic"],
side_effect_columns=["variation_id"],
)
def expand_topics(df: pd.DataFrame, params: None, models: dict) -> pd.DataFrame:
rows = []
for _, row in df.iterrows():
for i in range(3): # Generate 3 variations per input
rows.append({
"topic": row["topic"],
"question": f"Question {i+1} about {row['topic']}",
"variation_id": i,
})
return pd.DataFrame(rows)

dd.CustomColumnConfig(
name="question",
generator_function=expand_topics,
generation_strategy=dd.GenerationStrategy.FULL_COLUMN,
allow_resize=True,
)
```

**CELL_BY_CELL:** With `allow_resize=True`, your function may return a single row (`dict`) or multiple rows (`list[dict]`). Return `[]` to drop that input row.

```python
@dd.custom_column_generator(required_columns=["id"])
def expand_row(row: dict) -> list[dict]:
return [
{**row, "variant": "a"},
{**row, "variant": "b"},
]

dd.CustomColumnConfig(
name="variant",
generator_function=expand_row,
generation_strategy=dd.GenerationStrategy.CELL_BY_CELL,
allow_resize=True,
)
```
For expansion, filtering, aggregation, or deduplication, put the row-count-changing work at a workflow boundary. Use [Workflow Chaining](/concepts/workflow-chaining) to run one stage, transform that stage's output, and seed the next stage from the transformed rows.

Use cases:

- **Expansion (1:N)**: Generate multiple variations per input
- **Retraction (N:1)**: Filter, aggregate, or deduplicate records (FULL_COLUMN) or return `[]` per row (CELL_BY_CELL)
- **Expansion (1:N)**: Generate multiple variations per input between workflow stages
- **Retraction (N:1)**: Filter, aggregate, or deduplicate records between workflow stages

## Multi-Turn Example

Expand Down
4 changes: 2 additions & 2 deletions fern/versions/latest/pages/concepts/processors.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ Processors can run at three stages, determined by which callback methods they im
Each batch carries the full dataset schema during generation. Post-batch schema changes such as column dropping only alter past batches, so all columns remain accessible to generators while building follow-up batches.
</Note>

<Warning title="Row-count changes under the async engine">
The async engine (default) enforces row-count invariance in `process_before_batch()` and `process_after_batch()` β€” a processor returning a different row count raises `DatasetGenerationError`. Run row-filtering or expansion logic in `process_after_generation()`, which operates on the final dataset and supports row-count changes. The legacy sync engine (opt-out via `DATA_DESIGNER_ASYNC_ENGINE=0`) is permissive about row-count changes at all stages.
<Warning title="Row-count changes">
Data Designer enforces row-count invariance in generation-time processors: `process_before_batch()` and `process_after_batch()`. Run row-filtering or expansion logic in `process_after_generation()`, which operates on the final dataset and supports row-count changes, or put the transform at a workflow boundary.
</Warning>

<Warning title="Resume after process_after_generation">
Expand Down
11 changes: 1 addition & 10 deletions fern/versions/latest/pages/plugins/example.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,7 @@ class IndexMultiplierColumnConfig(SingleColumnConfig):
- `required_columns` lists any columns this generator depends on (empty if none)
- `side_effect_columns` lists any additional columns this generator produces beyond the primary column (empty if none)

**If your plugin can expand or retract the number of rows (1:N or N:1):** set `allow_resize=True` in the config class so the pipeline updates batch bookkeeping correctly. For example:

```python
class MyColumnConfig(SingleColumnConfig):
column_type: Literal["my-plugin"] = "my-plugin"
allow_resize: bool = True # required when output row count can differ from input
# ...
```

The default is `False`; only set it to `True` when your `generate` method can return more or fewer rows than it receives.
Column generator plugins must preserve row count: `generate()` returns one output row for each input row. If your plugin needs to expand or retract records, run it as a stage-boundary transform with [Workflow Chaining](/concepts/workflow-chaining) and feed the transformed rows into the next stage.

### Step 3: Create the implementation class

Expand Down
10 changes: 0 additions & 10 deletions packages/data-designer-config/src/data_designer/config/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,6 @@ class SingleColumnConfig(ConfigBase, ABC):
name: Unique name of the column to be generated.
drop: If True, the column will be generated but removed from the final dataset.
Useful for intermediate columns that are dependencies for other columns.
allow_resize: If True, the generator may emit a different number of rows than
it received (1:N or N:1). Explicit ``skip`` gates are invalid on resize
columns, and upstream skip propagation is not applied to them.
column_type: Discriminator field that identifies the specific column type.
Subclasses must override this field to specify the column type with a `Literal` value.
skip: Optional expression gate for conditional generation.
Expand All @@ -102,7 +99,6 @@ class SingleColumnConfig(ConfigBase, ABC):

name: str
drop: bool = False
allow_resize: bool = False
column_type: str
skip: SkipConfig | None = None
propagate_skip: bool = Field(
Expand All @@ -122,12 +118,6 @@ def _validate_skip_scope(self) -> Self:
"Sampler/seed columns are collapsed into shared multi-column generators "
"and cannot be skipped individually."
)
if self.allow_resize:
raise ValueError(
"skip and allow_resize cannot be used together. "
"allow_resize changes buffer size during generation (1:N / N:1), which "
"breaks index-based skip tracking and merge-back."
)
self_refs = {self.name} | set(self.side_effect_columns)
if not self_refs.isdisjoint(self.skip.columns):
offending = self_refs & set(self.skip.columns)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
if TYPE_CHECKING:
from data_designer.config.data_designer_config import DataDesignerConfig

CONFIG_HASH_VERSION = 1
CONFIG_HASH_VERSION = 2
CONFIG_HASH_ALGO = "sha256"


Expand Down
7 changes: 3 additions & 4 deletions packages/data-designer-config/tests/config/test_columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -700,10 +700,9 @@ def test_default_column_emoji_for_custom_column_type() -> None:
assert StubColumnConfig.get_column_emoji() == "🎨"


def test_allow_resize_inherited_by_subclasses() -> None:
"""Subclasses inherit allow_resize from SingleColumnConfig."""
assert StubColumnConfig(name="test").allow_resize is False
assert StubColumnConfig(name="test", allow_resize=True).allow_resize is True
def test_removed_allow_resize_field_rejected() -> None:
with pytest.raises(ValidationError, match="Extra inputs are not permitted"):
StubColumnConfig(name="test", allow_resize=True)


def test_get_model_aliases_empty_when_no_model_alias_field() -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,6 @@ def test_skip_rejected_on_seed_dataset_type() -> None:
)


def test_skip_rejected_with_allow_resize() -> None:
with pytest.raises(ValidationError, match="skip and allow_resize cannot be used together"):
LLMTextColumnConfig(
**_BASE_LLM,
allow_resize=True,
skip=SkipConfig(when="{{ x == 0 }}"),
)


def test_skip_self_reference_rejected() -> None:
with pytest.raises(ValidationError, match="references itself"):
LLMTextColumnConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,5 @@ def generate(self, data: pd.DataFrame) -> pd.DataFrame:

Returns:
DataFrame containing the input columns plus the new column and any side-effect
columns. When ``config.allow_resize`` is ``False``, the row count must match
the input; when it is ``True``, the row count may change.
columns. The row count must match the input.
"""
Loading
Loading