Skip to content

Commit 0c57a64

Browse files
committed
fix: prevent request admission timeout row drops
- Pace scheduler dispatch under request-admission pressure - Preserve local admission timeouts through salvage like 429s - Add regression tests and benchmark proof for Issue #725 Fixes #725 Signed-off-by: Eric W. Tramel <1223539+eric-tramel@users.noreply.github.com>
1 parent f7c8809 commit 0c57a64

10 files changed

Lines changed: 1032 additions & 139 deletions

File tree

architecture/dataset-builders.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ Preparation (`_prepare_async_run`):
3535
4. Constructs `CompletionTracker`, `RowGroupBufferManager`, `AsyncTaskScheduler`
3636
5. Hooks `ProcessorRunner` for pre-batch and post-batch stages
3737

38-
`AsyncTaskScheduler` runs on a dedicated async loop with frontier-driven dispatch, task-admission leases, salvage rounds for failed tasks, and order-dependent locks for columns that must execute sequentially. Ready frontier tasks enter `FairTaskQueue`, are selected through virtual-time ordering, and are committed only after `TaskAdmissionController` acquires the required scheduler resources. Salvage-exhausted tasks are dropped except for rate-limit failures, which stay deferred and retry after cooldown/backoff so 429s delay records rather than discard them.
38+
`AsyncTaskScheduler` runs on a dedicated async loop with frontier-driven dispatch, task-admission leases, salvage rounds for failed tasks, and order-dependent locks for columns that must execute sequentially. Ready frontier tasks enter `FairTaskQueue`, are selected through virtual-time ordering, and are committed only after `TaskAdmissionController` acquires the required scheduler resources. Salvage-exhausted tasks are dropped except for preserved retryable failures: provider rate limits and local request-admission queue timeouts stay deferred and retry after cooldown/backoff so scheduler-local pressure delays records rather than discarding them.
3939

4040
### Execution Graph
4141

packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py

Lines changed: 383 additions & 89 deletions
Large diffs are not rendered by default.

packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/queue.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,14 @@ def discard_where(self, predicate: Callable[[SchedulableTask], bool]) -> None:
8888
if predicate(item):
8989
self.discard(task_id)
9090

91-
def select_next(self, is_eligible: Callable[[SchedulableTask, QueueView], bool]) -> QueueSelection | None:
91+
def select_next(
92+
self,
93+
is_eligible: Callable[[SchedulableTask, QueueView], bool],
94+
*,
95+
view: QueueView | None = None,
96+
) -> QueueSelection | None:
9297
"""Return the next eligible task without mutating queue state."""
93-
view = self.view()
98+
view = self.view() if view is None else view
9499
heap_copy = list(self._heap)
95100
heapq.heapify(heap_copy)
96101
active_seen: set[TaskGroupKey] = set()

packages/data-designer-engine/src/data_designer/engine/models/clients/errors.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ class ProviderErrorKind(str, Enum):
2424
NOT_FOUND = "not_found"
2525
PERMISSION_DENIED = "permission_denied"
2626
RATE_LIMIT = "rate_limit"
27+
REQUEST_ADMISSION_TIMEOUT = "request_admission_timeout"
2728
TIMEOUT = "timeout"
2829
UNPROCESSABLE_ENTITY = "unprocessable_entity"
2930
UNSUPPORTED_CAPABILITY = "unsupported_capability"

packages/data-designer-engine/src/data_designer/engine/models/clients/model_request_executor.py

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -121,12 +121,7 @@ def _execute_sync_attempt(self, domain: RequestDomain, call: Callable[[], _T]) -
121121
try:
122122
lease = self._request_admission.acquire_sync(item)
123123
except RequestAdmissionError as exc:
124-
raise ProviderError(
125-
kind=ProviderErrorKind.TIMEOUT,
126-
message=str(exc),
127-
provider_name=self._provider_name,
128-
model_name=self._model_id,
129-
) from exc
124+
raise self._provider_error_from_request_admission(exc) from exc
130125
try:
131126
self._emit_model_event("model_request_started", item=item, lease=lease)
132127
result = call()
@@ -169,12 +164,7 @@ async def _execute_async_attempt(self, domain: RequestDomain, call: Callable[[],
169164
try:
170165
lease = await self._request_admission.acquire_async(item)
171166
except RequestAdmissionError as exc:
172-
raise ProviderError(
173-
kind=ProviderErrorKind.TIMEOUT,
174-
message=str(exc),
175-
provider_name=self._provider_name,
176-
model_name=self._model_id,
177-
) from exc
167+
raise self._provider_error_from_request_admission(exc) from exc
178168
except asyncio.CancelledError:
179169
raise
180170
try:
@@ -216,7 +206,7 @@ def _max_attempts(self) -> int:
216206
def _should_retry(self, exc: ProviderError, attempt: int) -> bool:
217207
if attempt >= self._max_attempts() - 1:
218208
return False
219-
if isinstance(exc.__cause__, RequestAdmissionError):
209+
if exc.kind == ProviderErrorKind.REQUEST_ADMISSION_TIMEOUT:
220210
return False
221211
if exc.kind == ProviderErrorKind.RATE_LIMIT:
222212
return False
@@ -249,6 +239,19 @@ def _release_provider_error(self, lease: RequestAdmissionLease, exc: ProviderErr
249239
outcome = RequestReleaseOutcome(kind="provider_failure")
250240
self._request_admission.release(lease, outcome)
251241

242+
def _provider_error_from_request_admission(self, exc: RequestAdmissionError) -> ProviderError:
243+
kind = (
244+
ProviderErrorKind.REQUEST_ADMISSION_TIMEOUT
245+
if exc.decision.reason == "queue_timeout"
246+
else ProviderErrorKind.TIMEOUT
247+
)
248+
return ProviderError(
249+
kind=kind,
250+
message=str(exc),
251+
provider_name=self._provider_name,
252+
model_name=self._model_id,
253+
)
254+
252255
def _item(self, domain: RequestDomain) -> RequestAdmissionItem:
253256
resolved = self._resource_resolver.resolve(
254257
provider_name=self._provider_name,

packages/data-designer-engine/src/data_designer/engine/models/errors.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,9 @@ class ModelQuotaExceededError(DataDesignerError): ...
7373
class ModelTimeoutError(DataDesignerError): ...
7474

7575

76+
class ModelRequestAdmissionTimeoutError(ModelTimeoutError): ...
77+
78+
7679
class ModelContextWindowExceededError(DataDesignerError): ...
7780

7881

@@ -303,6 +306,7 @@ def _raise_from_provider_error(
303306
_KIND_MAP: dict[ProviderErrorKind, type[DataDesignerError]] = {
304307
ProviderErrorKind.RATE_LIMIT: ModelRateLimitError,
305308
ProviderErrorKind.QUOTA_EXCEEDED: ModelQuotaExceededError,
309+
ProviderErrorKind.REQUEST_ADMISSION_TIMEOUT: ModelRequestAdmissionTimeoutError,
306310
ProviderErrorKind.TIMEOUT: ModelTimeoutError,
307311
ProviderErrorKind.NOT_FOUND: ModelNotFoundError,
308312
ProviderErrorKind.PERMISSION_DENIED: ModelPermissionDeniedError,
@@ -321,6 +325,10 @@ def _raise_from_provider_error(
321325
f"The request to model {model_name!r} timed out while {purpose}.",
322326
"Check your connection and try again. You may need to increase the timeout setting for the model.",
323327
),
328+
ProviderErrorKind.REQUEST_ADMISSION_TIMEOUT: (
329+
f"Local request admission for model {model_name!r} timed out while {purpose}; the provider request was not sent.",
330+
"Reduce request concurrency or tune the model's max_parallel_requests to match the endpoint's real capacity. For async dataset generation, also consider lowering RunConfig.max_in_flight_tasks.",
331+
),
324332
ProviderErrorKind.NOT_FOUND: (
325333
f"The specified model {model_name!r} could not be found while {purpose}.",
326334
f"Check that the model name is correct and supported by your model provider {model_provider_name!r} and try again.",

packages/data-designer-engine/src/data_designer/engine/observability.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ def reset(self, token: contextvars.Token) -> None:
9393
"admission_blocked",
9494
"group_capped",
9595
"request_pressure_advisory_skipped",
96+
"request_pressure_advisory_blocked",
97+
"dispatch_batch_yield",
9698
"task_lease_acquired",
9799
"admission_denied",
98100
"worker_spawned",

0 commit comments

Comments
 (0)