From e747af96ebb3940dfcef6708beffabdd2daa1bfb Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 6 Apr 2026 14:28:05 +0000 Subject: [PATCH 1/2] feat: add SKIPPED async job status to skip jobs with no data to return Add a new terminal SKIPPED status to AsyncJobStatus that allows connectors to indicate a job should be silently skipped (no records fetched, no retries, no errors). This is useful for APIs like Amazon SP-API where CANCELLED means 'no data to return' and retrying is wasteful. Changes: - Add SKIPPED to AsyncJobStatus enum (terminal status) - Add _process_skipped_partition to AsyncJobOrchestrator (frees job budget, no yield) - Add SKIPPED handling in partition status aggregation - Add optional 'skipped' key to AsyncJobStatusMap schema (backward-compatible) - Add 'skipped' mapping in model_to_component_factory - Handle None values in _create_async_job_status_mapping for optional fields - Add unit tests for SKIPPED partition status and orchestrator behavior Co-Authored-By: bot_apk --- .../declarative/async_job/job_orchestrator.py | 24 +++++++- .../sources/declarative/async_job/status.py | 1 + .../declarative_component_schema.yaml | 5 ++ .../models/declarative_component_schema.py | 1 + .../parsers/model_to_component_factory.py | 6 ++ .../async_job/test_job_orchestrator.py | 60 ++++++++++++++++++- 6 files changed, 95 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py b/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py index 21bb3b071..784abb6b5 100644 --- a/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py +++ b/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py @@ -91,11 +91,17 @@ def stream_slice(self) -> StreamSlice: @property def status(self) -> AsyncJobStatus: """ - Given different job statuses, the priority is: FAILED, TIMED_OUT, RUNNING. Else, it means everything is completed. + Given different job statuses, the priority is: FAILED, TIMED_OUT, RUNNING. Else, it means everything is completed + or skipped. A partition is SKIPPED only when all jobs are SKIPPED (or a mix of COMPLETED and SKIPPED). """ statuses = set(map(lambda job: job.status(), self.jobs)) if statuses == {AsyncJobStatus.COMPLETED}: return AsyncJobStatus.COMPLETED + elif statuses == {AsyncJobStatus.SKIPPED}: + return AsyncJobStatus.SKIPPED + elif statuses <= {AsyncJobStatus.COMPLETED, AsyncJobStatus.SKIPPED}: + # Mix of completed and skipped — treat as completed so records are fetched for the completed jobs + return AsyncJobStatus.COMPLETED elif AsyncJobStatus.FAILED in statuses: return AsyncJobStatus.FAILED elif AsyncJobStatus.TIMED_OUT in statuses: @@ -149,6 +155,7 @@ class AsyncJobOrchestrator: AsyncJobStatus.FAILED, AsyncJobStatus.RUNNING, AsyncJobStatus.TIMED_OUT, + AsyncJobStatus.SKIPPED, } _RUNNING_ON_API_SIDE_STATUS = {AsyncJobStatus.RUNNING, AsyncJobStatus.TIMED_OUT} @@ -341,6 +348,19 @@ def _process_completed_partition(self, partition: AsyncPartition) -> None: for job in partition.jobs: self._job_tracker.remove_job(job.api_job_id()) + def _process_skipped_partition(self, partition: AsyncPartition) -> None: + """ + Process a skipped partition. The API indicated there is no data to return for this job + (e.g. Amazon SP-API CANCELLED status means no data to report). We clean up the job + allocation without fetching any records or raising errors. + """ + job_ids = list(map(lambda job: job.api_job_id(), {job for job in partition.jobs})) + LOGGER.info( + f"The following jobs for stream slice {partition.stream_slice} have been skipped (no data to return): {job_ids}." + ) + for job in partition.jobs: + self._job_tracker.remove_job(job.api_job_id()) + def _process_running_partitions_and_yield_completed_ones( self, ) -> Generator[AsyncPartition, Any, None]: @@ -359,6 +379,8 @@ def _process_running_partitions_and_yield_completed_ones( case AsyncJobStatus.COMPLETED: self._process_completed_partition(partition) yield partition + case AsyncJobStatus.SKIPPED: + self._process_skipped_partition(partition) case AsyncJobStatus.RUNNING: current_running_partitions.append(partition) case _ if partition.has_reached_max_attempt(): diff --git a/airbyte_cdk/sources/declarative/async_job/status.py b/airbyte_cdk/sources/declarative/async_job/status.py index 586e79889..e05ec9187 100644 --- a/airbyte_cdk/sources/declarative/async_job/status.py +++ b/airbyte_cdk/sources/declarative/async_job/status.py @@ -11,6 +11,7 @@ class AsyncJobStatus(Enum): COMPLETED = ("COMPLETED", _TERMINAL) FAILED = ("FAILED", _TERMINAL) TIMED_OUT = ("TIMED_OUT", _TERMINAL) + SKIPPED = ("SKIPPED", _TERMINAL) def __init__(self, value: str, is_terminal: bool) -> None: self._value = value diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 4b06a81bc..353fd4fc1 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -3997,6 +3997,11 @@ definitions: type: array items: type: string + skipped: + description: "Statuses that indicate the job was skipped because there is no data to return. Jobs with these statuses will not be retried and no records will be fetched." + type: array + items: + type: string AsyncRetriever: title: Asynchronous Retriever description: "Retrieves records by Asynchronously sending requests to fetch records. The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the partition router." diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 2da6d4b09..43a974e7a 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1248,6 +1248,7 @@ class AsyncJobStatusMap(BaseModel): completed: List[str] failed: List[str] timeout: List[str] + skipped: Optional[List[str]] = None class ValueType(Enum): diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 239e5bd51..7e3491e2f 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -3689,6 +3689,10 @@ def _create_async_job_status_mapping( # This is an element of the dict because of the typing of the CDK but it is not a CDK status continue + if api_statuses is None: + # Optional fields like 'skipped' may be None when not provided + continue + for status in api_statuses: if status in api_status_to_cdk_status: raise ValueError( @@ -3707,6 +3711,8 @@ def _get_async_job_status(self, status: str) -> AsyncJobStatus: return AsyncJobStatus.FAILED case "timeout": return AsyncJobStatus.TIMED_OUT + case "skipped": + return AsyncJobStatus.SKIPPED case _: raise ValueError(f"Unsupported CDK status {status}") diff --git a/unit_tests/sources/declarative/async_job/test_job_orchestrator.py b/unit_tests/sources/declarative/async_job/test_job_orchestrator.py index 7fe9bcdf0..1558e5987 100644 --- a/unit_tests/sources/declarative/async_job/test_job_orchestrator.py +++ b/unit_tests/sources/declarative/async_job/test_job_orchestrator.py @@ -55,12 +55,39 @@ def test_given_running_and_completed_jobs_when_status_then_return_running(self) ) assert partition.status == AsyncJobStatus.RUNNING - def test_given_only_completed_jobs_when_status_then_return_running(self) -> None: + def test_given_only_completed_jobs_when_status_then_return_completed(self) -> None: partition = AsyncPartition( [_create_job(AsyncJobStatus.COMPLETED) for _ in range(10)], _ANY_STREAM_SLICE ) assert partition.status == AsyncJobStatus.COMPLETED + def test_given_only_skipped_jobs_when_status_then_return_skipped(self) -> None: + partition = AsyncPartition( + [_create_job(AsyncJobStatus.SKIPPED) for _ in range(3)], _ANY_STREAM_SLICE + ) + assert partition.status == AsyncJobStatus.SKIPPED + + def test_given_completed_and_skipped_jobs_when_status_then_return_completed(self) -> None: + partition = AsyncPartition( + [_create_job(AsyncJobStatus.COMPLETED), _create_job(AsyncJobStatus.SKIPPED)], + _ANY_STREAM_SLICE, + ) + assert partition.status == AsyncJobStatus.COMPLETED + + def test_given_skipped_and_running_jobs_when_status_then_return_running(self) -> None: + partition = AsyncPartition( + [_create_job(AsyncJobStatus.SKIPPED), _create_job(AsyncJobStatus.RUNNING)], + _ANY_STREAM_SLICE, + ) + assert partition.status == AsyncJobStatus.RUNNING + + def test_given_skipped_and_failed_jobs_when_status_then_return_failed(self) -> None: + partition = AsyncPartition( + [_create_job(AsyncJobStatus.SKIPPED), _create_job(AsyncJobStatus.FAILED)], + _ANY_STREAM_SLICE, + ) + assert partition.status == AsyncJobStatus.FAILED + def _status_update_per_jobs( status_update_per_jobs: Mapping[AsyncJob, List[AsyncJobStatus]], @@ -358,6 +385,37 @@ def test_given_start_job_raise_when_create_and_get_completed_partitions_then_fre assert job_tracker.try_to_get_intent() + @mock.patch(sleep_mock_target) + def test_given_skipped_when_create_and_get_completed_partitions_then_skip_without_fetching_records( + self, mock_sleep: MagicMock + ) -> None: + job_tracker = JobTracker(1) + self._job_repository.start.return_value = self._job_for_a_slice + self._job_repository.update_jobs_status.side_effect = _status_update_per_jobs( + {self._job_for_a_slice: [AsyncJobStatus.SKIPPED]} + ) + orchestrator = self._orchestrator([_A_STREAM_SLICE], job_tracker) + + partitions = list(orchestrator.create_and_get_completed_partitions()) + + assert len(partitions) == 0 # skipped partitions are not yielded + assert job_tracker.try_to_get_intent() # budget was freed + self._job_repository.fetch_records.assert_not_called() + + @mock.patch(sleep_mock_target) + def test_given_skipped_does_not_retry(self, mock_sleep: MagicMock) -> None: + self._job_repository.start.return_value = self._job_for_a_slice + self._job_repository.update_jobs_status.side_effect = _status_update_per_jobs( + {self._job_for_a_slice: [AsyncJobStatus.SKIPPED]} + ) + orchestrator = self._orchestrator([_A_STREAM_SLICE]) + + partitions = list(orchestrator.create_and_get_completed_partitions()) + + assert len(partitions) == 0 + # start is called only once — SKIPPED does not trigger a retry + assert self._job_repository.start.call_count == 1 + def _mock_repository(self) -> None: self._job_repository = Mock(spec=AsyncJobRepository) From eaf016af03cb2c6709390eb649d80dfdcf8a0148 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 6 Apr 2026 14:40:33 +0000 Subject: [PATCH 2/2] fix: scope None guard to only optional status fields (skipped) The generic None check could silently skip a required field that was accidentally None. Now only fields in _OPTIONAL_ASYNC_STATUS_FIELDS (currently just 'skipped') are allowed to be None. Required fields (running, completed, failed, timeout) raise ValueError if None. Co-Authored-By: bot_apk --- .../declarative/parsers/model_to_component_factory.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 7e3491e2f..6fb892e04 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -3680,6 +3680,8 @@ def _get_state_delegating_stream_model( else model.full_refresh_stream ) + _OPTIONAL_ASYNC_STATUS_FIELDS = {"skipped"} + def _create_async_job_status_mapping( self, model: AsyncJobStatusMapModel, config: Config, **kwargs: Any ) -> Mapping[str, AsyncJobStatus]: @@ -3690,8 +3692,12 @@ def _create_async_job_status_mapping( continue if api_statuses is None: - # Optional fields like 'skipped' may be None when not provided - continue + if cdk_status in self._OPTIONAL_ASYNC_STATUS_FIELDS: + continue + raise ValueError( + f"Required CDK status '{cdk_status}' has no API statuses mapped. " + f"Please provide at least an empty list for required status fields." + ) for status in api_statuses: if status in api_status_to_cdk_status: