Skip to content

Commit 4d21afa

Browse files
committed
preserve 429 retries during early shutdown
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
1 parent 266c53b commit 4d21afa

2 files changed

Lines changed: 65 additions & 0 deletions

File tree

packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1074,6 +1074,9 @@ async def _main_dispatch_loop(
10741074
if self._deferred:
10751075
await self._salvage_stalled_row_groups(seed_cols, has_pre_batch, all_columns)
10761076
self._checkpoint_completed_row_groups(all_columns)
1077+
if self._has_rate_limited_deferred_tasks():
1078+
await self._wait_before_rate_limit_resalvage()
1079+
continue
10771080
break
10781081

10791082
self._wake_event.clear()
@@ -1295,6 +1298,9 @@ def _is_preserved_rate_limit_task(self, task: Task) -> bool:
12951298
self._deferred_errors.get(task)
12961299
)
12971300

1301+
def _has_rate_limited_deferred_tasks(self) -> bool:
1302+
return any(self._is_rate_limit_error(self._deferred_errors.get(task)) for task in self._deferred)
1303+
12981304
def _checkpoint_completed_row_groups(self, all_columns: list[str]) -> None:
12991305
"""Checkpoint any row groups that reached completion."""
13001306
completed = [

packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1389,6 +1389,38 @@ async def test_rate_limit_errors_do_not_trigger_early_shutdown() -> None:
13891389
assert tracker.is_row_group_complete(0, 10, ["seed", "col"])
13901390

13911391

1392+
@pytest.mark.asyncio(loop_scope="session")
1393+
async def test_preserved_429_retries_after_unrelated_early_shutdown(monkeypatch: pytest.MonkeyPatch) -> None:
1394+
"""Early shutdown must not turn rate-limited deferred work into dropped rows."""
1395+
monkeypatch.setattr(async_scheduler_module, "RATE_LIMIT_RESALVAGE_BACKOFF_S", 0)
1396+
cell = MockRateLimitThenNonRetryableGenerator(
1397+
config=_expr_config("cell_out"),
1398+
resource_provider=_mock_provider(),
1399+
rate_limit_failures=2,
1400+
)
1401+
generators, graph, row_groups, tracker, buffer_mgr, _storage = _seed_plus_cell_setup(cell, num_records=3)
1402+
scheduler = AsyncTaskScheduler(
1403+
generators=generators,
1404+
graph=graph,
1405+
tracker=tracker,
1406+
row_groups=row_groups,
1407+
buffer_manager=buffer_mgr,
1408+
on_finalize_row_group=lambda rg_id: buffer_mgr.checkpoint_row_group(rg_id),
1409+
shutdown_error_rate=0.5,
1410+
shutdown_error_window=1,
1411+
salvage_max_rounds=1,
1412+
)
1413+
1414+
await scheduler.run()
1415+
1416+
assert scheduler.early_shutdown
1417+
assert not tracker.is_dropped(0, 0)
1418+
assert tracker.is_dropped(0, 1)
1419+
assert not tracker.is_dropped(0, 2)
1420+
assert tracker.is_row_group_complete(0, 3, ["seed", "cell_out"])
1421+
assert buffer_mgr.actual_num_records == 2
1422+
1423+
13921424
@pytest.mark.parametrize("exc_cls", RETRYABLE_MODEL_ERRORS, ids=lambda c: c.__name__)
13931425
@pytest.mark.asyncio(loop_scope="session")
13941426
async def test_retryable_errors_do_not_trigger_early_shutdown(
@@ -1831,6 +1863,33 @@ def generate(self, data: dict) -> dict:
18311863
return data
18321864

18331865

1866+
class MockRateLimitThenNonRetryableGenerator(ColumnGenerator[ExpressionColumnConfig]):
1867+
"""Generator that combines preserved 429 work with an early-shutdown failure."""
1868+
1869+
def __init__(self, *args: Any, rate_limit_failures: int = 0, **kwargs: Any) -> None:
1870+
super().__init__(*args, **kwargs)
1871+
self._rate_limit_failures = rate_limit_failures
1872+
self._rate_limit_calls = 0
1873+
1874+
def get_scheduling_metadata(self) -> SchedulingMetadata:
1875+
return SchedulingMetadata.custom_model("test", self.config.name, "v1")
1876+
1877+
@staticmethod
1878+
def get_generation_strategy() -> GenerationStrategy:
1879+
return GenerationStrategy.CELL_BY_CELL
1880+
1881+
def generate(self, data: dict) -> dict:
1882+
seed = data.get("seed")
1883+
if seed == 0:
1884+
self._rate_limit_calls += 1
1885+
if self._rate_limit_calls <= self._rate_limit_failures:
1886+
raise ModelRateLimitError("429 Too Many Requests")
1887+
elif seed == 1:
1888+
raise ValueError("non-retryable failure")
1889+
data[self.config.name] = f"shutdown_ok_{seed}"
1890+
return data
1891+
1892+
18341893
class MockModelRateLimitGenerator(MockLLMBoundRateLimitGenerator):
18351894
"""Rate-limit fixture with request-admission resource metadata."""
18361895

0 commit comments

Comments
 (0)