diff --git a/airbyte_cdk/sources/streams/http/error_handlers/default_error_mapping.py b/airbyte_cdk/sources/streams/http/error_handlers/default_error_mapping.py index 74840e2d2..78dbb3ae5 100644 --- a/airbyte_cdk/sources/streams/http/error_handlers/default_error_mapping.py +++ b/airbyte_cdk/sources/streams/http/error_handlers/default_error_mapping.py @@ -11,8 +11,14 @@ ErrorResolution, ResponseAction, ) +from airbyte_cdk.sources.streams.http.exceptions import DNSResolutionError DEFAULT_ERROR_MAPPING: Mapping[Union[int, str, Type[Exception]], ErrorResolution] = { + DNSResolutionError: ErrorResolution( + response_action=ResponseAction.RETRY, + failure_type=FailureType.transient_error, + error_message="Temporary DNS resolution error occurred. Retrying...", + ), InvalidSchema: ErrorResolution( response_action=ResponseAction.FAIL, failure_type=FailureType.config_error, diff --git a/airbyte_cdk/sources/streams/http/exceptions.py b/airbyte_cdk/sources/streams/http/exceptions.py index ee4687626..27e3f5d2c 100644 --- a/airbyte_cdk/sources/streams/http/exceptions.py +++ b/airbyte_cdk/sources/streams/http/exceptions.py @@ -59,3 +59,24 @@ class DefaultBackoffException(BaseBackoffException): class RateLimitBackoffException(BaseBackoffException): pass + + +class DNSResolutionError(BaseBackoffException): + """ + Raised when a DNS resolution error occurs. + This is different from InvalidURL which is raised for malformed URLs. + """ + + def __init__( + self, + url: str, + request: requests.PreparedRequest, + response: Optional[Union[requests.Response, Exception]], + error_message: str = "", + ): + self.url = url + super().__init__( + request=request, + response=response, + error_message=error_message or f"Failed to resolve DNS for URL: {url}", + ) diff --git a/airbyte_cdk/sources/streams/http/http_client.py b/airbyte_cdk/sources/streams/http/http_client.py index c4fa86866..22c43ef14 100644 --- a/airbyte_cdk/sources/streams/http/http_client.py +++ b/airbyte_cdk/sources/streams/http/http_client.py @@ -36,6 +36,7 @@ ) from airbyte_cdk.sources.streams.http.exceptions import ( DefaultBackoffException, + DNSResolutionError, RateLimitBackoffException, RequestBodyException, UserDefinedBackoffException, @@ -300,6 +301,16 @@ def _send( try: response = self._session.send(request, **request_kwargs) + except requests.ConnectionError as e: + if "Name or service not known" in str(e) or "nodename nor servname provided" in str(e): + assert ( + request.url is not None + ), "Request URL cannot be None for DNS resolution error" + exc = DNSResolutionError( + url=request.url, request=request, response=e, error_message=str(e) + ) + else: + exc = e except requests.RequestException as e: exc = e diff --git a/unit_tests/sources/streams/http/test_http.py b/unit_tests/sources/streams/http/test_http.py index 40fdb3201..7439eeaae 100644 --- a/unit_tests/sources/streams/http/test_http.py +++ b/unit_tests/sources/streams/http/test_http.py @@ -10,6 +10,7 @@ import pytest import requests +from requests.exceptions import ConnectionError, InvalidURL from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level, SyncMode, Type from airbyte_cdk.sources.streams import CheckpointMixin @@ -20,9 +21,13 @@ from airbyte_cdk.sources.streams.core import StreamData from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream from airbyte_cdk.sources.streams.http.error_handlers import ErrorHandler, HttpStatusErrorHandler -from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction +from airbyte_cdk.sources.streams.http.error_handlers.response_models import ( + FailureType, + ResponseAction, +) from airbyte_cdk.sources.streams.http.exceptions import ( DefaultBackoffException, + DNSResolutionError, RequestBodyException, UserDefinedBackoffException, ) @@ -331,6 +336,53 @@ def test_raise_on_http_errors(mocker, error): assert send_mock.call_count == stream.max_retries + 1 +class StubHttpStreamWithErrorHandler(StubBasicReadHttpStream): + def get_error_handler(self) -> Optional[ErrorHandler]: + return HttpStatusErrorHandler(logging.getLogger()) + + +def test_dns_resolution_error_retry(): + """Test that DNS resolution errors are retried""" + stream = StubHttpStreamWithErrorHandler() + error_handler = stream.get_error_handler() + request = requests.PreparedRequest() + request.url = "https://example.com" + dns_error = DNSResolutionError( + url="https://example.com", request=request, response=Exception("DNS lookup failed") + ) + resolution = error_handler.interpret_response(dns_error) + assert resolution.response_action == ResponseAction.RETRY + assert resolution.failure_type == FailureType.transient_error + + +def test_invalid_url_fails(): + """Test that invalid URLs fail immediately""" + stream = StubHttpStreamWithErrorHandler() + error_handler = stream.get_error_handler() + resolution = error_handler.interpret_response(InvalidURL()) + assert resolution.response_action == ResponseAction.FAIL + assert resolution.failure_type == FailureType.config_error + + +def test_dns_resolution_error_retry_with_connection_error(): + """Test that DNS resolution errors from ConnectionError are properly handled""" + stream = StubHttpStreamWithErrorHandler() + send_mock = MagicMock( + side_effect=requests.ConnectionError( + "HTTPSConnectionPool(host='api.example.com', port=443): " + "Max retries exceeded with url: /v1/data (Caused by " + 'NameResolutionError(": Failed to resolve 'api.example.com' " + '(Name or service not known)"))' + ) + ) + stream._http_client._session.send = send_mock + + with pytest.raises(DefaultBackoffException): + list(stream.read_records(SyncMode.full_refresh)) + assert send_mock.call_count == stream.max_retries + 1 + + class PostHttpStream(StubBasicReadHttpStream): http_method = "POST"