From bd7c4263157f3a34bcec74e928cb67468726ca2f Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 29 Jan 2026 14:52:31 +0000 Subject: [PATCH 1/9] feat(cdk): add REFRESH_TOKEN_THEN_RETRY response action for OAuth token refresh This introduces a new ResponseAction type called REFRESH_TOKEN_THEN_RETRY that forces an OAuth token refresh before retrying a request. This is useful when APIs return 401 Unauthorized but the stored token expiry hasn't been reached yet. Changes: - Add REFRESH_TOKEN_THEN_RETRY to ResponseAction enum - Update HttpClient._handle_error_resolution to force token refresh when this action is encountered by directly calling refresh_access_token() - Add REFRESH_TOKEN_THEN_RETRY to declarative_component_schema.yaml - Regenerate Pydantic models from schema - Add unit tests for the new functionality This generalizes the pattern from PR #72309 (HubSpot custom component) into the CDK itself. Co-Authored-By: Daryna Ishchenko --- .../declarative_component_schema.yaml | 2 + .../models/declarative_component_schema.py | 148 ++++++++------ .../http/error_handlers/response_models.py | 1 + .../sources/streams/http/http_client.py | 34 +++- .../sources/streams/http/test_http_client.py | 185 ++++++++++++++++++ 5 files changed, 306 insertions(+), 64 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 977fff2d5..e68318cd4 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -2331,6 +2331,7 @@ definitions: - IGNORE - RESET_PAGINATION - RATE_LIMITED + - REFRESH_TOKEN_THEN_RETRY examples: - SUCCESS - FAIL @@ -2338,6 +2339,7 @@ definitions: - IGNORE - RESET_PAGINATION - RATE_LIMITED + - REFRESH_TOKEN_THEN_RETRY failure_type: title: Failure Type description: Failure type of traced exception if a response matches the filter. diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 3fccc600f..a1c5c12b7 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1,5 +1,3 @@ -# Copyright (c) 2025 Airbyte, Inc., all rights reserved. - # generated by datamodel-codegen: # filename: declarative_component_schema.yaml @@ -543,6 +541,7 @@ class Action(Enum): IGNORE = "IGNORE" RESET_PAGINATION = "RESET_PAGINATION" RATE_LIMITED = "RATE_LIMITED" + REFRESH_TOKEN_THEN_RETRY = "REFRESH_TOKEN_THEN_RETRY" class FailureType(Enum): @@ -563,6 +562,7 @@ class HttpResponseFilter(BaseModel): "IGNORE", "RESET_PAGINATION", "RATE_LIMITED", + "REFRESH_TOKEN_THEN_RETRY", ], title="Action", ) @@ -928,24 +928,28 @@ class OAuthConfigSpecification(BaseModel): class Config: extra = Extra.allow - oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field( - None, - description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", - examples=[ - {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, - { - "app_id": { - "type": "string", - "path_in_connector_config": ["info", "app_id"], - } - }, - ], - title="OAuth user input", + oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = ( + Field( + None, + description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", + examples=[ + {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, + { + "app_id": { + "type": "string", + "path_in_connector_config": ["info", "app_id"], + } + }, + ], + title="OAuth user input", + ) ) - oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field( - None, - description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', - title="DeclarativeOAuth Connector Specification", + oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = ( + Field( + None, + description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', + title="DeclarativeOAuth Connector Specification", + ) ) complete_oauth_output_specification: Optional[Dict[str, Any]] = Field( None, @@ -963,7 +967,9 @@ class Config: complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field( None, description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }", - examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}], + examples=[ + {"client_id": {"type": "string"}, "client_secret": {"type": "string"}} + ], title="OAuth input specification", ) complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field( @@ -1467,7 +1473,9 @@ class CustomConfigTransformation(BaseModel): class_name: str = Field( ..., description="Fully-qualified name of the class that will be implementing the custom config transformation. The format is `source_..`.", - examples=["source_declarative_manifest.components.MyCustomConfigTransformation"], + examples=[ + "source_declarative_manifest.components.MyCustomConfigTransformation" + ], ) parameters: Optional[Dict[str, Any]] = Field( None, @@ -1895,7 +1903,9 @@ class OAuthAuthenticator(BaseModel): scopes: Optional[List[str]] = Field( None, description="List of scopes that should be granted to the access token.", - examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]], + examples=[ + ["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"] + ], title="Scopes", ) token_expiry_date: Optional[str] = Field( @@ -2122,7 +2132,9 @@ class RecordSelector(BaseModel): description="Responsible for filtering records to be emitted by the Source.", title="Record Filter", ) - schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field( + schema_normalization: Optional[ + Union[SchemaNormalization, CustomSchemaNormalization] + ] = Field( None, description="Responsible for normalization according to the schema.", title="Schema Normalization", @@ -2164,10 +2176,12 @@ class DpathValidator(BaseModel): ], title="Field Path", ) - validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = Field( - ..., - description="The condition that the specified config value will be evaluated against", - title="Validation Strategy", + validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = ( + Field( + ..., + description="The condition that the specified config value will be evaluated against", + title="Validation Strategy", + ) ) @@ -2184,10 +2198,12 @@ class PredicateValidator(BaseModel): ], title="Value", ) - validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = Field( - ..., - description="The validation strategy to apply to the value.", - title="Validation Strategy", + validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = ( + Field( + ..., + description="The validation strategy to apply to the value.", + title="Validation Strategy", + ) ) @@ -2212,12 +2228,12 @@ class ConfigAddFields(BaseModel): class CompositeErrorHandler(BaseModel): type: Literal["CompositeErrorHandler"] - error_handlers: List[Union[CompositeErrorHandler, DefaultErrorHandler, CustomErrorHandler]] = ( - Field( - ..., - description="List of error handlers to iterate on to determine how to handle a failed response.", - title="Error Handlers", - ) + error_handlers: List[ + Union[CompositeErrorHandler, DefaultErrorHandler, CustomErrorHandler] + ] = Field( + ..., + description="List of error handlers to iterate on to determine how to handle a failed response.", + title="Error Handlers", ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -2379,9 +2395,9 @@ class Config: type: Literal["DeclarativeSource"] check: Union[CheckStream, CheckDynamicStream] - streams: Optional[List[Union[ConditionalStreams, DeclarativeStream, StateDelegatingStream]]] = ( - None - ) + streams: Optional[ + List[Union[ConditionalStreams, DeclarativeStream, StateDelegatingStream]] + ] = None dynamic_streams: List[DynamicDeclarativeStream] version: str = Field( ..., @@ -2506,16 +2522,20 @@ class Config: extra = Extra.allow type: Literal["DeclarativeStream"] - name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") + name: Optional[str] = Field( + "", description="The stream name.", example=["Users"], title="Name" + ) retriever: Union[SimpleRetriever, AsyncRetriever, CustomRetriever] = Field( ..., description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) - incremental_sync: Optional[Union[DatetimeBasedCursor, IncrementingCountCursor]] = Field( - None, - description="Component used to fetch data incrementally based on a time field in the data.", - title="Incremental Sync", + incremental_sync: Optional[Union[DatetimeBasedCursor, IncrementingCountCursor]] = ( + Field( + None, + description="Component used to fetch data incrementally based on a time field in the data.", + title="Incremental Sync", + ) ) primary_key: Optional[PrimaryKey] = Field("", title="Primary Key") schema_loader: Optional[ @@ -2689,18 +2709,20 @@ class HttpRequester(BaseModelWithDeprecations): description="For APIs that require explicit specification of the properties to query for, this component will take a static or dynamic set of properties (which can be optionally split into chunks) and allow them to be injected into an outbound request by accessing stream_partition.extra_fields.", title="Query Properties", ) - request_parameters: Optional[Union[Dict[str, Union[str, QueryProperties]], str]] = Field( - None, - description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.", - examples=[ - {"unit": "day"}, - { - "query": 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"' - }, - {"searchIn": "{{ ','.join(config.get('search_in', [])) }}"}, - {"sort_by[asc]": "updated_at"}, - ], - title="Query Parameters", + request_parameters: Optional[Union[Dict[str, Union[str, QueryProperties]], str]] = ( + Field( + None, + description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.", + examples=[ + {"unit": "day"}, + { + "query": 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"' + }, + {"searchIn": "{{ ','.join(config.get('search_in', [])) }}"}, + {"sort_by[asc]": "updated_at"}, + ], + title="Query Parameters", + ) ) request_headers: Optional[Union[Dict[str, str], str]] = Field( None, @@ -2872,7 +2894,9 @@ class QueryProperties(BaseModel): class StateDelegatingStream(BaseModel): type: Literal["StateDelegatingStream"] - name: str = Field(..., description="The stream name.", example=["Users"], title="Name") + name: str = Field( + ..., description="The stream name.", example=["Users"], title="Name" + ) full_refresh_stream: DeclarativeStream = Field( ..., description="Component used to coordinate how records are extracted across stream slices and request pages when the state is empty or not provided.", @@ -2959,13 +2983,17 @@ class AsyncRetriever(BaseModel): status_extractor: Union[DpathExtractor, CustomRecordExtractor] = Field( ..., description="Responsible for fetching the actual status of the async job." ) - download_target_extractor: Optional[Union[DpathExtractor, CustomRecordExtractor]] = Field( + download_target_extractor: Optional[ + Union[DpathExtractor, CustomRecordExtractor] + ] = Field( None, description="Responsible for fetching the final result `urls` provided by the completed / finished / ready async job.", ) download_extractor: Optional[ Union[DpathExtractor, CustomRecordExtractor, ResponseToFileExtractor] - ] = Field(None, description="Responsible for fetching the records from provided urls.") + ] = Field( + None, description="Responsible for fetching the records from provided urls." + ) creation_requester: Union[HttpRequester, CustomRequester] = Field( ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.", diff --git a/airbyte_cdk/sources/streams/http/error_handlers/response_models.py b/airbyte_cdk/sources/streams/http/error_handlers/response_models.py index 7199d1982..082d580d5 100644 --- a/airbyte_cdk/sources/streams/http/error_handlers/response_models.py +++ b/airbyte_cdk/sources/streams/http/error_handlers/response_models.py @@ -18,6 +18,7 @@ class ResponseAction(Enum): IGNORE = "IGNORE" RESET_PAGINATION = "RESET_PAGINATION" RATE_LIMITED = "RATE_LIMITED" + REFRESH_TOKEN_THEN_RETRY = "REFRESH_TOKEN_THEN_RETRY" @dataclass diff --git a/airbyte_cdk/sources/streams/http/http_client.py b/airbyte_cdk/sources/streams/http/http_client.py index e9fc5add2..ea1656419 100644 --- a/airbyte_cdk/sources/streams/http/http_client.py +++ b/airbyte_cdk/sources/streams/http/http_client.py @@ -102,7 +102,7 @@ def __str__(self) -> str: class HttpClient: _DEFAULT_MAX_RETRY: int = 5 _DEFAULT_MAX_TIME: int = 60 * 10 - _ACTIONS_TO_RETRY_ON = {ResponseAction.RETRY, ResponseAction.RATE_LIMITED} + _ACTIONS_TO_RETRY_ON = {ResponseAction.RETRY, ResponseAction.RATE_LIMITED, ResponseAction.REFRESH_TOKEN_THEN_RETRY} def __init__( self, @@ -452,6 +452,31 @@ def _handle_error_resolution( # backoff retry loop. Adding `\n` to the message and ignore 'end' ensure that few messages are printed at the same time. print(f"{message}\n", end="", flush=True) + # Handle REFRESH_TOKEN_THEN_RETRY: Force refresh the OAuth token before retry + # This is useful when the API returns 401 but the stored token expiry hasn't been reached yet + # Only OAuth authenticators have refresh_access_token method + # Non-OAuth auth types (e.g., BearerAuthenticator) will fall through to normal retry + if error_resolution.response_action == ResponseAction.REFRESH_TOKEN_THEN_RETRY: + if ( + hasattr(self._session, "auth") + and self._session.auth is not None + and hasattr(self._session.auth, "refresh_access_token") + ): + try: + token, expires_in = self._session.auth.refresh_access_token() # type: ignore[union-attr] + self._session.auth.access_token = token # type: ignore[union-attr] + self._session.auth.set_token_expiry_date(expires_in) # type: ignore[union-attr] + self._logger.info("Refreshed OAuth token due to REFRESH_TOKEN_THEN_RETRY response action") + except Exception as refresh_error: + self._logger.warning( + f"Failed to refresh OAuth token: {refresh_error}. Proceeding with retry using existing token." + ) + else: + self._logger.debug( + "REFRESH_TOKEN_THEN_RETRY action received but authenticator does not support token refresh. " + "Proceeding with normal retry." + ) + if error_resolution.response_action == ResponseAction.FAIL: if response is not None: filtered_response_message = filter_secrets( @@ -481,9 +506,10 @@ def _handle_error_resolution( self._logger.info(error_resolution.error_message or log_message) # TODO: Consider dynamic retry count depending on subsequent error codes - elif ( - error_resolution.response_action == ResponseAction.RETRY - or error_resolution.response_action == ResponseAction.RATE_LIMITED + elif error_resolution.response_action in ( + ResponseAction.RETRY, + ResponseAction.RATE_LIMITED, + ResponseAction.REFRESH_TOKEN_THEN_RETRY, ): user_defined_backoff_time = None for backoff_strategy in self._backoff_strategies: diff --git a/unit_tests/sources/streams/http/test_http_client.py b/unit_tests/sources/streams/http/test_http_client.py index 3840c70e3..2903dd39e 100644 --- a/unit_tests/sources/streams/http/test_http_client.py +++ b/unit_tests/sources/streams/http/test_http_client.py @@ -837,3 +837,188 @@ def backoff_time(self, response_or_exception, attempt_count): with pytest.raises(AirbyteTracedException) as e: http_client.send_request(http_method="get", url="https://airbyte.io/", request_kwargs={}) assert e.value.failure_type == expected_failure_type + + +class MockOAuthAuthenticator: + def __init__(self): + self.access_token = "old_token" + self._token_expiry_date = None + self.refresh_called = False + + def refresh_access_token(self): + self.refresh_called = True + return ("new_refreshed_token", "2099-01-01T00:00:00Z") + + def set_token_expiry_date(self, value): + self._token_expiry_date = value + + def __call__(self, request): + request.headers["Authorization"] = f"Bearer {self.access_token}" + return request + + +def test_refresh_token_then_retry_action_refreshes_oauth_token(mocker): + mock_authenticator = MockOAuthAuthenticator() + mocked_session = MagicMock(spec=requests.Session) + mocked_session.auth = mock_authenticator + + http_client = HttpClient( + name="test", + logger=MagicMock(), + error_handler=HttpStatusErrorHandler( + logger=MagicMock(), + error_mapping={ + 401: ErrorResolution( + ResponseAction.REFRESH_TOKEN_THEN_RETRY, + FailureType.transient_error, + "Token expired, refreshing", + ) + }, + ), + session=mocked_session, + ) + + prepared_request = requests.PreparedRequest() + mocked_response = MagicMock(spec=requests.Response) + mocked_response.status_code = 401 + mocked_response.headers = {} + mocked_response.ok = False + mocked_session.send.return_value = mocked_response + + with pytest.raises(DefaultBackoffException): + http_client._send(prepared_request, {}) + + assert mock_authenticator.refresh_called + assert mock_authenticator.access_token == "new_refreshed_token" + assert mock_authenticator._token_expiry_date == "2099-01-01T00:00:00Z" + + +def test_refresh_token_then_retry_action_without_oauth_authenticator_proceeds_with_retry(mocker): + mocked_session = MagicMock(spec=requests.Session) + mocked_session.auth = None + + mocked_logger = MagicMock() + http_client = HttpClient( + name="test", + logger=mocked_logger, + error_handler=HttpStatusErrorHandler( + logger=MagicMock(), + error_mapping={ + 401: ErrorResolution( + ResponseAction.REFRESH_TOKEN_THEN_RETRY, + FailureType.transient_error, + "Token expired, refreshing", + ) + }, + ), + session=mocked_session, + ) + + prepared_request = requests.PreparedRequest() + mocked_response = MagicMock(spec=requests.Response) + mocked_response.status_code = 401 + mocked_response.headers = {} + mocked_response.ok = False + mocked_session.send.return_value = mocked_response + + with pytest.raises(DefaultBackoffException): + http_client._send(prepared_request, {}) + + mocked_logger.debug.assert_called() + + +def test_refresh_token_then_retry_action_handles_refresh_failure_gracefully(mocker): + class FailingOAuthAuthenticator: + def __init__(self): + self.access_token = "old_token" + + def refresh_access_token(self): + raise Exception("Token refresh failed") + + def __call__(self, request): + return request + + mock_authenticator = FailingOAuthAuthenticator() + mocked_session = MagicMock(spec=requests.Session) + mocked_session.auth = mock_authenticator + + mocked_logger = MagicMock() + http_client = HttpClient( + name="test", + logger=mocked_logger, + error_handler=HttpStatusErrorHandler( + logger=MagicMock(), + error_mapping={ + 401: ErrorResolution( + ResponseAction.REFRESH_TOKEN_THEN_RETRY, + FailureType.transient_error, + "Token expired, refreshing", + ) + }, + ), + session=mocked_session, + ) + + prepared_request = requests.PreparedRequest() + mocked_response = MagicMock(spec=requests.Response) + mocked_response.status_code = 401 + mocked_response.headers = {} + mocked_response.ok = False + mocked_session.send.return_value = mocked_response + + with pytest.raises(DefaultBackoffException): + http_client._send(prepared_request, {}) + + mocked_logger.warning.assert_called() + + +@pytest.mark.usefixtures("mock_sleep") +def test_refresh_token_then_retry_action_retries_and_succeeds_after_token_refresh(): + mock_authenticator = MockOAuthAuthenticator() + mocked_session = MagicMock(spec=requests.Session) + mocked_session.auth = mock_authenticator + + valid_response = MagicMock(spec=requests.Response) + valid_response.status_code = 200 + valid_response.ok = True + valid_response.headers = {} + + call_count = 0 + + def update_response(*args, **kwargs): + nonlocal call_count + call_count += 1 + if call_count == 1: + retry_response = MagicMock(spec=requests.Response) + retry_response.ok = False + retry_response.status_code = 401 + retry_response.headers = {} + return retry_response + else: + return valid_response + + mocked_session.send.side_effect = update_response + + http_client = HttpClient( + name="test", + logger=MagicMock(), + error_handler=HttpStatusErrorHandler( + logger=MagicMock(), + error_mapping={ + 401: ErrorResolution( + ResponseAction.REFRESH_TOKEN_THEN_RETRY, + FailureType.transient_error, + "Token expired, refreshing", + ) + }, + ), + session=mocked_session, + ) + + prepared_request = requests.PreparedRequest() + returned_response = http_client._send_with_retry(prepared_request, request_kwargs={}) + + assert mock_authenticator.refresh_called + assert mock_authenticator.access_token == "new_refreshed_token" + assert returned_response == valid_response + assert call_count == 2 From e1da9a4776ddd5e624eb9e241f369dc4ce4a4ef5 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 29 Jan 2026 14:54:17 +0000 Subject: [PATCH 2/9] style: fix ruff formatting Co-Authored-By: Daryna Ishchenko --- .../models/declarative_component_schema.py | 144 +++++++----------- .../sources/streams/http/http_client.py | 10 +- 2 files changed, 66 insertions(+), 88 deletions(-) diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index a1c5c12b7..dbe83967d 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -928,28 +928,24 @@ class OAuthConfigSpecification(BaseModel): class Config: extra = Extra.allow - oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = ( - Field( - None, - description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", - examples=[ - {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, - { - "app_id": { - "type": "string", - "path_in_connector_config": ["info", "app_id"], - } - }, - ], - title="OAuth user input", - ) + oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field( + None, + description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", + examples=[ + {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, + { + "app_id": { + "type": "string", + "path_in_connector_config": ["info", "app_id"], + } + }, + ], + title="OAuth user input", ) - oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = ( - Field( - None, - description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', - title="DeclarativeOAuth Connector Specification", - ) + oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field( + None, + description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', + title="DeclarativeOAuth Connector Specification", ) complete_oauth_output_specification: Optional[Dict[str, Any]] = Field( None, @@ -967,9 +963,7 @@ class Config: complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field( None, description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }", - examples=[ - {"client_id": {"type": "string"}, "client_secret": {"type": "string"}} - ], + examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}], title="OAuth input specification", ) complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field( @@ -1473,9 +1467,7 @@ class CustomConfigTransformation(BaseModel): class_name: str = Field( ..., description="Fully-qualified name of the class that will be implementing the custom config transformation. The format is `source_..`.", - examples=[ - "source_declarative_manifest.components.MyCustomConfigTransformation" - ], + examples=["source_declarative_manifest.components.MyCustomConfigTransformation"], ) parameters: Optional[Dict[str, Any]] = Field( None, @@ -1903,9 +1895,7 @@ class OAuthAuthenticator(BaseModel): scopes: Optional[List[str]] = Field( None, description="List of scopes that should be granted to the access token.", - examples=[ - ["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"] - ], + examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]], title="Scopes", ) token_expiry_date: Optional[str] = Field( @@ -2132,9 +2122,7 @@ class RecordSelector(BaseModel): description="Responsible for filtering records to be emitted by the Source.", title="Record Filter", ) - schema_normalization: Optional[ - Union[SchemaNormalization, CustomSchemaNormalization] - ] = Field( + schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field( None, description="Responsible for normalization according to the schema.", title="Schema Normalization", @@ -2176,12 +2164,10 @@ class DpathValidator(BaseModel): ], title="Field Path", ) - validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = ( - Field( - ..., - description="The condition that the specified config value will be evaluated against", - title="Validation Strategy", - ) + validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = Field( + ..., + description="The condition that the specified config value will be evaluated against", + title="Validation Strategy", ) @@ -2198,12 +2184,10 @@ class PredicateValidator(BaseModel): ], title="Value", ) - validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = ( - Field( - ..., - description="The validation strategy to apply to the value.", - title="Validation Strategy", - ) + validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = Field( + ..., + description="The validation strategy to apply to the value.", + title="Validation Strategy", ) @@ -2228,12 +2212,12 @@ class ConfigAddFields(BaseModel): class CompositeErrorHandler(BaseModel): type: Literal["CompositeErrorHandler"] - error_handlers: List[ - Union[CompositeErrorHandler, DefaultErrorHandler, CustomErrorHandler] - ] = Field( - ..., - description="List of error handlers to iterate on to determine how to handle a failed response.", - title="Error Handlers", + error_handlers: List[Union[CompositeErrorHandler, DefaultErrorHandler, CustomErrorHandler]] = ( + Field( + ..., + description="List of error handlers to iterate on to determine how to handle a failed response.", + title="Error Handlers", + ) ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -2395,9 +2379,9 @@ class Config: type: Literal["DeclarativeSource"] check: Union[CheckStream, CheckDynamicStream] - streams: Optional[ - List[Union[ConditionalStreams, DeclarativeStream, StateDelegatingStream]] - ] = None + streams: Optional[List[Union[ConditionalStreams, DeclarativeStream, StateDelegatingStream]]] = ( + None + ) dynamic_streams: List[DynamicDeclarativeStream] version: str = Field( ..., @@ -2522,20 +2506,16 @@ class Config: extra = Extra.allow type: Literal["DeclarativeStream"] - name: Optional[str] = Field( - "", description="The stream name.", example=["Users"], title="Name" - ) + name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") retriever: Union[SimpleRetriever, AsyncRetriever, CustomRetriever] = Field( ..., description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) - incremental_sync: Optional[Union[DatetimeBasedCursor, IncrementingCountCursor]] = ( - Field( - None, - description="Component used to fetch data incrementally based on a time field in the data.", - title="Incremental Sync", - ) + incremental_sync: Optional[Union[DatetimeBasedCursor, IncrementingCountCursor]] = Field( + None, + description="Component used to fetch data incrementally based on a time field in the data.", + title="Incremental Sync", ) primary_key: Optional[PrimaryKey] = Field("", title="Primary Key") schema_loader: Optional[ @@ -2709,20 +2689,18 @@ class HttpRequester(BaseModelWithDeprecations): description="For APIs that require explicit specification of the properties to query for, this component will take a static or dynamic set of properties (which can be optionally split into chunks) and allow them to be injected into an outbound request by accessing stream_partition.extra_fields.", title="Query Properties", ) - request_parameters: Optional[Union[Dict[str, Union[str, QueryProperties]], str]] = ( - Field( - None, - description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.", - examples=[ - {"unit": "day"}, - { - "query": 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"' - }, - {"searchIn": "{{ ','.join(config.get('search_in', [])) }}"}, - {"sort_by[asc]": "updated_at"}, - ], - title="Query Parameters", - ) + request_parameters: Optional[Union[Dict[str, Union[str, QueryProperties]], str]] = Field( + None, + description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.", + examples=[ + {"unit": "day"}, + { + "query": 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"' + }, + {"searchIn": "{{ ','.join(config.get('search_in', [])) }}"}, + {"sort_by[asc]": "updated_at"}, + ], + title="Query Parameters", ) request_headers: Optional[Union[Dict[str, str], str]] = Field( None, @@ -2894,9 +2872,7 @@ class QueryProperties(BaseModel): class StateDelegatingStream(BaseModel): type: Literal["StateDelegatingStream"] - name: str = Field( - ..., description="The stream name.", example=["Users"], title="Name" - ) + name: str = Field(..., description="The stream name.", example=["Users"], title="Name") full_refresh_stream: DeclarativeStream = Field( ..., description="Component used to coordinate how records are extracted across stream slices and request pages when the state is empty or not provided.", @@ -2983,17 +2959,13 @@ class AsyncRetriever(BaseModel): status_extractor: Union[DpathExtractor, CustomRecordExtractor] = Field( ..., description="Responsible for fetching the actual status of the async job." ) - download_target_extractor: Optional[ - Union[DpathExtractor, CustomRecordExtractor] - ] = Field( + download_target_extractor: Optional[Union[DpathExtractor, CustomRecordExtractor]] = Field( None, description="Responsible for fetching the final result `urls` provided by the completed / finished / ready async job.", ) download_extractor: Optional[ Union[DpathExtractor, CustomRecordExtractor, ResponseToFileExtractor] - ] = Field( - None, description="Responsible for fetching the records from provided urls." - ) + ] = Field(None, description="Responsible for fetching the records from provided urls.") creation_requester: Union[HttpRequester, CustomRequester] = Field( ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.", diff --git a/airbyte_cdk/sources/streams/http/http_client.py b/airbyte_cdk/sources/streams/http/http_client.py index ea1656419..30b9ab6ab 100644 --- a/airbyte_cdk/sources/streams/http/http_client.py +++ b/airbyte_cdk/sources/streams/http/http_client.py @@ -102,7 +102,11 @@ def __str__(self) -> str: class HttpClient: _DEFAULT_MAX_RETRY: int = 5 _DEFAULT_MAX_TIME: int = 60 * 10 - _ACTIONS_TO_RETRY_ON = {ResponseAction.RETRY, ResponseAction.RATE_LIMITED, ResponseAction.REFRESH_TOKEN_THEN_RETRY} + _ACTIONS_TO_RETRY_ON = { + ResponseAction.RETRY, + ResponseAction.RATE_LIMITED, + ResponseAction.REFRESH_TOKEN_THEN_RETRY, + } def __init__( self, @@ -466,7 +470,9 @@ def _handle_error_resolution( token, expires_in = self._session.auth.refresh_access_token() # type: ignore[union-attr] self._session.auth.access_token = token # type: ignore[union-attr] self._session.auth.set_token_expiry_date(expires_in) # type: ignore[union-attr] - self._logger.info("Refreshed OAuth token due to REFRESH_TOKEN_THEN_RETRY response action") + self._logger.info( + "Refreshed OAuth token due to REFRESH_TOKEN_THEN_RETRY response action" + ) except Exception as refresh_error: self._logger.warning( f"Failed to refresh OAuth token: {refresh_error}. Proceeding with retry using existing token." From b7218ad2c1e73c22c929b11869ea172398d62613 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 29 Jan 2026 16:35:02 +0000 Subject: [PATCH 3/9] fix: change log level from debug to warning for non-OAuth authenticator message Co-Authored-By: Daryna Ishchenko --- airbyte_cdk/sources/streams/http/http_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/streams/http/http_client.py b/airbyte_cdk/sources/streams/http/http_client.py index 30b9ab6ab..39dc3c746 100644 --- a/airbyte_cdk/sources/streams/http/http_client.py +++ b/airbyte_cdk/sources/streams/http/http_client.py @@ -478,7 +478,7 @@ def _handle_error_resolution( f"Failed to refresh OAuth token: {refresh_error}. Proceeding with retry using existing token." ) else: - self._logger.debug( + self._logger.warning( "REFRESH_TOKEN_THEN_RETRY action received but authenticator does not support token refresh. " "Proceeding with normal retry." ) From b49ae8d94858d738bab48a22db5f779e1fabe3a4 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 29 Jan 2026 16:36:20 +0000 Subject: [PATCH 4/9] test: update test to check for warning log level instead of debug Co-Authored-By: Daryna Ishchenko --- unit_tests/sources/streams/http/test_http_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/unit_tests/sources/streams/http/test_http_client.py b/unit_tests/sources/streams/http/test_http_client.py index 2903dd39e..5e5312350 100644 --- a/unit_tests/sources/streams/http/test_http_client.py +++ b/unit_tests/sources/streams/http/test_http_client.py @@ -924,7 +924,7 @@ def test_refresh_token_then_retry_action_without_oauth_authenticator_proceeds_wi with pytest.raises(DefaultBackoffException): http_client._send(prepared_request, {}) - mocked_logger.debug.assert_called() + mocked_logger.warning.assert_called() def test_refresh_token_then_retry_action_handles_refresh_failure_gracefully(mocker): From 7e4481dbd63dbe8fb8269b584597229f514f0fd5 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 29 Jan 2026 16:45:36 +0000 Subject: [PATCH 5/9] fix: use extended unpacking to handle both 2 and 3 value returns from refresh_access_token Co-Authored-By: Daryna Ishchenko --- airbyte_cdk/sources/streams/http/http_client.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/streams/http/http_client.py b/airbyte_cdk/sources/streams/http/http_client.py index 39dc3c746..0d6b658e4 100644 --- a/airbyte_cdk/sources/streams/http/http_client.py +++ b/airbyte_cdk/sources/streams/http/http_client.py @@ -467,7 +467,9 @@ def _handle_error_resolution( and hasattr(self._session.auth, "refresh_access_token") ): try: - token, expires_in = self._session.auth.refresh_access_token() # type: ignore[union-attr] + # Use extended unpacking to handle both 2-tuple (AbstractOauth2Authenticator) + # and 3-tuple (Oauth2Authenticator which also returns refresh_token) returns + token, expires_in, *_ = self._session.auth.refresh_access_token() # type: ignore[union-attr] self._session.auth.access_token = token # type: ignore[union-attr] self._session.auth.set_token_expiry_date(expires_in) # type: ignore[union-attr] self._logger.info( From b947c40a762f6a58eaaa771f919a5e2c0682422d Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 29 Jan 2026 16:52:27 +0000 Subject: [PATCH 6/9] feat: add special handling for SingleUseRefreshTokenOauth2Authenticator to persist new refresh token Co-Authored-By: Daryna Ishchenko --- .../sources/streams/http/http_client.py | 22 +++++++-- .../sources/streams/http/test_http_client.py | 47 +++++++++++++++++++ 2 files changed, 64 insertions(+), 5 deletions(-) diff --git a/airbyte_cdk/sources/streams/http/http_client.py b/airbyte_cdk/sources/streams/http/http_client.py index 0d6b658e4..73304a1f9 100644 --- a/airbyte_cdk/sources/streams/http/http_client.py +++ b/airbyte_cdk/sources/streams/http/http_client.py @@ -50,6 +50,9 @@ rate_limit_default_backoff_handler, user_defined_backoff_handler, ) +from airbyte_cdk.sources.streams.http.requests_native_auth import ( + SingleUseRefreshTokenOauth2Authenticator, +) from airbyte_cdk.sources.utils.types import JsonType from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets from airbyte_cdk.utils.constants import ENV_REQUEST_CACHE_PATH @@ -467,11 +470,20 @@ def _handle_error_resolution( and hasattr(self._session.auth, "refresh_access_token") ): try: - # Use extended unpacking to handle both 2-tuple (AbstractOauth2Authenticator) - # and 3-tuple (Oauth2Authenticator which also returns refresh_token) returns - token, expires_in, *_ = self._session.auth.refresh_access_token() # type: ignore[union-attr] - self._session.auth.access_token = token # type: ignore[union-attr] - self._session.auth.set_token_expiry_date(expires_in) # type: ignore[union-attr] + if isinstance(self._session.auth, SingleUseRefreshTokenOauth2Authenticator): + # For single-use refresh tokens, we must persist the new refresh token + # and emit a control message to update the connector config + token, expires_in, new_refresh_token = self._session.auth.refresh_access_token() + self._session.auth.access_token = token + self._session.auth.set_refresh_token(new_refresh_token) + self._session.auth.set_token_expiry_date(expires_in) + self._session.auth._emit_control_message() + else: + # Use extended unpacking to handle both 2-tuple (AbstractOauth2Authenticator) + # and 3-tuple (Oauth2Authenticator which also returns refresh_token) returns + token, expires_in, *_ = self._session.auth.refresh_access_token() # type: ignore[union-attr] + self._session.auth.access_token = token # type: ignore[union-attr] + self._session.auth.set_token_expiry_date(expires_in) # type: ignore[union-attr] self._logger.info( "Refreshed OAuth token due to REFRESH_TOKEN_THEN_RETRY response action" ) diff --git a/unit_tests/sources/streams/http/test_http_client.py b/unit_tests/sources/streams/http/test_http_client.py index 5e5312350..df6e28c08 100644 --- a/unit_tests/sources/streams/http/test_http_client.py +++ b/unit_tests/sources/streams/http/test_http_client.py @@ -972,6 +972,53 @@ def __call__(self, request): mocked_logger.warning.assert_called() +def test_refresh_token_then_retry_action_with_single_use_refresh_token_authenticator(mocker): + from airbyte_cdk.sources.streams.http.requests_native_auth import ( + SingleUseRefreshTokenOauth2Authenticator, + ) + + mock_authenticator = MagicMock(spec=SingleUseRefreshTokenOauth2Authenticator) + mock_authenticator.refresh_access_token.return_value = ( + "new_access_token", + "2099-01-01T00:00:00Z", + "new_refresh_token", + ) + + mocked_session = MagicMock(spec=requests.Session) + mocked_session.auth = mock_authenticator + + http_client = HttpClient( + name="test", + logger=MagicMock(), + error_handler=HttpStatusErrorHandler( + logger=MagicMock(), + error_mapping={ + 401: ErrorResolution( + ResponseAction.REFRESH_TOKEN_THEN_RETRY, + FailureType.transient_error, + "Token expired, refreshing", + ) + }, + ), + session=mocked_session, + ) + + prepared_request = requests.PreparedRequest() + mocked_response = MagicMock(spec=requests.Response) + mocked_response.status_code = 401 + mocked_response.headers = {} + mocked_response.ok = False + mocked_session.send.return_value = mocked_response + + with pytest.raises(DefaultBackoffException): + http_client._send(prepared_request, {}) + + mock_authenticator.refresh_access_token.assert_called_once() + mock_authenticator.set_refresh_token.assert_called_once_with("new_refresh_token") + mock_authenticator.set_token_expiry_date.assert_called_once_with("2099-01-01T00:00:00Z") + mock_authenticator._emit_control_message.assert_called_once() + + @pytest.mark.usefixtures("mock_sleep") def test_refresh_token_then_retry_action_retries_and_succeeds_after_token_refresh(): mock_authenticator = MockOAuthAuthenticator() From e6c03a3ca37c15a8bb4af44d102452449a4a4b30 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 29 Jan 2026 16:53:52 +0000 Subject: [PATCH 7/9] style: fix ruff formatting Co-Authored-By: Daryna Ishchenko --- airbyte_cdk/sources/streams/http/http_client.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/streams/http/http_client.py b/airbyte_cdk/sources/streams/http/http_client.py index 73304a1f9..d8be7172a 100644 --- a/airbyte_cdk/sources/streams/http/http_client.py +++ b/airbyte_cdk/sources/streams/http/http_client.py @@ -473,7 +473,9 @@ def _handle_error_resolution( if isinstance(self._session.auth, SingleUseRefreshTokenOauth2Authenticator): # For single-use refresh tokens, we must persist the new refresh token # and emit a control message to update the connector config - token, expires_in, new_refresh_token = self._session.auth.refresh_access_token() + token, expires_in, new_refresh_token = ( + self._session.auth.refresh_access_token() + ) self._session.auth.access_token = token self._session.auth.set_refresh_token(new_refresh_token) self._session.auth.set_token_expiry_date(expires_in) From 5699bdd92d9b97001cf70808ef2932570d46c05d Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 29 Jan 2026 16:58:35 +0000 Subject: [PATCH 8/9] refactor: move token refresh logic to refresh_and_set_access_token method in authenticator Co-Authored-By: Daryna Ishchenko --- .../sources/streams/http/http_client.py | 24 +++---------------- .../requests_native_auth/abstract_oauth.py | 15 +++++++++--- .../http/requests_native_auth/oauth.py | 24 ++++++++++++------- .../sources/streams/http/test_http_client.py | 20 ++++------------ 4 files changed, 36 insertions(+), 47 deletions(-) diff --git a/airbyte_cdk/sources/streams/http/http_client.py b/airbyte_cdk/sources/streams/http/http_client.py index d8be7172a..e7a5715ac 100644 --- a/airbyte_cdk/sources/streams/http/http_client.py +++ b/airbyte_cdk/sources/streams/http/http_client.py @@ -50,9 +50,6 @@ rate_limit_default_backoff_handler, user_defined_backoff_handler, ) -from airbyte_cdk.sources.streams.http.requests_native_auth import ( - SingleUseRefreshTokenOauth2Authenticator, -) from airbyte_cdk.sources.utils.types import JsonType from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets from airbyte_cdk.utils.constants import ENV_REQUEST_CACHE_PATH @@ -461,31 +458,16 @@ def _handle_error_resolution( # Handle REFRESH_TOKEN_THEN_RETRY: Force refresh the OAuth token before retry # This is useful when the API returns 401 but the stored token expiry hasn't been reached yet - # Only OAuth authenticators have refresh_access_token method + # Only OAuth authenticators have refresh_and_set_access_token method # Non-OAuth auth types (e.g., BearerAuthenticator) will fall through to normal retry if error_resolution.response_action == ResponseAction.REFRESH_TOKEN_THEN_RETRY: if ( hasattr(self._session, "auth") and self._session.auth is not None - and hasattr(self._session.auth, "refresh_access_token") + and hasattr(self._session.auth, "refresh_and_set_access_token") ): try: - if isinstance(self._session.auth, SingleUseRefreshTokenOauth2Authenticator): - # For single-use refresh tokens, we must persist the new refresh token - # and emit a control message to update the connector config - token, expires_in, new_refresh_token = ( - self._session.auth.refresh_access_token() - ) - self._session.auth.access_token = token - self._session.auth.set_refresh_token(new_refresh_token) - self._session.auth.set_token_expiry_date(expires_in) - self._session.auth._emit_control_message() - else: - # Use extended unpacking to handle both 2-tuple (AbstractOauth2Authenticator) - # and 3-tuple (Oauth2Authenticator which also returns refresh_token) returns - token, expires_in, *_ = self._session.auth.refresh_access_token() # type: ignore[union-attr] - self._session.auth.access_token = token # type: ignore[union-attr] - self._session.auth.set_token_expiry_date(expires_in) # type: ignore[union-attr] + self._session.auth.refresh_and_set_access_token() # type: ignore[union-attr] self._logger.info( "Refreshed OAuth token due to REFRESH_TOKEN_THEN_RETRY response action" ) diff --git a/airbyte_cdk/sources/streams/http/requests_native_auth/abstract_oauth.py b/airbyte_cdk/sources/streams/http/requests_native_auth/abstract_oauth.py index 3b4aa9844..48c5bba73 100644 --- a/airbyte_cdk/sources/streams/http/requests_native_auth/abstract_oauth.py +++ b/airbyte_cdk/sources/streams/http/requests_native_auth/abstract_oauth.py @@ -88,12 +88,21 @@ def get_auth_header(self) -> Mapping[str, Any]: def get_access_token(self) -> str: """Returns the access token""" if self.token_has_expired(): - token, expires_in = self.refresh_access_token() - self.access_token = token - self.set_token_expiry_date(expires_in) + self.refresh_and_set_access_token() return self.access_token + def refresh_and_set_access_token(self) -> None: + """Force refresh the access token and update internal state. + + This method refreshes the access token regardless of whether it has expired, + and updates the internal token and expiry date. Subclasses may override this + to handle additional state updates (e.g., persisting new refresh tokens). + """ + token, expires_in = self.refresh_access_token() + self.access_token = token + self.set_token_expiry_date(expires_in) + def token_has_expired(self) -> bool: """Returns True if the token is expired""" return ab_datetime_now() > self.get_token_expiry_date() diff --git a/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py b/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py index a2932c294..cb64eb3e3 100644 --- a/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py +++ b/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py @@ -318,20 +318,28 @@ def token_has_expired(self) -> bool: def get_access_token(self) -> str: """Retrieve new access and refresh token if the access token has expired. - The new refresh token is persisted with the set_refresh_token function + + The new refresh token is persisted with the set_refresh_token function. + Returns: str: The current access_token, updated if it was previously expired. """ if self.token_has_expired(): - new_access_token, access_token_expires_in, new_refresh_token = ( - self.refresh_access_token() - ) - self.access_token = new_access_token - self.set_refresh_token(new_refresh_token) - self.set_token_expiry_date(access_token_expires_in) - self._emit_control_message() + self.refresh_and_set_access_token() return self.access_token + def refresh_and_set_access_token(self) -> None: + """Force refresh the access token and update internal state. + + For single-use refresh tokens, this also persists the new refresh token + and emits a control message to update the connector config. + """ + new_access_token, access_token_expires_in, new_refresh_token = self.refresh_access_token() + self.access_token = new_access_token + self.set_refresh_token(new_refresh_token) + self.set_token_expiry_date(access_token_expires_in) + self._emit_control_message() + def refresh_access_token(self) -> Tuple[str, AirbyteDateTime, str]: # type: ignore[override] """ Refreshes the access token by making a handled request and extracting the necessary token information. diff --git a/unit_tests/sources/streams/http/test_http_client.py b/unit_tests/sources/streams/http/test_http_client.py index df6e28c08..ea245c2fb 100644 --- a/unit_tests/sources/streams/http/test_http_client.py +++ b/unit_tests/sources/streams/http/test_http_client.py @@ -845,12 +845,10 @@ def __init__(self): self._token_expiry_date = None self.refresh_called = False - def refresh_access_token(self): + def refresh_and_set_access_token(self): self.refresh_called = True - return ("new_refreshed_token", "2099-01-01T00:00:00Z") - - def set_token_expiry_date(self, value): - self._token_expiry_date = value + self.access_token = "new_refreshed_token" + self._token_expiry_date = "2099-01-01T00:00:00Z" def __call__(self, request): request.headers["Authorization"] = f"Bearer {self.access_token}" @@ -932,7 +930,7 @@ class FailingOAuthAuthenticator: def __init__(self): self.access_token = "old_token" - def refresh_access_token(self): + def refresh_and_set_access_token(self): raise Exception("Token refresh failed") def __call__(self, request): @@ -978,11 +976,6 @@ def test_refresh_token_then_retry_action_with_single_use_refresh_token_authentic ) mock_authenticator = MagicMock(spec=SingleUseRefreshTokenOauth2Authenticator) - mock_authenticator.refresh_access_token.return_value = ( - "new_access_token", - "2099-01-01T00:00:00Z", - "new_refresh_token", - ) mocked_session = MagicMock(spec=requests.Session) mocked_session.auth = mock_authenticator @@ -1013,10 +1006,7 @@ def test_refresh_token_then_retry_action_with_single_use_refresh_token_authentic with pytest.raises(DefaultBackoffException): http_client._send(prepared_request, {}) - mock_authenticator.refresh_access_token.assert_called_once() - mock_authenticator.set_refresh_token.assert_called_once_with("new_refresh_token") - mock_authenticator.set_token_expiry_date.assert_called_once_with("2099-01-01T00:00:00Z") - mock_authenticator._emit_control_message.assert_called_once() + mock_authenticator.refresh_and_set_access_token.assert_called_once() @pytest.mark.usefixtures("mock_sleep") From 9bd072c245bb32d02a768dd69557e05f8b6c6cfa Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 2 Feb 2026 14:54:05 +0000 Subject: [PATCH 9/9] style: restore copyright header in declarative_component_schema.py Co-Authored-By: Daryna Ishchenko --- .../sources/declarative/models/declarative_component_schema.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index dbe83967d..5d2f0521f 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1,3 +1,5 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + # generated by datamodel-codegen: # filename: declarative_component_schema.yaml