Skip to content

Commit f2b781f

Browse files
committed
test: tighten scheduler queue accounting coverage
Signed-off-by: Eric W. Tramel <1223539+eric-tramel@users.noreply.github.com>
1 parent 8baf165 commit f2b781f

2 files changed

Lines changed: 34 additions & 34 deletions

File tree

  • packages/data-designer-engine
    • src/data_designer/engine/dataset_builders/scheduling
    • tests/engine/dataset_builders/scheduling

packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/queue.py

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -145,9 +145,7 @@ def view(self) -> QueueView:
145145
first_tasks_by_group: dict[TaskGroupKey, SchedulableTask] = {}
146146
first_group_specs: dict[TaskGroupKey, TaskGroupSpec] = {}
147147

148-
for key, queue in self._queues.items():
149-
if self._queued_by_group.get(key, 0) <= 0:
150-
continue
148+
for key in self._queued_by_group:
151149
first = self._first_valid_item(key)
152150
if first is None:
153151
continue
@@ -157,18 +155,14 @@ def view(self) -> QueueView:
157155

158156
return QueueView(
159157
queued_total=len(self._queued),
160-
queued_by_group={key: count for key, count in self._queued_by_group.items() if count > 0},
158+
queued_by_group=dict(self._queued_by_group),
161159
queued_resource_demand_by_group={
162-
key: {resource: count for resource, count in value.items() if count > 0}
163-
for key, value in self._queued_resource_demand_by_group.items()
164-
if self._queued_by_group.get(key, 0) > 0
160+
key: dict(value) for key, value in self._queued_resource_demand_by_group.items()
165161
},
166162
first_candidate_resources_by_group=first_by_group,
167163
first_candidate_tasks_by_group=first_tasks_by_group,
168164
first_candidate_group_specs_by_group=first_group_specs,
169-
queued_peer_demand_by_resource={
170-
resource: count for resource, count in self._queued_peer_demand_by_resource.items() if count > 0
171-
},
165+
queued_peer_demand_by_resource=dict(self._queued_peer_demand_by_resource),
172166
)
173167

174168
def _activate_group(self, key: TaskGroupKey) -> None:

packages/data-designer-engine/tests/engine/dataset_builders/scheduling/test_queue.py

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -129,14 +129,25 @@ def test_select_next_uses_scheduler_eligibility_callback() -> None:
129129

130130
def test_enqueue_is_idempotent_by_task_id() -> None:
131131
queue = FairTaskQueue()
132-
item = _item("a", 0)
132+
group = _group("a")
133+
task = _task("a", 0)
134+
item = SchedulableTask(
135+
task_id=stable_task_id(task),
136+
payload=task,
137+
group=group,
138+
resource_request=SchedulerResourceRequest({"submission": 1, "llm_wait": 2}),
139+
)
133140

134141
first = queue.enqueue([item])
135142
second = queue.enqueue([item])
143+
view = queue.view()
136144

137145
assert first == (item.task_id,)
138146
assert second == ()
139-
assert queue.view().queued_total == 1
147+
assert view.queued_total == 1
148+
assert view.queued_by_group == {group.key: 1}
149+
assert view.queued_resource_demand_by_group == {group.key: {"submission": 1, "llm_wait": 2}}
150+
assert view.queued_peer_demand_by_resource == {"submission": 1, "llm_wait": 2}
140151

141152

142153
def test_discard_where_removes_matching_tasks() -> None:
@@ -208,28 +219,23 @@ def test_queue_view_updates_incremental_accounting_after_removals() -> None:
208219

209220
def test_queue_view_uses_incremental_accounting_for_non_candidate_tasks() -> None:
210221
queue = FairTaskQueue()
211-
guarded_amounts: list[_FailIfScannedAmounts] = []
212-
items: list[SchedulableTask] = []
213-
for group_index in range(8):
214-
group = _group(f"group-{group_index}")
215-
items.append(_item(f"group-{group_index}", 0, group))
216-
for row in range(1, 32):
217-
amounts = _FailIfScannedAmounts({"submission": 1})
218-
task = _task(f"group-{group_index}", row)
219-
items.append(
220-
SchedulableTask(
221-
task_id=stable_task_id(task),
222-
payload=task,
223-
group=group,
224-
resource_request=SchedulerResourceRequest(amounts),
225-
)
226-
)
227-
guarded_amounts.append(amounts)
228-
queue.enqueue(items)
229-
for amounts in guarded_amounts:
230-
amounts.locked = True
222+
group = _group("a")
223+
first = _item("a", 0, group)
224+
amounts = _FailIfScannedAmounts({"submission": 1})
225+
task = _task("a", 1)
226+
second = SchedulableTask(
227+
task_id=stable_task_id(task),
228+
payload=task,
229+
group=group,
230+
resource_request=SchedulerResourceRequest(amounts),
231+
)
232+
queue.enqueue([first, second])
233+
amounts.locked = True
231234

232235
view = queue.view()
233236

234-
assert view.queued_total == 256
235-
assert sum(view.queued_by_group.values()) == 256
237+
assert view.queued_total == 2
238+
assert view.queued_by_group == {group.key: 2}
239+
assert view.queued_resource_demand_by_group == {group.key: {"submission": 2}}
240+
assert view.first_candidate_resources_by_group == {group.key: {"submission": 1}}
241+
assert view.queued_peer_demand_by_resource == {"submission": 2}

0 commit comments

Comments
 (0)