Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions airbyte_cdk/sources/declarative/async_job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ def api_job_id(self) -> str:

def status(self) -> AsyncJobStatus:
if self._timer.has_timed_out():
# TODO: we should account the fact that,
# certain APIs could send the `Timeout` status,
# thus we should not return `Timeout` in that case,
# but act based on the scenario.

# the default behavior is to return `Timeout` status and retry.
return AsyncJobStatus.TIMED_OUT
return self._status

Expand Down
36 changes: 18 additions & 18 deletions airbyte_cdk/sources/declarative/async_job/job_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,21 @@ class AsyncPartition:
This bucket of api_jobs is a bit useless for this iteration but should become interesting when we will be able to split jobs
"""

_MAX_NUMBER_OF_ATTEMPTS = 3
_DEFAULT_MAX_JOB_RETRY = 3

def __init__(self, jobs: List[AsyncJob], stream_slice: StreamSlice) -> None:
def __init__(
self, jobs: List[AsyncJob], stream_slice: StreamSlice, job_max_retry: Optional[int] = None
) -> None:
self._attempts_per_job = {job: 1 for job in jobs}
self._stream_slice = stream_slice
self._job_max_retry = (
job_max_retry if job_max_retry is not None else self._DEFAULT_MAX_JOB_RETRY
)

def has_reached_max_attempt(self) -> bool:
return any(
map(
lambda attempt_count: attempt_count >= self._MAX_NUMBER_OF_ATTEMPTS,
lambda attempt_count: attempt_count >= self._job_max_retry,
self._attempts_per_job.values(),
)
)
Expand All @@ -62,7 +67,7 @@ def replace_job(self, job_to_replace: AsyncJob, new_jobs: List[AsyncJob]) -> Non
current_attempt_count = self._attempts_per_job.pop(job_to_replace, None)
if current_attempt_count is None:
raise ValueError("Could not find job to replace")
elif current_attempt_count >= self._MAX_NUMBER_OF_ATTEMPTS:
elif current_attempt_count >= self._job_max_retry:
raise ValueError(f"Max attempt reached for job in partition {self._stream_slice}")

new_attempt_count = current_attempt_count + 1
Expand Down Expand Up @@ -155,6 +160,7 @@ def __init__(
message_repository: MessageRepository,
exceptions_to_break_on: Iterable[Type[Exception]] = tuple(),
has_bulk_parent: bool = False,
job_max_retry: Optional[int] = None,
) -> None:
"""
If the stream slices provided as a parameters relies on a async job streams that relies on the same JobTracker, `has_bulk_parent`
Expand All @@ -175,11 +181,12 @@ def __init__(
self._message_repository = message_repository
self._exceptions_to_break_on: Tuple[Type[Exception], ...] = tuple(exceptions_to_break_on)
self._has_bulk_parent = has_bulk_parent
self._job_max_retry = job_max_retry

self._non_breaking_exceptions: List[Exception] = []

def _replace_failed_jobs(self, partition: AsyncPartition) -> None:
failed_status_jobs = (AsyncJobStatus.FAILED,)
failed_status_jobs = (AsyncJobStatus.FAILED, AsyncJobStatus.TIMED_OUT)
jobs_to_replace = [job for job in partition.jobs if job.status() in failed_status_jobs]
for job in jobs_to_replace:
new_job = self._start_job(job.job_parameters(), job.api_job_id())
Expand Down Expand Up @@ -214,7 +221,7 @@ def _start_jobs(self) -> None:
for _slice in self._slice_iterator:
at_least_one_slice_consumed_from_slice_iterator_during_current_iteration = True
job = self._start_job(_slice)
self._running_partitions.append(AsyncPartition([job], _slice))
self._running_partitions.append(AsyncPartition([job], _slice, self._job_max_retry))
if self._has_bulk_parent and self._slice_iterator.has_next():
break
except ConcurrentJobLimitReached:
Expand Down Expand Up @@ -363,7 +370,7 @@ def _process_running_partitions_and_yield_completed_ones(
self._reallocate_partition(current_running_partitions, partition)

# We only remove completed / timeout jobs jobs as we want failed jobs to be re-allocated in priority
self._remove_completed_or_timed_out_jobs(partition)
self._remove_completed_jobs(partition)

# update the referenced list with running partitions
self._running_partitions = current_running_partitions
Expand All @@ -378,11 +385,7 @@ def _stop_partition(self, partition: AsyncPartition) -> None:
def _stop_timed_out_jobs(self, partition: AsyncPartition) -> None:
for job in partition.jobs:
if job.status() == AsyncJobStatus.TIMED_OUT:
self._abort_job(job, free_job_allocation=True)
raise AirbyteTracedException(
internal_message=f"Job {job.api_job_id()} has timed out. Try increasing the `polling job timeout`.",
failure_type=FailureType.config_error,
)
self._abort_job(job, free_job_allocation=False)

def _abort_job(self, job: AsyncJob, free_job_allocation: bool = True) -> None:
try:
Expand All @@ -392,15 +395,15 @@ def _abort_job(self, job: AsyncJob, free_job_allocation: bool = True) -> None:
except Exception as exception:
LOGGER.warning(f"Could not free budget for job {job.api_job_id()}: {exception}")

def _remove_completed_or_timed_out_jobs(self, partition: AsyncPartition) -> None:
def _remove_completed_jobs(self, partition: AsyncPartition) -> None:
"""
Remove completed or timed out jobs from the partition.

Args:
partition (AsyncPartition): The partition to process.
"""
for job in partition.jobs:
if job.status() in [AsyncJobStatus.COMPLETED, AsyncJobStatus.TIMED_OUT]:
if job.status() == AsyncJobStatus.COMPLETED:
Comment thread
bazarnov marked this conversation as resolved.
self._job_tracker.remove_job(job.api_job_id())

def _reallocate_partition(
Expand All @@ -415,10 +418,7 @@ def _reallocate_partition(
current_running_partitions (list): The list of currently running partitions.
partition (AsyncPartition): The partition to reallocate.
"""
for job in partition.jobs:
if job.status() != AsyncJobStatus.TIMED_OUT:
# allow the FAILED jobs to be re-allocated for partition
current_running_partitions.insert(0, partition)
current_running_partitions.insert(0, partition)

def _process_partitions_with_errors(self, partition: AsyncPartition) -> None:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3073,8 +3073,11 @@ def _get_job_timeout() -> datetime.timedelta:
stream_slices,
self._job_tracker,
self._message_repository,
has_bulk_parent=False,
# FIXME work would need to be done here in order to detect if a stream as a parent stream that is bulk
has_bulk_parent=False,
# set the `job_max_retry` to 1 for the `Connector Builder`` use-case.
# `None` == default retry is set to 3 attempts, under the hood.
job_max_retry=1 if self._emit_connector_builder_messages else None,
),
stream_slicer=stream_slicer,
config=config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,14 @@ def test_given_timeout_when_create_and_get_completed_partitions_then_free_budget
)
orchestrator = self._orchestrator([_A_STREAM_SLICE], job_tracker)

with pytest.raises(AirbyteTracedException) as error:
with pytest.raises(AirbyteTracedException):
list(orchestrator.create_and_get_completed_partitions())

assert "Job an api job id has timed out" in str(error.value)
assert job_tracker.try_to_get_intent()
assert (
self._job_repository.start.call_args_list
== [call(_A_STREAM_SLICE)] * _MAX_NUMBER_OF_ATTEMPTS
)

@mock.patch(sleep_mock_target)
def test_given_failure_when_create_and_get_completed_partitions_then_raise_exception(
Expand Down
Loading