diff --git a/airbyte_cdk/sources/declarative/async_job/job.py b/airbyte_cdk/sources/declarative/async_job/job.py index b075b61e2..ff74854f6 100644 --- a/airbyte_cdk/sources/declarative/async_job/job.py +++ b/airbyte_cdk/sources/declarative/async_job/job.py @@ -34,7 +34,7 @@ def api_job_id(self) -> str: def status(self) -> AsyncJobStatus: if self._timer.has_timed_out(): - return AsyncJobStatus.TIMED_OUT + return AsyncJobStatus.FORCED_TIME_OUT return self._status def job_parameters(self) -> StreamSlice: diff --git a/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py b/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py index 343afad0b..f4a55170e 100644 --- a/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py +++ b/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py @@ -95,6 +95,9 @@ def status(self) -> AsyncJobStatus: return AsyncJobStatus.FAILED elif AsyncJobStatus.TIMED_OUT in statuses: return AsyncJobStatus.TIMED_OUT + # specific case when the job is forced to be stopped by the system + elif AsyncJobStatus.FORCED_TIME_OUT in statuses: + return AsyncJobStatus.FORCED_TIME_OUT else: return AsyncJobStatus.RUNNING @@ -144,6 +147,8 @@ class AsyncJobOrchestrator: AsyncJobStatus.FAILED, AsyncJobStatus.RUNNING, AsyncJobStatus.TIMED_OUT, + # specific case when the job is forced to be stopped by the system + AsyncJobStatus.FORCED_TIME_OUT, } _RUNNING_ON_API_SIDE_STATUS = {AsyncJobStatus.RUNNING, AsyncJobStatus.TIMED_OUT} @@ -179,7 +184,7 @@ def __init__( self._non_breaking_exceptions: List[Exception] = [] def _replace_failed_jobs(self, partition: AsyncPartition) -> None: - failed_status_jobs = (AsyncJobStatus.FAILED,) + failed_status_jobs = (AsyncJobStatus.FAILED, AsyncJobStatus.TIMED_OUT) jobs_to_replace = [job for job in partition.jobs if job.status() in failed_status_jobs] for job in jobs_to_replace: new_job = self._start_job(job.job_parameters(), job.api_job_id()) @@ -363,7 +368,7 @@ def _process_running_partitions_and_yield_completed_ones( self._reallocate_partition(current_running_partitions, partition) # We only remove completed / timeout jobs jobs as we want failed jobs to be re-allocated in priority - self._remove_completed_or_timed_out_jobs(partition) + self._remove_completed_or_forced_time_out_jobs(partition) # update the referenced list with running partitions self._running_partitions = current_running_partitions @@ -377,12 +382,15 @@ def _stop_partition(self, partition: AsyncPartition) -> None: def _stop_timed_out_jobs(self, partition: AsyncPartition) -> None: for job in partition.jobs: - if job.status() == AsyncJobStatus.TIMED_OUT: + if job.status() == AsyncJobStatus.FORCED_TIME_OUT: self._abort_job(job, free_job_allocation=True) raise AirbyteTracedException( internal_message=f"Job {job.api_job_id()} has timed out. Try increasing the `polling job timeout`.", failure_type=FailureType.config_error, ) + # we don't free allocation here because it is expected to retry the job + if job.status() == AsyncJobStatus.TIMED_OUT: + self._abort_job(job, free_job_allocation=False) def _abort_job(self, job: AsyncJob, free_job_allocation: bool = True) -> None: try: @@ -392,7 +400,7 @@ def _abort_job(self, job: AsyncJob, free_job_allocation: bool = True) -> None: except Exception as exception: LOGGER.warning(f"Could not free budget for job {job.api_job_id()}: {exception}") - def _remove_completed_or_timed_out_jobs(self, partition: AsyncPartition) -> None: + def _remove_completed_or_forced_time_out_jobs(self, partition: AsyncPartition) -> None: """ Remove completed or timed out jobs from the partition. @@ -400,7 +408,7 @@ def _remove_completed_or_timed_out_jobs(self, partition: AsyncPartition) -> None partition (AsyncPartition): The partition to process. """ for job in partition.jobs: - if job.status() in [AsyncJobStatus.COMPLETED, AsyncJobStatus.TIMED_OUT]: + if job.status() in [AsyncJobStatus.COMPLETED, AsyncJobStatus.FORCED_TIME_OUT]: self._job_tracker.remove_job(job.api_job_id()) def _reallocate_partition( @@ -415,10 +423,8 @@ def _reallocate_partition( current_running_partitions (list): The list of currently running partitions. partition (AsyncPartition): The partition to reallocate. """ - for job in partition.jobs: - if job.status() != AsyncJobStatus.TIMED_OUT: - # allow the FAILED jobs to be re-allocated for partition - current_running_partitions.insert(0, partition) + # allow the FAILED / TIMED_OUT jobs to be re-allocated for partition + current_running_partitions.insert(0, partition) def _process_partitions_with_errors(self, partition: AsyncPartition) -> None: """ diff --git a/airbyte_cdk/sources/declarative/async_job/status.py b/airbyte_cdk/sources/declarative/async_job/status.py index 586e79889..1fd00b62c 100644 --- a/airbyte_cdk/sources/declarative/async_job/status.py +++ b/airbyte_cdk/sources/declarative/async_job/status.py @@ -11,6 +11,8 @@ class AsyncJobStatus(Enum): COMPLETED = ("COMPLETED", _TERMINAL) FAILED = ("FAILED", _TERMINAL) TIMED_OUT = ("TIMED_OUT", _TERMINAL) + # service status to force the job to be stopped by the system + FORCED_TIME_OUT = ("FORCED_TIME_OUT", _TERMINAL) def __init__(self, value: str, is_terminal: bool) -> None: self._value = value @@ -19,6 +21,6 @@ def __init__(self, value: str, is_terminal: bool) -> None: def is_terminal(self) -> bool: """ A status is terminal when a job status can't be updated anymore. For example if a job is completed, it will stay completed but a - running job might because completed, failed or timed out. + running job might become completed, failed or timed out. """ return self._is_terminal diff --git a/unit_tests/sources/declarative/async_job/test_job.py b/unit_tests/sources/declarative/async_job/test_job.py index 6399433e4..566209750 100644 --- a/unit_tests/sources/declarative/async_job/test_job.py +++ b/unit_tests/sources/declarative/async_job/test_job.py @@ -19,10 +19,10 @@ def test_given_timer_is_not_out_when_status_then_return_actual_status(self) -> N job = AsyncJob(_AN_API_JOB_ID, _ANY_STREAM_SLICE, _A_VERY_BIG_TIMEOUT) assert job.status() == AsyncJobStatus.RUNNING - def test_given_timer_is_out_when_status_then_return_timed_out(self) -> None: + def test_given_timer_is_out_when_status_then_return_forced_time_out(self) -> None: job = AsyncJob(_AN_API_JOB_ID, _ANY_STREAM_SLICE, _IMMEDIATELY_TIMED_OUT) time.sleep(0.001) - assert job.status() == AsyncJobStatus.TIMED_OUT + assert job.status() == AsyncJobStatus.FORCED_TIME_OUT def test_given_status_is_terminal_when_update_status_then_stop_timer(self) -> None: """ diff --git a/unit_tests/sources/declarative/async_job/test_job_orchestrator.py b/unit_tests/sources/declarative/async_job/test_job_orchestrator.py index 56bbf5349..ab64e394f 100644 --- a/unit_tests/sources/declarative/async_job/test_job_orchestrator.py +++ b/unit_tests/sources/declarative/async_job/test_job_orchestrator.py @@ -144,6 +144,26 @@ def test_given_timeout_when_create_and_get_completed_partitions_then_free_budget ) orchestrator = self._orchestrator([_A_STREAM_SLICE], job_tracker) + with pytest.raises(AirbyteTracedException): + list(orchestrator.create_and_get_completed_partitions()) + + assert job_tracker.try_to_get_intent() + assert ( + self._job_repository.start.call_args_list + == [call(_A_STREAM_SLICE)] * _MAX_NUMBER_OF_ATTEMPTS + ) + + @mock.patch(sleep_mock_target) + def test_given_forced_timeout_when_create_and_get_completed_partitions_then_free_budget_and_raise_exception( + self, mock_sleep: MagicMock + ) -> None: + job_tracker = JobTracker(1) + self._job_repository.start.return_value = self._job_for_a_slice + self._job_repository.update_jobs_status.side_effect = _status_update_per_jobs( + {self._job_for_a_slice: [AsyncJobStatus.FORCED_TIME_OUT]} + ) + orchestrator = self._orchestrator([_A_STREAM_SLICE], job_tracker) + with pytest.raises(AirbyteTracedException) as error: list(orchestrator.create_and_get_completed_partitions())