Skip to content

Commit acafc75

Browse files
devin-ai-integration[bot]bot_apk
andauthored
fix(cdk): reclassify async job failure from config_error to system_error and add user-facing message (#961)
Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: bot_apk <apk@cognition.ai>
1 parent 0e57414 commit acafc75

File tree

2 files changed

+5
-4
lines changed

2 files changed

+5
-4
lines changed

airbyte_cdk/sources/declarative/async_job/job_orchestrator.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -434,8 +434,9 @@ def _process_partitions_with_errors(self, partition: AsyncPartition) -> None:
434434
status_by_job_id = {job.api_job_id(): job.status() for job in partition.jobs}
435435
self._non_breaking_exceptions.append(
436436
AirbyteTracedException(
437+
message="Async job failed after exhausting all retry attempts.",
437438
internal_message=f"At least one job could not be completed for slice {partition.stream_slice}. Job statuses were: {status_by_job_id}. See warning logs for more information.",
438-
failure_type=FailureType.config_error,
439+
failure_type=FailureType.system_error,
439440
)
440441
)
441442

@@ -481,14 +482,14 @@ def create_and_get_completed_partitions(self) -> Iterable[AsyncPartition]:
481482
# We emitted traced message but we didn't break on non_breaking_exception. We still need to raise an exception so that the
482483
# call of `create_and_get_completed_partitions` knows that there was an issue with some partitions and the sync is incomplete.
483484
raise AirbyteTracedException(
484-
message=None,
485+
message="One or more async jobs failed after exhausting all retry attempts.",
485486
internal_message="\n".join(
486487
[
487488
filter_secrets(exception.__repr__())
488489
for exception in self._non_breaking_exceptions
489490
]
490491
),
491-
failure_type=FailureType.config_error,
492+
failure_type=FailureType.system_error,
492493
)
493494

494495
def _handle_non_breaking_error(self, exception: Exception) -> None:

unit_tests/sources/declarative/async_job/test_job_orchestrator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ def test_given_exception_when_start_job_and_skip_this_exception(
296296

297297
assert len(partitions) == 1 # only _job_for_another_slice has succeeded
298298
assert self._message_repository.emit_message.call_count == 3 # one for each traced message
299-
assert exception.failure_type == FailureType.config_error # type: ignore # exception should be of type AirbyteTracedException
299+
assert exception.failure_type == FailureType.system_error # type: ignore # exception should be of type AirbyteTracedException
300300

301301
@mock.patch(sleep_mock_target)
302302
def test_given_jobs_failed_more_than_max_attempts_when_create_and_get_completed_partitions_then_free_job_budget(

0 commit comments

Comments
 (0)