Skip to content

Commit b00c20c

Browse files
devin-ai-integration[bot]bot_apk
andcommitted
fix(async-job): propagate wrapped FailureType on async job aggregation
Replace hardcoded system_error in AsyncJobOrchestrator's aggregated failure with the highest-precedence FailureType among wrapped non-breaking exceptions (config_error > transient_error > system_error). The user-facing message is chosen per FailureType to stay deterministic; underlying failure-type counts and exception reprs are moved into internal_message. Co-Authored-By: bot_apk <apk@cognition.ai>
1 parent 1256a1f commit b00c20c

2 files changed

Lines changed: 106 additions & 3 deletions

File tree

airbyte_cdk/sources/declarative/async_job/job_orchestrator.py

Lines changed: 67 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from datetime import timedelta
99
from typing import (
1010
Any,
11+
Dict,
1112
Generator,
1213
Generic,
1314
Iterable,
@@ -38,6 +39,29 @@
3839
_NO_TIMEOUT = timedelta.max
3940
_API_SIDE_RUNNING_STATUS = {AsyncJobStatus.RUNNING, AsyncJobStatus.TIMED_OUT}
4041

42+
# Precedence used to aggregate the `FailureType` of many non-breaking
43+
# exceptions into a single value. A `config_error` means the user must act
44+
# before retries can succeed, so it dominates. `transient_error` is next
45+
# (retryable). `system_error` is the fallback for genuine internal failures.
46+
_FAILURE_TYPE_PRECEDENCE: Tuple[FailureType, ...] = (
47+
FailureType.config_error,
48+
FailureType.transient_error,
49+
FailureType.system_error,
50+
)
51+
52+
# Deterministic, aggregation-friendly user-facing messages per dominant
53+
# `FailureType`. Counts and raw exception reprs go into `internal_message`
54+
# so that the `message` field stays stable as a log aggregation key.
55+
_ASYNC_JOB_FAILURE_MESSAGE_BY_TYPE: Mapping[FailureType, str] = {
56+
FailureType.config_error: (
57+
"Async jobs failed because the source API rejected the request as unauthorized or forbidden."
58+
),
59+
FailureType.transient_error: (
60+
"Async jobs failed after exhausting retries for source API rate limit or transient errors."
61+
),
62+
FailureType.system_error: "Async jobs failed after exhausting retry attempts.",
63+
}
64+
4165

4266
class AsyncPartition:
4367
"""
@@ -481,16 +505,56 @@ def create_and_get_completed_partitions(self) -> Iterable[AsyncPartition]:
481505
if self._non_breaking_exceptions:
482506
# We emitted traced message but we didn't break on non_breaking_exception. We still need to raise an exception so that the
483507
# call of `create_and_get_completed_partitions` knows that there was an issue with some partitions and the sync is incomplete.
508+
failure_type = self._aggregate_failure_type(self._non_breaking_exceptions)
509+
failure_counts = self._count_failure_types(self._non_breaking_exceptions)
510+
summary = ", ".join(
511+
f"{ft.value}={failure_counts[ft]}"
512+
for ft in _FAILURE_TYPE_PRECEDENCE
513+
if ft in failure_counts
514+
)
484515
raise AirbyteTracedException(
485-
message="One or more async jobs failed after exhausting all retry attempts.",
516+
message=_ASYNC_JOB_FAILURE_MESSAGE_BY_TYPE[failure_type],
486517
internal_message="\n".join(
487-
[
518+
[f"Underlying failure breakdown: {summary}."]
519+
+ [
488520
filter_secrets(exception.__repr__())
489521
for exception in self._non_breaking_exceptions
490522
]
491523
),
492-
failure_type=FailureType.system_error,
524+
failure_type=failure_type,
525+
)
526+
527+
@staticmethod
528+
def _aggregate_failure_type(exceptions: List[Exception]) -> FailureType:
529+
"""Return the highest-precedence `FailureType` across `exceptions`.
530+
531+
Non-`AirbyteTracedException` exceptions are treated as `system_error`
532+
(matching `AirbyteTracedException`'s default). The precedence order
533+
is `config_error` > `transient_error` > `system_error`.
534+
"""
535+
types_present: Set[FailureType] = {
536+
exc.failure_type
537+
if isinstance(exc, AirbyteTracedException) and exc.failure_type is not None
538+
else FailureType.system_error
539+
for exc in exceptions
540+
}
541+
for failure_type in _FAILURE_TYPE_PRECEDENCE:
542+
if failure_type in types_present:
543+
return failure_type
544+
return FailureType.system_error
545+
546+
@staticmethod
547+
def _count_failure_types(exceptions: List[Exception]) -> Dict[FailureType, int]:
548+
"""Return a count of each `FailureType` observed in `exceptions`."""
549+
counts: Dict[FailureType, int] = {}
550+
for exc in exceptions:
551+
failure_type = (
552+
exc.failure_type
553+
if isinstance(exc, AirbyteTracedException) and exc.failure_type is not None
554+
else FailureType.system_error
493555
)
556+
counts[failure_type] = counts.get(failure_type, 0) + 1
557+
return counts
494558

495559
def _handle_non_breaking_error(self, exception: Exception) -> None:
496560
LOGGER.error(f"Failed to start the Job: {exception}, traceback: {traceback.format_exc()}")

unit_tests/sources/declarative/async_job/test_job_orchestrator.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,45 @@ def test_given_exception_when_start_job_and_skip_this_exception(
298298
assert self._message_repository.emit_message.call_count == 3 # one for each traced message
299299
assert exception.failure_type == FailureType.system_error # type: ignore # exception should be of type AirbyteTracedException
300300

301+
def test_aggregate_failure_type_gives_config_error_highest_precedence(self) -> None:
302+
exceptions: List[Exception] = [
303+
AirbyteTracedException("a", failure_type=FailureType.transient_error),
304+
AirbyteTracedException("b", failure_type=FailureType.config_error),
305+
AirbyteTracedException("c"),
306+
ValueError("d"),
307+
]
308+
assert AsyncJobOrchestrator._aggregate_failure_type(exceptions) == FailureType.config_error
309+
310+
def test_aggregate_failure_type_prefers_transient_over_system(self) -> None:
311+
exceptions: List[Exception] = [
312+
AirbyteTracedException("a"),
313+
AirbyteTracedException("b", failure_type=FailureType.transient_error),
314+
ValueError("c"),
315+
]
316+
assert (
317+
AsyncJobOrchestrator._aggregate_failure_type(exceptions) == FailureType.transient_error
318+
)
319+
320+
def test_aggregate_failure_type_defaults_to_system_error(self) -> None:
321+
exceptions: List[Exception] = [
322+
ValueError("a"),
323+
AirbyteTracedException("b"),
324+
]
325+
assert AsyncJobOrchestrator._aggregate_failure_type(exceptions) == FailureType.system_error
326+
327+
def test_count_failure_types_counts_traced_and_plain_exceptions(self) -> None:
328+
exceptions: List[Exception] = [
329+
AirbyteTracedException("a", failure_type=FailureType.transient_error),
330+
AirbyteTracedException("b", failure_type=FailureType.transient_error),
331+
AirbyteTracedException("c"),
332+
ValueError("d"),
333+
]
334+
counts = AsyncJobOrchestrator._count_failure_types(exceptions)
335+
assert counts == {
336+
FailureType.transient_error: 2,
337+
FailureType.system_error: 2,
338+
}
339+
301340
@mock.patch(sleep_mock_target)
302341
def test_given_jobs_failed_more_than_max_attempts_when_create_and_get_completed_partitions_then_free_job_budget(
303342
self, mock_sleep: MagicMock

0 commit comments

Comments
 (0)