Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions airbyte_cdk/sources/declarative/async_job/job.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.


from datetime import timedelta
from datetime import datetime, timedelta, timezone
from typing import Optional

from airbyte_cdk.sources.declarative.async_job.timer import Timer
Expand All @@ -19,11 +19,17 @@ class AsyncJob:
"""

def __init__(
self, api_job_id: str, job_parameters: StreamSlice, timeout: Optional[timedelta] = None
self,
api_job_id: str,
job_parameters: StreamSlice,
timeout: Optional[timedelta] = None,
is_creation_failure: bool = False,
) -> None:
self._api_job_id = api_job_id
self._job_parameters = job_parameters
self._status = AsyncJobStatus.RUNNING
self._retry_after: Optional[datetime] = None
self._is_creation_failure = is_creation_failure

timeout = timeout if timeout else timedelta(minutes=60)
self._timer = Timer(timeout)
Expand Down Expand Up @@ -54,5 +60,23 @@ def update_status(self, status: AsyncJobStatus) -> None:

self._status = status

def is_creation_failure(self) -> bool:
"""Return True if this job was never actually created on the API side."""
return self._is_creation_failure

def set_retry_after(self, retry_after: datetime) -> None:
"""Set the earliest time this job can be retried."""
self._retry_after = retry_after

def retry_deferred(self) -> bool:
"""Return True if a deferred retry has been scheduled."""
return self._retry_after is not None

def ready_to_retry(self) -> bool:
"""Return True if the job has no deferred retry or the wait period has elapsed."""
if self._retry_after is None:
return True
return datetime.now(tz=timezone.utc) >= self._retry_after
Comment thread
darynaishchenko marked this conversation as resolved.

def __repr__(self) -> str:
return f"AsyncJob(api_job_id={self.api_job_id()}, job_parameters={self.job_parameters()}, status={self.status()})"
37 changes: 35 additions & 2 deletions airbyte_cdk/sources/declarative/async_job/job_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import time
import traceback
import uuid
from datetime import timedelta
from datetime import datetime, timedelta, timezone
from typing import (
Any,
Generator,
Expand Down Expand Up @@ -168,6 +168,7 @@ def __init__(
exceptions_to_break_on: Iterable[Type[Exception]] = tuple(),
has_bulk_parent: bool = False,
job_max_retry: Optional[int] = None,
failed_retry_wait_time_in_seconds: Optional[int] = None,
) -> None:
"""
If the stream slices provided as a parameters relies on a async job streams that relies on the same JobTracker, `has_bulk_parent`
Expand All @@ -181,6 +182,9 @@ def __init__(
"An AsyncJobStatus has been either removed or added which means the logic of this class needs to be reviewed. Once the logic has been updated, please update _KNOWN_JOB_STATUSES"
)

if failed_retry_wait_time_in_seconds is not None and failed_retry_wait_time_in_seconds <= 0:
raise ValueError("failed_retry_wait_time_in_seconds must be >= 1")

self._job_repository: AsyncJobRepository = job_repository
self._slice_iterator = LookaheadIterator(slices)
self._running_partitions: List[AsyncPartition] = []
Expand All @@ -189,13 +193,37 @@ def __init__(
self._exceptions_to_break_on: Tuple[Type[Exception], ...] = tuple(exceptions_to_break_on)
self._has_bulk_parent = has_bulk_parent
self._job_max_retry = job_max_retry
self._failed_retry_wait_time_in_seconds = failed_retry_wait_time_in_seconds

self._non_breaking_exceptions: List[Exception] = []

def _replace_failed_jobs(self, partition: AsyncPartition) -> None:
failed_status_jobs = (AsyncJobStatus.FAILED, AsyncJobStatus.TIMED_OUT)
jobs_to_replace = [job for job in partition.jobs if job.status() in failed_status_jobs]
for job in jobs_to_replace:
if (
self._failed_retry_wait_time_in_seconds is not None
and job.status() == AsyncJobStatus.FAILED
and not job.is_creation_failure()
):
if not job.ready_to_retry():
lazy_log(
LOGGER,
logging.DEBUG,
lambda: f"Job {job.api_job_id()} is not ready to retry yet (deferred). Skipping.",
)
continue
if not job.retry_deferred():
job.set_retry_after(
datetime.now(tz=timezone.utc)
+ timedelta(seconds=self._failed_retry_wait_time_in_seconds)
)
lazy_log(
LOGGER,
logging.INFO,
lambda: f"Job {job.api_job_id()} failed. Deferring retry for {self._failed_retry_wait_time_in_seconds} seconds.",
)
continue
new_job = self._start_job(job.job_parameters(), job.api_job_id())
partition.replace_job(job, [new_job])

Expand Down Expand Up @@ -281,7 +309,12 @@ def _keep_api_budget_with_failed_job(
return job

def _create_failed_job(self, stream_slice: StreamSlice) -> AsyncJob:
job = AsyncJob(f"{uuid.uuid4()} - Job that could not start", stream_slice, _NO_TIMEOUT)
job = AsyncJob(
f"{uuid.uuid4()} - Job that could not start",
stream_slice,
_NO_TIMEOUT,
is_creation_failure=True,
)
job.update_status(AsyncJobStatus.FAILED)
return job

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4107,6 +4107,14 @@ definitions:
- type: string
interpolation_context:
- config
failed_retry_wait_time_in_seconds:
description: Time in seconds to wait before retrying a failed async job. Only applies to jobs that ran on the API side and reported a FAILED status (e.g. report generation failed due to a cooldown). Creation failures (HTTP errors when starting a job, such as 429s) and TIMED_OUT jobs are retried immediately and are not affected by this setting. When set, the orchestrator defers retry of real failed jobs until the wait time has elapsed, without blocking other jobs.
anyOf:
- type: integer
minimum: 1
- type: string
interpolation_context:
- config
Comment thread
coderabbitai[bot] marked this conversation as resolved.
download_target_requester:
description: Requester component that describes how to prepare HTTP requests to send to the source API to extract the url from polling response by the completed async job.
anyOf:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3074,6 +3074,10 @@ class AsyncRetriever(BaseModel):
None,
description="The time in minutes after which the single Async Job should be considered as Timed Out.",
)
failed_retry_wait_time_in_seconds: Optional[Union[int, str]] = Field(
None,
description="Time in seconds to wait before retrying a failed async job. Only applies to jobs that ran on the API side and reported a FAILED status (e.g. report generation failed due to a cooldown). Creation failures (HTTP errors when starting a job, such as 429s) and TIMED_OUT jobs are retried immediately and are not affected by this setting. When set, the orchestrator defers retry of real failed jobs until the wait time has elapsed, without blocking other jobs.",
)
Comment thread
darynaishchenko marked this conversation as resolved.
download_target_requester: Optional[Union[HttpRequester, CustomRequester]] = Field(
None,
description="Requester component that describes how to prepare HTTP requests to send to the source API to extract the url from polling response by the completed async job.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3955,6 +3955,17 @@ def _get_job_timeout() -> datetime.timedelta:
job_timeout=_get_job_timeout(),
)

failed_retry_wait_time_in_seconds: Optional[int] = (
int(
InterpolatedString.create(
str(model.failed_retry_wait_time_in_seconds),
parameters={},
).eval(config)
)
if model.failed_retry_wait_time_in_seconds
else None
)

async_job_partition_router = AsyncJobPartitionRouter(
job_orchestrator_factory=lambda stream_slices: AsyncJobOrchestrator(
job_repository,
Expand All @@ -3966,6 +3977,7 @@ def _get_job_timeout() -> datetime.timedelta:
# set the `job_max_retry` to 1 for the `Connector Builder`` use-case.
# `None` == default retry is set to 3 attempts, under the hood.
job_max_retry=1 if self._emit_connector_builder_messages else None,
failed_retry_wait_time_in_seconds=failed_retry_wait_time_in_seconds,
),
stream_slicer=stream_slicer,
config=config,
Expand Down
45 changes: 29 additions & 16 deletions unit_tests/sources/declarative/async_job/test_job.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

import time
from datetime import timedelta
from unittest import TestCase
from datetime import datetime, timedelta, timezone
from typing import Optional

import pytest

from airbyte_cdk.sources.declarative.async_job.job import AsyncJob
from airbyte_cdk.sources.declarative.async_job.status import AsyncJobStatus
Expand All @@ -14,19 +16,30 @@
_IMMEDIATELY_TIMED_OUT = timedelta(microseconds=1)


class AsyncJobTest(TestCase):
def test_given_timer_is_not_out_when_status_then_return_actual_status(self) -> None:
job = AsyncJob(_AN_API_JOB_ID, _ANY_STREAM_SLICE, _A_VERY_BIG_TIMEOUT)
assert job.status() == AsyncJobStatus.RUNNING
def test_given_timer_is_not_out_when_status_then_return_actual_status() -> None:
job = AsyncJob(_AN_API_JOB_ID, _ANY_STREAM_SLICE, _A_VERY_BIG_TIMEOUT)
assert job.status() == AsyncJobStatus.RUNNING


def test_given_timer_is_out_when_status_then_return_timed_out() -> None:
job = AsyncJob(_AN_API_JOB_ID, _ANY_STREAM_SLICE, _IMMEDIATELY_TIMED_OUT)
time.sleep(0.001)
assert job.status() == AsyncJobStatus.TIMED_OUT

def test_given_timer_is_out_when_status_then_return_timed_out(self) -> None:
job = AsyncJob(_AN_API_JOB_ID, _ANY_STREAM_SLICE, _IMMEDIATELY_TIMED_OUT)
time.sleep(0.001)
assert job.status() == AsyncJobStatus.TIMED_OUT

def test_given_status_is_terminal_when_update_status_then_stop_timer(self) -> None:
"""
This test will become important once we will print stats associated with jobs. As for now, we stop the timer but do not return any
metrics regarding the timer so it is not useful.
"""
pass
@pytest.mark.parametrize(
"retry_after_offset,expected_deferred,expected_ready",
[
pytest.param(None, False, True, id="no_retry_after_set"),
pytest.param(timedelta(hours=1), True, False, id="retry_after_in_future"),
pytest.param(-timedelta(seconds=1), True, True, id="retry_after_in_past"),
],
)
def test_retry_after(
retry_after_offset: Optional[timedelta], expected_deferred: bool, expected_ready: bool
) -> None:
job = AsyncJob(_AN_API_JOB_ID, _ANY_STREAM_SLICE, _A_VERY_BIG_TIMEOUT)
if retry_after_offset is not None:
job.set_retry_after(datetime.now(tz=timezone.utc) + retry_after_offset)
assert job.retry_deferred() == expected_deferred
assert job.ready_to_retry() == expected_ready
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Loading
Loading