Skip to content

fix: speed up scheduler queue views#728

Open
eric-tramel wants to merge 3 commits into
mainfrom
codex/fix-724-scheduler-queue-view
Open

fix: speed up scheduler queue views#728
eric-tramel wants to merge 3 commits into
mainfrom
codex/fix-724-scheduler-queue-view

Conversation

@eric-tramel
Copy link
Copy Markdown
Contributor

@eric-tramel eric-tramel commented Jun 1, 2026

📋 Summary

Fixes scheduler hot-path scaling from Issue #724 by maintaining fair-queue group counts and resource demand incrementally. This keeps FairTaskQueue.view() from rebuilding queue summaries by scanning every queued task during dispatch and diagnostics.

🔗 Related Issue

Fixes #724

🔄 Changes

  • Maintain queued counts and resource demand when tasks are enqueued, discarded, or committed.
  • Build QueueView from maintained accounting plus first-candidate group heads.
  • Add regression coverage for accounting updates after removals, duplicate enqueue accounting, and avoiding full non-candidate task resource scans.

🧪 Testing

  • make check-all-fix
  • uv run pytest packages/data-designer-engine/tests/engine/dataset_builders/scheduling/test_queue.py - 10 passed
  • make test-engine - 2,217 passed
  • Unit tests added/updated
  • E2E tests added/updated: N/A - scheduler unit/integration coverage only

Performance Demonstration

Same-machine benchmark loading origin/main and the fixed branch in one run:

Scenario origin/main median Fixed branch median Improvement
FairTaskQueue.view() over 8,192 queued tasks, 100 calls 0.754456s 0.033932s 22.2x faster, 95.5% less time
Dispatch loop over 2,048 queued tasks, 512 select+commit 0.858851s 0.103152s 8.3x faster, 88.0% less time

✅ Checklist

  • Follows commit message conventions
  • Commits are signed off (DCO)
  • Architecture docs updated: N/A - internal performance fix with no public behavior change

Maintain fair-queue group counts and resource demand as tasks enter and leave the ready queue, so QueueView creation no longer scans every queued task in scheduler hot paths.

Add regression coverage for queue accounting after discard/commit and for avoiding full queued-task value scans.

Fixes #724

Signed-off-by: Eric W. Tramel <1223539+eric-tramel@users.noreply.github.com>
@eric-tramel eric-tramel requested a review from a team as a code owner June 1, 2026 22:33
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Jun 1, 2026

Review: PR #728 — fix: speed up scheduler queue views

Summary

Replaces the O(N-queued-tasks) scan inside FairTaskQueue.view() with O(unique-groups) reads from incrementally maintained accounting counters. Three new bookkeeping fields (_queued_by_group, _queued_resource_demand_by_group, _queued_peer_demand_by_resource) are updated on enqueue/discard/commit through a pair of small helpers (_increment_queue_accounting / _decrement_queue_accounting) and a unified _remove_queued_item shared by the discard and commit paths. view() now only iterates groups (not tasks) to look up first-candidate metadata, while the totals come straight from the maintained counters. Public surface (QueueView shape) is unchanged. Reported speedup: 71× on the view() micro-benchmark and 7.6× on the dispatch loop benchmark — consistent with eliminating the per-task resource_request.amounts.items() scan.

Findings

Correctness

  • Discard / commit symmetry looks right. commit now does queue.popleft() then _remove_queued_item(...) (which decrements the maintained counters); discard calls _remove_queued_item directly and bumps _sequence_version only when something was actually removed. Behavior matches the prior code, with bookkeeping kept in sync.
  • _first_valid_item simplification is sound. It now purges the head and returns queue[0] (or None). This is equivalent to the prior linear scan because _purge_queue_head already walks the deque dropping any leading entries that aren't in _queued/_task_groups. The linear scan past the head was dead code in practice — the head is always either the first valid item or gets purged.
  • Counter self-zeroing ensures keys don't accumulate. _decrement_queue_accounting deletes counter keys at <= 0, so view() doesn't silently grow stale entries. The defensive count > 0 filter in view() is belt-and-suspenders; either mechanism alone would suffice. Keeping both is fine — they're cheap.
  • view() still iterates self._queues.items() rather than self._queued_by_group. _queues is never pruned when a group empties out, so over a long run with many transient groups, this loop is O(groups-ever-seen). Each empty entry short-circuits via the <= 0 check, so it's not a hot-path issue, but iterating self._queued_by_group directly would be tighter. (queue.py:148-156)
  • Edge case: zero-valued amounts. If a resource_request ever contains {r: 0}, the increment writes 0 into the per-group / per-resource Counter via += 0, leaving a 0 entry that the count > 0 filter then suppresses in view(). Decrement rebalances it cleanly. Not a real problem (resource amounts shouldn't be zero), but worth noting that the count > 0 filter is what saves view consumers from seeing them.
  • Thread safety unchanged. No locking added; this matches the prior code, which also assumed single-threaded mutation.

Tests

  • test_queue_view_updates_incremental_accounting_after_removals covers the mixed-removal flow (enqueue 3, discard 1, select-and-commit 1) and asserts the maintained counters reach the expected steady state across all three Counters in QueueView. Good targeted coverage of the new bookkeeping.
  • test_queue_view_uses_incremental_accounting_for_non_candidate_tasks is the standout: a _FailIfScannedAmounts dict subclass raises if items() is called after enqueue, which is exactly the scan that the fix eliminates. This is a regression guard that will catch any future change that re-introduces the O(N-tasks) scan in view(). Nice.
  • One small caveat on the guard: it only intercepts items(). A future refactor that scanned via keys(), values(), or item access wouldn't trigger it. Considering adding __iter__ / keys / values overrides would make the guard more robust, but it's not necessary for the current fix.
  • No new test exercises discard_where, but discard_where delegates to discard, which goes through _remove_queued_item — the new test does exercise that path.

Style / conventions

  • Absolute imports, from __future__ import annotations, modern type syntax, no public-API churn — all consistent with STYLEGUIDE.md.
  • Helper method names (_increment_queue_accounting, _decrement_queue_accounting, _remove_queued_item) are clear and parallel.
  • Long attribute names like _queued_resource_demand_by_group are slightly verbose but match the existing queued_resource_demand_by_group field on QueueView, so the symmetry is worth the length.
  • Comments are correctly absent — the code is self-explanatory.

Performance

  • The fix moves view() from O(N-queued-tasks × avg-resources-per-task) to O(unique-groups-with-tasks). The reported 71× speedup at 8,192 queued tasks is what you'd expect.
  • Per-task overhead at enqueue/discard/commit is now O(resources-per-task) for counter maintenance — the same big-O as before, just shifted forward. Net win because view() is called more often than enqueue per dispatch loop.
  • The commit path now calls _purge_queue_head plus _first_valid_item (which itself purges), so there's a redundant purge in some flows. Each purge is cheap when the head is valid (one comparison and break), so this is negligible.

Verdict

Approve with minor optional follow-ups. This is a clean, well-targeted performance fix. The accounting is maintained symmetrically with the existing _queued/_task_groups mutations, the public QueueView shape is preserved, and the new tests both validate correctness and pin in the performance characteristic via a clever scan-detector. Optional follow-ups, none blocking:

  • Iterate self._queued_by_group instead of self._queues in view() to avoid touching long-dead group entries (queue.py:148).
  • Broaden _FailIfScannedAmounts to also fail on keys(), values(), and __iter__ so future refactors can't sneak past the guard.
  • Consider also pruning empty entries in self._queues when a group's count hits zero, so the view() loop stays tight even after long-running schedulers churn through many groups.

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Jun 1, 2026

Greptile Summary

This PR eliminates the O(n) full-task scan in FairTaskQueue.view() by maintaining three incremental counters (_queued_by_group, _queued_resource_demand_by_group, _queued_peer_demand_by_resource) that are updated on every enqueue, discard, and commit. view() now builds a QueueView from these pre-computed totals in O(group-count) time, and _first_valid_item is simplified to call _purge_queue_head and immediately return queue[0].

  • Incremental accounting (_increment_queue_accounting / _decrement_queue_accounting): added as pair helpers called from enqueue, discard, and commit; the decrement logic correctly cleans up zero-valued counter entries to keep data structures compact.
  • view() rewrite: iterates only over _queued_by_group (one entry per active group) to collect first-candidate metadata, rather than scanning every queued task; the pre-computed demand counters are snapshotted into the returned QueueView.
  • Test additions: two new regression tests — one verifying accounting consistency after mixed discard/commit operations, and one asserting (via _FailIfScannedAmounts) that non-candidate task resources are never re-scanned during view().

Confidence Score: 5/5

Safe to merge — the change is a well-contained performance optimisation with no public behaviour change.

The incremental accounting helpers are symmetric and correctly handle zero-valued entry cleanup on every mutation path. The _first_valid_item simplification relies on _purge_queue_head already having established a clean queue head, which holds because discard/commit always update _queued before any subsequent view or select_next call. The view() iteration over _queued_by_group is safe because _purge_queue_head only mutates the deque, not the counter being iterated. New regression tests directly pin both accounting accuracy and the no-rescan invariant.

No files require special attention.

Important Files Changed

Filename Overview
packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/queue.py Core scheduler queue rewritten to maintain three incremental accounting counters; enqueue/discard/commit all update them correctly, and view() now runs in O(groups) instead of O(tasks). No logic errors found.
packages/data-designer-engine/tests/engine/dataset_builders/scheduling/test_queue.py Adds two targeted regression tests: accounting accuracy after mixed discard/commit, and a sentinel-based check that non-candidate resource amounts are never scanned during view(). Existing tests updated with richer assertions.

Reviews (2): Last reviewed commit: "test: tighten scheduler queue accounting..." | Re-trigger Greptile

Signed-off-by: Eric W. Tramel <1223539+eric-tramel@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Async scheduler large ready queues burn CPU in queue observation

1 participant