Skip to content

Commit 231a656

Browse files
committed
refactor: remove sync engine
1 parent 1210979 commit 231a656

28 files changed

Lines changed: 186 additions & 2049 deletions

File tree

architecture/dataset-builders.md

Lines changed: 18 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,21 @@
11
# Dataset Builders
22

3-
The dataset builder subsystem orchestrates the end-to-end generation of a dataset from compiled column configs. It supports two execution modes: a sequential batch loop and an async DAG-based scheduler.
3+
The dataset builder subsystem orchestrates the end-to-end generation of a dataset from compiled column configs using an async DAG-based scheduler.
44

55
Source: `packages/data-designer-engine/src/data_designer/engine/dataset_builders/`
66

77
## Overview
88

9-
`DatasetBuilder` is the central orchestrator. It receives a compiled `DataDesignerConfig`, instantiates column generators from the registry, and executes them in dependency order. The execution mode is selected by the `DATA_DESIGNER_ASYNC_ENGINE` environment variable.
9+
`DatasetBuilder` is the central orchestrator. It receives a compiled `DataDesignerConfig`, instantiates column generators from the registry, and executes them through `AsyncTaskScheduler`.
1010

11-
Both modes produce the same output: batched parquet files managed by `DatasetBatchManager`, with post-generation processing and profiling.
11+
The scheduler produces row-group parquet files managed by `RowGroupBufferManager`, with post-generation processing and profiling.
1212

1313
## Key Components
1414

1515
### DatasetBuilder
1616

17-
Entry point for generation. `build()` branches:
18-
- **Sequential path** (default): `DatasetBatchManager.start` → batch loop → `_run_batch` per batch → `finish()``ProcessorRunner.run_after_generation``model_registry.log_model_usage`
19-
- **Async path** (`DATA_DESIGNER_ASYNC_ENGINE=1`): `_prepare_async_run``AsyncTaskScheduler.run()` → telemetry and metadata
20-
21-
### Sequential Execution (`_run_batch`)
22-
23-
Iterates compiled column order. For each generator:
24-
1. `log_pre_generation()` — logs model and optional MCP tool alias
25-
2. **From-scratch generators** (empty buffer): `generate_from_scratch` → optional `run_pre_batch` after first seed column
26-
3. **`CELL_BY_CELL` generators**: `_fan_out_with_threads` or `_fan_out_with_async` — parallel cell generation
27-
4. **`FULL_COLUMN` generators**: `generate` on the whole batch DataFrame; output row count must match input row count
17+
Entry point for generation. `build()` runs:
18+
- `_prepare_async_run``AsyncTaskScheduler.run()` → telemetry and metadata
2819

2920
### Async Execution (`_build_async`)
3021

@@ -44,7 +35,7 @@ Preparation (`_prepare_async_run`):
4435
- `GenerationStrategy` per column (CELL_BY_CELL or FULL_COLUMN)
4536
- Kahn topological sort for execution order
4637
- `split_upstream_by_strategy` — separates batch-level from cell-level dependencies
47-
- Skip metadata per column — `get_skip_config`, `should_propagate_skip`, `get_required_columns`, and `get_side_effect_columns` — queried at runtime by both engines to evaluate skip decisions
38+
- Skip metadata per column — `get_skip_config`, `should_propagate_skip`, `get_required_columns`, and `get_side_effect_columns` — queried at runtime to evaluate skip decisions
4839

4940
### CompletionTracker
5041

@@ -64,21 +55,18 @@ Columns can be conditionally skipped per-row via `SkipConfig` (defined in `data_
6455
Skip evaluation is handled by two utility modules:
6556

6657
- **`skip_evaluator.py`**`evaluate_skip_when` renders the expression in a `NativeSandboxedEnvironment` (native Python types, `StrictUndefined`). `should_skip_by_propagation` checks set intersection between required columns and skipped columns.
67-
- **`skip_tracker.py`** — manages the `__internal_skipped_columns` metadata key on record dicts. Each record carries a `__internal_skipped_columns` set listing which columns were skipped for that row. `apply_skip_to_record` adds the column name to that set, writes the skip value into the cell, and clears any side-effect columns. `strip_skip_metadata_from_records` removes the `__internal_skipped_columns` key before DataFrame construction so it never reaches parquet (called by `DatasetBatchManager`, `RowGroupBufferManager`, and inline in both engines).
58+
- **`skip_tracker.py`** — manages the `__internal_skipped_columns` metadata key on record dicts. Each record carries a `__internal_skipped_columns` set listing which columns were skipped for that row. `apply_skip_to_record` adds the column name to that set, writes the skip value into the cell, and clears any side-effect columns. `strip_skip_metadata_from_records` removes the `__internal_skipped_columns` key before DataFrame construction so it never reaches parquet.
6859

69-
Both execution modes integrate skip at the same points:
60+
`_run_cell` and `_run_batch` in `AsyncTaskScheduler` call `_should_skip_record` / `_apply_skip_to_record`. Skipped cells report as skipped (not success) in progress tracking.
7061

71-
- **Sequential**: `_run_full_column_generator` and the fan-out methods (`_fan_out_with_threads`, `_fan_out_with_async`) call `_should_skip_cell` per record. Skipped rows are excluded from the generator input, then merged back with skip metadata preserved. A fast `_column_can_skip` check short-circuits the per-record evaluation when no skip config or propagation applies.
72-
- **Async**: `_run_cell` and `_run_batch` in `AsyncTaskScheduler` call `_should_skip_record` / `_apply_skip_to_record` with the same logic. Skipped cells report as skipped (not success) in progress tracking.
62+
DAG edges are added for `skip.when` column references in both `topologically_sort_column_configs` (compile-time sort) and `ExecutionGraph.create` (runtime graph) so skip-gate columns are generated before the gated column.
7363

74-
DAG edges are added for `skip.when` column references in both `topologically_sort_column_configs` (compile-time sort) and `ExecutionGraph.create` (async runtime) so skip-gate columns are generated before the gated column.
64+
### RowGroupBufferManager
7565

76-
### DatasetBatchManager
77-
78-
Manages in-memory row buffers and persistence:
79-
- `finish_batch` → writes parquet via `ArtifactStorage`
80-
- Updates dataset metadata between batches
81-
- The async path uses `RowGroupBufferManager` for per-row-group DataFrames and checkpointing
66+
Manages per-row-group DataFrames and persistence:
67+
- `checkpoint_row_group` → writes parquet via `ArtifactStorage`
68+
- Updates dataset metadata between row groups
69+
- Tracks dropped rows and actual record counts for resume
8270

8371
### Resume Checkpointing
8472

@@ -90,7 +78,7 @@ Manages in-memory row buffers and persistence:
9078

9179
Checkpoint state lives in `metadata.json`. Each metadata write includes the config fingerprint (`config_hash`, `config_hash_algo`, and `config_hash_version`) so compatibility checks do not need to deserialize `builder_config.json` for the common path. `builder_config.json` remains the human-readable record of the run configuration and the fallback for older datasets.
9280

93-
Both engines resume the same way: they scan `parquet-files/batch_*.parquet` and read parquet metadata to recover the completed row-group IDs and their actual persisted row counts. `metadata.json` remains the source of truth for the run *configuration* (`buffer_size`, `target_num_records`, `original_target_num_records`, config fingerprint), but the filesystem is the source of truth for *progress* (`num_completed_batches`, `actual_num_records`). Splitting the two sources is what lets resume survive a crash between writing a batch parquet and updating metadata — the filesystem reflects the durable state even when metadata lags by a step. Reading actual row counts also matters for async early-shutdown salvage, where a completed parquet file can contain fewer rows than the requested row-group size. The async engine tolerates non-contiguous IDs because row groups can complete out of order; the sync engine writes batches sequentially and rejects holes (likely external mutation or a directory written by an incompatible engine).
81+
Resume scans `parquet-files/batch_*.parquet` and reads parquet metadata to recover the completed row-group IDs and their actual persisted row counts. `metadata.json` remains the source of truth for the run *configuration* (`buffer_size`, `target_num_records`, `original_target_num_records`, config fingerprint), but the filesystem is the source of truth for *progress* (`num_completed_batches`, `actual_num_records`). Splitting the two sources is what lets resume survive a crash between writing a row-group parquet and updating metadata - the filesystem reflects the durable state even when metadata lags by a step. Reading actual row counts also matters for early-shutdown salvage, where a completed parquet file can contain fewer rows than the requested row-group size. Resume tolerates non-contiguous IDs because row groups can complete out of order.
9482

9583
Resume relies on stable row-group boundaries within a run. It treats datasets that have completed `process_after_generation()` as terminal: after-generation processors operate on the whole dataset and can re-chunk rows or change schema, invalidating row-group identity for later resume/extension. The terminal-state check raises a clear `DatasetGenerationError` (not a `TypeError`) when the persisted metadata is missing required fields such as `target_num_records`.
9684

@@ -102,19 +90,6 @@ Metadata writes are atomic (`tmp` file + `fsync` + `os.replace`) because `metada
10290

10391
## Data Flow
10492

105-
### Sequential
106-
```
107-
DatasetBuilder.build()
108-
→ DatasetBatchManager.start()
109-
→ for each batch:
110-
→ for each generator (topological order):
111-
→ generate_from_scratch / generate (FULL_COLUMN) / fan_out (CELL_BY_CELL)
112-
→ DatasetBatchManager.finish_batch() → parquet
113-
→ ProcessorRunner.run_after_generation()
114-
→ model_registry.log_model_usage()
115-
```
116-
117-
### Async
11893
```
11994
DatasetBuilder.build()
12095
→ _build_async()
@@ -136,15 +111,15 @@ When request admission is available, async scheduling may use request-pressure s
136111

137112
## Design Decisions
138113

139-
- **Dual execution engines behind one API.** The sequential engine is simpler and easier to debug; the async engine adds row-group parallelism for throughput. Users switch via an environment variable without changing their code.
114+
- **One execution engine behind the API.** The async scheduler handles row-group parallelism, DAG-aware dispatch, resume, and checkpointing for all generation runs.
140115
- **DAG-driven ordering** ensures columns with dependencies (e.g., a judge column that depends on a text column) are generated in the correct order, regardless of the order they appear in the config.
141116
- **Fair async admission with bounded borrow by default** keeps the scheduler flowing across ready columns and model groups. `FairTaskQueue.select_next(...)` chooses eligible ready work, `TaskAdmissionController` leases scheduler resources before spawn, and `FairTaskQueue.commit(...)` removes the selected task only after admission succeeds. The default `BoundedBorrowTaskAdmissionPolicyConfig` computes a strict per-group share, lets solo groups borrow only up to a capacity-derived reserve, and makes borrowed groups yield when eligible peer pressure appears. Passing `bounded_borrow=None` selects strict-fair admission for tests and benchmark comparisons. Per-group virtual-time ordering prevents a large ready frontier from degenerating into a column-by-column wave, and scheduler-resource accounting remains separate from provider/model request admission.
142-
- **Salvage rounds in async mode** retry failed tasks after all other tasks in a round complete, improving resilience against transient LLM failures without blocking the entire generation.
117+
- **Salvage rounds** retry failed tasks after all other tasks in a round complete, improving resilience against transient LLM failures without blocking the entire generation.
143118
- **Unified DAG construction.** `topologically_sort_column_configs` (in `execution_graph.py`) determines column ordering using Kahn's algorithm; the runtime `ExecutionGraph` adds strategy-aware dependency tracking for the async scheduler.
144119

145120
## Cross-References
146121

147-
- [System Architecture](overview.md) end-to-end data flow
122+
- [System Architecture](overview.md) - end-to-end data flow
148123
- [Engine Layer](engine.md) — compilation and generator hierarchy
149124
- [Models](models.md) — how generators access LLMs
150125
- [Config Layer](config.md) — column configs and dependency declarations

architecture/overview.md

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ Users declare what their data should look like through config objects (columns,
2929
|-----------|---------|-------------|
3030
| `DataDesigner` | `data-designer` | Public API — `create()`, `preview()`, `validate()` |
3131
| `DataDesignerConfigBuilder` | `data-designer-config` | Fluent builder for dataset configs |
32-
| `DatasetBuilder` | `data-designer-engine` | Orchestrates generation (sync or async) |
32+
| `DatasetBuilder` | `data-designer-engine` | Orchestrates async generation |
3333
| `ModelFacade` / `ModelRegistry` | `data-designer-engine` | LLM client abstraction with retry, request admission, usage tracking |
3434
| `MCPFacade` / `MCPRegistry` | `data-designer-engine` | Tool execution via Model Context Protocol |
3535
| `ColumnGeneratorRegistry` | `data-designer-engine` | Maps column types to generator implementations |
@@ -42,9 +42,7 @@ Users declare what their data should look like through config objects (columns,
4242

4343
2. **Compilation**`compile_data_designer_config` enriches the config (seed columns, internal UUID column), runs static validation (Jinja references, code columns, processors), and produces a compiled column order via topological sort.
4444

45-
3. **Generation**`DatasetBuilder` instantiates column generators from the registry, then executes one of two paths:
46-
- **Sequential** (default): batch loop over columns in topological order. Each generator produces its column via `CELL_BY_CELL` (threaded fan-out) or `FULL_COLUMN` strategy.
47-
- **Async** (`DATA_DESIGNER_ASYNC_ENGINE=1`): builds an `ExecutionGraph`, partitions rows into groups, and dispatches tasks via `AsyncTaskScheduler` with `FairTaskQueue` selection, `TaskAdmissionController` scheduler-resource leases, salvage rounds, and per-row-group checkpointing.
45+
3. **Generation**`DatasetBuilder` instantiates column generators from the registry, builds an `ExecutionGraph`, partitions rows into groups, and dispatches tasks via `AsyncTaskScheduler` with `FairTaskQueue` selection, `TaskAdmissionController` scheduler-resource leases, salvage rounds, and per-row-group checkpointing.
4846

4947
4. **Post-processing**`ProcessorRunner` applies transformations (pre-batch, post-batch, after-generation). Profilers analyze the generated dataset.
5048

@@ -54,15 +52,15 @@ Users declare what their data should look like through config objects (columns,
5452

5553
- **PEP 420 namespace packages** allow the three packages to be installed independently while sharing the `data_designer` namespace. This enables lighter installs (e.g., config-only for validation tooling) without import conflicts.
5654
- **Lazy imports throughout**`__getattr__`-based lazy loading in `data_designer.config` and `data_designer.interface`, plus `lazy_heavy_imports` for numpy/pandas, keep startup fast.
57-
- **Dual execution engines** share the same `DatasetBuilder` API. The async engine adds row-group parallelism and DAG-aware scheduling without changing the public interface.
55+
- **Async-only execution** gives `DatasetBuilder` one scheduling path with row-group parallelism and DAG-aware dispatch behind the public interface.
5856
- **`TaskRegistry` subclasses: one instance per class**`TaskRegistry.__new__` (`registry/base.py`) ensures a single instance of each concrete registry (column generators, profilers, processors). **`ModelRegistry`** and **`MCPRegistry`** are ordinary classes, constructed per run with injected dependencies. **`PluginRegistry`** (`plugins/registry.py`) uses `__new__` so entry points are discovered once per process.
5957

6058
## Cross-References
6159

6260
- [Config Layer](config.md) — builder API, column types, model configs, plugin system
6361
- [Engine Layer](engine.md) — compilation, generators, registries
6462
- [Models](models.md) — model facade, adapters, retry, request admission
65-
- [Dataset Builders](dataset-builders.md)sync/async orchestration, DAG, batching
63+
- [Dataset Builders](dataset-builders.md) — async orchestration, DAG, row groups
6664
- [MCP](mcp.md) — tool execution, session pooling
6765
- [Sampling](sampling.md) — statistical generators, person/entity data
6866
- [CLI](cli.md) — command structure, controller/service/repo pattern

0 commit comments

Comments
 (0)