Skip to content

fix: keep rate-limited records retrying#703

Merged
eric-tramel merged 10 commits into
mainfrom
codex/fix-429-rate-limit-salvage
May 27, 2026
Merged

fix: keep rate-limited records retrying#703
eric-tramel merged 10 commits into
mainfrom
codex/fix-429-rate-limit-salvage

Conversation

@eric-tramel
Copy link
Copy Markdown
Contributor

@eric-tramel eric-tramel commented May 22, 2026

📋 Summary

This PR prevents async scheduler rate-limit backpressure from turning records into dropped rows. Records that keep hitting HTTP 429 / ModelRateLimitError after the finite salvage round cap now remain deferred and continue retrying, so they are delayed instead of lost.

🔗 Related Issue

N/A

🔄 Changes

  • Track the last retryable error for deferred scheduler tasks so rate-limit exhaustion can be distinguished from other retryable failures.
  • Keep exhausted ModelRateLimitError tasks in the deferred queue instead of dropping their row or row group.
  • Pace repeated 429-only salvage cycles using request-admission cooldown when available, with a small fallback delay, to avoid hot-loop retrying.
  • Throttle repeated rate-limit preservation logs and keep row-group numbering consistent with existing 1-indexed progress logs.
  • Validate salvage_max_rounds >= 1 so preserved 429 work always has retry attempts.
  • Add regression coverage for delayed-not-dropped 429s, sustained 429 pacing, mixed retryable exhaustion, cleanup of deferred bookkeeping, and request-cooldown pacing.

🧪 Testing

  • make test passes (not run; focused engine coverage below)
  • Unit tests added/updated
  • E2E tests added/updated (N/A)

Focused checks run:

  • uv run --group dev pytest packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py
  • uv run --group dev pytest packages/data-designer-engine/tests/engine/column_generators/generators/test_custom.py -k "retryable_model_errors_pass_through"
  • uv run --group dev ruff check packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py
  • uv run --group dev ruff format --check packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py
  • git diff --check

✅ Checklist

  • Follows commit message conventions
  • Commits are signed off (DCO)
  • Architecture docs updated (N/A)

@eric-tramel eric-tramel requested a review from a team as a code owner May 22, 2026 16:59
@github-actions
Copy link
Copy Markdown
Contributor

Code Review: PR #703 — fix: keep rate-limited records retrying

Summary

This PR fixes a correctness bug in AsyncTaskScheduler where records that exhausted the salvage round cap while being rate-limited (HTTP 429 / ModelRateLimitError) were being dropped rather than continuing to retry. After the fix, exhausted-but-rate-limited tasks are kept in the deferred queue and the main dispatch loop keeps cycling so they get re-attempted on subsequent salvage passes.

The change is small, well-scoped, and accompanied by a focused regression test. The mechanism — tracking the last error per deferred task in a dict[Task, Exception] and special-casing ModelRateLimitError in the exhaustion path — is straightforward and consistent with the surrounding code.

Findings

Correctness

  • Lifecycle of _deferred_errors looks right. Entries are written when a task is appended to _deferred after a retryable failure (line 1581) and popped on terminal transitions: success/non-retryable in _execute_task_inner_impl (lines 1586, 1639), drop in _salvage_stalled_row_groups (line 1222), and bulk cleanup in _checkpoint_completed_row_groups (lines 1295–1297). I did not find a path that would leak an entry past its task's lifetime.
  • Hashing of Task as a dict key. Task is already used as a dict/set key elsewhere (_dispatched, etc.), so equality/hash semantics are fine. Worth noting: equality is structural, so if two Task instances with identical fields were ever live concurrently, they would share a _deferred_errors slot. The salvage pipeline preserves task identity through retries (same instance is re-dispatched), so this isn't currently a hazard, but it's an implicit invariant worth keeping in mind.
  • Loop continuation is gated correctly. The new if self._deferred and not self._in_flight: continue (line 1089) only fires when there's nothing else in flight, which is the only case where the existing all_done check would have otherwise broken out. The old behavior on mixed in-flight + deferred is preserved.

Risk: tight loop / livelock under sustained rate limiting

The outer dispatch loop will now continue whenever rate-limited tasks remain and nothing else is in flight. Backpressure comes from the rate-limit / model semaphore inside _salvage_rounds → _drain_frontier, which awaits _wake_event. If that semaphore yields naturally on rate-limit responses, the loop is paced; if a misbehaving generator returns 429 synchronously without releasing/acquiring admission, this becomes a CPU-bound spin.

Recommendations to harden:

  • Add a small backoff (e.g. await asyncio.sleep(...) keyed off _wake_event.wait() with a timeout) before re-entering _salvage_stalled_row_groups when no progress was made. Today there's no minimum delay between salvage cycles for purely-rate-limited deferred work.
  • Consider an upper bound on how many times a single task can be preserved across salvage cycles — even if the policy is "never drop on 429," surfacing a warning (or eventually a fatal error) after, say, N preservation cycles would prevent silent infinite waits when the provider is down.

Logging

  • _salvage_stalled_row_groups will log "keeping N rate-limited task(s) deferred…" on every cycle. With the new continue, this can run repeatedly per second under sustained 429s, which will spam logs. Suggest either logging only when the count changes, throttling with a timestamp, or aggregating across cycles.
  • Row-group numbering inconsistency. Line 1230 uses Row group {rg_id} (0-indexed). The salvage banner four lines earlier (line 1198) uses ({rg_id + 1:0{width}d}/{num_rgs}) (1-indexed). Match the existing style for consistency.

Style / minor

  • Lines 1209–1225 iterate exhausted twice (once to handle drops, once to collect rate-limited). It's correct, but could be a single pass that partitions into to_drop / to_preserve, which would also avoid a second _deferred_errors.get() lookup per task. Not blocking — current form is more obviously correct.
  • _is_rate_limit_error is a one-line helper that could just be inlined as isinstance(exc, ModelRateLimitError) at the two call sites. Minor preference; the helper does document intent.
  • Type signature dict[Task, Exception] is fine; you might consider dict[Task, BaseException] for symmetry with _is_retryable(exc: BaseException) / _is_rate_limit_error(exc: BaseException | None). Today _execute_task_inner_impl only stores Exception, so this is non-blocking.

Test coverage

  • The added test test_scheduler_429_beyond_salvage_cap_is_delayed_not_dropped covers the happy path: all 3 records hit 429s through initial + salvage rounds, then succeed. Good regression for the reported bug.
  • Gaps worth considering:
    • A test where 429s never resolve, asserting the run terminates (or at least makes bounded progress) instead of hanging — this would also flush out the livelock concern above.
    • A test mixing ModelRateLimitError and another RETRYABLE_MODEL_ERRORS member (e.g. ModelTimeoutError) on the same row group, asserting only the 429 tasks are preserved and the timeout tasks are dropped on exhaustion as before.
    • A test that the _deferred_errors dict does not leak entries after a successful retry (could be done by inspecting scheduler._deferred_errors post-run).

Project conventions

  • Follows from __future__ import annotations and modern type syntax (already present in file).
  • Imports are absolute and sorted appropriately.
  • No reverse imports introduced; change is contained to the engine package.

Verdict

Approve with comments. The fix is correct, narrowly scoped, and addresses a real data-loss bug. The main concern is operational: under sustained provider rate-limiting, the new continue path can drive a hot loop and produce log spam. Adding a small wait/backoff and throttling the "keeping rate-limited deferred" log would make this much safer to ship. A "rate-limit never resolves" test would also be valuable insurance.

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 22, 2026

Greptile Summary

This PR fixes a data-loss bug where records hitting HTTP 429 after the salvage round cap were silently dropped. Rate-limited exhausted tasks are now kept in _deferred and retried on each subsequent salvage cycle, paced by request-admission cooldown or a small fallback delay, so 429s delay records instead of discarding them.

  • _deferred_errors tracks the last exception per task so _is_rate_limit_error can distinguish 429 exhaustion from other retryable failures; non-429 exhausted tasks retain their existing drop behavior.
  • _wait_before_rate_limit_resalvage and _rate_limit_resalvage_delay_seconds prevent hot-loop retrying; _record_rate_limit_preservations throttles the preservation warning to every RATE_LIMIT_PRESERVATION_WARNING_INTERVAL cycles.
  • salvage_max_rounds >= 1 is now validated at construction time so preserved tasks always have at least one retry attempt per cycle.

Confidence Score: 5/5

Safe to merge; the change keeps rate-limited records alive rather than discarding them, with well-tested pacing and cleanup paths.

The preservation logic, bookkeeping cleanup, and pacing mechanics are all correct. The one gap — the pre-existing early-shutdown warning now fires on every retry cycle instead of once — does not affect data correctness or scheduler termination.

async_scheduler.py early-shutdown branch (lines 1072–1080) for the repeated warning log.

Important Files Changed

Filename Overview
packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py Core change: rate-limited exhausted tasks are kept in _deferred instead of dropped, and new pacing/throttling logic retries them with backoff. Logic is sound; one pre-existing warning log is now emitted per salvage cycle instead of once.
packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py Adds five regression tests: delayed-not-dropped 429s, paced resalvage cycling, mixed retryable exhaustion, zero salvage rounds rejection, and request-cooldown pacing. Coverage is thorough and assertions are appropriate.
architecture/dataset-builders.md Single-sentence documentation update reflecting that salvage-exhausted 429 tasks are deferred rather than dropped.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A[Task dispatched] --> B{Fails?}
    B -- No --> C[Complete / checkpoint]
    B -- Yes --> D{Retryable?}
    D -- No --> E[Drop row]
    D -- Yes --> F[Append to _deferred]
    F --> G[_salvage_stalled_row_groups]
    G --> H[_salvage_rounds up to max]
    H --> I{Still failing after all rounds?}
    I -- No --> C
    I -- Yes: rate-limit --> J[Preserve in _deferred]
    I -- Yes: other retryable --> E
    J --> K{has rate-limited deferred?}
    K -- Yes --> L[_wait_before_rate_limit_resalvage]
    L --> G
    K -- No --> M{Early shutdown?}
    M -- Yes --> N[break]
    M -- No --> O[Normal loop continues]
Loading
Prompt To Fix All With AI
Fix the following 1 code review issue. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 1
packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py:1072-1080
**Unthrottled early-shutdown warning per retry cycle**

Before this PR the early-shutdown branch always `break`ed, so `logger.warning("Early shutdown triggered …")` fired exactly once. With the new `continue`, this line executes on every salvage cycle until all rate-limited tasks clear. At the minimum fallback delay of 50 ms that is ~20 log entries per second; with a real 429 back-off of several seconds the rate is lower, but a sustained rate-limit period can still produce hundreds of repetitions.

The PR explicitly adds throttling for `_record_rate_limit_preservations` (firing on the first preservation and every `RATE_LIMIT_PRESERVATION_WARNING_INTERVAL` cycles thereafter). The same pattern should be applied here — e.g. guard the warning with a flag that is set on the first entry into the early-shutdown block, so it fires once and the repeating cycles are silent.

Reviews (10): Last reviewed commit: "Merge branch 'main' into codex/fix-429-r..." | Re-trigger Greptile

@eric-tramel eric-tramel force-pushed the codex/fix-429-rate-limit-salvage branch from b16957b to f6de784 Compare May 22, 2026 17:13
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
@eric-tramel eric-tramel force-pushed the codex/fix-429-rate-limit-salvage branch from f6de784 to e1c4896 Compare May 22, 2026 17:17
@eric-tramel
Copy link
Copy Markdown
Contributor Author

Update summary after reviewing the automated feedback:

  • Addressed Greptile's remaining cleanup by removing the redundant _deferred_errors.pop(task, None) from the non-retryable error branch; cleanup now stays centralized in the finally block.
  • Kept the 429 data-loss fix intact: exhausted ModelRateLimitError tasks remain deferred for later retry instead of being dropped after salvage_max_rounds.
  • Added pacing for sustained rate-limit-only salvage cycles so a misbehaving or synchronously failing generator cannot drive a hot loop.
  • Added throttled preservation logging and consistent 1-indexed row-group labels for preserved rate-limited tasks.
  • Added safeguards and tests around mixed retryable failures, cleanup of deferred bookkeeping, invalid salvage_max_rounds=0, and request-admission cooldown based backoff.

Verification run on the latest head:

  • uv run --group dev pytest packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py -> 86 passed
  • uv run --group dev ruff check packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py -> passed
  • uv run --group dev ruff format --check packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py -> passed
  • git diff --check -> passed

@nabinchha
Copy link
Copy Markdown
Contributor

Nice work on this one, @eric-tramel — the scheduler change is nicely targeted and the regression coverage hits the important behavioral edges.

Summary

This PR changes async scheduler salvage so exhausted ModelRateLimitError work stays deferred and keeps retrying instead of being treated like other retryable failures after the salvage cap. The implementation matches the stated intent, and I locally ran changed-file ruff plus the focused new scheduler tests.

Findings

Warnings — Worth addressing

Design issues, missing error handling, test gaps, or violations of project standards that could cause problems later.

packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py:2618 — Timing assertion can flake on slow CI

  • What: test_scheduler_paces_sustained_429_resalvage sets the backoff to 0.1, sleeps for 0.03, and then asserts the call count has not advanced. If the event loop is paused or CI is overloaded, that sleep(0.03) can resume after the 0.1 timeout has already expired, letting the scheduler retry and making the test fail even though the production behavior is correct.
  • Why: This test is covering exactly the kind of scheduler behavior we’ll lean on during provider stress, so a wall-clock-sensitive assertion could become noisy in CI.
  • Suggestion: Could we make the pacing check deterministic? One option is to monkeypatch _wait_before_rate_limit_resalvage with an async gate that records it was awaited before allowing the next salvage cycle, then assert the scheduler does not advance until the test releases that gate. If we keep a real-time smoke check, using a much larger “no retry yet” delay budget would also reduce the flake window.

Suggestions — Take it or leave it

Style improvements, minor simplifications, or optional enhancements that would improve code quality.

docs/devnotes/posts/async-all-the-way-down.md:198 — Async fault-tolerance note now overstates drop behavior

  • What: The dev note still says retryable failures, including rate limits, are dropped after salvage attempts are exhausted.
  • Why: After this PR, ModelRateLimitError is the special case: it stays deferred and keeps retrying, while other exhausted retryables still drop. Readers using this page to reason about async fault tolerance will get the old behavior for 429s.
  • Suggestion: Could we update that paragraph in this PR or a small follow-up to carve out rate limits? Something like: “Rate-limit errors are preserved and paced for another salvage cycle; other retryable errors that still fail after the salvage cap are dropped.”

What Looks Good

  • The new _deferred_errors bookkeeping is a clean way to distinguish exhausted 429s from exhausted timeouts/API-connection errors without widening the scheduler’s public surface.
  • The cleanup paths for successful/checkpointed work cover the new deferred bookkeeping, which keeps long runs from accumulating stale task state.
  • The regression tests cover the big behavioral cases: 429s beyond the salvage cap, mixed retryable exhaustion, early shutdown interaction, and request-admission cooldown pacing.

Verdict

Needs changes — I’d address the wall-clock-sensitive scheduler test before merge. The docs update is a light nit, but worth keeping close to the behavior change.


This review was generated by an AI assistant.

@github-actions
Copy link
Copy Markdown
Contributor

MkDocs preview: https://b6c9e2ef.dd-docs-preview.pages.dev

Fern preview: https://nvidia-preview-pr-703.docs.buildwithfern.com/nemo/datadesigner

Fern previews include the docs-website version archive with PR changes synced into latest. Notebook tutorials are rendered without execution outputs in previews.

Copy link
Copy Markdown
Contributor

@nabinchha nabinchha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice follow-up, @eric-tramel — the latest updates address the prior scheduler-review concerns cleanly.

Summary

This PR keeps salvage-exhausted ModelRateLimitError tasks deferred so sustained 429s delay records instead of dropping them, while preserving the existing drop behavior for other exhausted retryable failures. I re-reviewed the latest head after the test and docs adjustments; the implementation matches the PR intent.

Findings

No findings. The prior timing-sensitive pacing test has been made deterministic with an async gate, and the dataset-builder architecture note now documents the new rate-limit preservation behavior.

What Looks Good

  • The deferred-error tracking stays tightly scoped to scheduler internals and is cleaned up on success, checkpoint, and non-rate-limit exhaustion paths.
  • The pacing path uses request-admission cooldown when available, with a small fallback delay, so preserved 429 work avoids the hot-loop risk raised earlier.
  • The tests now cover delayed-not-dropped 429s, early shutdown interaction, mixed retryable exhaustion, cleanup invariants, and cooldown-based delay selection.

Verdict

Ship it — ready to merge from my side.


This review was generated by an AI assistant.

@eric-tramel eric-tramel merged commit efdfd7a into main May 27, 2026
60 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants