Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion airbyte_cdk/sources/declarative/async_job/job_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,17 @@ def stream_slice(self) -> StreamSlice:
@property
def status(self) -> AsyncJobStatus:
"""
Given different job statuses, the priority is: FAILED, TIMED_OUT, RUNNING. Else, it means everything is completed.
Given different job statuses, the priority is: FAILED, TIMED_OUT, RUNNING. Else, it means everything is completed
or skipped. A partition is SKIPPED only when all jobs are SKIPPED (or a mix of COMPLETED and SKIPPED).
"""
statuses = set(map(lambda job: job.status(), self.jobs))
if statuses == {AsyncJobStatus.COMPLETED}:
return AsyncJobStatus.COMPLETED
elif statuses == {AsyncJobStatus.SKIPPED}:
return AsyncJobStatus.SKIPPED
elif statuses <= {AsyncJobStatus.COMPLETED, AsyncJobStatus.SKIPPED}:
# Mix of completed and skipped — treat as completed so records are fetched for the completed jobs
return AsyncJobStatus.COMPLETED
elif AsyncJobStatus.FAILED in statuses:
return AsyncJobStatus.FAILED
elif AsyncJobStatus.TIMED_OUT in statuses:
Expand Down Expand Up @@ -149,6 +155,7 @@ class AsyncJobOrchestrator:
AsyncJobStatus.FAILED,
AsyncJobStatus.RUNNING,
AsyncJobStatus.TIMED_OUT,
AsyncJobStatus.SKIPPED,
}
_RUNNING_ON_API_SIDE_STATUS = {AsyncJobStatus.RUNNING, AsyncJobStatus.TIMED_OUT}

Expand Down Expand Up @@ -341,6 +348,19 @@ def _process_completed_partition(self, partition: AsyncPartition) -> None:
for job in partition.jobs:
self._job_tracker.remove_job(job.api_job_id())

def _process_skipped_partition(self, partition: AsyncPartition) -> None:
"""
Process a skipped partition. The API indicated there is no data to return for this job
(e.g. Amazon SP-API CANCELLED status means no data to report). We clean up the job
allocation without fetching any records or raising errors.
"""
job_ids = list(map(lambda job: job.api_job_id(), {job for job in partition.jobs}))
LOGGER.info(
f"The following jobs for stream slice {partition.stream_slice} have been skipped (no data to return): {job_ids}."
)
for job in partition.jobs:
self._job_tracker.remove_job(job.api_job_id())

def _process_running_partitions_and_yield_completed_ones(
self,
) -> Generator[AsyncPartition, Any, None]:
Expand All @@ -359,6 +379,8 @@ def _process_running_partitions_and_yield_completed_ones(
case AsyncJobStatus.COMPLETED:
self._process_completed_partition(partition)
yield partition
case AsyncJobStatus.SKIPPED:
self._process_skipped_partition(partition)
case AsyncJobStatus.RUNNING:
current_running_partitions.append(partition)
case _ if partition.has_reached_max_attempt():
Expand Down
1 change: 1 addition & 0 deletions airbyte_cdk/sources/declarative/async_job/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class AsyncJobStatus(Enum):
COMPLETED = ("COMPLETED", _TERMINAL)
FAILED = ("FAILED", _TERMINAL)
TIMED_OUT = ("TIMED_OUT", _TERMINAL)
SKIPPED = ("SKIPPED", _TERMINAL)

def __init__(self, value: str, is_terminal: bool) -> None:
self._value = value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3997,6 +3997,11 @@ definitions:
type: array
items:
type: string
skipped:
description: "Statuses that indicate the job was skipped because there is no data to return. Jobs with these statuses will not be retried and no records will be fetched."
type: array
items:
type: string
AsyncRetriever:
title: Asynchronous Retriever
description: "Retrieves records by Asynchronously sending requests to fetch records. The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the partition router."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1248,6 +1248,7 @@ class AsyncJobStatusMap(BaseModel):
completed: List[str]
failed: List[str]
timeout: List[str]
skipped: Optional[List[str]] = None


class ValueType(Enum):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3680,6 +3680,8 @@ def _get_state_delegating_stream_model(
else model.full_refresh_stream
)

_OPTIONAL_ASYNC_STATUS_FIELDS = {"skipped"}

def _create_async_job_status_mapping(
self, model: AsyncJobStatusMapModel, config: Config, **kwargs: Any
) -> Mapping[str, AsyncJobStatus]:
Expand All @@ -3689,6 +3691,14 @@ def _create_async_job_status_mapping(
# This is an element of the dict because of the typing of the CDK but it is not a CDK status
continue

if api_statuses is None:
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Fixed in eaf016a — the None guard is now scoped to only the optional skipped field via _OPTIONAL_ASYNC_STATUS_FIELDS = {"skipped"}. If a required field (running, completed, failed, timeout) is ever accidentally None, it will now raise a ValueError instead of being silently skipped.

if cdk_status in self._OPTIONAL_ASYNC_STATUS_FIELDS:
continue
raise ValueError(
f"Required CDK status '{cdk_status}' has no API statuses mapped. "
f"Please provide at least an empty list for required status fields."
)

for status in api_statuses:
if status in api_status_to_cdk_status:
raise ValueError(
Expand All @@ -3707,6 +3717,8 @@ def _get_async_job_status(self, status: str) -> AsyncJobStatus:
return AsyncJobStatus.FAILED
case "timeout":
return AsyncJobStatus.TIMED_OUT
case "skipped":
return AsyncJobStatus.SKIPPED
case _:
raise ValueError(f"Unsupported CDK status {status}")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,39 @@ def test_given_running_and_completed_jobs_when_status_then_return_running(self)
)
assert partition.status == AsyncJobStatus.RUNNING

def test_given_only_completed_jobs_when_status_then_return_running(self) -> None:
def test_given_only_completed_jobs_when_status_then_return_completed(self) -> None:
partition = AsyncPartition(
[_create_job(AsyncJobStatus.COMPLETED) for _ in range(10)], _ANY_STREAM_SLICE
)
assert partition.status == AsyncJobStatus.COMPLETED

def test_given_only_skipped_jobs_when_status_then_return_skipped(self) -> None:
partition = AsyncPartition(
[_create_job(AsyncJobStatus.SKIPPED) for _ in range(3)], _ANY_STREAM_SLICE
)
assert partition.status == AsyncJobStatus.SKIPPED

def test_given_completed_and_skipped_jobs_when_status_then_return_completed(self) -> None:
partition = AsyncPartition(
[_create_job(AsyncJobStatus.COMPLETED), _create_job(AsyncJobStatus.SKIPPED)],
_ANY_STREAM_SLICE,
)
assert partition.status == AsyncJobStatus.COMPLETED

def test_given_skipped_and_running_jobs_when_status_then_return_running(self) -> None:
partition = AsyncPartition(
[_create_job(AsyncJobStatus.SKIPPED), _create_job(AsyncJobStatus.RUNNING)],
_ANY_STREAM_SLICE,
)
assert partition.status == AsyncJobStatus.RUNNING

def test_given_skipped_and_failed_jobs_when_status_then_return_failed(self) -> None:
partition = AsyncPartition(
[_create_job(AsyncJobStatus.SKIPPED), _create_job(AsyncJobStatus.FAILED)],
_ANY_STREAM_SLICE,
)
assert partition.status == AsyncJobStatus.FAILED


def _status_update_per_jobs(
status_update_per_jobs: Mapping[AsyncJob, List[AsyncJobStatus]],
Expand Down Expand Up @@ -358,6 +385,37 @@ def test_given_start_job_raise_when_create_and_get_completed_partitions_then_fre

assert job_tracker.try_to_get_intent()

@mock.patch(sleep_mock_target)
def test_given_skipped_when_create_and_get_completed_partitions_then_skip_without_fetching_records(
self, mock_sleep: MagicMock
) -> None:
job_tracker = JobTracker(1)
self._job_repository.start.return_value = self._job_for_a_slice
self._job_repository.update_jobs_status.side_effect = _status_update_per_jobs(
{self._job_for_a_slice: [AsyncJobStatus.SKIPPED]}
)
orchestrator = self._orchestrator([_A_STREAM_SLICE], job_tracker)

partitions = list(orchestrator.create_and_get_completed_partitions())

assert len(partitions) == 0 # skipped partitions are not yielded
assert job_tracker.try_to_get_intent() # budget was freed
self._job_repository.fetch_records.assert_not_called()

@mock.patch(sleep_mock_target)
def test_given_skipped_does_not_retry(self, mock_sleep: MagicMock) -> None:
self._job_repository.start.return_value = self._job_for_a_slice
self._job_repository.update_jobs_status.side_effect = _status_update_per_jobs(
{self._job_for_a_slice: [AsyncJobStatus.SKIPPED]}
)
orchestrator = self._orchestrator([_A_STREAM_SLICE])

partitions = list(orchestrator.create_and_get_completed_partitions())

assert len(partitions) == 0
# start is called only once — SKIPPED does not trigger a retry
assert self._job_repository.start.call_count == 1

def _mock_repository(self) -> None:
self._job_repository = Mock(spec=AsyncJobRepository)

Expand Down
Loading