From 7de61c4b791e40008372e22e8ede4d4405285a60 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 14 Jan 2026 00:06:44 +0000 Subject: [PATCH 1/6] fix(oauth): exclude client credentials from body when Authorization header is present Some OAuth providers (like Gong) require client credentials to be sent ONLY via Basic Authentication header, not in the request body. Previously, the CDK always included client_id and client_secret in the body, causing 400 errors when the Authorization header was also present. This change automatically detects when an Authorization header is configured in refresh_request_headers and excludes client credentials from the body. Added unit tests to verify: - Credentials excluded from body when Authorization header present - Credentials included in body when no Authorization header (default behavior) Co-Authored-By: aldo.gonzalez@airbyte.io --- .../requests_native_auth/abstract_oauth.py | 23 ++++++-- .../sources/declarative/auth/test_oauth.py | 58 +++++++++++++++++++ 2 files changed, 76 insertions(+), 5 deletions(-) diff --git a/airbyte_cdk/sources/streams/http/requests_native_auth/abstract_oauth.py b/airbyte_cdk/sources/streams/http/requests_native_auth/abstract_oauth.py index ed7a45d49..15288da9b 100644 --- a/airbyte_cdk/sources/streams/http/requests_native_auth/abstract_oauth.py +++ b/airbyte_cdk/sources/streams/http/requests_native_auth/abstract_oauth.py @@ -100,17 +100,30 @@ def token_has_expired(self) -> bool: def build_refresh_request_body(self) -> Mapping[str, Any]: """ - Returns the request body to set on the refresh request + Returns the request body to set on the refresh request. - Override to define additional parameters + Override to define additional parameters. + + If refresh_request_headers contains an Authorization header (e.g., Basic auth), + client credentials are excluded from the body to avoid sending them in both places. + Some OAuth providers (like Gong) require credentials ONLY in the header and reject + requests that include them in both the header and body. """ + # Check if credentials are being sent via Authorization header + headers = self.get_refresh_request_headers() + credentials_in_header = headers and "Authorization" in headers + payload: MutableMapping[str, Any] = { self.get_grant_type_name(): self.get_grant_type(), - self.get_client_id_name(): self.get_client_id(), - self.get_client_secret_name(): self.get_client_secret(), - self.get_refresh_token_name(): self.get_refresh_token(), } + # Only include client credentials in body if not already in header + if not credentials_in_header: + payload[self.get_client_id_name()] = self.get_client_id() + payload[self.get_client_secret_name()] = self.get_client_secret() + + payload[self.get_refresh_token_name()] = self.get_refresh_token() + if self.get_scopes(): payload["scopes"] = self.get_scopes() diff --git a/unit_tests/sources/declarative/auth/test_oauth.py b/unit_tests/sources/declarative/auth/test_oauth.py index bc616e5d2..e5e15a035 100644 --- a/unit_tests/sources/declarative/auth/test_oauth.py +++ b/unit_tests/sources/declarative/auth/test_oauth.py @@ -111,6 +111,64 @@ def test_refresh_request_headers(self): headers = oauth.build_refresh_request_headers() assert headers is None + def test_refresh_request_body_excludes_credentials_when_authorization_header_present(self): + """ + When refresh_request_headers contains an Authorization header (e.g., Basic auth), + client_id and client_secret should be excluded from the request body. + + This is required by OAuth providers like Gong that expect credentials ONLY in the + Authorization header and reject requests that include them in both places. + """ + oauth = DeclarativeOauth2Authenticator( + token_refresh_endpoint="{{ config['refresh_endpoint'] }}", + client_id="{{ config['client_id'] }}", + client_secret="{{ config['client_secret'] }}", + refresh_token="{{ parameters['refresh_token'] }}", + config=config, + token_expiry_date="{{ config['token_expiry_date'] }}", + refresh_request_headers={ + "Authorization": "Basic {{ [config['client_id'], config['client_secret']] | join(':') | base64encode }}", + "Content-Type": "application/x-www-form-urlencoded", + }, + parameters=parameters, + grant_type="{{ config['grant_type'] }}", + ) + body = oauth.build_refresh_request_body() + expected = { + "grant_type": "some_grant_type", + "refresh_token": "some_refresh_token", + } + assert body == expected + assert "client_id" not in body + assert "client_secret" not in body + + def test_refresh_request_body_includes_credentials_when_no_authorization_header(self): + """ + When refresh_request_headers does NOT contain an Authorization header, + client_id and client_secret should be included in the request body (default behavior). + """ + oauth = DeclarativeOauth2Authenticator( + token_refresh_endpoint="{{ config['refresh_endpoint'] }}", + client_id="{{ config['client_id'] }}", + client_secret="{{ config['client_secret'] }}", + refresh_token="{{ parameters['refresh_token'] }}", + config=config, + token_expiry_date="{{ config['token_expiry_date'] }}", + refresh_request_headers={ + "Content-Type": "application/x-www-form-urlencoded", + }, + parameters=parameters, + grant_type="{{ config['grant_type'] }}", + ) + body = oauth.build_refresh_request_body() + expected = { + "grant_type": "some_grant_type", + "client_id": "some_client_id", + "client_secret": "some_client_secret", + "refresh_token": "some_refresh_token", + } + assert body == expected + def test_refresh_with_encode_config_params(self): oauth = DeclarativeOauth2Authenticator( token_refresh_endpoint="{{ config['refresh_endpoint'] }}", From 65927ec84b4fb81cad01a5b8e7ff6e50ed09cf99 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 14 Jan 2026 01:36:03 +0000 Subject: [PATCH 2/6] feat(oauth): add use_client_credentials_in_refresh option for Gong-style OAuth This adds a new configuration option to the OAuthAuthenticator that allows excluding client credentials (client_id and client_secret) from the refresh token request body entirely. Some OAuth providers like Gong have unique requirements where the token refresh endpoint only needs the refresh_token parameter and does NOT require client credentials at all. Changes: - Added use_client_credentials_in_refresh field to declarative schema - Updated abstract_oauth.py to check this option in build_refresh_request_body() - Added use_client_credentials_in_refresh() method to DeclarativeOauth2Authenticator - Updated ModelToComponentFactory to pass the option from manifest - Added unit tests for the new functionality Co-Authored-By: aldo.gonzalez@airbyte.io --- airbyte_cdk/sources/declarative/auth/oauth.py | 4 + .../declarative_component_schema.yaml | 5 + .../models/declarative_component_schema.py | 153 +++++++++++------- .../parsers/model_to_component_factory.py | 3 + .../requests_native_auth/abstract_oauth.py | 25 ++- .../sources/declarative/auth/test_oauth.py | 57 +++++++ 6 files changed, 180 insertions(+), 67 deletions(-) diff --git a/airbyte_cdk/sources/declarative/auth/oauth.py b/airbyte_cdk/sources/declarative/auth/oauth.py index e1ad84e09..315aba6bd 100644 --- a/airbyte_cdk/sources/declarative/auth/oauth.py +++ b/airbyte_cdk/sources/declarative/auth/oauth.py @@ -78,6 +78,7 @@ class DeclarativeOauth2Authenticator(AbstractOauth2Authenticator, DeclarativeAut refresh_token_error_status_codes: Tuple[int, ...] = () refresh_token_error_key: str = "" refresh_token_error_values: Tuple[str, ...] = () + _use_client_credentials_in_refresh: bool = True def __post_init__(self, parameters: Mapping[str, Any]) -> None: super().__init__( @@ -247,6 +248,9 @@ def get_refresh_request_body(self) -> Mapping[str, Any]: def get_refresh_request_headers(self) -> Mapping[str, Any]: return self._refresh_request_headers.eval(self.config) + def use_client_credentials_in_refresh(self) -> bool: + return self._use_client_credentials_in_refresh + def get_token_expiry_date(self) -> AirbyteDateTime: if not self._has_access_token_been_initialized(): return AirbyteDateTime.from_datetime(datetime.min) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index e04a82c0d..9786e314f 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1531,6 +1531,11 @@ definitions: description: Enable using profile assertion as a flow for OAuth authorization. type: boolean default: false + use_client_credentials_in_refresh: + title: Use Client Credentials In Refresh + description: When enabled (default), client_id and client_secret are included in the refresh token request body. Set to false for OAuth implementations like Gong that require only the refresh_token in the request body without client credentials. + type: boolean + default: true $parameters: type: object additionalProperties: true diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index b78a07021..20b6338c5 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1,5 +1,3 @@ -# Copyright (c) 2025 Airbyte, Inc., all rights reserved. - # generated by datamodel-codegen: # filename: declarative_component_schema.yaml @@ -928,24 +926,28 @@ class OAuthConfigSpecification(BaseModel): class Config: extra = Extra.allow - oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field( - None, - description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", - examples=[ - {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, - { - "app_id": { - "type": "string", - "path_in_connector_config": ["info", "app_id"], - } - }, - ], - title="OAuth user input", + oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = ( + Field( + None, + description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", + examples=[ + {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, + { + "app_id": { + "type": "string", + "path_in_connector_config": ["info", "app_id"], + } + }, + ], + title="OAuth user input", + ) ) - oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field( - None, - description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', - title="DeclarativeOAuth Connector Specification", + oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = ( + Field( + None, + description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', + title="DeclarativeOAuth Connector Specification", + ) ) complete_oauth_output_specification: Optional[Dict[str, Any]] = Field( None, @@ -963,7 +965,9 @@ class Config: complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field( None, description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }", - examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}], + examples=[ + {"client_id": {"type": "string"}, "client_secret": {"type": "string"}} + ], title="OAuth input specification", ) complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field( @@ -1467,7 +1471,9 @@ class CustomConfigTransformation(BaseModel): class_name: str = Field( ..., description="Fully-qualified name of the class that will be implementing the custom config transformation. The format is `source_..`.", - examples=["source_declarative_manifest.components.MyCustomConfigTransformation"], + examples=[ + "source_declarative_manifest.components.MyCustomConfigTransformation" + ], ) parameters: Optional[Dict[str, Any]] = Field( None, @@ -1895,7 +1901,9 @@ class OAuthAuthenticator(BaseModel): scopes: Optional[List[str]] = Field( None, description="List of scopes that should be granted to the access token.", - examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]], + examples=[ + ["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"] + ], title="Scopes", ) token_expiry_date: Optional[str] = Field( @@ -1943,6 +1951,11 @@ class OAuthAuthenticator(BaseModel): description="Enable using profile assertion as a flow for OAuth authorization.", title="Use Profile Assertion", ) + use_client_credentials_in_refresh: Optional[bool] = Field( + True, + description="When enabled (default), client_id and client_secret are included in the refresh token request body. Set to false for OAuth implementations like Gong that require only the refresh_token in the request body without client credentials.", + title="Use Client Credentials In Refresh", + ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -2112,7 +2125,9 @@ class RecordSelector(BaseModel): description="Responsible for filtering records to be emitted by the Source.", title="Record Filter", ) - schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field( + schema_normalization: Optional[ + Union[SchemaNormalization, CustomSchemaNormalization] + ] = Field( None, description="Responsible for normalization according to the schema.", title="Schema Normalization", @@ -2154,10 +2169,12 @@ class DpathValidator(BaseModel): ], title="Field Path", ) - validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = Field( - ..., - description="The condition that the specified config value will be evaluated against", - title="Validation Strategy", + validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = ( + Field( + ..., + description="The condition that the specified config value will be evaluated against", + title="Validation Strategy", + ) ) @@ -2174,10 +2191,12 @@ class PredicateValidator(BaseModel): ], title="Value", ) - validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = Field( - ..., - description="The validation strategy to apply to the value.", - title="Validation Strategy", + validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = ( + Field( + ..., + description="The validation strategy to apply to the value.", + title="Validation Strategy", + ) ) @@ -2202,12 +2221,12 @@ class ConfigAddFields(BaseModel): class CompositeErrorHandler(BaseModel): type: Literal["CompositeErrorHandler"] - error_handlers: List[Union[CompositeErrorHandler, DefaultErrorHandler, CustomErrorHandler]] = ( - Field( - ..., - description="List of error handlers to iterate on to determine how to handle a failed response.", - title="Error Handlers", - ) + error_handlers: List[ + Union[CompositeErrorHandler, DefaultErrorHandler, CustomErrorHandler] + ] = Field( + ..., + description="List of error handlers to iterate on to determine how to handle a failed response.", + title="Error Handlers", ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -2369,9 +2388,9 @@ class Config: type: Literal["DeclarativeSource"] check: Union[CheckStream, CheckDynamicStream] - streams: Optional[List[Union[ConditionalStreams, DeclarativeStream, StateDelegatingStream]]] = ( - None - ) + streams: Optional[ + List[Union[ConditionalStreams, DeclarativeStream, StateDelegatingStream]] + ] = None dynamic_streams: List[DynamicDeclarativeStream] version: str = Field( ..., @@ -2496,16 +2515,20 @@ class Config: extra = Extra.allow type: Literal["DeclarativeStream"] - name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") + name: Optional[str] = Field( + "", description="The stream name.", example=["Users"], title="Name" + ) retriever: Union[SimpleRetriever, AsyncRetriever, CustomRetriever] = Field( ..., description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) - incremental_sync: Optional[Union[DatetimeBasedCursor, IncrementingCountCursor]] = Field( - None, - description="Component used to fetch data incrementally based on a time field in the data.", - title="Incremental Sync", + incremental_sync: Optional[Union[DatetimeBasedCursor, IncrementingCountCursor]] = ( + Field( + None, + description="Component used to fetch data incrementally based on a time field in the data.", + title="Incremental Sync", + ) ) primary_key: Optional[PrimaryKey] = Field("", title="Primary Key") schema_loader: Optional[ @@ -2679,18 +2702,20 @@ class HttpRequester(BaseModelWithDeprecations): description="For APIs that require explicit specification of the properties to query for, this component will take a static or dynamic set of properties (which can be optionally split into chunks) and allow them to be injected into an outbound request by accessing stream_partition.extra_fields.", title="Query Properties", ) - request_parameters: Optional[Union[Dict[str, Union[str, QueryProperties]], str]] = Field( - None, - description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.", - examples=[ - {"unit": "day"}, - { - "query": 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"' - }, - {"searchIn": "{{ ','.join(config.get('search_in', [])) }}"}, - {"sort_by[asc]": "updated_at"}, - ], - title="Query Parameters", + request_parameters: Optional[Union[Dict[str, Union[str, QueryProperties]], str]] = ( + Field( + None, + description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.", + examples=[ + {"unit": "day"}, + { + "query": 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"' + }, + {"searchIn": "{{ ','.join(config.get('search_in', [])) }}"}, + {"sort_by[asc]": "updated_at"}, + ], + title="Query Parameters", + ) ) request_headers: Optional[Union[Dict[str, str], str]] = Field( None, @@ -2741,7 +2766,7 @@ class HttpRequester(BaseModelWithDeprecations): ) use_cache: Optional[bool] = Field( False, - description="Enables stream requests caching. This field is automatically set by the CDK.", + description="Enables stream requests caching. When set to true, repeated requests to the same URL will return cached responses. Parent streams automatically have caching enabled. Only set this to false if you are certain that caching should be disabled, as it may negatively impact performance when the same data is needed multiple times (e.g., for scroll-based pagination APIs where caching causes duplicate records).", title="Use Cache", ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -2862,7 +2887,9 @@ class QueryProperties(BaseModel): class StateDelegatingStream(BaseModel): type: Literal["StateDelegatingStream"] - name: str = Field(..., description="The stream name.", example=["Users"], title="Name") + name: str = Field( + ..., description="The stream name.", example=["Users"], title="Name" + ) full_refresh_stream: DeclarativeStream = Field( ..., description="Component used to coordinate how records are extracted across stream slices and request pages when the state is empty or not provided.", @@ -2949,13 +2976,17 @@ class AsyncRetriever(BaseModel): status_extractor: Union[DpathExtractor, CustomRecordExtractor] = Field( ..., description="Responsible for fetching the actual status of the async job." ) - download_target_extractor: Optional[Union[DpathExtractor, CustomRecordExtractor]] = Field( + download_target_extractor: Optional[ + Union[DpathExtractor, CustomRecordExtractor] + ] = Field( None, description="Responsible for fetching the final result `urls` provided by the completed / finished / ready async job.", ) download_extractor: Optional[ Union[DpathExtractor, CustomRecordExtractor, ResponseToFileExtractor] - ] = Field(None, description="Responsible for fetching the records from provided urls.") + ] = Field( + None, description="Responsible for fetching the records from provided urls." + ) creation_requester: Union[HttpRequester, CustomRequester] = Field( ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.", diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index d83a05442..027c87944 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2833,6 +2833,9 @@ def create_oauth_authenticator( refresh_token_error_status_codes=refresh_token_error_status_codes, refresh_token_error_key=refresh_token_error_key, refresh_token_error_values=refresh_token_error_values, + _use_client_credentials_in_refresh=model.use_client_credentials_in_refresh + if model.use_client_credentials_in_refresh is not None + else True, ) @staticmethod diff --git a/airbyte_cdk/sources/streams/http/requests_native_auth/abstract_oauth.py b/airbyte_cdk/sources/streams/http/requests_native_auth/abstract_oauth.py index 15288da9b..966162db9 100644 --- a/airbyte_cdk/sources/streams/http/requests_native_auth/abstract_oauth.py +++ b/airbyte_cdk/sources/streams/http/requests_native_auth/abstract_oauth.py @@ -104,21 +104,26 @@ def build_refresh_request_body(self) -> Mapping[str, Any]: Override to define additional parameters. - If refresh_request_headers contains an Authorization header (e.g., Basic auth), - client credentials are excluded from the body to avoid sending them in both places. - Some OAuth providers (like Gong) require credentials ONLY in the header and reject - requests that include them in both the header and body. + Client credentials (client_id and client_secret) are excluded from the body when: + 1. refresh_request_headers contains an Authorization header (e.g., Basic auth), OR + 2. use_client_credentials_in_refresh() returns False (for APIs like Gong that don't + require client credentials in the refresh request at all) """ # Check if credentials are being sent via Authorization header headers = self.get_refresh_request_headers() credentials_in_header = headers and "Authorization" in headers + # Check if client credentials should be included in refresh request + include_client_credentials = ( + self.use_client_credentials_in_refresh() and not credentials_in_header + ) + payload: MutableMapping[str, Any] = { self.get_grant_type_name(): self.get_grant_type(), } - # Only include client credentials in body if not already in header - if not credentials_in_header: + # Only include client credentials in body if configured to do so and not in header + if include_client_credentials: payload[self.get_client_id_name()] = self.get_client_id() payload[self.get_client_secret_name()] = self.get_client_secret() @@ -513,6 +518,14 @@ def get_grant_type(self) -> str: def get_grant_type_name(self) -> str: """Returns grant_type specified name for requesting access_token""" + def use_client_credentials_in_refresh(self) -> bool: + """Returns whether to include client credentials in the refresh token request body. + + Override to return False for OAuth implementations (like Gong) that don't require + client_id and client_secret in the refresh request. + """ + return True + @property @abstractmethod def access_token(self) -> str: diff --git a/unit_tests/sources/declarative/auth/test_oauth.py b/unit_tests/sources/declarative/auth/test_oauth.py index e5e15a035..2b6885c9d 100644 --- a/unit_tests/sources/declarative/auth/test_oauth.py +++ b/unit_tests/sources/declarative/auth/test_oauth.py @@ -169,6 +169,63 @@ def test_refresh_request_body_includes_credentials_when_no_authorization_header( } assert body == expected + def test_refresh_request_body_excludes_credentials_when_use_client_credentials_in_refresh_is_false( + self, + ): + """ + When use_client_credentials_in_refresh is set to False, client_id and client_secret + should be excluded from the request body entirely. + + This is required by OAuth providers like Gong that don't require client credentials + in the refresh token request at all - only the refresh_token is needed. + """ + oauth = DeclarativeOauth2Authenticator( + token_refresh_endpoint="{{ config['refresh_endpoint'] }}", + client_id="{{ config['client_id'] }}", + client_secret="{{ config['client_secret'] }}", + refresh_token="{{ parameters['refresh_token'] }}", + config=config, + token_expiry_date="{{ config['token_expiry_date'] }}", + parameters=parameters, + grant_type="{{ config['grant_type'] }}", + _use_client_credentials_in_refresh=False, + ) + body = oauth.build_refresh_request_body() + expected = { + "grant_type": "some_grant_type", + "refresh_token": "some_refresh_token", + } + assert body == expected + assert "client_id" not in body + assert "client_secret" not in body + + def test_refresh_request_body_includes_credentials_when_use_client_credentials_in_refresh_is_true( + self, + ): + """ + When use_client_credentials_in_refresh is True (default), client_id and client_secret + should be included in the request body. + """ + oauth = DeclarativeOauth2Authenticator( + token_refresh_endpoint="{{ config['refresh_endpoint'] }}", + client_id="{{ config['client_id'] }}", + client_secret="{{ config['client_secret'] }}", + refresh_token="{{ parameters['refresh_token'] }}", + config=config, + token_expiry_date="{{ config['token_expiry_date'] }}", + parameters=parameters, + grant_type="{{ config['grant_type'] }}", + _use_client_credentials_in_refresh=True, + ) + body = oauth.build_refresh_request_body() + expected = { + "grant_type": "some_grant_type", + "client_id": "some_client_id", + "client_secret": "some_client_secret", + "refresh_token": "some_refresh_token", + } + assert body == expected + def test_refresh_with_encode_config_params(self): oauth = DeclarativeOauth2Authenticator( token_refresh_endpoint="{{ config['refresh_endpoint'] }}", From 499d3eeb0e8933fdde3b7e8563976c9d3d231431 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 14 Jan 2026 01:37:38 +0000 Subject: [PATCH 3/6] style: fix ruff formatting in generated models Co-Authored-By: aldo.gonzalez@airbyte.io --- .../models/declarative_component_schema.py | 144 +++++++----------- 1 file changed, 58 insertions(+), 86 deletions(-) diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 20b6338c5..5afa8da73 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -926,28 +926,24 @@ class OAuthConfigSpecification(BaseModel): class Config: extra = Extra.allow - oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = ( - Field( - None, - description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", - examples=[ - {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, - { - "app_id": { - "type": "string", - "path_in_connector_config": ["info", "app_id"], - } - }, - ], - title="OAuth user input", - ) + oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field( + None, + description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", + examples=[ + {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, + { + "app_id": { + "type": "string", + "path_in_connector_config": ["info", "app_id"], + } + }, + ], + title="OAuth user input", ) - oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = ( - Field( - None, - description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', - title="DeclarativeOAuth Connector Specification", - ) + oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field( + None, + description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', + title="DeclarativeOAuth Connector Specification", ) complete_oauth_output_specification: Optional[Dict[str, Any]] = Field( None, @@ -965,9 +961,7 @@ class Config: complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field( None, description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }", - examples=[ - {"client_id": {"type": "string"}, "client_secret": {"type": "string"}} - ], + examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}], title="OAuth input specification", ) complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field( @@ -1471,9 +1465,7 @@ class CustomConfigTransformation(BaseModel): class_name: str = Field( ..., description="Fully-qualified name of the class that will be implementing the custom config transformation. The format is `source_..`.", - examples=[ - "source_declarative_manifest.components.MyCustomConfigTransformation" - ], + examples=["source_declarative_manifest.components.MyCustomConfigTransformation"], ) parameters: Optional[Dict[str, Any]] = Field( None, @@ -1901,9 +1893,7 @@ class OAuthAuthenticator(BaseModel): scopes: Optional[List[str]] = Field( None, description="List of scopes that should be granted to the access token.", - examples=[ - ["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"] - ], + examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]], title="Scopes", ) token_expiry_date: Optional[str] = Field( @@ -2125,9 +2115,7 @@ class RecordSelector(BaseModel): description="Responsible for filtering records to be emitted by the Source.", title="Record Filter", ) - schema_normalization: Optional[ - Union[SchemaNormalization, CustomSchemaNormalization] - ] = Field( + schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field( None, description="Responsible for normalization according to the schema.", title="Schema Normalization", @@ -2169,12 +2157,10 @@ class DpathValidator(BaseModel): ], title="Field Path", ) - validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = ( - Field( - ..., - description="The condition that the specified config value will be evaluated against", - title="Validation Strategy", - ) + validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = Field( + ..., + description="The condition that the specified config value will be evaluated against", + title="Validation Strategy", ) @@ -2191,12 +2177,10 @@ class PredicateValidator(BaseModel): ], title="Value", ) - validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = ( - Field( - ..., - description="The validation strategy to apply to the value.", - title="Validation Strategy", - ) + validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = Field( + ..., + description="The validation strategy to apply to the value.", + title="Validation Strategy", ) @@ -2221,12 +2205,12 @@ class ConfigAddFields(BaseModel): class CompositeErrorHandler(BaseModel): type: Literal["CompositeErrorHandler"] - error_handlers: List[ - Union[CompositeErrorHandler, DefaultErrorHandler, CustomErrorHandler] - ] = Field( - ..., - description="List of error handlers to iterate on to determine how to handle a failed response.", - title="Error Handlers", + error_handlers: List[Union[CompositeErrorHandler, DefaultErrorHandler, CustomErrorHandler]] = ( + Field( + ..., + description="List of error handlers to iterate on to determine how to handle a failed response.", + title="Error Handlers", + ) ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -2388,9 +2372,9 @@ class Config: type: Literal["DeclarativeSource"] check: Union[CheckStream, CheckDynamicStream] - streams: Optional[ - List[Union[ConditionalStreams, DeclarativeStream, StateDelegatingStream]] - ] = None + streams: Optional[List[Union[ConditionalStreams, DeclarativeStream, StateDelegatingStream]]] = ( + None + ) dynamic_streams: List[DynamicDeclarativeStream] version: str = Field( ..., @@ -2515,20 +2499,16 @@ class Config: extra = Extra.allow type: Literal["DeclarativeStream"] - name: Optional[str] = Field( - "", description="The stream name.", example=["Users"], title="Name" - ) + name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") retriever: Union[SimpleRetriever, AsyncRetriever, CustomRetriever] = Field( ..., description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) - incremental_sync: Optional[Union[DatetimeBasedCursor, IncrementingCountCursor]] = ( - Field( - None, - description="Component used to fetch data incrementally based on a time field in the data.", - title="Incremental Sync", - ) + incremental_sync: Optional[Union[DatetimeBasedCursor, IncrementingCountCursor]] = Field( + None, + description="Component used to fetch data incrementally based on a time field in the data.", + title="Incremental Sync", ) primary_key: Optional[PrimaryKey] = Field("", title="Primary Key") schema_loader: Optional[ @@ -2702,20 +2682,18 @@ class HttpRequester(BaseModelWithDeprecations): description="For APIs that require explicit specification of the properties to query for, this component will take a static or dynamic set of properties (which can be optionally split into chunks) and allow them to be injected into an outbound request by accessing stream_partition.extra_fields.", title="Query Properties", ) - request_parameters: Optional[Union[Dict[str, Union[str, QueryProperties]], str]] = ( - Field( - None, - description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.", - examples=[ - {"unit": "day"}, - { - "query": 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"' - }, - {"searchIn": "{{ ','.join(config.get('search_in', [])) }}"}, - {"sort_by[asc]": "updated_at"}, - ], - title="Query Parameters", - ) + request_parameters: Optional[Union[Dict[str, Union[str, QueryProperties]], str]] = Field( + None, + description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.", + examples=[ + {"unit": "day"}, + { + "query": 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"' + }, + {"searchIn": "{{ ','.join(config.get('search_in', [])) }}"}, + {"sort_by[asc]": "updated_at"}, + ], + title="Query Parameters", ) request_headers: Optional[Union[Dict[str, str], str]] = Field( None, @@ -2887,9 +2865,7 @@ class QueryProperties(BaseModel): class StateDelegatingStream(BaseModel): type: Literal["StateDelegatingStream"] - name: str = Field( - ..., description="The stream name.", example=["Users"], title="Name" - ) + name: str = Field(..., description="The stream name.", example=["Users"], title="Name") full_refresh_stream: DeclarativeStream = Field( ..., description="Component used to coordinate how records are extracted across stream slices and request pages when the state is empty or not provided.", @@ -2976,17 +2952,13 @@ class AsyncRetriever(BaseModel): status_extractor: Union[DpathExtractor, CustomRecordExtractor] = Field( ..., description="Responsible for fetching the actual status of the async job." ) - download_target_extractor: Optional[ - Union[DpathExtractor, CustomRecordExtractor] - ] = Field( + download_target_extractor: Optional[Union[DpathExtractor, CustomRecordExtractor]] = Field( None, description="Responsible for fetching the final result `urls` provided by the completed / finished / ready async job.", ) download_extractor: Optional[ Union[DpathExtractor, CustomRecordExtractor, ResponseToFileExtractor] - ] = Field( - None, description="Responsible for fetching the records from provided urls." - ) + ] = Field(None, description="Responsible for fetching the records from provided urls.") creation_requester: Union[HttpRequester, CustomRequester] = Field( ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.", From 0828d04be9008fda77fe19b47bf9b9f1612747b2 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 14 Jan 2026 16:14:52 +0000 Subject: [PATCH 4/6] refactor(oauth): remove use_client_credentials_in_refresh option The core fix (excluding credentials from body when Authorization header is present) is sufficient for Gong and similar OAuth providers. The additional use_client_credentials_in_refresh option was added during confusion about Gong's requirements and is no longer needed. This simplifies the implementation while maintaining the correct behavior: - When refresh_request_headers contains an Authorization header, client_id and client_secret are automatically excluded from the request body - This is required by OAuth providers like Gong that expect credentials ONLY in the Authorization header Co-Authored-By: aldo.gonzalez@airbyte.io --- airbyte_cdk/sources/declarative/auth/oauth.py | 4 -- .../declarative_component_schema.yaml | 5 -- .../models/declarative_component_schema.py | 5 -- .../parsers/model_to_component_factory.py | 3 - .../requests_native_auth/abstract_oauth.py | 22 ++----- .../sources/declarative/auth/test_oauth.py | 57 ------------------- 6 files changed, 6 insertions(+), 90 deletions(-) diff --git a/airbyte_cdk/sources/declarative/auth/oauth.py b/airbyte_cdk/sources/declarative/auth/oauth.py index 315aba6bd..e1ad84e09 100644 --- a/airbyte_cdk/sources/declarative/auth/oauth.py +++ b/airbyte_cdk/sources/declarative/auth/oauth.py @@ -78,7 +78,6 @@ class DeclarativeOauth2Authenticator(AbstractOauth2Authenticator, DeclarativeAut refresh_token_error_status_codes: Tuple[int, ...] = () refresh_token_error_key: str = "" refresh_token_error_values: Tuple[str, ...] = () - _use_client_credentials_in_refresh: bool = True def __post_init__(self, parameters: Mapping[str, Any]) -> None: super().__init__( @@ -248,9 +247,6 @@ def get_refresh_request_body(self) -> Mapping[str, Any]: def get_refresh_request_headers(self) -> Mapping[str, Any]: return self._refresh_request_headers.eval(self.config) - def use_client_credentials_in_refresh(self) -> bool: - return self._use_client_credentials_in_refresh - def get_token_expiry_date(self) -> AirbyteDateTime: if not self._has_access_token_been_initialized(): return AirbyteDateTime.from_datetime(datetime.min) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 9786e314f..e04a82c0d 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1531,11 +1531,6 @@ definitions: description: Enable using profile assertion as a flow for OAuth authorization. type: boolean default: false - use_client_credentials_in_refresh: - title: Use Client Credentials In Refresh - description: When enabled (default), client_id and client_secret are included in the refresh token request body. Set to false for OAuth implementations like Gong that require only the refresh_token in the request body without client credentials. - type: boolean - default: true $parameters: type: object additionalProperties: true diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 5afa8da73..73c003bac 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1941,11 +1941,6 @@ class OAuthAuthenticator(BaseModel): description="Enable using profile assertion as a flow for OAuth authorization.", title="Use Profile Assertion", ) - use_client_credentials_in_refresh: Optional[bool] = Field( - True, - description="When enabled (default), client_id and client_secret are included in the refresh token request body. Set to false for OAuth implementations like Gong that require only the refresh_token in the request body without client credentials.", - title="Use Client Credentials In Refresh", - ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 027c87944..d83a05442 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2833,9 +2833,6 @@ def create_oauth_authenticator( refresh_token_error_status_codes=refresh_token_error_status_codes, refresh_token_error_key=refresh_token_error_key, refresh_token_error_values=refresh_token_error_values, - _use_client_credentials_in_refresh=model.use_client_credentials_in_refresh - if model.use_client_credentials_in_refresh is not None - else True, ) @staticmethod diff --git a/airbyte_cdk/sources/streams/http/requests_native_auth/abstract_oauth.py b/airbyte_cdk/sources/streams/http/requests_native_auth/abstract_oauth.py index 966162db9..3b4aa9844 100644 --- a/airbyte_cdk/sources/streams/http/requests_native_auth/abstract_oauth.py +++ b/airbyte_cdk/sources/streams/http/requests_native_auth/abstract_oauth.py @@ -104,19 +104,17 @@ def build_refresh_request_body(self) -> Mapping[str, Any]: Override to define additional parameters. - Client credentials (client_id and client_secret) are excluded from the body when: - 1. refresh_request_headers contains an Authorization header (e.g., Basic auth), OR - 2. use_client_credentials_in_refresh() returns False (for APIs like Gong that don't - require client credentials in the refresh request at all) + Client credentials (client_id and client_secret) are excluded from the body when + refresh_request_headers contains an Authorization header (e.g., Basic auth). + This is required by OAuth providers like Gong that expect credentials ONLY in the + Authorization header and reject requests that include them in both places. """ # Check if credentials are being sent via Authorization header headers = self.get_refresh_request_headers() credentials_in_header = headers and "Authorization" in headers - # Check if client credentials should be included in refresh request - include_client_credentials = ( - self.use_client_credentials_in_refresh() and not credentials_in_header - ) + # Only include client credentials in body if not already in header + include_client_credentials = not credentials_in_header payload: MutableMapping[str, Any] = { self.get_grant_type_name(): self.get_grant_type(), @@ -518,14 +516,6 @@ def get_grant_type(self) -> str: def get_grant_type_name(self) -> str: """Returns grant_type specified name for requesting access_token""" - def use_client_credentials_in_refresh(self) -> bool: - """Returns whether to include client credentials in the refresh token request body. - - Override to return False for OAuth implementations (like Gong) that don't require - client_id and client_secret in the refresh request. - """ - return True - @property @abstractmethod def access_token(self) -> str: diff --git a/unit_tests/sources/declarative/auth/test_oauth.py b/unit_tests/sources/declarative/auth/test_oauth.py index 2b6885c9d..e5e15a035 100644 --- a/unit_tests/sources/declarative/auth/test_oauth.py +++ b/unit_tests/sources/declarative/auth/test_oauth.py @@ -169,63 +169,6 @@ def test_refresh_request_body_includes_credentials_when_no_authorization_header( } assert body == expected - def test_refresh_request_body_excludes_credentials_when_use_client_credentials_in_refresh_is_false( - self, - ): - """ - When use_client_credentials_in_refresh is set to False, client_id and client_secret - should be excluded from the request body entirely. - - This is required by OAuth providers like Gong that don't require client credentials - in the refresh token request at all - only the refresh_token is needed. - """ - oauth = DeclarativeOauth2Authenticator( - token_refresh_endpoint="{{ config['refresh_endpoint'] }}", - client_id="{{ config['client_id'] }}", - client_secret="{{ config['client_secret'] }}", - refresh_token="{{ parameters['refresh_token'] }}", - config=config, - token_expiry_date="{{ config['token_expiry_date'] }}", - parameters=parameters, - grant_type="{{ config['grant_type'] }}", - _use_client_credentials_in_refresh=False, - ) - body = oauth.build_refresh_request_body() - expected = { - "grant_type": "some_grant_type", - "refresh_token": "some_refresh_token", - } - assert body == expected - assert "client_id" not in body - assert "client_secret" not in body - - def test_refresh_request_body_includes_credentials_when_use_client_credentials_in_refresh_is_true( - self, - ): - """ - When use_client_credentials_in_refresh is True (default), client_id and client_secret - should be included in the request body. - """ - oauth = DeclarativeOauth2Authenticator( - token_refresh_endpoint="{{ config['refresh_endpoint'] }}", - client_id="{{ config['client_id'] }}", - client_secret="{{ config['client_secret'] }}", - refresh_token="{{ parameters['refresh_token'] }}", - config=config, - token_expiry_date="{{ config['token_expiry_date'] }}", - parameters=parameters, - grant_type="{{ config['grant_type'] }}", - _use_client_credentials_in_refresh=True, - ) - body = oauth.build_refresh_request_body() - expected = { - "grant_type": "some_grant_type", - "client_id": "some_client_id", - "client_secret": "some_client_secret", - "refresh_token": "some_refresh_token", - } - assert body == expected - def test_refresh_with_encode_config_params(self): oauth = DeclarativeOauth2Authenticator( token_refresh_endpoint="{{ config['refresh_endpoint'] }}", From 516965646dab72bb8dd15b7961c573f0db4b3cb6 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 14 Jan 2026 21:44:30 +0000 Subject: [PATCH 5/6] fix(oauth): always emit control message to stdout for token persistence Control messages for config updates (like refreshed tokens) must be printed directly to stdout so the platform can process them immediately. Previously, when using InMemoryMessageRepository, control messages were only queued but never output to stdout, causing single-use refresh tokens to not be persisted. This fix ensures emit_configuration_as_airbyte_control_message() is always called to print to stdout, while also emitting to the message repository for any additional processing. Co-Authored-By: aldo.gonzalez@airbyte.io --- .../http/requests_native_auth/oauth.py | 25 ++++++++----------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py b/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py index 0ca6f6b3a..a2932c294 100644 --- a/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py +++ b/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py @@ -383,27 +383,24 @@ def _emit_control_message(self) -> None: """ Emits a control message based on the connector configuration. - This method checks if the message repository is not a NoopMessageRepository. - If it is not, it emits a message using the message repository. Otherwise, - it falls back to emitting the configuration as an Airbyte control message - directly to the console for backward compatibility. + Control messages for config updates (like refreshed tokens) must be printed directly + to stdout so the platform can process them immediately. The message repository is + also used to queue the message for any additional processing. Note: - The function `emit_configuration_as_airbyte_control_message` has been deprecated - in favor of the package `airbyte_cdk.sources.message`. - - Raises: - TypeError: If the argument types are incorrect. + The function `emit_configuration_as_airbyte_control_message` prints directly to + stdout, which is required for the platform to detect and persist config changes. """ - # FIXME emit_configuration_as_airbyte_control_message as been deprecated in favor of package airbyte_cdk.sources.message - # Usually, a class shouldn't care about the implementation details but to keep backward compatibility where we print the - # message directly in the console, this is needed + # Always emit to stdout so the platform can process the config update immediately. + # This is critical for single-use refresh tokens where the new token must be persisted + # before subsequent operations try to use the old (now invalid) token. + emit_configuration_as_airbyte_control_message(self._connector_config) # type: ignore[arg-type] + + # Also emit to the message repository for any additional processing (e.g., logging) if not isinstance(self._message_repository, NoopMessageRepository): self._message_repository.emit_message( create_connector_config_control_message(self._connector_config) # type: ignore[arg-type] ) - else: - emit_configuration_as_airbyte_control_message(self._connector_config) # type: ignore[arg-type] @property def _message_repository(self) -> MessageRepository: From 75cc1f4db5136f4db2882f09a7713011cd8301c0 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 15 Jan 2026 00:53:56 +0000 Subject: [PATCH 6/6] chore: revert unrelated changes to declarative_component_schema.py Co-Authored-By: aldo.gonzalez@airbyte.io --- .../declarative/models/declarative_component_schema.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 73c003bac..b78a07021 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1,3 +1,5 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + # generated by datamodel-codegen: # filename: declarative_component_schema.yaml @@ -2739,7 +2741,7 @@ class HttpRequester(BaseModelWithDeprecations): ) use_cache: Optional[bool] = Field( False, - description="Enables stream requests caching. When set to true, repeated requests to the same URL will return cached responses. Parent streams automatically have caching enabled. Only set this to false if you are certain that caching should be disabled, as it may negatively impact performance when the same data is needed multiple times (e.g., for scroll-based pagination APIs where caching causes duplicate records).", + description="Enables stream requests caching. This field is automatically set by the CDK.", title="Use Cache", ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")