diff --git a/airbyte_cdk/sources/declarative/async_job/job.py b/airbyte_cdk/sources/declarative/async_job/job.py index b075b61e2..ea83b7456 100644 --- a/airbyte_cdk/sources/declarative/async_job/job.py +++ b/airbyte_cdk/sources/declarative/async_job/job.py @@ -34,6 +34,12 @@ def api_job_id(self) -> str: def status(self) -> AsyncJobStatus: if self._timer.has_timed_out(): + # TODO: we should account the fact that, + # certain APIs could send the `Timeout` status, + # thus we should not return `Timeout` in that case, + # but act based on the scenario. + + # the default behavior is to return `Timeout` status and retry. return AsyncJobStatus.TIMED_OUT return self._status diff --git a/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py b/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py index 343afad0b..399f42430 100644 --- a/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py +++ b/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py @@ -44,16 +44,21 @@ class AsyncPartition: This bucket of api_jobs is a bit useless for this iteration but should become interesting when we will be able to split jobs """ - _MAX_NUMBER_OF_ATTEMPTS = 3 + _DEFAULT_MAX_JOB_RETRY = 3 - def __init__(self, jobs: List[AsyncJob], stream_slice: StreamSlice) -> None: + def __init__( + self, jobs: List[AsyncJob], stream_slice: StreamSlice, job_max_retry: Optional[int] = None + ) -> None: self._attempts_per_job = {job: 1 for job in jobs} self._stream_slice = stream_slice + self._job_max_retry = ( + job_max_retry if job_max_retry is not None else self._DEFAULT_MAX_JOB_RETRY + ) def has_reached_max_attempt(self) -> bool: return any( map( - lambda attempt_count: attempt_count >= self._MAX_NUMBER_OF_ATTEMPTS, + lambda attempt_count: attempt_count >= self._job_max_retry, self._attempts_per_job.values(), ) ) @@ -62,7 +67,7 @@ def replace_job(self, job_to_replace: AsyncJob, new_jobs: List[AsyncJob]) -> Non current_attempt_count = self._attempts_per_job.pop(job_to_replace, None) if current_attempt_count is None: raise ValueError("Could not find job to replace") - elif current_attempt_count >= self._MAX_NUMBER_OF_ATTEMPTS: + elif current_attempt_count >= self._job_max_retry: raise ValueError(f"Max attempt reached for job in partition {self._stream_slice}") new_attempt_count = current_attempt_count + 1 @@ -155,6 +160,7 @@ def __init__( message_repository: MessageRepository, exceptions_to_break_on: Iterable[Type[Exception]] = tuple(), has_bulk_parent: bool = False, + job_max_retry: Optional[int] = None, ) -> None: """ If the stream slices provided as a parameters relies on a async job streams that relies on the same JobTracker, `has_bulk_parent` @@ -175,11 +181,12 @@ def __init__( self._message_repository = message_repository self._exceptions_to_break_on: Tuple[Type[Exception], ...] = tuple(exceptions_to_break_on) self._has_bulk_parent = has_bulk_parent + self._job_max_retry = job_max_retry self._non_breaking_exceptions: List[Exception] = [] def _replace_failed_jobs(self, partition: AsyncPartition) -> None: - failed_status_jobs = (AsyncJobStatus.FAILED,) + failed_status_jobs = (AsyncJobStatus.FAILED, AsyncJobStatus.TIMED_OUT) jobs_to_replace = [job for job in partition.jobs if job.status() in failed_status_jobs] for job in jobs_to_replace: new_job = self._start_job(job.job_parameters(), job.api_job_id()) @@ -214,7 +221,7 @@ def _start_jobs(self) -> None: for _slice in self._slice_iterator: at_least_one_slice_consumed_from_slice_iterator_during_current_iteration = True job = self._start_job(_slice) - self._running_partitions.append(AsyncPartition([job], _slice)) + self._running_partitions.append(AsyncPartition([job], _slice, self._job_max_retry)) if self._has_bulk_parent and self._slice_iterator.has_next(): break except ConcurrentJobLimitReached: @@ -363,7 +370,7 @@ def _process_running_partitions_and_yield_completed_ones( self._reallocate_partition(current_running_partitions, partition) # We only remove completed / timeout jobs jobs as we want failed jobs to be re-allocated in priority - self._remove_completed_or_timed_out_jobs(partition) + self._remove_completed_jobs(partition) # update the referenced list with running partitions self._running_partitions = current_running_partitions @@ -378,11 +385,7 @@ def _stop_partition(self, partition: AsyncPartition) -> None: def _stop_timed_out_jobs(self, partition: AsyncPartition) -> None: for job in partition.jobs: if job.status() == AsyncJobStatus.TIMED_OUT: - self._abort_job(job, free_job_allocation=True) - raise AirbyteTracedException( - internal_message=f"Job {job.api_job_id()} has timed out. Try increasing the `polling job timeout`.", - failure_type=FailureType.config_error, - ) + self._abort_job(job, free_job_allocation=False) def _abort_job(self, job: AsyncJob, free_job_allocation: bool = True) -> None: try: @@ -392,7 +395,7 @@ def _abort_job(self, job: AsyncJob, free_job_allocation: bool = True) -> None: except Exception as exception: LOGGER.warning(f"Could not free budget for job {job.api_job_id()}: {exception}") - def _remove_completed_or_timed_out_jobs(self, partition: AsyncPartition) -> None: + def _remove_completed_jobs(self, partition: AsyncPartition) -> None: """ Remove completed or timed out jobs from the partition. @@ -400,7 +403,7 @@ def _remove_completed_or_timed_out_jobs(self, partition: AsyncPartition) -> None partition (AsyncPartition): The partition to process. """ for job in partition.jobs: - if job.status() in [AsyncJobStatus.COMPLETED, AsyncJobStatus.TIMED_OUT]: + if job.status() == AsyncJobStatus.COMPLETED: self._job_tracker.remove_job(job.api_job_id()) def _reallocate_partition( @@ -415,10 +418,7 @@ def _reallocate_partition( current_running_partitions (list): The list of currently running partitions. partition (AsyncPartition): The partition to reallocate. """ - for job in partition.jobs: - if job.status() != AsyncJobStatus.TIMED_OUT: - # allow the FAILED jobs to be re-allocated for partition - current_running_partitions.insert(0, partition) + current_running_partitions.insert(0, partition) def _process_partitions_with_errors(self, partition: AsyncPartition) -> None: """ diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 1501aa676..263c17439 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -3073,8 +3073,11 @@ def _get_job_timeout() -> datetime.timedelta: stream_slices, self._job_tracker, self._message_repository, - has_bulk_parent=False, # FIXME work would need to be done here in order to detect if a stream as a parent stream that is bulk + has_bulk_parent=False, + # set the `job_max_retry` to 1 for the `Connector Builder`` use-case. + # `None` == default retry is set to 3 attempts, under the hood. + job_max_retry=1 if self._emit_connector_builder_messages else None, ), stream_slicer=stream_slicer, config=config, diff --git a/unit_tests/sources/declarative/async_job/test_job_orchestrator.py b/unit_tests/sources/declarative/async_job/test_job_orchestrator.py index 56bbf5349..d99f8502f 100644 --- a/unit_tests/sources/declarative/async_job/test_job_orchestrator.py +++ b/unit_tests/sources/declarative/async_job/test_job_orchestrator.py @@ -144,10 +144,14 @@ def test_given_timeout_when_create_and_get_completed_partitions_then_free_budget ) orchestrator = self._orchestrator([_A_STREAM_SLICE], job_tracker) - with pytest.raises(AirbyteTracedException) as error: + with pytest.raises(AirbyteTracedException): list(orchestrator.create_and_get_completed_partitions()) - assert "Job an api job id has timed out" in str(error.value) + assert job_tracker.try_to_get_intent() + assert ( + self._job_repository.start.call_args_list + == [call(_A_STREAM_SLICE)] * _MAX_NUMBER_OF_ATTEMPTS + ) @mock.patch(sleep_mock_target) def test_given_failure_when_create_and_get_completed_partitions_then_raise_exception(