Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ Data Designer helps you create synthetic datasets that go beyond simple LLM prom

---

### πŸ“£ Heads-up: async engine is now the default
### πŸ“£ Heads-up: async engine

Data Designer now runs pipelines on a cell-level async engine that overlaps independent columns and adapts concurrency per (provider, model). On most pipelines this is faster with no config changes; on slow self-hosted endpoints, set `inference_parameters.timeout` to your real per-request latency. See [Architecture & Performance β†’ Async Engine](https://docs.nvidia.com/nemo/datadesigner/concepts/architecture-performance#async-engine) for the behaviors worth knowing about.

If you hit anything unexpected, fall back to the legacy sync engine for one transitional release with `DATA_DESIGNER_ASYNC_ENGINE=0`, and please [open an issue](https://github.com/NVIDIA-NeMo/DataDesigner/issues/new) so we can fix the async path.
If you hit anything unexpected, please [open an issue](https://github.com/NVIDIA-NeMo/DataDesigner/issues/new).

---

Expand Down
61 changes: 18 additions & 43 deletions architecture/dataset-builders.md
Original file line number Diff line number Diff line change
@@ -1,30 +1,21 @@
# Dataset Builders

The dataset builder subsystem orchestrates the end-to-end generation of a dataset from compiled column configs. It supports two execution modes: a sequential batch loop and an async DAG-based scheduler.
The dataset builder subsystem orchestrates the end-to-end generation of a dataset from compiled column configs using an async DAG-based scheduler.

Source: `packages/data-designer-engine/src/data_designer/engine/dataset_builders/`

## Overview

`DatasetBuilder` is the central orchestrator. It receives a compiled `DataDesignerConfig`, instantiates column generators from the registry, and executes them in dependency order. The execution mode is selected by the `DATA_DESIGNER_ASYNC_ENGINE` environment variable.
`DatasetBuilder` is the central orchestrator. It receives a compiled `DataDesignerConfig`, instantiates column generators from the registry, and executes them through `AsyncTaskScheduler`.

Both modes produce the same output: batched parquet files managed by `DatasetBatchManager`, with post-generation processing and profiling.
The scheduler produces row-group parquet files managed by `RowGroupBufferManager`, with post-generation processing and profiling.

## Key Components

### DatasetBuilder

Entry point for generation. `build()` branches:
- **Sequential path** (default): `DatasetBatchManager.start` β†’ batch loop β†’ `_run_batch` per batch β†’ `finish()` β†’ `ProcessorRunner.run_after_generation` β†’ `model_registry.log_model_usage`
- **Async path** (`DATA_DESIGNER_ASYNC_ENGINE=1`): `_prepare_async_run` β†’ `AsyncTaskScheduler.run()` β†’ telemetry and metadata

### Sequential Execution (`_run_batch`)

Iterates compiled column order. For each generator:
1. `log_pre_generation()` β€” logs model and optional MCP tool alias
2. **From-scratch generators** (empty buffer): `generate_from_scratch` β†’ optional `run_pre_batch` after first seed column
3. **`CELL_BY_CELL` generators**: `_fan_out_with_threads` or `_fan_out_with_async` β€” parallel cell generation
4. **`FULL_COLUMN` generators**: `generate` on the whole batch DataFrame; output row count must match input row count
Entry point for generation. `build()` runs:
- `_prepare_async_run` β†’ `AsyncTaskScheduler.run()` β†’ telemetry and metadata

### Async Execution (`_build_async`)

Expand All @@ -44,7 +35,7 @@ Preparation (`_prepare_async_run`):
- `GenerationStrategy` per column (CELL_BY_CELL or FULL_COLUMN)
- Kahn topological sort for execution order
- `split_upstream_by_strategy` β€” separates batch-level from cell-level dependencies
- Skip metadata per column β€” `get_skip_config`, `should_propagate_skip`, `get_required_columns`, and `get_side_effect_columns` β€” queried at runtime by both engines to evaluate skip decisions
- Skip metadata per column β€” `get_skip_config`, `should_propagate_skip`, `get_required_columns`, and `get_side_effect_columns` β€” queried at runtime to evaluate skip decisions

### CompletionTracker

Expand All @@ -64,21 +55,18 @@ Columns can be conditionally skipped per-row via `SkipConfig` (defined in `data_
Skip evaluation is handled by two utility modules:

- **`skip_evaluator.py`** β€” `evaluate_skip_when` renders the expression in a `NativeSandboxedEnvironment` (native Python types, `StrictUndefined`). `should_skip_by_propagation` checks set intersection between required columns and skipped columns.
- **`skip_tracker.py`** β€” manages the `__internal_skipped_columns` metadata key on record dicts. Each record carries a `__internal_skipped_columns` set listing which columns were skipped for that row. `apply_skip_to_record` adds the column name to that set, writes the skip value into the cell, and clears any side-effect columns. `strip_skip_metadata_from_records` removes the `__internal_skipped_columns` key before DataFrame construction so it never reaches parquet (called by `DatasetBatchManager`, `RowGroupBufferManager`, and inline in both engines).
- **`skip_tracker.py`** β€” manages the `__internal_skipped_columns` metadata key on record dicts. Each record carries a `__internal_skipped_columns` set listing which columns were skipped for that row. `apply_skip_to_record` adds the column name to that set, writes the skip value into the cell, and clears any side-effect columns. `strip_skip_metadata_from_records` removes the `__internal_skipped_columns` key before DataFrame construction so it never reaches parquet.

Both execution modes integrate skip at the same points:
`_run_cell` and `_run_batch` in `AsyncTaskScheduler` call `_should_skip_record` / `_apply_skip_to_record`. Skipped cells report as skipped (not success) in progress tracking.

- **Sequential**: `_run_full_column_generator` and the fan-out methods (`_fan_out_with_threads`, `_fan_out_with_async`) call `_should_skip_cell` per record. Skipped rows are excluded from the generator input, then merged back with skip metadata preserved. A fast `_column_can_skip` check short-circuits the per-record evaluation when no skip config or propagation applies.
- **Async**: `_run_cell` and `_run_batch` in `AsyncTaskScheduler` call `_should_skip_record` / `_apply_skip_to_record` with the same logic. Skipped cells report as skipped (not success) in progress tracking.
DAG edges are added for `skip.when` column references in both `topologically_sort_column_configs` (compile-time sort) and `ExecutionGraph.create` (runtime graph) so skip-gate columns are generated before the gated column.

DAG edges are added for `skip.when` column references in both `topologically_sort_column_configs` (compile-time sort) and `ExecutionGraph.create` (async runtime) so skip-gate columns are generated before the gated column.
### RowGroupBufferManager

### DatasetBatchManager

Manages in-memory row buffers and persistence:
- `finish_batch` β†’ writes parquet via `ArtifactStorage`
- Updates dataset metadata between batches
- The async path uses `RowGroupBufferManager` for per-row-group DataFrames and checkpointing
Manages per-row-group DataFrames and persistence:
- `checkpoint_row_group` β†’ writes parquet via `ArtifactStorage`
- Updates dataset metadata between row groups
- Tracks dropped rows and actual record counts for resume

### Resume Checkpointing

Expand All @@ -90,7 +78,7 @@ Manages in-memory row buffers and persistence:

Checkpoint state lives in `metadata.json`. Each metadata write includes the config fingerprint (`config_hash`, `config_hash_algo`, and `config_hash_version`) so compatibility checks do not need to deserialize `builder_config.json` for the common path. `builder_config.json` remains the human-readable record of the run configuration and the fallback for older datasets.

Both engines resume the same way: they scan `parquet-files/batch_*.parquet` and read parquet metadata to recover the completed row-group IDs and their actual persisted row counts. `metadata.json` remains the source of truth for the run *configuration* (`buffer_size`, `target_num_records`, `original_target_num_records`, config fingerprint), but the filesystem is the source of truth for *progress* (`num_completed_batches`, `actual_num_records`). Splitting the two sources is what lets resume survive a crash between writing a batch parquet and updating metadata β€” the filesystem reflects the durable state even when metadata lags by a step. Reading actual row counts also matters for async early-shutdown salvage, where a completed parquet file can contain fewer rows than the requested row-group size. The async engine tolerates non-contiguous IDs because row groups can complete out of order; the sync engine writes batches sequentially and rejects holes (likely external mutation or a directory written by an incompatible engine).
Resume scans `parquet-files/batch_*.parquet` and reads parquet metadata to recover the completed row-group IDs and their actual persisted row counts. `metadata.json` remains the source of truth for the run *configuration* (`buffer_size`, `target_num_records`, `original_target_num_records`, config fingerprint), but the filesystem is the source of truth for *progress* (`num_completed_batches`, `actual_num_records`). Splitting the two sources is what lets resume survive a crash between writing a row-group parquet and updating metadata - the filesystem reflects the durable state even when metadata lags by a step. Reading actual row counts also matters for early-shutdown salvage, where a completed parquet file can contain fewer rows than the requested row-group size. Resume tolerates non-contiguous IDs because row groups can complete out of order.

Resume relies on stable row-group boundaries within a run. It treats datasets that have completed `process_after_generation()` as terminal: after-generation processors operate on the whole dataset and can re-chunk rows or change schema, invalidating row-group identity for later resume/extension. The terminal-state check raises a clear `DatasetGenerationError` (not a `TypeError`) when the persisted metadata is missing required fields such as `target_num_records`.

Expand All @@ -102,19 +90,6 @@ Metadata writes are atomic (`tmp` file + `fsync` + `os.replace`) because `metada

## Data Flow

### Sequential
```
DatasetBuilder.build()
β†’ DatasetBatchManager.start()
β†’ for each batch:
β†’ for each generator (topological order):
β†’ generate_from_scratch / generate (FULL_COLUMN) / fan_out (CELL_BY_CELL)
β†’ DatasetBatchManager.finish_batch() β†’ parquet
β†’ ProcessorRunner.run_after_generation()
β†’ model_registry.log_model_usage()
```

### Async
```
DatasetBuilder.build()
β†’ _build_async()
Expand All @@ -136,15 +111,15 @@ When request admission is available, async scheduling may use request-pressure s

## Design Decisions

- **Dual execution engines behind one API.** The sequential engine is simpler and easier to debug; the async engine adds row-group parallelism for throughput. Users switch via an environment variable without changing their code.
- **One execution engine behind the API.** The async scheduler handles row-group parallelism, DAG-aware dispatch, resume, and checkpointing for all generation runs.
- **DAG-driven ordering** ensures columns with dependencies (e.g., a judge column that depends on a text column) are generated in the correct order, regardless of the order they appear in the config.
- **Fair async admission with bounded borrow by default** keeps the scheduler flowing across ready columns and model groups. `FairTaskQueue.select_next(...)` chooses eligible ready work, `TaskAdmissionController` leases scheduler resources before spawn, and `FairTaskQueue.commit(...)` removes the selected task only after admission succeeds. The default `BoundedBorrowTaskAdmissionPolicyConfig` computes a strict per-group share, lets solo groups borrow only up to a capacity-derived reserve, and makes borrowed groups yield when eligible peer pressure appears. Passing `bounded_borrow=None` selects strict-fair admission for tests and benchmark comparisons. Per-group virtual-time ordering prevents a large ready frontier from degenerating into a column-by-column wave, and scheduler-resource accounting remains separate from provider/model request admission.
- **Salvage rounds in async mode** retry failed tasks after all other tasks in a round complete, improving resilience against transient LLM failures without blocking the entire generation.
- **Salvage rounds** retry failed tasks after all other tasks in a round complete, improving resilience against transient LLM failures without blocking the entire generation.
- **Unified DAG construction.** `topologically_sort_column_configs` (in `execution_graph.py`) determines column ordering using Kahn's algorithm; the runtime `ExecutionGraph` adds strategy-aware dependency tracking for the async scheduler.

## Cross-References

- [System Architecture](overview.md) β€” end-to-end data flow
- [System Architecture](overview.md) - end-to-end data flow
- [Engine Layer](engine.md) β€” compilation and generator hierarchy
- [Models](models.md) β€” how generators access LLMs
- [Config Layer](config.md) β€” column configs and dependency declarations
10 changes: 4 additions & 6 deletions architecture/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Users declare what their data should look like through config objects (columns,
|-----------|---------|-------------|
| `DataDesigner` | `data-designer` | Public API β€” `create()`, `preview()`, `validate()` |
| `DataDesignerConfigBuilder` | `data-designer-config` | Fluent builder for dataset configs |
| `DatasetBuilder` | `data-designer-engine` | Orchestrates generation (sync or async) |
| `DatasetBuilder` | `data-designer-engine` | Orchestrates async generation |
| `ModelFacade` / `ModelRegistry` | `data-designer-engine` | LLM client abstraction with retry, request admission, usage tracking |
| `MCPFacade` / `MCPRegistry` | `data-designer-engine` | Tool execution via Model Context Protocol |
| `ColumnGeneratorRegistry` | `data-designer-engine` | Maps column types to generator implementations |
Expand All @@ -42,9 +42,7 @@ Users declare what their data should look like through config objects (columns,

2. **Compilation** β€” `compile_data_designer_config` enriches the config (seed columns, internal UUID column), runs static validation (Jinja references, code columns, processors), and produces a compiled column order via topological sort.

3. **Generation** β€” `DatasetBuilder` instantiates column generators from the registry, then executes one of two paths:
- **Sequential** (default): batch loop over columns in topological order. Each generator produces its column via `CELL_BY_CELL` (threaded fan-out) or `FULL_COLUMN` strategy.
- **Async** (`DATA_DESIGNER_ASYNC_ENGINE=1`): builds an `ExecutionGraph`, partitions rows into groups, and dispatches tasks via `AsyncTaskScheduler` with `FairTaskQueue` selection, `TaskAdmissionController` scheduler-resource leases, salvage rounds, and per-row-group checkpointing.
3. **Generation** β€” `DatasetBuilder` instantiates column generators from the registry, builds an `ExecutionGraph`, partitions rows into groups, and dispatches tasks via `AsyncTaskScheduler` with `FairTaskQueue` selection, `TaskAdmissionController` scheduler-resource leases, salvage rounds, and per-row-group checkpointing.

4. **Post-processing** β€” `ProcessorRunner` applies transformations (pre-batch, post-batch, after-generation). Profilers analyze the generated dataset.

Expand All @@ -54,15 +52,15 @@ Users declare what their data should look like through config objects (columns,

- **PEP 420 namespace packages** allow the three packages to be installed independently while sharing the `data_designer` namespace. This enables lighter installs (e.g., config-only for validation tooling) without import conflicts.
- **Lazy imports throughout** β€” `__getattr__`-based lazy loading in `data_designer.config` and `data_designer.interface`, plus `lazy_heavy_imports` for numpy/pandas, keep startup fast.
- **Dual execution engines** share the same `DatasetBuilder` API. The async engine adds row-group parallelism and DAG-aware scheduling without changing the public interface.
- **Async-only execution** gives `DatasetBuilder` one scheduling path with row-group parallelism and DAG-aware dispatch behind the public interface.
- **`TaskRegistry` subclasses: one instance per class** β€” `TaskRegistry.__new__` (`registry/base.py`) ensures a single instance of each concrete registry (column generators, profilers, processors). **`ModelRegistry`** and **`MCPRegistry`** are ordinary classes, constructed per run with injected dependencies. **`PluginRegistry`** (`plugins/registry.py`) uses `__new__` so entry points are discovered once per process.

## Cross-References

- [Config Layer](config.md) β€” builder API, column types, model configs, plugin system
- [Engine Layer](engine.md) β€” compilation, generators, registries
- [Models](models.md) β€” model facade, adapters, retry, request admission
- [Dataset Builders](dataset-builders.md) β€” sync/async orchestration, DAG, batching
- [Dataset Builders](dataset-builders.md) β€” async orchestration, DAG, row groups
- [MCP](mcp.md) β€” tool execution, session pooling
- [Sampling](sampling.md) β€” statistical generators, person/entity data
- [CLI](cli.md) β€” command structure, controller/service/repo pattern
Expand Down
Loading
Loading