Skip to content

feat(async-jobs): add deferred retry with cooldown for failed async jobs#1016

Merged
Daryna Ishchenko (darynaishchenko) merged 8 commits into
mainfrom
devin/1778159323-failed-retry-wait-time
May 11, 2026
Merged

feat(async-jobs): add deferred retry with cooldown for failed async jobs#1016
Daryna Ishchenko (darynaishchenko) merged 8 commits into
mainfrom
devin/1778159323-failed-retry-wait-time

Conversation

@darynaishchenko
Copy link
Copy Markdown
Contributor

@darynaishchenko Daryna Ishchenko (darynaishchenko) commented May 7, 2026

Summary

Adds an optional failed_retry_wait_time_in_seconds parameter to AsyncRetriever that defers retry of real FAILED async jobs (report created on the API but returned a failure status like Amazon SP-API's FATAL) until a cooldown period elapses — without blocking other jobs or using time.sleep().

Key design decisions:

  • Only real FAILED jobs are deferred. Creation-failure jobs (HTTP 429 when starting a job — report never created) and TIMED_OUT jobs are retried immediately, preserving existing behavior.
  • AsyncJob distinguishes the two via an is_creation_failure constructor parameter (no private-attribute writes from outside the class).
  • The orchestrator uses a two-phase check: first arms retry_after on initial failure, then skips until the cooldown elapses.
  • Exposed on the declarative schema with minimum: 1 validation and config interpolation support.

Related: airbytehq/airbyte#77837 configures failed_retry_wait_time_in_seconds: 1800 for source-amazon-seller-partner.

Review & Testing Checklist for Human

  • Trace the deferral state machine in _replace_failed_jobs — the two guards (ready_to_retry then retry_deferred) must be checked in this order. On first failure both are false → second branch arms cooldown. Before cooldown expires, ready_to_retry() is false → first branch skips. After cooldown, both true → falls through to replacement. Verify no ordering produces unexpected behavior.
  • Verify creation-failure bypass — confirm that _create_failed_job() passes is_creation_failure=True and that the condition not job.is_creation_failure() correctly gates deferral so 429-driven retries are never delayed.
  • Run tests: pytest unit_tests/sources/declarative/async_job/ -v (43 tests). The regression test test_given_real_failed_then_cooldown_elapses_then_start_returns_creation_failure_then_no_rearm covers: real FAILED → arm cooldown → cooldown elapses → replacement is creation-failure → no re-arm on next tick.

Notes

  • The <= 0 runtime guard in the orchestrator constructor is the only enforcement beyond the YAML minimum: 1; Pydantic model uses Union[int, str] without constraints.
  • Tests access job_tracker._jobs (private set) directly to pre-register job IDs.

Link to Devin session: https://app.devin.ai/sessions/2cbe53a30ea14f9f8151f407f423283b
Requested by: Daryna Ishchenko (@darynaishchenko)

Summary by CodeRabbit

  • New Features

    • Added deferred retry scheduling for failed async jobs with configurable wait times (failed_retry_wait_time_in_seconds).
    • Enhanced job failure tracking to distinguish between creation-time failures and runtime failures.
    • Jobs now support retry deferral logic with automatic readiness checks based on elapsed time.
  • Tests

    • Expanded test coverage for retry and cooldown behavior in async job orchestration.

Review Change Stack

Add failed_retry_wait_time_in_seconds parameter to AsyncRetriever and
AsyncJobOrchestrator. When configured, FAILED jobs are not retried
immediately but deferred until a cooldown period elapses. This is
non-blocking: the orchestrator skips deferred jobs and continues
processing other partitions normally.

Changes:
- AsyncJob: add set_retry_after(), retry_deferred(), ready_to_retry()
- AsyncJobOrchestrator: defer FAILED job retries when wait time is set
- Schema/models: add failed_retry_wait_time_in_seconds to AsyncRetriever
- Factory: wire parameter through to orchestrator
- Tests: add parametrized tests for retry_after and orchestrator behavior

Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
@devin-ai-integration
Copy link
Copy Markdown
Contributor

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 7, 2026

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

💡 Show Tips and Tricks

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1778159323-failed-retry-wait-time#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1778159323-failed-retry-wait-time

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /prerelease - Triggers a prerelease publish with default arguments
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment
📚 Show Repo Guidance

Helpful Resources

📝 Edit this welcome message.

Comment thread unit_tests/sources/declarative/async_job/test_job_orchestrator.py Fixed
Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 7, 2026

PyTest Results (Fast)

4 059 tests  +8   4 048 ✅ +8   6m 34s ⏱️ -46s
    1 suites ±0      11 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit ec1a0f0. ± Comparison against base commit 9fcc6de.

This pull request removes 3 and adds 11 tests. Note that renamed tests count towards both.
unit_tests.sources.declarative.async_job.test_job.AsyncJobTest ‑ test_given_status_is_terminal_when_update_status_then_stop_timer
unit_tests.sources.declarative.async_job.test_job.AsyncJobTest ‑ test_given_timer_is_not_out_when_status_then_return_actual_status
unit_tests.sources.declarative.async_job.test_job.AsyncJobTest ‑ test_given_timer_is_out_when_status_then_return_timed_out
unit_tests.sources.declarative.async_job.test_job ‑ test_given_timer_is_not_out_when_status_then_return_actual_status
unit_tests.sources.declarative.async_job.test_job ‑ test_given_timer_is_out_when_status_then_return_timed_out
unit_tests.sources.declarative.async_job.test_job ‑ test_retry_after[no_retry_after_set]
unit_tests.sources.declarative.async_job.test_job ‑ test_retry_after[retry_after_in_future]
unit_tests.sources.declarative.async_job.test_job ‑ test_retry_after[retry_after_in_past]
unit_tests.sources.declarative.async_job.test_job_orchestrator.AsyncJobOrchestratorTest ‑ test_create_failed_job_tags_as_creation_failure
unit_tests.sources.declarative.async_job.test_job_orchestrator.AsyncJobOrchestratorTest ‑ test_given_creation_failure_job_when_cooldown_configured_then_replaces_immediately
unit_tests.sources.declarative.async_job.test_job_orchestrator.AsyncJobOrchestratorTest ‑ test_given_failed_retry_wait_time_when_job_fails_then_defers_retry
unit_tests.sources.declarative.async_job.test_job_orchestrator.AsyncJobOrchestratorTest ‑ test_given_failed_retry_wait_time_when_timed_out_job_then_replaces_immediately
unit_tests.sources.declarative.async_job.test_job_orchestrator.AsyncJobOrchestratorTest ‑ test_given_no_failed_retry_wait_time_when_job_fails_then_replaces_immediately
…

♻️ This comment has been updated with latest results.

@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 7, 2026

PyTest Results (Full)

4 062 tests  +8   4 050 ✅ +8   10m 56s ⏱️ -35s
    1 suites ±0      12 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit ec1a0f0. ± Comparison against base commit 9fcc6de.

This pull request removes 3 and adds 11 tests. Note that renamed tests count towards both.
unit_tests.sources.declarative.async_job.test_job.AsyncJobTest ‑ test_given_status_is_terminal_when_update_status_then_stop_timer
unit_tests.sources.declarative.async_job.test_job.AsyncJobTest ‑ test_given_timer_is_not_out_when_status_then_return_actual_status
unit_tests.sources.declarative.async_job.test_job.AsyncJobTest ‑ test_given_timer_is_out_when_status_then_return_timed_out
unit_tests.sources.declarative.async_job.test_job ‑ test_given_timer_is_not_out_when_status_then_return_actual_status
unit_tests.sources.declarative.async_job.test_job ‑ test_given_timer_is_out_when_status_then_return_timed_out
unit_tests.sources.declarative.async_job.test_job ‑ test_retry_after[no_retry_after_set]
unit_tests.sources.declarative.async_job.test_job ‑ test_retry_after[retry_after_in_future]
unit_tests.sources.declarative.async_job.test_job ‑ test_retry_after[retry_after_in_past]
unit_tests.sources.declarative.async_job.test_job_orchestrator.AsyncJobOrchestratorTest ‑ test_create_failed_job_tags_as_creation_failure
unit_tests.sources.declarative.async_job.test_job_orchestrator.AsyncJobOrchestratorTest ‑ test_given_creation_failure_job_when_cooldown_configured_then_replaces_immediately
unit_tests.sources.declarative.async_job.test_job_orchestrator.AsyncJobOrchestratorTest ‑ test_given_failed_retry_wait_time_when_job_fails_then_defers_retry
unit_tests.sources.declarative.async_job.test_job_orchestrator.AsyncJobOrchestratorTest ‑ test_given_failed_retry_wait_time_when_timed_out_job_then_replaces_immediately
unit_tests.sources.declarative.async_job.test_job_orchestrator.AsyncJobOrchestratorTest ‑ test_given_no_failed_retry_wait_time_when_job_fails_then_replaces_immediately
…

♻️ This comment has been updated with latest results.

@darynaishchenko Daryna Ishchenko (darynaishchenko) marked this pull request as ready for review May 7, 2026 14:02
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 7, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: f8b6872f-4457-4ea6-9f1d-114cdb448655

📥 Commits

Reviewing files that changed from the base of the PR and between 787db4b and ec1a0f0.

📒 Files selected for processing (5)
  • airbyte_cdk/sources/declarative/async_job/job.py
  • airbyte_cdk/sources/declarative/async_job/job_orchestrator.py
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py
  • unit_tests/sources/declarative/async_job/test_job_orchestrator.py
🚧 Files skipped from review as they are similar to previous changes (2)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py

📝 Walkthrough

Walkthrough

Adds configurable cooldown-based deferred retries for FAILED async jobs: AsyncJob tracks a retry-after timestamp and creation-failure flag, orchestrator gates replacements using failed_retry_wait_time_in_seconds, configuration is wired through the factory/schema, and tests cover behaviors.

Changes

Async Job Retry Deferral

Layer / File(s) Summary
Schema and Configuration Fields
airbyte_cdk/sources/declarative/declarative_component_schema.yaml, airbyte_cdk/sources/declarative/models/declarative_component_schema.py
AsyncRetriever gains optional failed_retry_wait_time_in_seconds (int or interpolated string) to control cooldown before retrying failed jobs.
Configuration Wiring
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
create_async_retriever interpolates the model value and passes failed_retry_wait_time_in_seconds (Optional[int]) into AsyncHttpJobRepository.
AsyncJob Retry State Tracking
airbyte_cdk/sources/declarative/async_job/job.py
AsyncJob adds _retry_after and _is_creation_failure plus methods: is_creation_failure(), set_retry_after(), retry_deferred(), and ready_to_retry() using UTC-aware comparisons.
Orchestrator Retry Deferral Logic
airbyte_cdk/sources/declarative/async_job/job_orchestrator.py
AsyncJobOrchestrator accepts failed_retry_wait_time_in_seconds (validated >0); it skips replacing FAILED jobs not ready to retry, arms retry-after when configured, and _create_failed_job marks created failures as creation failures; TIMED_OUT behavior unchanged.
Tests
unit_tests/sources/declarative/async_job/test_job.py, unit_tests/sources/declarative/async_job/test_job_orchestrator.py
Tests converted to pytest; added parametrized tests for AsyncJob retry-after states and multiple orchestrator tests verifying deferred replacement, immediate replacement when unset, TIMED_OUT behavior, and creation-failure handling.

Sequence Diagram(s)

sequenceDiagram
  participant Client
  participant AsyncJobOrchestrator
  participant AsyncHttpJobRepository
  participant AsyncJob
  participant Clock as UTC

  Client->>AsyncJobOrchestrator: trigger job handling
  AsyncJobOrchestrator->>AsyncHttpJobRepository: fetch job(s)
  AsyncJobOrchestrator->>AsyncJob: check status
  AsyncJobOrchestrator->>AsyncJob: ready_to_retry()?
  alt Not ready
    AsyncJob-->>AsyncJobOrchestrator: false
    AsyncJobOrchestrator-->>AsyncJob: skip replacement
  else Ready
    AsyncJob-->>AsyncJobOrchestrator: true
    AsyncJobOrchestrator->>AsyncJob: retry_deferred()?
    alt Not deferred
      AsyncJob-->>AsyncJobOrchestrator: false
      AsyncJobOrchestrator->>Clock: now + wait_seconds
      Clock-->>AsyncJobOrchestrator: retry_after timestamp
      AsyncJobOrchestrator->>AsyncJob: set_retry_after(retry_after)
      AsyncJobOrchestrator-->>AsyncJob: defer replacement
    else Deferred
      AsyncJob-->>AsyncJobOrchestrator: true
      AsyncJobOrchestrator->>AsyncHttpJobRepository: start replacement
      AsyncHttpJobRepository-->>AsyncJobOrchestrator: new job (may be FAILED/creation-failure)
    end
  end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Would you like me to flag specific places to tighten timezone handling or add more unit coverage around edge cases, wdyt?

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 52.17% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the main change: adding deferred retry with cooldown for failed async jobs, which is the core feature across all modified files.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch devin/1778159323-failed-retry-wait-time

Warning

Review ran into problems

🔥 Problems

Timed out fetching pipeline failures after 30000ms

Tip

💬 Introducing Slack Agent: The best way for teams to turn conversations into code.

Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.

  • Generate code and open pull requests
  • Plan features and break down work
  • Investigate incidents and troubleshoot customer tickets together
  • Automate recurring tasks and respond to alerts with triggers
  • Summarize progress and report instantly

Built for teams:

  • Shared memory across your entire org—no repeating context
  • Per-thread sandboxes to safely plan and execute work
  • Governance built-in—scoped access, auditability, and budget controls

One agent for your entire SDLC. Right inside Slack.

👉 Get started


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
airbyte_cdk/sources/declarative/async_job/job_orchestrator.py (1)

171-193: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Could we reject negative retry cooldown values at construction time, wdyt?

Right now, a negative value can produce a retry_after in the past, which makes the deferral semantics confusing instead of explicit.

Proposed fix
     def __init__(
         self,
         job_repository: AsyncJobRepository,
         slices: Iterable[StreamSlice],
         job_tracker: JobTracker,
         message_repository: MessageRepository,
         exceptions_to_break_on: Iterable[Type[Exception]] = tuple(),
         has_bulk_parent: bool = False,
         job_max_retry: Optional[int] = None,
         failed_retry_wait_time_in_seconds: Optional[int] = None,
     ) -> None:
+        if (
+            failed_retry_wait_time_in_seconds is not None
+            and failed_retry_wait_time_in_seconds < 0
+        ):
+            raise ValueError(
+                "failed_retry_wait_time_in_seconds must be >= 0"
+            )
         """
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@airbyte_cdk/sources/declarative/async_job/job_orchestrator.py` around lines
171 - 193, Reject negative failed_retry_wait_time_in_seconds in the constructor
by validating the parameter and raising a ValueError if it's < 0; add a check
after parameters are received (near AsyncJobStatus/_KNOWN_JOB_STATUSES
validation) to assert failed_retry_wait_time_in_seconds is either None or >= 0
and include a clear error message, then assign it to
self._failed_retry_wait_time_in_seconds as before.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@airbyte_cdk/sources/declarative/async_job/job.py`:
- Around line 58-70: The set_retry_after method should guard against naive
datetimes and store a normalized UTC-aware datetime in self._retry_after: if
retry_after.tzinfo is None (naive) attach timezone.utc (e.g. retry_after =
retry_after.replace(tzinfo=timezone.utc)), then normalize any timezone-aware
value to UTC via retry_after = retry_after.astimezone(timezone.utc) before
assigning to self._retry_after; keep ready_to_retry using
datetime.now(tz=timezone.utc) and the _retry_after field for comparisons so no
TypeError occurs.

In `@airbyte_cdk/sources/declarative/declarative_component_schema.yaml`:
- Around line 4105-4111: Add a minimum bound to the integer branch of the
failed_retry_wait_time_in_seconds schema so negative or zero values are rejected
by the schema; specifically, in the failed_retry_wait_time_in_seconds anyOf
entry for type: integer add minimum: 1 (mirroring the pattern used by
DynamicStreamCheckConfig.stream_count) to ensure the orchestrator's
ready_to_retry() logic isn't fed non-positive values while keeping the existing
anyOf string branch and interpolation_context unchanged.

In `@airbyte_cdk/sources/declarative/models/declarative_component_schema.py`:
- Around line 3077-3080: The field failed_retry_wait_time_in_seconds currently
allows negative values; add validation to prevent negatives by adding ge=0 to
its Pydantic Field call in the declarative component model (update the
Field(...) for failed_retry_wait_time_in_seconds to include ge=0) and also add
minimum: 0 to the corresponding YAML schema definition for the same field so
generated models and schema both enforce a non-negative cooldown.

In `@unit_tests/sources/declarative/async_job/test_job.py`:
- Around line 30-55: The parametrized datetimes are computed at import time
which makes the test time-sensitive; change the test to compute retry_after
values at runtime inside test_retry_after instead of using datetime.now(...) in
the param list: keep the same parameter cases (None, future, past) but pass a
marker or enum in the parametrize tuple and in test_retry_after compute
datetime.now(timezone.utc) + timedelta(hours=1) or - timedelta(seconds=1) as
needed, then call AsyncJob.set_retry_after(retry_after) and assert
job.retry_deferred() and job.ready_to_retry() as before (refer to
test_retry_after, AsyncJob.set_retry_after, job.retry_deferred,
job.ready_to_retry).

---

Outside diff comments:
In `@airbyte_cdk/sources/declarative/async_job/job_orchestrator.py`:
- Around line 171-193: Reject negative failed_retry_wait_time_in_seconds in the
constructor by validating the parameter and raising a ValueError if it's < 0;
add a check after parameters are received (near
AsyncJobStatus/_KNOWN_JOB_STATUSES validation) to assert
failed_retry_wait_time_in_seconds is either None or >= 0 and include a clear
error message, then assign it to self._failed_retry_wait_time_in_seconds as
before.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 708f4de0-2b2c-45d9-9671-f27afea41ede

📥 Commits

Reviewing files that changed from the base of the PR and between ccc185f and 483701a.

📒 Files selected for processing (7)
  • airbyte_cdk/sources/declarative/async_job/job.py
  • airbyte_cdk/sources/declarative/async_job/job_orchestrator.py
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
  • unit_tests/sources/declarative/async_job/test_job.py
  • unit_tests/sources/declarative/async_job/test_job_orchestrator.py

Comment thread airbyte_cdk/sources/declarative/async_job/job.py
Comment thread airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Comment thread unit_tests/sources/declarative/async_job/test_job.py
…runtime offsets

Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
@tolik0
Copy link
Copy Markdown
Contributor

A couple of concerns from review:

1. Double cooldown via synthetic FAILED jobs

_start_job doesn't always return a freshly-running job. When the API rejects the start, _keep_api_budget_with_failed_job synthesizes a FAILED AsyncJob with _retry_after = None and returns it (job_orchestrator.py:260-281). Tracing the SP-API cooldown path:

  1. Job fails at T=0 → cooldown armed, retry_after = T + 1800
  2. At T=1800, cooldown elapses → _replace_failed_jobs calls _start_job
  3. API still in cooldown → _start_job returns a synthetic FAILED job (fresh, no _retry_after)
  4. Next tick → state machine sees a fresh FAILED job → arms cooldown again at T ≈ 1800
  5. Effective total wait is roughly 2 × cooldown per attempt, not 1 × cooldown

For the motivating SP-API case, hitting _keep_api_budget_with_failed_job is the expected outcome of retrying during cooldown, so this isn't an edge case. Two reasonable fixes:

  • Carry _retry_after from the old job onto the synthetic replacement, or
  • Don't replace at all when _start_job would synthesize a failure during cooldown — keep the original job and re-arm.

Either way, worth a regression test that simulates _start_job returning a synthetic FAILED job and asserts the total wall-clock wait equals one cooldown, not two.

2. Three different lower bounds for failed_retry_wait_time_in_seconds

  • YAML schema: minimum: 1
  • Constructor: if ... < 0: raise ValueError(...) — allows 0
  • Runtime: if self._failed_retry_wait_time_in_seconds: — treats 0 as disabled

A value of 0 is rejected by the schema, silently accepted by the constructor, and silently disables the feature at runtime. Pick one: is not None and > 0 in the runtime check, and > 0 (or >= 1) in the constructor to match the YAML.

…dation

- When _start_job returns a synthetic FAILED job (API still in cooldown),
  carry _retry_after from the old job to prevent double-cooldown
- Change constructor validation from < 0 to <= 0 to match YAML schema
  minimum: 1 constraint
- Add regression test for synthetic FAILED replacement scenario

Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
@devin-ai-integration
Copy link
Copy Markdown
Contributor

Both concerns addressed in 9ddf6ca:

1. Double cooldown via synthetic FAILED jobs

Fixed by carrying _retry_after from the old job onto the synthetic replacement. After _start_job returns, if the new job is already FAILED (synthetic) and the original job had a deferred retry, the replacement inherits _retry_after:

new_job = self._start_job(job.job_parameters(), job.api_job_id())
if (
    self._failed_retry_wait_time_in_seconds
    and new_job.status() == AsyncJobStatus.FAILED
    and job.retry_deferred()
):
    new_job.set_retry_after(job._retry_after)
partition.replace_job(job, [new_job])

This ensures the total wall-clock wait equals one cooldown, not two. Added a regression test (test_given_synthetic_failed_replacement_when_cooldown_elapses_then_carries_retry_after) that simulates _start_job returning a synthetic FAILED job and asserts the replacement inherits _retry_after from the original.

2. Consistent lower bound

Constructor validation now rejects <= 0 (matching YAML minimum: 1):

if failed_retry_wait_time_in_seconds is not None and failed_retry_wait_time_in_seconds <= 0:
    raise ValueError("failed_retry_wait_time_in_seconds must be >= 1")

The three layers are now consistent:

  • YAML schema: minimum: 1
  • Constructor: rejects <= 0
  • Runtime: if self._failed_retry_wait_time_in_seconds: — 0 can never reach here since constructor rejects it

Devin session

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/async_job/job_orchestrator.py (1)

204-223: ⚡ Quick win

Could we flip the cooldown guard order to reduce hidden coupling, wdyt?

At Line 205, readiness is checked before we know whether cooldown was ever armed. Would you consider arming on first FAILED (not retry_deferred) first, then checking readiness, so this path doesn’t depend on implicit ready_to_retry() semantics?

Proposed refactor
-            if self._failed_retry_wait_time_in_seconds and job.status() == AsyncJobStatus.FAILED:
-                if not job.ready_to_retry():
-                    lazy_log(
-                        LOGGER,
-                        logging.DEBUG,
-                        lambda: f"Job {job.api_job_id()} is not ready to retry yet (deferred). Skipping.",
-                    )
-                    continue
-                if not job.retry_deferred():
+            if self._failed_retry_wait_time_in_seconds and job.status() == AsyncJobStatus.FAILED:
+                if not job.retry_deferred():
                     job.set_retry_after(
                         datetime.now(tz=timezone.utc)
                         + timedelta(seconds=self._failed_retry_wait_time_in_seconds)
@@
                     )
                     continue
+                if not job.ready_to_retry():
+                    lazy_log(
+                        LOGGER,
+                        logging.DEBUG,
+                        lambda: f"Job {job.api_job_id()} is not ready to retry yet (deferred). Skipping.",
+                    )
+                    continue
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@airbyte_cdk/sources/declarative/async_job/job_orchestrator.py` around lines
204 - 223, Flip the cooldown guard order so the retry cooldown is explicitly
armed on the first FAILED occurrence before relying on ready_to_retry(); inside
the loop handling AsyncJobStatus.FAILED (use symbols
_failed_retry_wait_time_in_seconds, job.retry_deferred(),
job.set_retry_after(...), job.ready_to_retry(), job.api_job_id(), _start_job),
first check if not job.retry_deferred() and then call
job.set_retry_after(datetime.now(tz=timezone.utc) +
timedelta(seconds=self._failed_retry_wait_time_in_seconds)) and log the deferral
and continue; otherwise (if retry_deferred) then check job.ready_to_retry() and
skip/continue if not ready, only proceed to call
self._start_job(job.job_parameters(), job.api_job_id()) when readiness passes.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@airbyte_cdk/sources/declarative/async_job/job_orchestrator.py`:
- Around line 204-223: Flip the cooldown guard order so the retry cooldown is
explicitly armed on the first FAILED occurrence before relying on
ready_to_retry(); inside the loop handling AsyncJobStatus.FAILED (use symbols
_failed_retry_wait_time_in_seconds, job.retry_deferred(),
job.set_retry_after(...), job.ready_to_retry(), job.api_job_id(), _start_job),
first check if not job.retry_deferred() and then call
job.set_retry_after(datetime.now(tz=timezone.utc) +
timedelta(seconds=self._failed_retry_wait_time_in_seconds)) and log the deferral
and continue; otherwise (if retry_deferred) then check job.ready_to_retry() and
skip/continue if not ready, only proceed to call
self._start_job(job.job_parameters(), job.api_job_id()) when readiness passes.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 242307c0-269e-4c29-aba3-70f104ce272f

📥 Commits

Reviewing files that changed from the base of the PR and between d0385e7 and 9ddf6ca.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/async_job/job_orchestrator.py
  • unit_tests/sources/declarative/async_job/test_job_orchestrator.py

Synthetic FAILED jobs (created when API rejects with 429/budget-exceeded)
are now marked with _is_synthetic=True and bypass the cooldown deferral.
Only real FAILED jobs (report created but got FATAL from API) are deferred.

This removes the _retry_after carry-over logic since synthetic jobs are
naturally identified and retried immediately.

Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
@devin-ai-integration
Copy link
Copy Markdown
Contributor

Addressed in 787db4b — synthetic vs real FAILED jobs are now distinguished:

What changed:

  • AsyncJob gains an _is_synthetic flag (default False)
  • _create_failed_job() (called when API rejects with 429/budget-exceeded) sets _is_synthetic = True on the job
  • _replace_failed_jobs() now checks not job.is_synthetic() before applying cooldown deferral
  • Synthetic FAILED jobs are retried immediately (existing behavior preserved)
  • Only real FAILED jobs (report created, API returned FATAL) get the failed_retry_wait_time_in_seconds cooldown

Removed: The _retry_after carry-over logic from the previous commit is no longer needed — synthetic replacements are identified by their flag and skip the cooldown path entirely, so the double-cooldown problem is solved naturally.

Test added: test_given_synthetic_failed_job_when_cooldown_configured_then_replaces_immediately verifies that a synthetic FAILED job bypasses deferral even when failed_retry_wait_time_in_seconds is set.


Devin session

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (3)
airbyte_cdk/sources/declarative/async_job/job.py (1)

28-28: ⚡ Quick win

Consider adding type annotation for consistency?

_is_synthetic is missing a type annotation while _retry_after on line 27 has one. Adding : bool would keep the typing consistent across the new fields, wdyt?

✨ Proposed fix
-        self._is_synthetic = False
+        self._is_synthetic: bool = False
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@airbyte_cdk/sources/declarative/async_job/job.py` at line 28, The new
attribute _is_synthetic lacks a type annotation whereas nearby fields like
_retry_after are typed; add an explicit boolean type to _is_synthetic (i.e.,
declare _is_synthetic: bool) in the same class where _retry_after is defined
(look for the class containing _retry_after and the line setting
self._is_synthetic) to keep typing consistent across the new fields.
airbyte_cdk/sources/declarative/async_job/job_orchestrator.py (2)

204-226: 💤 Low value

Consider explicit None-check for the cooldown guard?

Line 205 uses truthiness (if self._failed_retry_wait_time_in_seconds). Since the constructor already rejects 0, this is functionally equivalent to is not None, but being explicit might make the intent clearer for future readers, wdyt?

✨ Proposed refactor
         for job in jobs_to_replace:
             if (
-                self._failed_retry_wait_time_in_seconds
+                self._failed_retry_wait_time_in_seconds is not None
                 and job.status() == AsyncJobStatus.FAILED
                 and not job.is_synthetic()
             ):
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@airbyte_cdk/sources/declarative/async_job/job_orchestrator.py` around lines
204 - 226, The guard using truthiness on self._failed_retry_wait_time_in_seconds
should be made explicit to indicate it’s intended to test for presence (None)
rather than falsy values; update the conditional in job_orchestrator.py to use
"is not None" for self._failed_retry_wait_time_in_seconds while keeping the rest
of the logic around job.status(), job.is_synthetic(), job.ready_to_retry(),
job.retry_deferred(), job.set_retry_after(), and job.api_job_id() unchanged so
behavior remains the same but the intent is clearer.

311-315: 💤 Low value

Direct field access is working but could be cleaner?

Line 314 directly sets job._is_synthetic = True. Would adding a method like job.mark_as_synthetic() or a constructor parameter be cleaner for future maintainability, or is direct access fine here since it's internal framework code, wdyt?

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@airbyte_cdk/sources/declarative/async_job/job_orchestrator.py` around lines
311 - 315, The code in _create_failed_job directly sets the private field
job._is_synthetic = True; instead, add a clear public API on AsyncJob (e.g., a
method mark_as_synthetic() or a constructor parameter like is_synthetic=False)
and use that here: update the AsyncJob class to expose mark_as_synthetic(self)
which sets the internal flag and any related invariants, then replace the direct
assignment in _create_failed_job with job.mark_as_synthetic() (or pass
is_synthetic=True when constructing the AsyncJob) to improve encapsulation and
maintainability.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@airbyte_cdk/sources/declarative/async_job/job_orchestrator.py`:
- Around line 204-226: The guard using truthiness on
self._failed_retry_wait_time_in_seconds should be made explicit to indicate it’s
intended to test for presence (None) rather than falsy values; update the
conditional in job_orchestrator.py to use "is not None" for
self._failed_retry_wait_time_in_seconds while keeping the rest of the logic
around job.status(), job.is_synthetic(), job.ready_to_retry(),
job.retry_deferred(), job.set_retry_after(), and job.api_job_id() unchanged so
behavior remains the same but the intent is clearer.
- Around line 311-315: The code in _create_failed_job directly sets the private
field job._is_synthetic = True; instead, add a clear public API on AsyncJob
(e.g., a method mark_as_synthetic() or a constructor parameter like
is_synthetic=False) and use that here: update the AsyncJob class to expose
mark_as_synthetic(self) which sets the internal flag and any related invariants,
then replace the direct assignment in _create_failed_job with
job.mark_as_synthetic() (or pass is_synthetic=True when constructing the
AsyncJob) to improve encapsulation and maintainability.

In `@airbyte_cdk/sources/declarative/async_job/job.py`:
- Line 28: The new attribute _is_synthetic lacks a type annotation whereas
nearby fields like _retry_after are typed; add an explicit boolean type to
_is_synthetic (i.e., declare _is_synthetic: bool) in the same class where
_retry_after is defined (look for the class containing _retry_after and the line
setting self._is_synthetic) to keep typing consistent across the new fields.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 9eba2569-5cb8-466c-af30-b6675195fac2

📥 Commits

Reviewing files that changed from the base of the PR and between 9ddf6ca and 787db4b.

📒 Files selected for processing (3)
  • airbyte_cdk/sources/declarative/async_job/job.py
  • airbyte_cdk/sources/declarative/async_job/job_orchestrator.py
  • unit_tests/sources/declarative/async_job/test_job_orchestrator.py

…on_failure

Address review feedback from Daryna:
1. AsyncJob accepts is_creation_failure as constructor parameter (default False),
   eliminating private-attribute write from outside the class.
2. Rename is_synthetic() to is_creation_failure() for self-documenting call sites.
3. _create_failed_job passes is_creation_failure=True at construction time.
4. Apply CodeRabbit nitpick: use 'is not None' for cooldown guard.
5. Add full regression test: real FAILED -> defer -> cooldown elapses ->
   _start_job returns creation-failure -> verify no re-arm.
6. Add unit test verifying _create_failed_job correctly tags the job.

Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
@devin-ai-integration
Copy link
Copy Markdown
Contributor

All 4 design points from Daryna Ishchenko (@darynaishchenko) addressed in e2e9e6b:

1. Constructor parameter instead of private-attribute write

AsyncJob now accepts is_creation_failure: bool = False as a constructor parameter. _create_failed_job() passes is_creation_failure=True at construction time — no more job._is_synthetic = True from outside.

2. Full regression test

Added test_given_real_failed_then_cooldown_elapses_then_start_returns_creation_failure_then_no_rearm covering the exact bug scenario: real FAILED → arm cooldown → cooldown elapses → _start_job returns creation-failure → next tick must NOT re-arm.

3. Unit test for _create_failed_job

Added test_create_failed_job_tags_as_creation_failure verifying the method produces a job with is_creation_failure() == True and status() == FAILED.

4. Renamed is_synthetic()is_creation_failure()

Self-documenting at call sites — not job.is_creation_failure() clearly communicates the meaning.

Also applied CodeRabbit nitpick: explicit is not None check on the cooldown guard.


Devin session

@darynaishchenko Daryna Ishchenko (darynaishchenko) merged commit d3d1346 into main May 11, 2026
29 checks passed
@darynaishchenko Daryna Ishchenko (darynaishchenko) deleted the devin/1778159323-failed-retry-wait-time branch May 11, 2026 15:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants