Skip to content

Commit c0a4dcb

Browse files
authored
feat: implement async scheduling admission control (#661)
1 parent a83968f commit c0a4dcb

90 files changed

Lines changed: 10825 additions & 3415 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.gitignore

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,14 @@ venv.bak/
8585
# Local scratch space
8686
.scratch/
8787

88+
# Generated benchmark/report output
89+
/artifacts/
90+
/reports/
91+
/scripts/benchmarks/benchmark_async_scheduling.py
92+
/scripts/benchmarks/export_async_scheduling_perfetto.py
93+
/scripts/benchmarks/generate_async_scheduling_idle_report.py
94+
/scripts/benchmarks/run_async_scheduling_idle_regression.py
95+
8896
docs/notebooks/
8997
docs/notebook_source/*.ipynb
9098
docs/notebook_source/*.csv

architecture/dataset-builders.md

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ Preparation (`_prepare_async_run`):
3535
4. Constructs `CompletionTracker`, `RowGroupBufferManager`, `AsyncTaskScheduler`
3636
5. Hooks `ProcessorRunner` for pre-batch and post-batch stages
3737

38-
`AsyncTaskScheduler` runs on a dedicated async loop with frontier-driven dispatch, semaphore-based capacity limits, salvage rounds for failed tasks, and order-dependent locks for columns that must execute sequentially. Ready frontier tasks are admitted through a virtual-time fair queue so one hot column or model-backed generator cannot consume the whole submission window before peer work gets a turn.
38+
`AsyncTaskScheduler` runs on a dedicated async loop with frontier-driven dispatch, task-admission leases, salvage rounds for failed tasks, and order-dependent locks for columns that must execute sequentially. Ready frontier tasks enter `FairTaskQueue`, are selected through virtual-time ordering, and are committed only after `TaskAdmissionController` acquires the required scheduler resources.
3939

4040
### Execution Graph
4141

@@ -121,19 +121,24 @@ DatasetBuilder.build()
121121
→ _prepare_async_run()
122122
→ ExecutionGraph.create()
123123
→ CompletionTracker.with_graph()
124-
→ AsyncTaskScheduler(semaphores, salvage_rounds)
124+
→ AsyncTaskScheduler(task admission, fair queue, salvage_rounds)
125125
→ scheduler.run()
126-
→ for each row group, fairly admit ready tasks from frontier
126+
→ admit row groups under the configured row-group cap
127+
→ fairly admit ready tasks from the frontier through task admission
127128
→ tasks execute generators, update CompletionTracker
128129
→ checkpoints via RowGroupBufferManager
129130
→ collect TaskTraces, emit telemetry
130131
```
131132

133+
Row-group admission is fixed by default in the dataset-builder path: the configured row-group concurrency is the hard in-flight cap. The scheduler also has an internal adaptive row-group mode for direct use that only raises a soft target up to that cap; it is additive ramp-up, not AIMD shrink/recovery behavior.
134+
135+
When request admission is available, async scheduling may use request-pressure snapshots as a read-only advisory during fair-queue selection. A request-pressured task can be skipped for an eligible peer without mutating request-admission state; provider/model/domain request limits remain owned by request admission.
136+
132137
## Design Decisions
133138

134139
- **Dual execution engines behind one API.** The sequential engine is simpler and easier to debug; the async engine adds row-group parallelism for throughput. Users switch via an environment variable without changing their code.
135140
- **DAG-driven ordering** ensures columns with dependencies (e.g., a judge column that depends on a text column) are generated in the correct order, regardless of the order they appear in the config.
136-
- **Fair async admission** keeps the scheduler flowing across ready columns and model groups. Global semaphores still bound memory/coroutine growth, while per-group virtual-time queues prevent a large ready frontier from degenerating into a column-by-column wave. LLM admission caps are peer-sensitive: a solo model group can fill available global capacity, but once another scheduling group has queued work the saturated group yields until peers get admission slots or admitted tasks complete.
141+
- **Fair async admission** keeps the scheduler flowing across ready columns and model groups. `FairTaskQueue.select_next(...)` chooses eligible ready work, `TaskAdmissionController` leases scheduler resources before spawn, and `FairTaskQueue.commit(...)` removes the selected task only after admission succeeds. Per-group virtual-time ordering prevents a large ready frontier from degenerating into a column-by-column wave, and scheduler-resource accounting remains separate from provider/model request admission.
137142
- **Salvage rounds in async mode** retry failed tasks after all other tasks in a round complete, improving resilience against transient LLM failures without blocking the entire generation.
138143
- **Unified DAG construction.** `topologically_sort_column_configs` (in `execution_graph.py`) determines column ordering using Kahn's algorithm; the runtime `ExecutionGraph` adds strategy-aware dependency tracking for the async scheduler.
139144

architecture/models.md

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Models
22

3-
The model subsystem provides a unified interface for LLM access: chat completions, embeddings, and image generation. It handles client creation, retry, rate-limit throttling, usage tracking, and MCP tool integration.
3+
The model subsystem provides a unified interface for LLM access: chat completions, embeddings, and image generation. It handles client creation, retry, request admission, usage tracking, and MCP tool integration.
44

55
Source: `packages/data-designer-engine/src/data_designer/engine/models/`
66

@@ -11,12 +11,12 @@ The model subsystem is layered:
1111
```
1212
ModelRegistry (lazy facade-per-alias)
1313
└── ModelFacade (completion, embeddings, image gen, MCP tool loops)
14-
└── ThrottledModelClient (AIMD rate limiting)
14+
└── ModelRequestExecutor (request admission + provider execution)
1515
└── ModelClient (OpenAI-compatible or Anthropic adapter)
1616
└── RetryTransport (httpx-level retries)
1717
```
1818

19-
Generators never interact with HTTP clients directly. They request a `ModelFacade` by alias from the `ModelRegistry`, which handles lazy construction and shared throttle state.
19+
Generators never interact with HTTP clients directly. They request a `ModelFacade` by alias from the `ModelRegistry`, which handles lazy construction, request-resource canonicalization, and shared adaptive request admission state.
2020

2121
## Key Components
2222

@@ -31,13 +31,13 @@ Defines the contract: sync/async chat, embeddings, image generation, `supports_*
3131

3232
`create_model_client` routes by provider type to the appropriate adapter. Optionally wraps with:
3333
- **`RetryTransport`** — httpx-level retries via `httpx_retries.RetryTransport`. `HttpModelClient` sets `strip_rate_limit_codes=True` for the async client and `False` for the sync client (`http_model_client.py`), which controls whether 429 responses are eligible for transport-layer retries.
34-
- **`ThrottledModelClient`**AIMD (Additive Increase, Multiplicative Decrease) concurrency control per throttle domain.
34+
- **`ModelRequestExecutor`**maps model-call attempts to request-admission items, acquires request leases, invokes the provider client, and releases the exact lease on every terminal path.
3535

36-
### ThrottleManager
36+
### Request Admission
3737

38-
Manages concurrency limits per `ThrottleDomain` (CHAT, EMBEDDING, IMAGE, HEALTHCHECK), keyed by `(provider_name, model_id)`. Thread-safe with a shared lock for sync/async access.
38+
`RequestAdmissionController` manages provider/model/domain request resources. `AdaptiveRequestAdmissionController` adds AIMD (Additive Increase, Multiplicative Decrease) adaptation per `RequestDomain` (`chat`, `embedding`, `image`, `healthcheck`) under the provider/model static cap.
3939

40-
`ThrottledModelClient` wraps each API call in a context manager that acquires/releases throttle capacity and adjusts limits on success (additive increase) or rate-limit errors (multiplicative decrease).
40+
`ModelRequestExecutor` wraps each provider call with a request-admission lease and feeds success or rate-limit outcomes back to the controller. `RequestResourceResolver` owns canonical provider/model/domain identity so aliases that target the same endpoint share request capacity.
4141

4242
When `rampup_seconds` is configured, `ThrottleManager` starts new domains at one concurrent request, climbs linearly toward the peak, and aborts to normal AIMD behavior on the first 429.
4343

@@ -52,7 +52,7 @@ The primary interface for generators. Holds a `ModelConfig`, `ModelClient`, opti
5252

5353
### ModelRegistry
5454

55-
Lazy `ModelFacade` construction per alias. Registers a shared `ThrottleManager` across all facades for coordinated rate limiting. Provides `get_model_usage_stats` and `log_model_usage` for post-build reporting.
55+
Lazy `ModelFacade` construction per alias. Registers shared request-admission state across all facades for coordinated provider/model/domain capacity. Provides `get_model_usage_stats` and `log_model_usage` for post-build reporting.
5656

5757
### Usage Tracking
5858

@@ -61,18 +61,18 @@ Lazy `ModelFacade` construction per alias. Registers a shared `ThrottleManager`
6161
## Data Flow
6262

6363
1. Generator requests a model by alias from `ModelRegistry`
64-
2. Registry lazily creates `ModelFacade` with the appropriate client and throttle config
64+
2. Registry lazily creates `ModelFacade` with the appropriate client and request-admission executor
6565
3. Generator calls `completion()` with prompt/messages
66-
4. `ModelFacade` builds kwargs, calls `ThrottledModelClient`
67-
5. Throttle layer acquires capacity, delegates to `ModelClient`
66+
4. `ModelFacade` builds kwargs, calls `ModelRequestExecutor`
67+
5. Request admission acquires a provider/model/domain lease, delegates to `ModelClient`
6868
6. `ModelClient` makes the HTTP request through `RetryTransport`
6969
7. Response flows back; usage is tracked; if MCP tools are configured, tool calls are executed and results fed back for another completion round
7070

7171
## Design Decisions
7272

73-
- **Facade pattern** hides HTTP, retry, throttle, and MCP complexity from generators. Generators see `completion()` and get back parsed results.
74-
- **AIMD throttling at the application layer** rather than relying solely on HTTP retries. This provides smoother throughput under rate limitsthe transport layer still handles many transient failures, while the throttle manager adjusts concurrency to avoid sustained 429 storms.
75-
- **429 handling depends on sync vs async `HttpModelClient`** — The async client uses `strip_rate_limit_codes=True`, so 429s are not retried at the transport layer and rate-limit signals reach `ThrottledModelClient` / AIMD quickly. The sync client uses `strip_rate_limit_codes=False`, so 429s may still be retried transparently at the transport layer before surfacing to callers.
73+
- **Facade pattern** hides HTTP, retry, request admission, and MCP complexity from generators. Generators see `completion()` and get back parsed results.
74+
- **AIMD request admission at the application layer** rather than relying solely on HTTP retries. This provides smoother throughput under rate limits: the transport layer still handles many transient failures, while adaptive request admission adjusts concurrency to avoid sustained 429 storms.
75+
- **429 handling depends on sync vs async `HttpModelClient`** — The async client uses `strip_rate_limit_codes=True`, so 429s are not retried at the transport layer and rate-limit signals reach `ModelRequestExecutor` / request admission quickly. The sync client uses `strip_rate_limit_codes=False`, so 429s may still be retried transparently at the transport layer before surfacing to callers.
7676
- **Distribution-valued inference parameters** (`temperature`, `top_p` as `UniformDistribution` or `ManualDistribution`) enable controlled randomness across a dataset without per-row config changes.
7777
- **Lazy facade construction** avoids health-checking or connecting to models that are configured but never used in a particular generation run.
7878

architecture/overview.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ Users declare what their data should look like through config objects (columns,
3030
| `DataDesigner` | `data-designer` | Public API — `create()`, `preview()`, `validate()` |
3131
| `DataDesignerConfigBuilder` | `data-designer-config` | Fluent builder for dataset configs |
3232
| `DatasetBuilder` | `data-designer-engine` | Orchestrates generation (sync or async) |
33-
| `ModelFacade` / `ModelRegistry` | `data-designer-engine` | LLM client abstraction with retry, throttle, usage tracking |
33+
| `ModelFacade` / `ModelRegistry` | `data-designer-engine` | LLM client abstraction with retry, request admission, usage tracking |
3434
| `MCPFacade` / `MCPRegistry` | `data-designer-engine` | Tool execution via Model Context Protocol |
3535
| `ColumnGeneratorRegistry` | `data-designer-engine` | Maps column types to generator implementations |
3636
| `PluginRegistry` | `data-designer-config` | Discovers and registers entry-point plugins |
@@ -44,7 +44,7 @@ Users declare what their data should look like through config objects (columns,
4444

4545
3. **Generation**`DatasetBuilder` instantiates column generators from the registry, then executes one of two paths:
4646
- **Sequential** (default): batch loop over columns in topological order. Each generator produces its column via `CELL_BY_CELL` (threaded fan-out) or `FULL_COLUMN` strategy.
47-
- **Async** (`DATA_DESIGNER_ASYNC_ENGINE=1`): builds an `ExecutionGraph`, partitions rows into groups, and dispatches tasks via `AsyncTaskScheduler` with semaphore-based concurrency, salvage rounds, and per-row-group checkpointing.
47+
- **Async** (`DATA_DESIGNER_ASYNC_ENGINE=1`): builds an `ExecutionGraph`, partitions rows into groups, and dispatches tasks via `AsyncTaskScheduler` with `FairTaskQueue` selection, `TaskAdmissionController` scheduler-resource leases, salvage rounds, and per-row-group checkpointing.
4848

4949
4. **Post-processing**`ProcessorRunner` applies transformations (pre-batch, post-batch, after-generation). Profilers analyze the generated dataset.
5050

@@ -61,7 +61,7 @@ Users declare what their data should look like through config objects (columns,
6161

6262
- [Config Layer](config.md) — builder API, column types, model configs, plugin system
6363
- [Engine Layer](engine.md) — compilation, generators, registries
64-
- [Models](models.md) — model facade, adapters, retry/throttle
64+
- [Models](models.md) — model facade, adapters, retry, request admission
6565
- [Dataset Builders](dataset-builders.md) — sync/async orchestration, DAG, batching
6666
- [MCP](mcp.md) — tool execution, session pooling
6767
- [Sampling](sampling.md) — statistical generators, person/entity data

0 commit comments

Comments
 (0)