Skip to content

Commit d3d1346

Browse files
feat(async-jobs): add deferred retry with cooldown for failed async jobs (#1016)
Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
1 parent 9fcc6de commit d3d1346

7 files changed

Lines changed: 298 additions & 20 deletions

File tree

airbyte_cdk/sources/declarative/async_job/job.py

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
22

33

4-
from datetime import timedelta
4+
from datetime import datetime, timedelta, timezone
55
from typing import Optional
66

77
from airbyte_cdk.sources.declarative.async_job.timer import Timer
@@ -19,11 +19,17 @@ class AsyncJob:
1919
"""
2020

2121
def __init__(
22-
self, api_job_id: str, job_parameters: StreamSlice, timeout: Optional[timedelta] = None
22+
self,
23+
api_job_id: str,
24+
job_parameters: StreamSlice,
25+
timeout: Optional[timedelta] = None,
26+
is_creation_failure: bool = False,
2327
) -> None:
2428
self._api_job_id = api_job_id
2529
self._job_parameters = job_parameters
2630
self._status = AsyncJobStatus.RUNNING
31+
self._retry_after: Optional[datetime] = None
32+
self._is_creation_failure = is_creation_failure
2733

2834
timeout = timeout if timeout else timedelta(minutes=60)
2935
self._timer = Timer(timeout)
@@ -54,5 +60,23 @@ def update_status(self, status: AsyncJobStatus) -> None:
5460

5561
self._status = status
5662

63+
def is_creation_failure(self) -> bool:
64+
"""Return True if this job was never actually created on the API side."""
65+
return self._is_creation_failure
66+
67+
def set_retry_after(self, retry_after: datetime) -> None:
68+
"""Set the earliest time this job can be retried."""
69+
self._retry_after = retry_after
70+
71+
def retry_deferred(self) -> bool:
72+
"""Return True if a deferred retry has been scheduled."""
73+
return self._retry_after is not None
74+
75+
def ready_to_retry(self) -> bool:
76+
"""Return True if the job has no deferred retry or the wait period has elapsed."""
77+
if self._retry_after is None:
78+
return True
79+
return datetime.now(tz=timezone.utc) >= self._retry_after
80+
5781
def __repr__(self) -> str:
5882
return f"AsyncJob(api_job_id={self.api_job_id()}, job_parameters={self.job_parameters()}, status={self.status()})"

airbyte_cdk/sources/declarative/async_job/job_orchestrator.py

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import time
66
import traceback
77
import uuid
8-
from datetime import timedelta
8+
from datetime import datetime, timedelta, timezone
99
from typing import (
1010
Any,
1111
Generator,
@@ -168,6 +168,7 @@ def __init__(
168168
exceptions_to_break_on: Iterable[Type[Exception]] = tuple(),
169169
has_bulk_parent: bool = False,
170170
job_max_retry: Optional[int] = None,
171+
failed_retry_wait_time_in_seconds: Optional[int] = None,
171172
) -> None:
172173
"""
173174
If the stream slices provided as a parameters relies on a async job streams that relies on the same JobTracker, `has_bulk_parent`
@@ -181,6 +182,9 @@ def __init__(
181182
"An AsyncJobStatus has been either removed or added which means the logic of this class needs to be reviewed. Once the logic has been updated, please update _KNOWN_JOB_STATUSES"
182183
)
183184

185+
if failed_retry_wait_time_in_seconds is not None and failed_retry_wait_time_in_seconds <= 0:
186+
raise ValueError("failed_retry_wait_time_in_seconds must be >= 1")
187+
184188
self._job_repository: AsyncJobRepository = job_repository
185189
self._slice_iterator = LookaheadIterator(slices)
186190
self._running_partitions: List[AsyncPartition] = []
@@ -189,13 +193,37 @@ def __init__(
189193
self._exceptions_to_break_on: Tuple[Type[Exception], ...] = tuple(exceptions_to_break_on)
190194
self._has_bulk_parent = has_bulk_parent
191195
self._job_max_retry = job_max_retry
196+
self._failed_retry_wait_time_in_seconds = failed_retry_wait_time_in_seconds
192197

193198
self._non_breaking_exceptions: List[Exception] = []
194199

195200
def _replace_failed_jobs(self, partition: AsyncPartition) -> None:
196201
failed_status_jobs = (AsyncJobStatus.FAILED, AsyncJobStatus.TIMED_OUT)
197202
jobs_to_replace = [job for job in partition.jobs if job.status() in failed_status_jobs]
198203
for job in jobs_to_replace:
204+
if (
205+
self._failed_retry_wait_time_in_seconds is not None
206+
and job.status() == AsyncJobStatus.FAILED
207+
and not job.is_creation_failure()
208+
):
209+
if not job.ready_to_retry():
210+
lazy_log(
211+
LOGGER,
212+
logging.DEBUG,
213+
lambda: f"Job {job.api_job_id()} is not ready to retry yet (deferred). Skipping.",
214+
)
215+
continue
216+
if not job.retry_deferred():
217+
job.set_retry_after(
218+
datetime.now(tz=timezone.utc)
219+
+ timedelta(seconds=self._failed_retry_wait_time_in_seconds)
220+
)
221+
lazy_log(
222+
LOGGER,
223+
logging.INFO,
224+
lambda: f"Job {job.api_job_id()} failed. Deferring retry for {self._failed_retry_wait_time_in_seconds} seconds.",
225+
)
226+
continue
199227
new_job = self._start_job(job.job_parameters(), job.api_job_id())
200228
partition.replace_job(job, [new_job])
201229

@@ -281,7 +309,12 @@ def _keep_api_budget_with_failed_job(
281309
return job
282310

283311
def _create_failed_job(self, stream_slice: StreamSlice) -> AsyncJob:
284-
job = AsyncJob(f"{uuid.uuid4()} - Job that could not start", stream_slice, _NO_TIMEOUT)
312+
job = AsyncJob(
313+
f"{uuid.uuid4()} - Job that could not start",
314+
stream_slice,
315+
_NO_TIMEOUT,
316+
is_creation_failure=True,
317+
)
285318
job.update_status(AsyncJobStatus.FAILED)
286319
return job
287320

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4107,6 +4107,14 @@ definitions:
41074107
- type: string
41084108
interpolation_context:
41094109
- config
4110+
failed_retry_wait_time_in_seconds:
4111+
description: Time in seconds to wait before retrying a failed async job. Only applies to jobs that ran on the API side and reported a FAILED status (e.g. report generation failed due to a cooldown). Creation failures (HTTP errors when starting a job, such as 429s) and TIMED_OUT jobs are retried immediately and are not affected by this setting. When set, the orchestrator defers retry of real failed jobs until the wait time has elapsed, without blocking other jobs.
4112+
anyOf:
4113+
- type: integer
4114+
minimum: 1
4115+
- type: string
4116+
interpolation_context:
4117+
- config
41104118
download_target_requester:
41114119
description: Requester component that describes how to prepare HTTP requests to send to the source API to extract the url from polling response by the completed async job.
41124120
anyOf:

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3074,6 +3074,10 @@ class AsyncRetriever(BaseModel):
30743074
None,
30753075
description="The time in minutes after which the single Async Job should be considered as Timed Out.",
30763076
)
3077+
failed_retry_wait_time_in_seconds: Optional[Union[int, str]] = Field(
3078+
None,
3079+
description="Time in seconds to wait before retrying a failed async job. Only applies to jobs that ran on the API side and reported a FAILED status (e.g. report generation failed due to a cooldown). Creation failures (HTTP errors when starting a job, such as 429s) and TIMED_OUT jobs are retried immediately and are not affected by this setting. When set, the orchestrator defers retry of real failed jobs until the wait time has elapsed, without blocking other jobs.",
3080+
)
30773081
download_target_requester: Optional[Union[HttpRequester, CustomRequester]] = Field(
30783082
None,
30793083
description="Requester component that describes how to prepare HTTP requests to send to the source API to extract the url from polling response by the completed async job.",

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3955,6 +3955,17 @@ def _get_job_timeout() -> datetime.timedelta:
39553955
job_timeout=_get_job_timeout(),
39563956
)
39573957

3958+
failed_retry_wait_time_in_seconds: Optional[int] = (
3959+
int(
3960+
InterpolatedString.create(
3961+
str(model.failed_retry_wait_time_in_seconds),
3962+
parameters={},
3963+
).eval(config)
3964+
)
3965+
if model.failed_retry_wait_time_in_seconds
3966+
else None
3967+
)
3968+
39583969
async_job_partition_router = AsyncJobPartitionRouter(
39593970
job_orchestrator_factory=lambda stream_slices: AsyncJobOrchestrator(
39603971
job_repository,
@@ -3966,6 +3977,7 @@ def _get_job_timeout() -> datetime.timedelta:
39663977
# set the `job_max_retry` to 1 for the `Connector Builder`` use-case.
39673978
# `None` == default retry is set to 3 attempts, under the hood.
39683979
job_max_retry=1 if self._emit_connector_builder_messages else None,
3980+
failed_retry_wait_time_in_seconds=failed_retry_wait_time_in_seconds,
39693981
),
39703982
stream_slicer=stream_slicer,
39713983
config=config,
Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
22

33
import time
4-
from datetime import timedelta
5-
from unittest import TestCase
4+
from datetime import datetime, timedelta, timezone
5+
from typing import Optional
6+
7+
import pytest
68

79
from airbyte_cdk.sources.declarative.async_job.job import AsyncJob
810
from airbyte_cdk.sources.declarative.async_job.status import AsyncJobStatus
@@ -14,19 +16,30 @@
1416
_IMMEDIATELY_TIMED_OUT = timedelta(microseconds=1)
1517

1618

17-
class AsyncJobTest(TestCase):
18-
def test_given_timer_is_not_out_when_status_then_return_actual_status(self) -> None:
19-
job = AsyncJob(_AN_API_JOB_ID, _ANY_STREAM_SLICE, _A_VERY_BIG_TIMEOUT)
20-
assert job.status() == AsyncJobStatus.RUNNING
19+
def test_given_timer_is_not_out_when_status_then_return_actual_status() -> None:
20+
job = AsyncJob(_AN_API_JOB_ID, _ANY_STREAM_SLICE, _A_VERY_BIG_TIMEOUT)
21+
assert job.status() == AsyncJobStatus.RUNNING
22+
23+
24+
def test_given_timer_is_out_when_status_then_return_timed_out() -> None:
25+
job = AsyncJob(_AN_API_JOB_ID, _ANY_STREAM_SLICE, _IMMEDIATELY_TIMED_OUT)
26+
time.sleep(0.001)
27+
assert job.status() == AsyncJobStatus.TIMED_OUT
2128

22-
def test_given_timer_is_out_when_status_then_return_timed_out(self) -> None:
23-
job = AsyncJob(_AN_API_JOB_ID, _ANY_STREAM_SLICE, _IMMEDIATELY_TIMED_OUT)
24-
time.sleep(0.001)
25-
assert job.status() == AsyncJobStatus.TIMED_OUT
2629

27-
def test_given_status_is_terminal_when_update_status_then_stop_timer(self) -> None:
28-
"""
29-
This test will become important once we will print stats associated with jobs. As for now, we stop the timer but do not return any
30-
metrics regarding the timer so it is not useful.
31-
"""
32-
pass
30+
@pytest.mark.parametrize(
31+
"retry_after_offset,expected_deferred,expected_ready",
32+
[
33+
pytest.param(None, False, True, id="no_retry_after_set"),
34+
pytest.param(timedelta(hours=1), True, False, id="retry_after_in_future"),
35+
pytest.param(-timedelta(seconds=1), True, True, id="retry_after_in_past"),
36+
],
37+
)
38+
def test_retry_after(
39+
retry_after_offset: Optional[timedelta], expected_deferred: bool, expected_ready: bool
40+
) -> None:
41+
job = AsyncJob(_AN_API_JOB_ID, _ANY_STREAM_SLICE, _A_VERY_BIG_TIMEOUT)
42+
if retry_after_offset is not None:
43+
job.set_retry_after(datetime.now(tz=timezone.utc) + retry_after_offset)
44+
assert job.retry_deferred() == expected_deferred
45+
assert job.ready_to_retry() == expected_ready

0 commit comments

Comments
 (0)