feat: add fair async task scheduling#639
Conversation
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
Code Review: PR #639 — feat: add fair async task schedulingSummaryAdds a virtual-time fair admission layer ( Net change: +626 / -45 across the scheduler, a new FindingsCorrectness
Design / API
Style / Conventions
Test CoverageCoverage is solid:
Gaps worth considering:
Performance / Risk
Security
VerdictSolid implementation of a well-motivated change. The fair-queue policy is reasonable, the data-structure choices (per-group deque + min-heap by virtual finish time) match the cited references, and the encapsulation of Recommended before merge:
None of these are blocking; the change can land as-is and these can be follow-ups. |
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
|
Addressed the review action items in follow-up commit
Validation run locally:
|
Greptile SummaryThis PR introduces
|
| Filename | Overview |
|---|---|
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/fair_task_queue.py | New virtual-time fair queue with per-group FIFO admission and heap-based dispatch; core logic is correct but _has_queued_peer_group scans all queued-task entries rather than a group-key index, giving O(queued tasks) per blocked-group admission check. |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py | Main scheduler refactored to route all frontier tasks through FairTaskQueue; seed bypass, stateful-lock ordering, and salvage paths are preserved; FrontierDelta integration and deadlock guard in _drain_frontier look correct. |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/completion_tracker.py | CompletionTracker updated to return FrontierDelta from all mutation methods; delta construction and frontier management look correct. |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/scheduling_hints.py | New module resolves per-generator scheduling metadata (provider, model, weight) once at scheduler init; partial-alias fallback correctly preserves accumulated weight. |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py | on_seeds_complete callback updated to return FrontierDelta by aggregating delta tuples from per-row drop_row calls; correct and backward-compatible (None return still accepted by scheduler). |
| packages/data-designer-engine/tests/engine/dataset_builders/utils/test_fair_task_queue.py | New unit tests covering queue fairness, per-group admission caps, stale pruning, and peer-sensitive bypass; good coverage of key edge cases. |
| packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py | Expanded with fair-column-admission and LLM-bound-admission integration tests; error-rate-shutdown test updated to check early_shutdown flag and log output rather than internal _rg_states, which is more accurate for the 0-survivor case. |
Sequence Diagram
sequenceDiagram
participant DL as _main_dispatch_loop
participant FQ as FairTaskQueue
participant SS as SubmissionSemaphore
participant W as Worker(_execute_task_inner)
participant CT as CompletionTracker
DL->>FQ: has_queued_tasks?
FQ-->>DL: true
DL->>SS: try_acquire()
SS-->>DL: ok
DL->>FQ: admit_next()
FQ-->>DL: TaskSelection(task, group)
DL->>W: spawn _execute_task(task)
W->>CT: mark_cell_complete / mark_row_range_complete
CT-->>W: FrontierDelta(added, removed)
W->>FQ: enqueue(added tasks)
W->>FQ: discard(removed tasks)
W->>FQ: release(task)
W->>SS: release()
W->>DL: wake_event.set()
Prompt To Fix All With AI
Fix the following 1 code review issue. Work through them one at a time, proposing concise fixes.
---
### Issue 1 of 1
packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/fair_task_queue.py:155-156
`_has_queued_peer_group` iterates over every entry in `_task_groups`, which maps one entry per queued task. In a workload with many queued LLM-bound tasks across a small number of groups, this scan is O(total queued tasks) rather than O(distinct groups). Maintaining a separate `Counter` or `set` of group keys with at least one queued task would make this check O(1).
```suggestion
def _has_queued_peer_group(self, key: TaskGroupKey) -> bool:
return any(queued_key != key for queued_key in self._queued_group_keys)
```
Reviews (5): Last reviewed commit: "Merge branch 'main' into codex/fair-asyn..." | Re-trigger Greptile
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
|
Pushed the structural correction in What changed:
Local validation:
This aligns the implementation with the scratch benchmark result: keep the fairness policy, but make readiness incremental so scheduler cost scales with work movement rather than repeatedly sorting/syncing the whole backlog. |
|
Want your agent to iterate on Greptile's feedback? Try greploops. |
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
|
Dead-code cleanup pushed in Removed:
Validation:
|
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
|
Refactor pushed in Implementation shape:
LOC effect:
Validation:
|
|
Not blocking for this PR, but I wonder if the fairness criteria should eventually incorporate observed runtime stats rather than only static configured parallelism. Right now the weight is based on Could we make the scheduler track an EWMA of per-group runtime, queue wait, or completed-throughput and adjust the effective cost/weight over time? For example, virtual finish could charge longer-running groups more wall-clock cost, or effective weight could become something like configured capacity divided by observed runtime. That might make fairness closer to actual provider throughput, especially when one model/provider is much slower or degraded. Happy to discuss as a follow-up mechanism after the liveness fix. |
|
Small testing-standards nit: a few new tests exercise private scheduler internals directly ( |
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
|
Addressed the testing-standards nit in
Validation after merging the worker fixes:
|
📋 Summary
Adds a virtual-time fair admission layer to the async dataset scheduler so a large ready frontier cannot let one column or model-backed generator consume the full submission window. The existing semaphore caps and LLM-wait handoff remain in place; this changes task admission order, not public APIs.
🔗 Related Issue
N/A
🔄 Changes
FairTaskQueue, a per-group FIFO plus heap-backed virtual-time scheduler for ready async tasks.AsyncTaskScheduleronly supplies group specs and dispatches selected tasks.Implementation notes
FairTaskQueuekeeps one FIFO queue per scheduling group and a global min-heap ordered by virtual finish time. Each dispatch pops one eligible group, admits one task, charges1 / weight, and reinserts the group if more work is queued. Global_submission_semaphoreand_llm_wait_semaphoreremain the hard capacity controls; per-group admitted counts prevent a single LLM-bound flow from filling the LLM-wait window.The group identity includes the logical task flow, so sibling ready columns get turns even when they share the same provider/model. Model-backed groups also include provider, model, and generation type; custom generators with model aliases group by the custom flow plus alias set because the exact model call path is user code.
📊 Benchmark proof point
A scratch benchmark compared merge-base
1d203b1dwith this PR at0973d1feusing a realDataDesigner.create()run over384records, local mock network endpoints,1warmup trial, and3measured trials. The workflow had a slow branch plus a fast branch gate, three fast downstream branches, a deeper review task, and a terminalrecord_donetask. Endpoint pools were capped at32concurrent requests each; slow requests slept120ms, fast requests15ms, and terminal requests5ms.Under an adversarial slow-first frontier order that models the slot-hoarding failure mode:
slow_extract=256slow_extract=128,branch_gate=1280.209s0.057s1.150s0.414s79.7%92.6%1.152s0.413s1.0s0 / 384~207 / 3841.581s1.591sThe wall-clock tail is effectively unchanged because the slow endpoint remains the true bottleneck. The proof point is resource flow: the PR keeps downstream/parallel resources active much earlier instead of letting the slow frontier monopolize the scheduling window.
References
🧪 Testing
uv run --group dev pytest packages/data-designer-engine/tests/engine/dataset_builders/utils/test_fair_task_queue.py packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py -q(58 passed)uv run --group dev pytest packages/data-designer-engine/tests/engine/dataset_builders packages/data-designer-engine/tests/engine/models -q(983 passed)make check-enginemake check-allmake test(562 config + 2010 engine + 717 interface tests passed)DATA_DESIGNER_ASYNC_ENGINE=1 uv run --project tests_e2e pytest tests -q(6 passed,2 skippedfor credential-gated live-provider tests)DATA_DESIGNER_ASYNC_ENGINE=1 uv run --package data-designer python docs/assets/recipes/plugin_development/markdown_seed_reader.py/tmp/dd-fair-scheduler-benchmark-rvQn7u/scripts/fair_scheduler_benchmark.pyagainst merge-base and PR commits✅ Checklist
Description updated with AI