Skip to content

Commit c827f66

Browse files
committed
fix scheduler queue view scaling
Maintain fair-queue group counts and resource demand as tasks enter and leave the ready queue, so QueueView creation no longer scans every queued task in scheduler hot paths. Add regression coverage for queue accounting after discard/commit and for avoiding full queued-task value scans. Fixes #724 Signed-off-by: Eric W. Tramel <1223539+eric-tramel@users.noreply.github.com>
1 parent b418153 commit c827f66

2 files changed

Lines changed: 135 additions & 27 deletions

File tree

  • packages/data-designer-engine
    • src/data_designer/engine/dataset_builders/scheduling
    • tests/engine/dataset_builders/scheduling

packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/queue.py

Lines changed: 59 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ def __init__(self) -> None:
4646
self._queued: dict[str, SchedulableTask] = {}
4747
self._task_groups: dict[str, TaskGroupKey] = {}
4848
self._group_specs: dict[TaskGroupKey, TaskGroupSpec] = {}
49+
self._queued_by_group: Counter[TaskGroupKey] = Counter()
50+
self._queued_resource_demand_by_group: dict[TaskGroupKey, Counter[SchedulerResourceKey]] = defaultdict(Counter)
51+
self._queued_peer_demand_by_resource: Counter[SchedulerResourceKey] = Counter()
4952
self._group_finish: dict[TaskGroupKey, float] = {}
5053
self._heap: list[tuple[float, int, TaskGroupKey]] = []
5154
self._active_heap_keys: set[TaskGroupKey] = set()
@@ -69,6 +72,7 @@ def enqueue(self, items: Iterable[SchedulableTask]) -> tuple[str, ...]:
6972
queue.append(item)
7073
self._queued[item.task_id] = item
7174
self._task_groups[item.task_id] = item.group.key
75+
self._increment_queue_accounting(item)
7276
self._activate_group(item.group.key)
7377
accepted.append(item.task_id)
7478
if accepted:
@@ -77,10 +81,8 @@ def enqueue(self, items: Iterable[SchedulableTask]) -> tuple[str, ...]:
7781

7882
def discard(self, task_id: str) -> None:
7983
"""Remove a queued task lazily if it is no longer dispatchable."""
80-
if task_id in self._queued:
84+
if self._remove_queued_item(task_id) is not None:
8185
self._sequence_version += 1
82-
self._queued.pop(task_id, None)
83-
self._task_groups.pop(task_id, None)
8486

8587
def discard_where(self, predicate: Callable[[SchedulableTask], bool]) -> None:
8688
"""Remove queued tasks matching a predicate."""
@@ -125,8 +127,7 @@ def commit(self, selection: QueueSelection) -> SchedulableTask | None:
125127
return None
126128

127129
queue.popleft()
128-
self._queued.pop(item.task_id, None)
129-
self._task_groups.pop(item.task_id, None)
130+
self._remove_queued_item(item.task_id)
130131
self._active_heap_keys.discard(key)
131132
self._active_heap_entries.pop(key, None)
132133
group = self._group_specs[key]
@@ -140,35 +141,34 @@ def commit(self, selection: QueueSelection) -> SchedulableTask | None:
140141
return item
141142

142143
def view(self) -> QueueView:
143-
queued_by_group: Counter[TaskGroupKey] = Counter()
144-
demand_by_group: dict[TaskGroupKey, dict[SchedulerResourceKey, int]] = defaultdict(lambda: defaultdict(int))
145144
first_by_group: dict[TaskGroupKey, Mapping[SchedulerResourceKey, int]] = {}
146145
first_tasks_by_group: dict[TaskGroupKey, SchedulableTask] = {}
147146
first_group_specs: dict[TaskGroupKey, TaskGroupSpec] = {}
148-
demand_by_resource: Counter[SchedulerResourceKey] = Counter()
149-
150-
for item in self._queued.values():
151-
key = item.group.key
152-
queued_by_group[key] += 1
153-
for resource, amount in item.resource_request.amounts.items():
154-
demand_by_group[key][resource] += amount
155-
demand_by_resource[resource] += amount
156147

157148
for key, queue in self._queues.items():
149+
if self._queued_by_group.get(key, 0) <= 0:
150+
continue
158151
first = self._first_valid_item(key)
159-
if first is not None:
160-
first_by_group[key] = dict(first.resource_request.amounts)
161-
first_tasks_by_group[key] = first
162-
first_group_specs[key] = first.group
152+
if first is None:
153+
continue
154+
first_by_group[key] = dict(first.resource_request.amounts)
155+
first_tasks_by_group[key] = first
156+
first_group_specs[key] = first.group
163157

164158
return QueueView(
165159
queued_total=len(self._queued),
166-
queued_by_group=dict(queued_by_group),
167-
queued_resource_demand_by_group={key: dict(value) for key, value in demand_by_group.items()},
160+
queued_by_group={key: count for key, count in self._queued_by_group.items() if count > 0},
161+
queued_resource_demand_by_group={
162+
key: {resource: count for resource, count in value.items() if count > 0}
163+
for key, value in self._queued_resource_demand_by_group.items()
164+
if self._queued_by_group.get(key, 0) > 0
165+
},
168166
first_candidate_resources_by_group=first_by_group,
169167
first_candidate_tasks_by_group=first_tasks_by_group,
170168
first_candidate_group_specs_by_group=first_group_specs,
171-
queued_peer_demand_by_resource=dict(demand_by_resource),
169+
queued_peer_demand_by_resource={
170+
resource: count for resource, count in self._queued_peer_demand_by_resource.items() if count > 0
171+
},
172172
)
173173

174174
def _activate_group(self, key: TaskGroupKey) -> None:
@@ -183,13 +183,11 @@ def _activate_group(self, key: TaskGroupKey) -> None:
183183
self._active_heap_entries[key] = (finish, self._sequence)
184184

185185
def _first_valid_item(self, key: TaskGroupKey) -> SchedulableTask | None:
186+
self._purge_queue_head(key)
186187
queue = self._queues.get(key)
187-
if queue is None:
188+
if not queue:
188189
return None
189-
for item in queue:
190-
if item.task_id in self._queued and self._task_groups.get(item.task_id) == key:
191-
return item
192-
return None
190+
return queue[0]
193191

194192
def _purge_queue_head(self, key: TaskGroupKey) -> None:
195193
queue = self._queues.get(key)
@@ -200,3 +198,37 @@ def _purge_queue_head(self, key: TaskGroupKey) -> None:
200198
if item.task_id in self._queued and self._task_groups.get(item.task_id) == key:
201199
break
202200
queue.popleft()
201+
202+
def _increment_queue_accounting(self, item: SchedulableTask) -> None:
203+
key = item.group.key
204+
self._queued_by_group[key] += 1
205+
for resource, amount in item.resource_request.amounts.items():
206+
self._queued_resource_demand_by_group[key][resource] += amount
207+
self._queued_peer_demand_by_resource[resource] += amount
208+
209+
def _remove_queued_item(self, task_id: str) -> SchedulableTask | None:
210+
item = self._queued.pop(task_id, None)
211+
key = self._task_groups.pop(task_id, None)
212+
if item is None or key is None:
213+
return item
214+
self._decrement_queue_accounting(item, key)
215+
return item
216+
217+
def _decrement_queue_accounting(self, item: SchedulableTask, key: TaskGroupKey) -> None:
218+
self._queued_by_group[key] -= 1
219+
if self._queued_by_group[key] <= 0:
220+
del self._queued_by_group[key]
221+
222+
group_demand = self._queued_resource_demand_by_group.get(key)
223+
if group_demand is not None:
224+
for resource, amount in item.resource_request.amounts.items():
225+
group_demand[resource] -= amount
226+
if group_demand[resource] <= 0:
227+
del group_demand[resource]
228+
if not group_demand:
229+
del self._queued_resource_demand_by_group[key]
230+
231+
for resource, amount in item.resource_request.amounts.items():
232+
self._queued_peer_demand_by_resource[resource] -= amount
233+
if self._queued_peer_demand_by_resource[resource] <= 0:
234+
del self._queued_peer_demand_by_resource[resource]

packages/data-designer-engine/tests/engine/dataset_builders/scheduling/test_queue.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44
from __future__ import annotations
55

66
from collections import Counter
7+
from collections.abc import ItemsView
78

89
from data_designer.engine.dataset_builders.scheduling.queue import FairTaskQueue, QueueView
910
from data_designer.engine.dataset_builders.scheduling.resources import (
1011
SchedulableTask,
12+
SchedulerResourceKey,
1113
SchedulerResourceRequest,
1214
TaskGroupKey,
1315
TaskGroupSpec,
@@ -16,6 +18,15 @@
1618
from data_designer.engine.dataset_builders.scheduling.task_model import Task
1719

1820

21+
class _FailIfScannedAmounts(dict[SchedulerResourceKey, int]):
22+
locked: bool = False
23+
24+
def items(self) -> ItemsView[SchedulerResourceKey, int]:
25+
if self.locked:
26+
raise AssertionError("QueueView should use incremental accounting for non-candidate tasks.")
27+
return super().items()
28+
29+
1930
def _task(column: str, row_index: int) -> Task:
2031
return Task(column=column, row_group=0, row_index=row_index, task_type="cell")
2132

@@ -157,3 +168,68 @@ def test_queue_view_exposes_group_and_resource_demand() -> None:
157168
assert view.queued_by_group[group.key] == 1
158169
assert view.queued_resource_demand_by_group[group.key]["llm_wait"] == 1
159170
assert view.first_candidate_resources_by_group[group.key]["submission"] == 1
171+
172+
173+
def test_queue_view_updates_incremental_accounting_after_removals() -> None:
174+
queue = FairTaskQueue()
175+
first_group = _group("a")
176+
second_group = _group("b")
177+
first = SchedulableTask(
178+
task_id=stable_task_id(_task("a", 0)),
179+
payload=_task("a", 0),
180+
group=first_group,
181+
resource_request=SchedulerResourceRequest({"submission": 1, "llm_wait": 2}),
182+
)
183+
second = SchedulableTask(
184+
task_id=stable_task_id(_task("b", 0)),
185+
payload=_task("b", 0),
186+
group=second_group,
187+
resource_request=SchedulerResourceRequest({"submission": 1, "llm_wait": 3}),
188+
)
189+
third = SchedulableTask(
190+
task_id=stable_task_id(_task("b", 1)),
191+
payload=_task("b", 1),
192+
group=second_group,
193+
resource_request=SchedulerResourceRequest({"submission": 1, "local": 1}),
194+
)
195+
queue.enqueue([first, second, third])
196+
197+
queue.discard(first.task_id)
198+
committed = _select_and_commit(queue)
199+
200+
assert committed == second
201+
view = queue.view()
202+
assert view.queued_total == 1
203+
assert first_group.key not in view.queued_by_group
204+
assert view.queued_by_group == {second_group.key: 1}
205+
assert view.queued_resource_demand_by_group == {second_group.key: {"submission": 1, "local": 1}}
206+
assert view.queued_peer_demand_by_resource == {"submission": 1, "local": 1}
207+
208+
209+
def test_queue_view_uses_incremental_accounting_for_non_candidate_tasks() -> None:
210+
queue = FairTaskQueue()
211+
guarded_amounts: list[_FailIfScannedAmounts] = []
212+
items: list[SchedulableTask] = []
213+
for group_index in range(8):
214+
group = _group(f"group-{group_index}")
215+
items.append(_item(f"group-{group_index}", 0, group))
216+
for row in range(1, 32):
217+
amounts = _FailIfScannedAmounts({"submission": 1})
218+
task = _task(f"group-{group_index}", row)
219+
items.append(
220+
SchedulableTask(
221+
task_id=stable_task_id(task),
222+
payload=task,
223+
group=group,
224+
resource_request=SchedulerResourceRequest(amounts),
225+
)
226+
)
227+
guarded_amounts.append(amounts)
228+
queue.enqueue(items)
229+
for amounts in guarded_amounts:
230+
amounts.locked = True
231+
232+
view = queue.view()
233+
234+
assert view.queued_total == 256
235+
assert sum(view.queued_by_group.values()) == 256

0 commit comments

Comments
 (0)