Skip to content

Commit a3018cf

Browse files
fix: improve async job retry exhaustion error
1 parent 19a7083 commit a3018cf

3 files changed

Lines changed: 44 additions & 4 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ Newer updates can be found here: [GitHub Release Notes](https://github.com/airby
44

55
# Changelog
66

7+
## 6.5.3
8+
9+
bugfix: Improve low-code async job retry exhaustion error messages and classification.
10+
711
## 6.5.2
812

913
bugfix: Ensure that streams with partition router are not executed concurrently

airbyte_cdk/sources/declarative/async_job/job_orchestrator.py

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
LOGGER = logging.getLogger("airbyte")
3838
_NO_TIMEOUT = timedelta.max
3939
_API_SIDE_RUNNING_STATUS = {AsyncJobStatus.RUNNING, AsyncJobStatus.TIMED_OUT}
40+
_ASYNC_JOB_TERMINAL_FAILURE_MESSAGE = "Async job failed after exhausting all retry attempts."
41+
_ASYNC_JOB_FINAL_FAILURE_MESSAGE = "Async job partition did not complete successfully."
4042

4143

4244
class AsyncPartition:
@@ -487,11 +489,16 @@ def _process_partitions_with_errors(self, partition: AsyncPartition) -> None:
487489
AirbyteTracedException: If at least one job could not be completed.
488490
"""
489491
status_by_job_id = {job.api_job_id(): job.status() for job in partition.jobs}
492+
failure_type = (
493+
FailureType.system_error
494+
if any(job.is_creation_failure() for job in partition.jobs)
495+
else FailureType.transient_error
496+
)
490497
self._non_breaking_exceptions.append(
491498
AirbyteTracedException(
492-
message="Async job failed after exhausting all retry attempts.",
499+
message=_ASYNC_JOB_TERMINAL_FAILURE_MESSAGE,
493500
internal_message=f"At least one job could not be completed for slice {partition.stream_slice}. Job statuses were: {status_by_job_id}. See warning logs for more information.",
494-
failure_type=FailureType.system_error,
501+
failure_type=failure_type,
495502
)
496503
)
497504

@@ -536,15 +543,25 @@ def create_and_get_completed_partitions(self) -> Iterable[AsyncPartition]:
536543
if self._non_breaking_exceptions:
537544
# We emitted traced message but we didn't break on non_breaking_exception. We still need to raise an exception so that the
538545
# call of `create_and_get_completed_partitions` knows that there was an issue with some partitions and the sync is incomplete.
546+
failure_type = (
547+
FailureType.transient_error
548+
if any(
549+
isinstance(exception, AirbyteTracedException)
550+
and exception.message == _ASYNC_JOB_TERMINAL_FAILURE_MESSAGE
551+
and exception.failure_type == FailureType.transient_error
552+
for exception in self._non_breaking_exceptions
553+
)
554+
else FailureType.system_error
555+
)
539556
raise AirbyteTracedException(
540-
message="One or more async jobs failed after exhausting all retry attempts.",
557+
message=_ASYNC_JOB_FINAL_FAILURE_MESSAGE,
541558
internal_message="\n".join(
542559
[
543560
filter_secrets(exception.__repr__())
544561
for exception in self._non_breaking_exceptions
545562
]
546563
),
547-
failure_type=FailureType.system_error,
564+
failure_type=failure_type,
548565
)
549566

550567
def _handle_non_breaking_error(self, exception: Exception) -> None:

unit_tests/sources/declarative/async_job/test_job_orchestrator.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,25 @@ def test_given_jobs_failed_more_than_max_attempts_when_create_and_get_completed_
344344

345345
assert job_tracker.try_to_get_intent()
346346

347+
@mock.patch(sleep_mock_target)
348+
def test_given_jobs_failed_more_than_max_attempts_when_create_and_get_completed_partitions_then_raise_transient_error(
349+
self, mock_sleep: MagicMock
350+
) -> None:
351+
jobs = [self._an_async_job(str(i), _A_STREAM_SLICE) for i in range(_MAX_NUMBER_OF_ATTEMPTS)]
352+
self._job_repository.start.side_effect = jobs
353+
self._job_repository.update_jobs_status.side_effect = _status_update_per_jobs(
354+
{job: [AsyncJobStatus.FAILED] for job in jobs}
355+
)
356+
357+
orchestrator = self._orchestrator([_A_STREAM_SLICE], JobTracker(1))
358+
359+
with pytest.raises(AirbyteTracedException) as exc_info:
360+
list(orchestrator.create_and_get_completed_partitions())
361+
362+
assert exc_info.value.message == "Async job partition did not complete successfully."
363+
assert exc_info.value.failure_type == FailureType.transient_error
364+
assert "At least one job could not be completed" in exc_info.value.internal_message
365+
347366
def given_budget_already_taken_before_start_when_create_and_get_completed_partitions_then_wait_for_budget_to_be_freed(
348367
self,
349368
) -> None:

0 commit comments

Comments
 (0)