Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 2 additions & 19 deletions airbyte_cdk/sources/streams/http/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,23 +82,6 @@ def monkey_patched_get_item(self, key): # type: ignore # this interface is a co
requests_cache.SQLiteDict.__getitem__ = monkey_patched_get_item # type: ignore # see the method doc for more information


class MessageRepresentationAirbyteTracedErrors(AirbyteTracedException):
"""
Before the migration to the HttpClient in low-code, the exception raised was
[ReadException](https://github.com/airbytehq/airbyte/blob/8fdd9818ec16e653ba3dd2b167a74b7c07459861/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_requester.py#L566).
This has been moved to a AirbyteTracedException. The printing on this is questionable (AirbyteTracedException string representation
shows the internal_message and not the message). We have already discussed moving the AirbyteTracedException string representation to
`message` but the impact is unclear and hard to quantify so we will do it here only for now.
"""

def __str__(self) -> str:
if self.message:
return self.message
elif self.internal_message:
return self.internal_message
return ""


class HttpClient:
_DEFAULT_MAX_RETRY: int = 5
_DEFAULT_MAX_TIME: int = 60 * 10
Expand Down Expand Up @@ -311,7 +294,7 @@ def _send_with_retry(
return response
except BaseBackoffException as e:
self._logger.error(f"Retries exhausted with backoff exception.", exc_info=True)
raise MessageRepresentationAirbyteTracedErrors(
raise AirbyteTracedException(
internal_message=f"Exhausted available request attempts. Exception: {e}",
message=f"Exhausted available request attempts. Please see logs for more details. Exception: {e}",
failure_type=e.failure_type or FailureType.system_error,
Expand Down Expand Up @@ -495,7 +478,7 @@ def _handle_error_resolution(
# ensure the exception message is emitted before raised
self._logger.error(error_message)

raise MessageRepresentationAirbyteTracedErrors(
raise AirbyteTracedException(
internal_message=error_message,
message=error_resolution.error_message or error_message,
failure_type=error_resolution.failure_type,
Expand Down
9 changes: 9 additions & 0 deletions airbyte_cdk/utils/traced_exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ def __init__(
self._stream_descriptor = stream_descriptor
super().__init__(internal_message)

def __str__(self) -> str:
"""Return the user-facing message, falling back to internal_message."""

if self.message:
return self.message
elif self.internal_message:
return self.internal_message
return ""
Comment thread
sophiecuiy marked this conversation as resolved.

def as_airbyte_message(
self, stream_descriptor: Optional[StreamDescriptor] = None
) -> AirbyteMessage:
Expand Down
89 changes: 89 additions & 0 deletions unit_tests/utils/test_traced_exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,92 @@ def test_given_both_from_exception_and_as_sanitized_airbyte_message_with_stream_
)
message = traced_exc.as_sanitized_airbyte_message(stream_descriptor=_ANOTHER_STREAM_DESCRIPTOR)
assert message.trace.error.stream_descriptor == _A_STREAM_DESCRIPTOR


class TestAirbyteTracedExceptionStr:
"""Tests proving that __str__ returns user-facing message instead of internal_message."""

def test_str_returns_user_facing_message_when_both_set(self) -> None:
exc = AirbyteTracedException(
internal_message="raw API error: 401 Unauthorized",
message="Authentication credentials are invalid.",
)
assert str(exc) == "Authentication credentials are invalid."

def test_str_falls_back_to_internal_message_when_message_is_none(self) -> None:
exc = AirbyteTracedException(internal_message="an internal error")
assert str(exc) == "an internal error"

def test_str_returns_empty_string_when_both_none(self) -> None:
exc = AirbyteTracedException()
assert str(exc) == ""

def test_str_returns_message_when_internal_message_is_none(self) -> None:
exc = AirbyteTracedException(message="A user-friendly error occurred.")
assert str(exc) == "A user-friendly error occurred."

def test_str_used_in_fstring_returns_user_facing_message(self) -> None:
exc = AirbyteTracedException(
internal_message="internal detail",
message="Connection timed out.",
)
assert f"Error: {exc}" == "Error: Connection timed out."

def test_str_used_in_logging_format_returns_user_facing_message(self) -> None:
exc = AirbyteTracedException(
internal_message="socket.timeout: read timed out",
message="Request timed out.",
)
assert "Error: %s" % exc == "Error: Request timed out."

def test_args_still_contains_internal_message(self) -> None:
"""Verify args[0] is still internal_message for traceback formatting."""
exc = AirbyteTracedException(
internal_message="internal detail",
message="user-facing message",
)
assert exc.args[0] == "internal detail"

def test_str_on_subclass_inherits_behavior(self) -> None:
"""Verify subclasses inherit the __str__ override without needing their own."""

class CustomTracedException(AirbyteTracedException):
pass

exc = CustomTracedException(
internal_message="raw error",
message="User-friendly error.",
)
assert str(exc) == "User-friendly error."

def test_str_with_from_exception_factory(self) -> None:
original = ValueError("original error")
exc = AirbyteTracedException.from_exception(
original, message="A validation error occurred."
)
assert str(exc) == "A validation error occurred."
assert exc.internal_message == "original error"

def test_str_with_from_exception_without_message(self) -> None:
original = RuntimeError("runtime failure")
exc = AirbyteTracedException.from_exception(original)
assert str(exc) == "runtime failure"

def test_stack_trace_uses_str_representation(self) -> None:
"""Verify traceback one-liner uses __str__ (user-facing message)."""
exc = AirbyteTracedException(
internal_message="internal detail for traceback",
message="User sees this.",
)
airbyte_message = exc.as_airbyte_message()
assert "User sees this." in airbyte_message.trace.error.stack_trace

def test_internal_message_preserved_in_trace_error(self) -> None:
"""Verify internal_message is still available in the trace error for debugging."""
exc = AirbyteTracedException(
internal_message="raw API error: 401",
message="Authentication failed.",
)
airbyte_message = exc.as_airbyte_message()
assert airbyte_message.trace.error.internal_message == "raw API error: 401"
assert airbyte_message.trace.error.message == "Authentication failed."
Comment thread
sophiecuiy marked this conversation as resolved.
Loading