Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airbyte_cdk/sources/declarative/async_job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def api_job_id(self) -> str:

def status(self) -> AsyncJobStatus:
if self._timer.has_timed_out():
return AsyncJobStatus.TIMED_OUT
return AsyncJobStatus.FORCED_TIME_OUT
return self._status

def job_parameters(self) -> StreamSlice:
Expand Down
24 changes: 15 additions & 9 deletions airbyte_cdk/sources/declarative/async_job/job_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ def status(self) -> AsyncJobStatus:
return AsyncJobStatus.FAILED
elif AsyncJobStatus.TIMED_OUT in statuses:
return AsyncJobStatus.TIMED_OUT
# specific case when the job is forced to be stopped by the system
elif AsyncJobStatus.FORCED_TIME_OUT in statuses:
return AsyncJobStatus.FORCED_TIME_OUT
else:
return AsyncJobStatus.RUNNING

Expand Down Expand Up @@ -144,6 +147,8 @@ class AsyncJobOrchestrator:
AsyncJobStatus.FAILED,
AsyncJobStatus.RUNNING,
AsyncJobStatus.TIMED_OUT,
# specific case when the job is forced to be stopped by the system
AsyncJobStatus.FORCED_TIME_OUT,
}
_RUNNING_ON_API_SIDE_STATUS = {AsyncJobStatus.RUNNING, AsyncJobStatus.TIMED_OUT}

Expand Down Expand Up @@ -179,7 +184,7 @@ def __init__(
self._non_breaking_exceptions: List[Exception] = []

def _replace_failed_jobs(self, partition: AsyncPartition) -> None:
failed_status_jobs = (AsyncJobStatus.FAILED,)
failed_status_jobs = (AsyncJobStatus.FAILED, AsyncJobStatus.TIMED_OUT)
jobs_to_replace = [job for job in partition.jobs if job.status() in failed_status_jobs]
for job in jobs_to_replace:
new_job = self._start_job(job.job_parameters(), job.api_job_id())
Expand Down Expand Up @@ -363,7 +368,7 @@ def _process_running_partitions_and_yield_completed_ones(
self._reallocate_partition(current_running_partitions, partition)

# We only remove completed / timeout jobs jobs as we want failed jobs to be re-allocated in priority
self._remove_completed_or_timed_out_jobs(partition)
self._remove_completed_or_forced_time_out_jobs(partition)

# update the referenced list with running partitions
self._running_partitions = current_running_partitions
Expand All @@ -377,12 +382,15 @@ def _stop_partition(self, partition: AsyncPartition) -> None:

def _stop_timed_out_jobs(self, partition: AsyncPartition) -> None:
for job in partition.jobs:
if job.status() == AsyncJobStatus.TIMED_OUT:
if job.status() == AsyncJobStatus.FORCED_TIME_OUT:
self._abort_job(job, free_job_allocation=True)
raise AirbyteTracedException(
internal_message=f"Job {job.api_job_id()} has timed out. Try increasing the `polling job timeout`.",
failure_type=FailureType.config_error,
)
# we don't free allocation here because it is expected to retry the job
if job.status() == AsyncJobStatus.TIMED_OUT:
self._abort_job(job, free_job_allocation=False)

def _abort_job(self, job: AsyncJob, free_job_allocation: bool = True) -> None:
try:
Expand All @@ -392,15 +400,15 @@ def _abort_job(self, job: AsyncJob, free_job_allocation: bool = True) -> None:
except Exception as exception:
LOGGER.warning(f"Could not free budget for job {job.api_job_id()}: {exception}")

def _remove_completed_or_timed_out_jobs(self, partition: AsyncPartition) -> None:
def _remove_completed_or_forced_time_out_jobs(self, partition: AsyncPartition) -> None:
"""
Remove completed or timed out jobs from the partition.

Args:
partition (AsyncPartition): The partition to process.
"""
for job in partition.jobs:
if job.status() in [AsyncJobStatus.COMPLETED, AsyncJobStatus.TIMED_OUT]:
if job.status() in [AsyncJobStatus.COMPLETED, AsyncJobStatus.FORCED_TIME_OUT]:
self._job_tracker.remove_job(job.api_job_id())

def _reallocate_partition(
Expand All @@ -415,10 +423,8 @@ def _reallocate_partition(
current_running_partitions (list): The list of currently running partitions.
partition (AsyncPartition): The partition to reallocate.
"""
for job in partition.jobs:
if job.status() != AsyncJobStatus.TIMED_OUT:
# allow the FAILED jobs to be re-allocated for partition
current_running_partitions.insert(0, partition)
# allow the FAILED / TIMED_OUT jobs to be re-allocated for partition
current_running_partitions.insert(0, partition)

def _process_partitions_with_errors(self, partition: AsyncPartition) -> None:
"""
Expand Down
4 changes: 3 additions & 1 deletion airbyte_cdk/sources/declarative/async_job/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ class AsyncJobStatus(Enum):
COMPLETED = ("COMPLETED", _TERMINAL)
FAILED = ("FAILED", _TERMINAL)
TIMED_OUT = ("TIMED_OUT", _TERMINAL)
# service status to force the job to be stopped by the system
FORCED_TIME_OUT = ("FORCED_TIME_OUT", _TERMINAL)

def __init__(self, value: str, is_terminal: bool) -> None:
self._value = value
Expand All @@ -19,6 +21,6 @@ def __init__(self, value: str, is_terminal: bool) -> None:
def is_terminal(self) -> bool:
"""
A status is terminal when a job status can't be updated anymore. For example if a job is completed, it will stay completed but a
running job might because completed, failed or timed out.
running job might become completed, failed or timed out.
"""
return self._is_terminal
4 changes: 2 additions & 2 deletions unit_tests/sources/declarative/async_job/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ def test_given_timer_is_not_out_when_status_then_return_actual_status(self) -> N
job = AsyncJob(_AN_API_JOB_ID, _ANY_STREAM_SLICE, _A_VERY_BIG_TIMEOUT)
assert job.status() == AsyncJobStatus.RUNNING

def test_given_timer_is_out_when_status_then_return_timed_out(self) -> None:
def test_given_timer_is_out_when_status_then_return_forced_time_out(self) -> None:
job = AsyncJob(_AN_API_JOB_ID, _ANY_STREAM_SLICE, _IMMEDIATELY_TIMED_OUT)
time.sleep(0.001)
assert job.status() == AsyncJobStatus.TIMED_OUT
assert job.status() == AsyncJobStatus.FORCED_TIME_OUT

def test_given_status_is_terminal_when_update_status_then_stop_timer(self) -> None:
"""
Expand Down
20 changes: 20 additions & 0 deletions unit_tests/sources/declarative/async_job/test_job_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,26 @@ def test_given_timeout_when_create_and_get_completed_partitions_then_free_budget
)
orchestrator = self._orchestrator([_A_STREAM_SLICE], job_tracker)

with pytest.raises(AirbyteTracedException):
list(orchestrator.create_and_get_completed_partitions())

assert job_tracker.try_to_get_intent()
assert (
self._job_repository.start.call_args_list
== [call(_A_STREAM_SLICE)] * _MAX_NUMBER_OF_ATTEMPTS
)

@mock.patch(sleep_mock_target)
def test_given_forced_timeout_when_create_and_get_completed_partitions_then_free_budget_and_raise_exception(
self, mock_sleep: MagicMock
) -> None:
job_tracker = JobTracker(1)
self._job_repository.start.return_value = self._job_for_a_slice
self._job_repository.update_jobs_status.side_effect = _status_update_per_jobs(
{self._job_for_a_slice: [AsyncJobStatus.FORCED_TIME_OUT]}
)
orchestrator = self._orchestrator([_A_STREAM_SLICE], job_tracker)

with pytest.raises(AirbyteTracedException) as error:
list(orchestrator.create_and_get_completed_partitions())

Expand Down
Loading