Skip to content

Commit e747af9

Browse files
devin-ai-integration[bot]bot_apk
andcommitted
feat: add SKIPPED async job status to skip jobs with no data to return
Add a new terminal SKIPPED status to AsyncJobStatus that allows connectors to indicate a job should be silently skipped (no records fetched, no retries, no errors). This is useful for APIs like Amazon SP-API where CANCELLED means 'no data to return' and retrying is wasteful. Changes: - Add SKIPPED to AsyncJobStatus enum (terminal status) - Add _process_skipped_partition to AsyncJobOrchestrator (frees job budget, no yield) - Add SKIPPED handling in partition status aggregation - Add optional 'skipped' key to AsyncJobStatusMap schema (backward-compatible) - Add 'skipped' mapping in model_to_component_factory - Handle None values in _create_async_job_status_mapping for optional fields - Add unit tests for SKIPPED partition status and orchestrator behavior Co-Authored-By: bot_apk <apk@cognition.ai>
1 parent 0494fd4 commit e747af9

6 files changed

Lines changed: 95 additions & 2 deletions

File tree

airbyte_cdk/sources/declarative/async_job/job_orchestrator.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,17 @@ def stream_slice(self) -> StreamSlice:
9191
@property
9292
def status(self) -> AsyncJobStatus:
9393
"""
94-
Given different job statuses, the priority is: FAILED, TIMED_OUT, RUNNING. Else, it means everything is completed.
94+
Given different job statuses, the priority is: FAILED, TIMED_OUT, RUNNING. Else, it means everything is completed
95+
or skipped. A partition is SKIPPED only when all jobs are SKIPPED (or a mix of COMPLETED and SKIPPED).
9596
"""
9697
statuses = set(map(lambda job: job.status(), self.jobs))
9798
if statuses == {AsyncJobStatus.COMPLETED}:
9899
return AsyncJobStatus.COMPLETED
100+
elif statuses == {AsyncJobStatus.SKIPPED}:
101+
return AsyncJobStatus.SKIPPED
102+
elif statuses <= {AsyncJobStatus.COMPLETED, AsyncJobStatus.SKIPPED}:
103+
# Mix of completed and skipped — treat as completed so records are fetched for the completed jobs
104+
return AsyncJobStatus.COMPLETED
99105
elif AsyncJobStatus.FAILED in statuses:
100106
return AsyncJobStatus.FAILED
101107
elif AsyncJobStatus.TIMED_OUT in statuses:
@@ -149,6 +155,7 @@ class AsyncJobOrchestrator:
149155
AsyncJobStatus.FAILED,
150156
AsyncJobStatus.RUNNING,
151157
AsyncJobStatus.TIMED_OUT,
158+
AsyncJobStatus.SKIPPED,
152159
}
153160
_RUNNING_ON_API_SIDE_STATUS = {AsyncJobStatus.RUNNING, AsyncJobStatus.TIMED_OUT}
154161

@@ -341,6 +348,19 @@ def _process_completed_partition(self, partition: AsyncPartition) -> None:
341348
for job in partition.jobs:
342349
self._job_tracker.remove_job(job.api_job_id())
343350

351+
def _process_skipped_partition(self, partition: AsyncPartition) -> None:
352+
"""
353+
Process a skipped partition. The API indicated there is no data to return for this job
354+
(e.g. Amazon SP-API CANCELLED status means no data to report). We clean up the job
355+
allocation without fetching any records or raising errors.
356+
"""
357+
job_ids = list(map(lambda job: job.api_job_id(), {job for job in partition.jobs}))
358+
LOGGER.info(
359+
f"The following jobs for stream slice {partition.stream_slice} have been skipped (no data to return): {job_ids}."
360+
)
361+
for job in partition.jobs:
362+
self._job_tracker.remove_job(job.api_job_id())
363+
344364
def _process_running_partitions_and_yield_completed_ones(
345365
self,
346366
) -> Generator[AsyncPartition, Any, None]:
@@ -359,6 +379,8 @@ def _process_running_partitions_and_yield_completed_ones(
359379
case AsyncJobStatus.COMPLETED:
360380
self._process_completed_partition(partition)
361381
yield partition
382+
case AsyncJobStatus.SKIPPED:
383+
self._process_skipped_partition(partition)
362384
case AsyncJobStatus.RUNNING:
363385
current_running_partitions.append(partition)
364386
case _ if partition.has_reached_max_attempt():

airbyte_cdk/sources/declarative/async_job/status.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ class AsyncJobStatus(Enum):
1111
COMPLETED = ("COMPLETED", _TERMINAL)
1212
FAILED = ("FAILED", _TERMINAL)
1313
TIMED_OUT = ("TIMED_OUT", _TERMINAL)
14+
SKIPPED = ("SKIPPED", _TERMINAL)
1415

1516
def __init__(self, value: str, is_terminal: bool) -> None:
1617
self._value = value

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3997,6 +3997,11 @@ definitions:
39973997
type: array
39983998
items:
39993999
type: string
4000+
skipped:
4001+
description: "Statuses that indicate the job was skipped because there is no data to return. Jobs with these statuses will not be retried and no records will be fetched."
4002+
type: array
4003+
items:
4004+
type: string
40004005
AsyncRetriever:
40014006
title: Asynchronous Retriever
40024007
description: "Retrieves records by Asynchronously sending requests to fetch records. The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the partition router."

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1248,6 +1248,7 @@ class AsyncJobStatusMap(BaseModel):
12481248
completed: List[str]
12491249
failed: List[str]
12501250
timeout: List[str]
1251+
skipped: Optional[List[str]] = None
12511252

12521253

12531254
class ValueType(Enum):

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3689,6 +3689,10 @@ def _create_async_job_status_mapping(
36893689
# This is an element of the dict because of the typing of the CDK but it is not a CDK status
36903690
continue
36913691

3692+
if api_statuses is None:
3693+
# Optional fields like 'skipped' may be None when not provided
3694+
continue
3695+
36923696
for status in api_statuses:
36933697
if status in api_status_to_cdk_status:
36943698
raise ValueError(
@@ -3707,6 +3711,8 @@ def _get_async_job_status(self, status: str) -> AsyncJobStatus:
37073711
return AsyncJobStatus.FAILED
37083712
case "timeout":
37093713
return AsyncJobStatus.TIMED_OUT
3714+
case "skipped":
3715+
return AsyncJobStatus.SKIPPED
37103716
case _:
37113717
raise ValueError(f"Unsupported CDK status {status}")
37123718

unit_tests/sources/declarative/async_job/test_job_orchestrator.py

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,39 @@ def test_given_running_and_completed_jobs_when_status_then_return_running(self)
5555
)
5656
assert partition.status == AsyncJobStatus.RUNNING
5757

58-
def test_given_only_completed_jobs_when_status_then_return_running(self) -> None:
58+
def test_given_only_completed_jobs_when_status_then_return_completed(self) -> None:
5959
partition = AsyncPartition(
6060
[_create_job(AsyncJobStatus.COMPLETED) for _ in range(10)], _ANY_STREAM_SLICE
6161
)
6262
assert partition.status == AsyncJobStatus.COMPLETED
6363

64+
def test_given_only_skipped_jobs_when_status_then_return_skipped(self) -> None:
65+
partition = AsyncPartition(
66+
[_create_job(AsyncJobStatus.SKIPPED) for _ in range(3)], _ANY_STREAM_SLICE
67+
)
68+
assert partition.status == AsyncJobStatus.SKIPPED
69+
70+
def test_given_completed_and_skipped_jobs_when_status_then_return_completed(self) -> None:
71+
partition = AsyncPartition(
72+
[_create_job(AsyncJobStatus.COMPLETED), _create_job(AsyncJobStatus.SKIPPED)],
73+
_ANY_STREAM_SLICE,
74+
)
75+
assert partition.status == AsyncJobStatus.COMPLETED
76+
77+
def test_given_skipped_and_running_jobs_when_status_then_return_running(self) -> None:
78+
partition = AsyncPartition(
79+
[_create_job(AsyncJobStatus.SKIPPED), _create_job(AsyncJobStatus.RUNNING)],
80+
_ANY_STREAM_SLICE,
81+
)
82+
assert partition.status == AsyncJobStatus.RUNNING
83+
84+
def test_given_skipped_and_failed_jobs_when_status_then_return_failed(self) -> None:
85+
partition = AsyncPartition(
86+
[_create_job(AsyncJobStatus.SKIPPED), _create_job(AsyncJobStatus.FAILED)],
87+
_ANY_STREAM_SLICE,
88+
)
89+
assert partition.status == AsyncJobStatus.FAILED
90+
6491

6592
def _status_update_per_jobs(
6693
status_update_per_jobs: Mapping[AsyncJob, List[AsyncJobStatus]],
@@ -358,6 +385,37 @@ def test_given_start_job_raise_when_create_and_get_completed_partitions_then_fre
358385

359386
assert job_tracker.try_to_get_intent()
360387

388+
@mock.patch(sleep_mock_target)
389+
def test_given_skipped_when_create_and_get_completed_partitions_then_skip_without_fetching_records(
390+
self, mock_sleep: MagicMock
391+
) -> None:
392+
job_tracker = JobTracker(1)
393+
self._job_repository.start.return_value = self._job_for_a_slice
394+
self._job_repository.update_jobs_status.side_effect = _status_update_per_jobs(
395+
{self._job_for_a_slice: [AsyncJobStatus.SKIPPED]}
396+
)
397+
orchestrator = self._orchestrator([_A_STREAM_SLICE], job_tracker)
398+
399+
partitions = list(orchestrator.create_and_get_completed_partitions())
400+
401+
assert len(partitions) == 0 # skipped partitions are not yielded
402+
assert job_tracker.try_to_get_intent() # budget was freed
403+
self._job_repository.fetch_records.assert_not_called()
404+
405+
@mock.patch(sleep_mock_target)
406+
def test_given_skipped_does_not_retry(self, mock_sleep: MagicMock) -> None:
407+
self._job_repository.start.return_value = self._job_for_a_slice
408+
self._job_repository.update_jobs_status.side_effect = _status_update_per_jobs(
409+
{self._job_for_a_slice: [AsyncJobStatus.SKIPPED]}
410+
)
411+
orchestrator = self._orchestrator([_A_STREAM_SLICE])
412+
413+
partitions = list(orchestrator.create_and_get_completed_partitions())
414+
415+
assert len(partitions) == 0
416+
# start is called only once — SKIPPED does not trigger a retry
417+
assert self._job_repository.start.call_count == 1
418+
361419
def _mock_repository(self) -> None:
362420
self._job_repository = Mock(spec=AsyncJobRepository)
363421

0 commit comments

Comments
 (0)