Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 2 additions & 19 deletions airbyte_cdk/sources/streams/http/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,23 +82,6 @@ def monkey_patched_get_item(self, key): # type: ignore # this interface is a co
requests_cache.SQLiteDict.__getitem__ = monkey_patched_get_item # type: ignore # see the method doc for more information


class MessageRepresentationAirbyteTracedErrors(AirbyteTracedException):
"""
Before the migration to the HttpClient in low-code, the exception raised was
[ReadException](https://github.com/airbytehq/airbyte/blob/8fdd9818ec16e653ba3dd2b167a74b7c07459861/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_requester.py#L566).
This has been moved to a AirbyteTracedException. The printing on this is questionable (AirbyteTracedException string representation
shows the internal_message and not the message). We have already discussed moving the AirbyteTracedException string representation to
`message` but the impact is unclear and hard to quantify so we will do it here only for now.
"""

def __str__(self) -> str:
if self.message:
return self.message
elif self.internal_message:
return self.internal_message
return ""


class HttpClient:
_DEFAULT_MAX_RETRY: int = 5
_DEFAULT_MAX_TIME: int = 60 * 10
Expand Down Expand Up @@ -311,7 +294,7 @@ def _send_with_retry(
return response
except BaseBackoffException as e:
self._logger.error(f"Retries exhausted with backoff exception.", exc_info=True)
raise MessageRepresentationAirbyteTracedErrors(
raise AirbyteTracedException(
internal_message=f"Exhausted available request attempts. Exception: {e}",
message=f"Exhausted available request attempts. Please see logs for more details. Exception: {e}",
failure_type=e.failure_type or FailureType.system_error,
Expand Down Expand Up @@ -495,7 +478,7 @@ def _handle_error_resolution(
# ensure the exception message is emitted before raised
self._logger.error(error_message)

raise MessageRepresentationAirbyteTracedErrors(
raise AirbyteTracedException(
internal_message=error_message,
message=error_resolution.error_message or error_message,
failure_type=error_resolution.failure_type,
Expand Down
17 changes: 16 additions & 1 deletion airbyte_cdk/utils/traced_exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ def __init__(
self._stream_descriptor = stream_descriptor
super().__init__(internal_message)

def __str__(self) -> str:
"""Return the user-facing message, falling back to internal_message."""
if self.message is not None:
return self.message
if self.internal_message is not None:
return self.internal_message
return ""

def as_airbyte_message(
self, stream_descriptor: Optional[StreamDescriptor] = None
) -> AirbyteMessage:
Expand Down Expand Up @@ -112,8 +120,15 @@ def from_exception(
:param exc: the exception that caused the error
:param stream_descriptor: describe the stream from which the exception comes from
"""
if isinstance(exc, AirbyteTracedException):
internal_message = exc.internal_message
# Preserve the original user-facing message if the caller didn't provide one
if "message" not in kwargs:
kwargs["message"] = exc.message
else:
internal_message = str(exc)
return cls(
internal_message=str(exc),
internal_message=internal_message,
exception=exc,
stream_descriptor=stream_descriptor,
*args,
Expand Down
170 changes: 170 additions & 0 deletions unit_tests/utils/test_traced_exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,173 @@ def test_given_both_from_exception_and_as_sanitized_airbyte_message_with_stream_
)
message = traced_exc.as_sanitized_airbyte_message(stream_descriptor=_ANOTHER_STREAM_DESCRIPTOR)
assert message.trace.error.stream_descriptor == _A_STREAM_DESCRIPTOR


class TestAirbyteTracedExceptionStr:
"""Tests proving that __str__ returns user-facing message instead of internal_message."""

def test_str_returns_user_facing_message_when_both_set(self) -> None:
exc = AirbyteTracedException(
internal_message="raw API error: 401 Unauthorized",
message="Authentication credentials are invalid.",
)
assert str(exc) == "Authentication credentials are invalid."

def test_str_falls_back_to_internal_message_when_message_is_none(self) -> None:
exc = AirbyteTracedException(internal_message="an internal error")
assert str(exc) == "an internal error"

def test_str_returns_empty_string_when_both_none(self) -> None:
exc = AirbyteTracedException()
assert str(exc) == ""

def test_str_returns_message_when_internal_message_is_none(self) -> None:
exc = AirbyteTracedException(message="A user-friendly error occurred.")
assert str(exc) == "A user-friendly error occurred."

def test_str_used_in_fstring_returns_user_facing_message(self) -> None:
exc = AirbyteTracedException(
internal_message="internal detail",
message="Connection timed out.",
)
assert f"Error: {exc}" == "Error: Connection timed out."

def test_str_used_in_logging_format_returns_user_facing_message(self) -> None:
exc = AirbyteTracedException(
internal_message="socket.timeout: read timed out",
message="Request timed out.",
)
assert "Error: %s" % exc == "Error: Request timed out."

def test_args_still_contains_internal_message(self) -> None:
"""Verify args[0] is still internal_message for traceback formatting."""
exc = AirbyteTracedException(
internal_message="internal detail",
message="user-facing message",
)
assert exc.args[0] == "internal detail"

def test_str_on_subclass_inherits_behavior(self) -> None:
"""Verify subclasses inherit the __str__ override without needing their own."""

class CustomTracedException(AirbyteTracedException):
pass

exc = CustomTracedException(
internal_message="raw error",
message="User-friendly error.",
)
assert str(exc) == "User-friendly error."

def test_str_with_from_exception_factory(self) -> None:
original = ValueError("original error")
exc = AirbyteTracedException.from_exception(
original, message="A validation error occurred."
)
assert str(exc) == "A validation error occurred."
assert exc.internal_message == "original error"

def test_str_with_from_exception_without_message(self) -> None:
original = RuntimeError("runtime failure")
exc = AirbyteTracedException.from_exception(original)
assert str(exc) == "runtime failure"

def test_stack_trace_uses_str_representation(self) -> None:
"""Verify traceback one-liner uses __str__ (user-facing message)."""
exc = AirbyteTracedException(
internal_message="internal detail for traceback",
message="User sees this.",
)
airbyte_message = exc.as_airbyte_message()
assert "User sees this." in airbyte_message.trace.error.stack_trace

def test_str_with_empty_message_does_not_fall_back_to_internal_message(self) -> None:
"""Explicit empty message should be respected and not replaced by internal_message."""
exc = AirbyteTracedException(
internal_message="an internal error that should not be shown to the user",
message="",
)
assert str(exc) == ""

def test_internal_message_preserved_in_trace_error(self) -> None:
"""Verify internal_message is still available in the trace error for debugging."""
exc = AirbyteTracedException(
internal_message="raw API error: 401",
message="Authentication failed.",
)
airbyte_message = exc.as_airbyte_message()
assert airbyte_message.trace.error.internal_message == "raw API error: 401"
assert airbyte_message.trace.error.message == "Authentication failed."


class TestFromExceptionPreservesFields:
"""Tests proving that from_exception preserves both fields when wrapping an AirbyteTracedException."""

def test_from_exception_wrapping_traced_preserves_internal_message(self) -> None:
"""When wrapping an AirbyteTracedException, internal_message should be taken directly, not via str()."""
original = AirbyteTracedException(
internal_message="raw API error: 401 Unauthorized",
message="Authentication failed.",
)
wrapped = AirbyteTracedException.from_exception(original)
assert wrapped.internal_message == "raw API error: 401 Unauthorized"

def test_from_exception_wrapping_traced_preserves_message(self) -> None:
"""When wrapping an AirbyteTracedException without explicit message, the original's message is preserved."""
original = AirbyteTracedException(
internal_message="raw API error",
message="User-friendly error.",
)
wrapped = AirbyteTracedException.from_exception(original)
assert wrapped.message == "User-friendly error."

def test_from_exception_wrapping_traced_caller_message_overrides(self) -> None:
"""When the caller provides an explicit message, it should override the original's message."""
original = AirbyteTracedException(
internal_message="raw API error",
message="Original user message.",
)
wrapped = AirbyteTracedException.from_exception(original, message="Custom wrapper message.")
assert wrapped.message == "Custom wrapper message."
assert wrapped.internal_message == "raw API error"

def test_from_exception_wrapping_traced_with_only_message(self) -> None:
"""When wrapping a traced exception that has message but no internal_message, both fields are preserved."""
original = AirbyteTracedException(
message="User error only.",
)
wrapped = AirbyteTracedException.from_exception(original)
assert wrapped.internal_message is None
assert wrapped.message == "User error only."

def test_from_exception_wrapping_traced_with_only_internal_message(self) -> None:
"""When wrapping a traced exception that has only internal_message, it is preserved correctly."""
original = AirbyteTracedException(
internal_message="internal detail only",
)
wrapped = AirbyteTracedException.from_exception(original)
assert wrapped.internal_message == "internal detail only"
assert wrapped.message is None

def test_from_exception_wrapping_traced_with_neither_field(self) -> None:
"""When wrapping a traced exception with no messages, both remain None."""
original = AirbyteTracedException()
wrapped = AirbyteTracedException.from_exception(original)
assert wrapped.internal_message is None
assert wrapped.message is None

def test_from_exception_wrapping_regular_exception_unchanged(self) -> None:
"""Wrapping a regular exception should still use str(exc) for internal_message."""
original = ValueError("some value error")
wrapped = AirbyteTracedException.from_exception(original)
assert wrapped.internal_message == "some value error"
assert wrapped.message is None

def test_from_exception_wrapping_regular_exception_with_message(self) -> None:
"""Wrapping a regular exception with explicit message kwarg still works."""
original = RuntimeError("runtime failure")
wrapped = AirbyteTracedException.from_exception(
original, message="A runtime error occurred."
)
assert wrapped.internal_message == "runtime failure"
assert wrapped.message == "A runtime error occurred."
Loading