Skip to content

Commit b30f802

Browse files
authored
feat: enable bounded-borrow task admission (#693)
1 parent b6de38d commit b30f802

7 files changed

Lines changed: 785 additions & 38 deletions

File tree

architecture/dataset-builders.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ When request admission is available, async scheduling may use request-pressure s
138138

139139
- **Dual execution engines behind one API.** The sequential engine is simpler and easier to debug; the async engine adds row-group parallelism for throughput. Users switch via an environment variable without changing their code.
140140
- **DAG-driven ordering** ensures columns with dependencies (e.g., a judge column that depends on a text column) are generated in the correct order, regardless of the order they appear in the config.
141-
- **Fair async admission** keeps the scheduler flowing across ready columns and model groups. `FairTaskQueue.select_next(...)` chooses eligible ready work, `TaskAdmissionController` leases scheduler resources before spawn, and `FairTaskQueue.commit(...)` removes the selected task only after admission succeeds. Per-group virtual-time ordering prevents a large ready frontier from degenerating into a column-by-column wave, and scheduler-resource accounting remains separate from provider/model request admission.
141+
- **Fair async admission with bounded borrow by default** keeps the scheduler flowing across ready columns and model groups. `FairTaskQueue.select_next(...)` chooses eligible ready work, `TaskAdmissionController` leases scheduler resources before spawn, and `FairTaskQueue.commit(...)` removes the selected task only after admission succeeds. The default `BoundedBorrowTaskAdmissionPolicyConfig` computes a strict per-group share, lets solo groups borrow only up to a capacity-derived reserve, and makes borrowed groups yield when eligible peer pressure appears. Passing `bounded_borrow=None` selects strict-fair admission for tests and benchmark comparisons. Per-group virtual-time ordering prevents a large ready frontier from degenerating into a column-by-column wave, and scheduler-resource accounting remains separate from provider/model request admission.
142142
- **Salvage rounds in async mode** retry failed tasks after all other tasks in a round complete, improving resilience against transient LLM failures without blocking the entire generation.
143143
- **Unified DAG construction.** `topologically_sort_column_configs` (in `execution_graph.py`) determines column ordering using Kahn's algorithm; the runtime `ExecutionGraph` adds strategy-aware dependency tracking for the async scheduler.
144144

packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
TaskAdmissionLease,
4545
)
4646
from data_designer.engine.dataset_builders.scheduling.task_model import SliceRef, Task, TaskTrace
47+
from data_designer.engine.dataset_builders.scheduling.task_policies import BoundedBorrowTaskAdmissionPolicyConfig
4748
from data_designer.engine.dataset_builders.utils.async_progress_reporter import (
4849
DEFAULT_REPORT_INTERVAL,
4950
AsyncProgressReporter,
@@ -185,6 +186,7 @@ def __init__(
185186
admission_config = task_admission_config or TaskAdmissionConfig(
186187
submission_capacity=max_submitted_tasks,
187188
resource_limits={"llm_wait": max_model_task_admission, "local": max_submitted_tasks},
189+
bounded_borrow=BoundedBorrowTaskAdmissionPolicyConfig(),
188190
)
189191
self._task_admission = TaskAdmissionController(admission_config)
190192
self._task_admission_config = admission_config
@@ -1842,6 +1844,11 @@ def task_admission_snapshot(self) -> object:
18421844
"""Return the current scheduler task-admission snapshot for diagnostics."""
18431845
return self._task_admission.view()
18441846

1847+
@property
1848+
def task_admission_config(self) -> TaskAdmissionConfig:
1849+
"""Return the effective scheduler task-admission config."""
1850+
return self._task_admission_config
1851+
18451852
def capacity_plan(self) -> AsyncCapacityPlan:
18461853
"""Return the scheduler-side async capacity explanation for this run."""
18471854
task_view = self._task_admission.view()

packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/task_policies.py

Lines changed: 159 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
from __future__ import annotations
55

6+
import math
67
from collections.abc import Mapping
78
from dataclasses import dataclass, field
89
from typing import TYPE_CHECKING, Literal, Protocol
@@ -12,6 +13,7 @@
1213
SchedulableTask,
1314
SchedulerResourceKey,
1415
TaskGroupKey,
16+
TaskGroupSpec,
1517
)
1618

1719
if TYPE_CHECKING:
@@ -27,6 +29,8 @@
2729
"shutdown",
2830
"policy_denial",
2931
]
32+
DEFAULT_DYNAMIC_BORROW_RESERVE_FRACTION = 0.125
33+
DEFAULT_DYNAMIC_BORROW_MAX_RESERVED_SLOTS = 8
3034

3135

3236
@dataclass(frozen=True)
@@ -35,16 +39,32 @@ class BoundedBorrowTaskAdmissionPolicyConfig:
3539
3640
Borrow debt is tracked by task group and scheduler resource. Any completed
3741
lease in the same group repays debt for the released resources; repayment is
38-
not tied to the specific lease that originally borrowed.
42+
not tied to the specific lease that originally borrowed. When no explicit
43+
borrow ceiling is configured, the policy reserves one slot per eight
44+
resource slots, capped at eight reserved slots, and lets solo groups borrow
45+
up to the remaining capacity.
3946
"""
4047

4148
borrow_ceiling_by_group_resource: Mapping[tuple[TaskGroupKey, SchedulerResourceKey], int] = field(
4249
default_factory=dict
4350
)
44-
default_borrow_ceiling: int = 0
45-
strict_share_rounding: Literal["floor", "ceil"] = "floor"
51+
default_borrow_ceiling: int | None = None
52+
dynamic_borrow_reserve_fraction: float = DEFAULT_DYNAMIC_BORROW_RESERVE_FRACTION
53+
dynamic_borrow_max_reserved_slots: int = DEFAULT_DYNAMIC_BORROW_MAX_RESERVED_SLOTS
54+
strict_share_rounding: Literal["floor", "ceil"] = "ceil"
4655
repay_on_withheld_peer_pressure: bool = True
4756

57+
def __post_init__(self) -> None:
58+
if self.default_borrow_ceiling is not None and self.default_borrow_ceiling < 0:
59+
raise ValueError("default_borrow_ceiling must be non-negative.")
60+
if not 0 <= self.dynamic_borrow_reserve_fraction <= 1:
61+
raise ValueError("dynamic_borrow_reserve_fraction must be between 0 and 1.")
62+
if self.dynamic_borrow_max_reserved_slots <= 0:
63+
raise ValueError("dynamic_borrow_max_reserved_slots must be positive.")
64+
for key, ceiling in self.borrow_ceiling_by_group_resource.items():
65+
if ceiling < 0:
66+
raise ValueError(f"Borrow ceiling for {key!r} must be non-negative.")
67+
4868

4969
@dataclass(frozen=True)
5070
class TaskAdmissionPolicyDecision:
@@ -127,51 +147,74 @@ def evaluate(
127147
queue_view: QueueView,
128148
admission_view: TaskAdmissionView,
129149
) -> TaskAdmissionPolicyDecision:
130-
limit = item.group.admitted_limit
131-
if limit is None:
132-
return TaskAdmissionPolicyDecision(allowed=True)
133-
134-
leased_count = admission_view.running_counts_by_group.get(item.group.key, 0)
135-
if leased_count < limit:
150+
if item.group.admitted_limit is None:
136151
return TaskAdmissionPolicyDecision(allowed=True)
137152

138153
pressure_resources = _queued_peer_pressure_resources(item, queue_view, admission_view)
139-
if pressure_resources:
140-
for resource in pressure_resources:
141-
debt_key = (item.group.key, resource)
142-
debt = admission_view.policy_debt_by_group_resource.get(debt_key, 0)
154+
borrow_resources: list[tuple[SchedulerResourceKey, int]] = []
155+
diagnostics_by_resource: dict[SchedulerResourceKey, dict[str, int | str]] = {}
156+
for resource, amount in item.resource_request.amounts.items():
157+
admitted = admission_view.leased_resources_by_group.get(item.group.key, {}).get(resource, 0)
158+
strict_share = _strict_share(item, resource, queue_view, admission_view, self._config.strict_share_rounding)
159+
projected = admitted + amount
160+
debt_key = (item.group.key, resource)
161+
debt = admission_view.policy_debt_by_group_resource.get(debt_key, 0)
162+
diagnostics_by_resource[resource] = {
163+
"admitted": admitted,
164+
"requested": amount,
165+
"strict_share": strict_share,
166+
"debt": debt,
167+
}
168+
169+
if resource in pressure_resources:
143170
if debt > 0:
144171
return TaskAdmissionPolicyDecision(
145172
allowed=False,
146173
reason="borrow_debt",
147-
diagnostics={"resource": resource, "debt": debt},
174+
diagnostics={"resource": resource, "debt": debt, "strict_share": strict_share},
148175
)
149-
return TaskAdmissionPolicyDecision(
150-
allowed=False,
151-
reason="group_cap",
152-
diagnostics={
153-
"admitted_limit": limit,
154-
"leased_count": leased_count,
155-
"pressure_resources": pressure_resources,
156-
},
157-
)
176+
if projected > strict_share:
177+
return TaskAdmissionPolicyDecision(
178+
allowed=False,
179+
reason="group_cap",
180+
diagnostics={
181+
"resource": resource,
182+
"admitted": admitted,
183+
"requested": amount,
184+
"strict_share": strict_share,
185+
"pressure_resources": pressure_resources,
186+
},
187+
)
188+
continue
158189

159-
borrow_resources: list[tuple[SchedulerResourceKey, int]] = []
160-
for resource, amount in item.resource_request.amounts.items():
161-
debt_key = (item.group.key, resource)
162-
debt = admission_view.policy_debt_by_group_resource.get(debt_key, 0)
163-
ceiling = self._config.borrow_ceiling_by_group_resource.get(
190+
if projected <= strict_share:
191+
continue
192+
193+
new_debt = min(amount, projected - strict_share)
194+
ceiling, ceiling_diagnostics = self._borrow_ceiling(
164195
debt_key,
165-
self._config.default_borrow_ceiling,
196+
resource_limit=admission_view.resource_limits.get(resource, 0),
197+
strict_share=strict_share,
166198
)
167-
if debt + amount > ceiling:
199+
diagnostics_by_resource[resource].update(ceiling_diagnostics)
200+
if debt + new_debt > ceiling:
168201
return TaskAdmissionPolicyDecision(
169202
allowed=False,
170203
reason="borrow_debt",
171-
diagnostics={"resource": resource, "debt": debt, "requested": amount, "ceiling": ceiling},
204+
diagnostics={
205+
"resource": resource,
206+
"debt": debt,
207+
"requested": amount,
208+
"new_debt": new_debt,
209+
"ceiling": ceiling,
210+
"strict_share": strict_share,
211+
},
172212
)
173-
borrow_resources.append((resource, amount))
174-
return TaskAdmissionPolicyDecision(allowed=True, diagnostics={"borrow_resources": tuple(borrow_resources)})
213+
borrow_resources.append((resource, new_debt))
214+
return TaskAdmissionPolicyDecision(
215+
allowed=True,
216+
diagnostics={"borrow_resources": tuple(borrow_resources), "strict_share": diagnostics_by_resource},
217+
)
175218

176219
def on_acquire(
177220
self,
@@ -196,6 +239,35 @@ def on_release(self, lease: TaskAdmissionLease) -> PolicyStateDelta:
196239
debt_changes={(lease.item.group.key, resource): -amount for resource, amount in lease.resources.items()}
197240
)
198241

242+
def _borrow_ceiling(
243+
self,
244+
debt_key: tuple[TaskGroupKey, SchedulerResourceKey],
245+
*,
246+
resource_limit: int,
247+
strict_share: int,
248+
) -> tuple[int, dict[str, int | str]]:
249+
explicit_ceiling = self._config.borrow_ceiling_by_group_resource.get(debt_key)
250+
if explicit_ceiling is not None:
251+
return explicit_ceiling, {"ceiling": explicit_ceiling, "ceiling_source": "group_resource"}
252+
if self._config.default_borrow_ceiling is not None:
253+
return self._config.default_borrow_ceiling, {
254+
"ceiling": self._config.default_borrow_ceiling,
255+
"ceiling_source": "default",
256+
}
257+
reserved_slots = _dynamic_reserved_slots(
258+
resource_limit,
259+
reserve_fraction=self._config.dynamic_borrow_reserve_fraction,
260+
max_reserved_slots=self._config.dynamic_borrow_max_reserved_slots,
261+
)
262+
target_solo_cap = max(0, resource_limit - reserved_slots)
263+
borrow_slots = max(0, target_solo_cap - strict_share)
264+
return borrow_slots, {
265+
"ceiling": borrow_slots,
266+
"ceiling_source": "dynamic",
267+
"reserved_slots": reserved_slots,
268+
"borrow_slots": borrow_slots,
269+
}
270+
199271

200272
def _queued_peer_pressure_resources(
201273
item: SchedulableTask,
@@ -215,6 +287,60 @@ def _queued_peer_pressure_resources(
215287
return tuple(pressure_resources)
216288

217289

290+
def _strict_share(
291+
item: SchedulableTask,
292+
resource: SchedulerResourceKey,
293+
queue_view: QueueView,
294+
admission_view: TaskAdmissionView,
295+
rounding: Literal["floor", "ceil"],
296+
) -> int:
297+
resource_limit = admission_view.resource_limits.get(resource, 0)
298+
if resource_limit <= 0:
299+
return 0
300+
if resource_limit == 1:
301+
return 1
302+
303+
candidate_groups = _competing_group_specs(item, resource, queue_view, admission_view)
304+
group_weight = max(1.0, item.group.weight)
305+
if len(candidate_groups) <= 1:
306+
# 2x synthesizes one equally weighted future peer, keeping solo strict share near 50%.
307+
# Dynamic borrow then decides how much of the remaining capacity the group may use.
308+
total_weight = group_weight * 2
309+
else:
310+
total_weight = sum(max(1.0, group.weight) for group in candidate_groups.values())
311+
raw_share = resource_limit * group_weight / total_weight
312+
if rounding == "ceil":
313+
rounded_share = math.ceil(raw_share)
314+
else:
315+
rounded_share = math.floor(raw_share)
316+
strict_share = max(1, rounded_share)
317+
if item.group.admitted_limit is not None:
318+
strict_share = min(strict_share, item.group.admitted_limit)
319+
return min(resource_limit, strict_share)
320+
321+
322+
def _dynamic_reserved_slots(resource_limit: int, *, reserve_fraction: float, max_reserved_slots: int) -> int:
323+
return min(max_reserved_slots, max(1, math.ceil(resource_limit * reserve_fraction)))
324+
325+
326+
def _competing_group_specs(
327+
item: SchedulableTask,
328+
resource: SchedulerResourceKey,
329+
queue_view: QueueView,
330+
admission_view: TaskAdmissionView,
331+
) -> dict[TaskGroupKey, TaskGroupSpec]:
332+
groups: dict[TaskGroupKey, TaskGroupSpec] = {item.group.key: item.group}
333+
for group_key, peer_resources in queue_view.first_candidate_resources_by_group.items():
334+
if peer_resources.get(resource, 0) <= 0:
335+
continue
336+
if not _is_hard_resource_eligible(peer_resources, admission_view):
337+
continue
338+
group = queue_view.first_candidate_group_specs_by_group.get(group_key)
339+
if group is not None:
340+
groups[group_key] = group
341+
return groups
342+
343+
218344
def _fair_pressure_resources(
219345
resources: Mapping[SchedulerResourceKey, int],
220346
) -> tuple[SchedulerResourceKey, ...]:

0 commit comments

Comments
 (0)