From 1557365d7da5e7b16140ecfde1683eb35c1944d8 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 27 Jan 2026 15:53:12 +0000 Subject: [PATCH 1/4] feat(cdk): add token_prefix support to SessionTokenAuthenticator This adds a token_prefix field to SessionTokenRequestApiKeyAuthenticator that allows prepending a custom prefix to the session token when injecting it into requests. This is useful for APIs that require a specific prefix before the token, such as Django REST Framework APIs that expect 'Authorization: Token ' format instead of the standard Bearer format. Changes: - Add token_prefix field to SessionTokenRequestApiKeyAuthenticator in schema - Create PrefixedTokenProvider wrapper class in token_provider.py - Update factory to wrap token provider with prefix when token_prefix is set - Regenerate Pydantic models from schema - Add unit tests for PrefixedTokenProvider Closes #884 Co-Authored-By: Ryan Waskewich --- .../declarative/auth/token_provider.py | 17 ++ .../declarative_component_schema.yaml | 8 + .../models/declarative_component_schema.py | 154 +++++++++++------- .../parsers/model_to_component_factory.py | 11 +- .../declarative/auth/test_token_provider.py | 20 +++ 5 files changed, 148 insertions(+), 62 deletions(-) diff --git a/airbyte_cdk/sources/declarative/auth/token_provider.py b/airbyte_cdk/sources/declarative/auth/token_provider.py index c4bae02f1..861d2e8f7 100644 --- a/airbyte_cdk/sources/declarative/auth/token_provider.py +++ b/airbyte_cdk/sources/declarative/auth/token_provider.py @@ -71,6 +71,8 @@ def _refresh(self) -> None: @dataclass class InterpolatedStringTokenProvider(TokenProvider): + """Provides a token by interpolating a string with config values.""" + config: Config api_token: Union[InterpolatedString, str] parameters: Mapping[str, Any] @@ -80,3 +82,18 @@ def __post_init__(self) -> None: def get_token(self) -> str: return str(self._token.eval(self.config)) + + +@dataclass +class PrefixedTokenProvider(TokenProvider): + """Wraps a TokenProvider and prepends a prefix to the token value. + + This is useful for APIs that require a specific prefix before the token, + such as Django REST Framework APIs that expect "Token " format. + """ + + token_provider: TokenProvider + prefix: str + + def get_token(self) -> str: + return f"{self.prefix}{self.token_provider.get_token()}" diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index e04a82c0d..a91b69f74 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -2067,6 +2067,14 @@ definitions: field_name: Authorization - inject_into: request_parameter field_name: authKey + token_prefix: + title: Token Prefix + description: 'A prefix to prepend to the session token when injecting it into requests. For example, use "Token " (with trailing space) for APIs that expect "Authorization: Token ".' + type: string + default: "" + examples: + - "Token " + - "Bearer " SessionTokenRequestBearerAuthenticator: title: Bearer Authenticator description: Authenticator for requests using the session token as a standard bearer token. diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index b78a07021..346f68e76 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1,5 +1,3 @@ -# Copyright (c) 2025 Airbyte, Inc., all rights reserved. - # generated by datamodel-codegen: # filename: declarative_component_schema.yaml @@ -928,24 +926,28 @@ class OAuthConfigSpecification(BaseModel): class Config: extra = Extra.allow - oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field( - None, - description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", - examples=[ - {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, - { - "app_id": { - "type": "string", - "path_in_connector_config": ["info", "app_id"], - } - }, - ], - title="OAuth user input", + oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = ( + Field( + None, + description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", + examples=[ + {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, + { + "app_id": { + "type": "string", + "path_in_connector_config": ["info", "app_id"], + } + }, + ], + title="OAuth user input", + ) ) - oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field( - None, - description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', - title="DeclarativeOAuth Connector Specification", + oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = ( + Field( + None, + description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', + title="DeclarativeOAuth Connector Specification", + ) ) complete_oauth_output_specification: Optional[Dict[str, Any]] = Field( None, @@ -963,7 +965,9 @@ class Config: complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field( None, description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }", - examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}], + examples=[ + {"client_id": {"type": "string"}, "client_secret": {"type": "string"}} + ], title="OAuth input specification", ) complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field( @@ -1467,7 +1471,9 @@ class CustomConfigTransformation(BaseModel): class_name: str = Field( ..., description="Fully-qualified name of the class that will be implementing the custom config transformation. The format is `source_..`.", - examples=["source_declarative_manifest.components.MyCustomConfigTransformation"], + examples=[ + "source_declarative_manifest.components.MyCustomConfigTransformation" + ], ) parameters: Optional[Dict[str, Any]] = Field( None, @@ -1895,7 +1901,9 @@ class OAuthAuthenticator(BaseModel): scopes: Optional[List[str]] = Field( None, description="List of scopes that should be granted to the access token.", - examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]], + examples=[ + ["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"] + ], title="Scopes", ) token_expiry_date: Optional[str] = Field( @@ -2057,6 +2065,12 @@ class SessionTokenRequestApiKeyAuthenticator(BaseModel): ], title="Inject API Key Into Outgoing HTTP Request", ) + token_prefix: Optional[str] = Field( + "", + description='A prefix to prepend to the session token when injecting it into requests. For example, use "Token " (with trailing space) for APIs that expect "Authorization: Token ".', + examples=["Token ", "Bearer "], + title="Token Prefix", + ) class JsonSchemaPropertySelector(BaseModel): @@ -2112,7 +2126,9 @@ class RecordSelector(BaseModel): description="Responsible for filtering records to be emitted by the Source.", title="Record Filter", ) - schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field( + schema_normalization: Optional[ + Union[SchemaNormalization, CustomSchemaNormalization] + ] = Field( None, description="Responsible for normalization according to the schema.", title="Schema Normalization", @@ -2154,10 +2170,12 @@ class DpathValidator(BaseModel): ], title="Field Path", ) - validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = Field( - ..., - description="The condition that the specified config value will be evaluated against", - title="Validation Strategy", + validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = ( + Field( + ..., + description="The condition that the specified config value will be evaluated against", + title="Validation Strategy", + ) ) @@ -2174,10 +2192,12 @@ class PredicateValidator(BaseModel): ], title="Value", ) - validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = Field( - ..., - description="The validation strategy to apply to the value.", - title="Validation Strategy", + validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = ( + Field( + ..., + description="The validation strategy to apply to the value.", + title="Validation Strategy", + ) ) @@ -2202,12 +2222,12 @@ class ConfigAddFields(BaseModel): class CompositeErrorHandler(BaseModel): type: Literal["CompositeErrorHandler"] - error_handlers: List[Union[CompositeErrorHandler, DefaultErrorHandler, CustomErrorHandler]] = ( - Field( - ..., - description="List of error handlers to iterate on to determine how to handle a failed response.", - title="Error Handlers", - ) + error_handlers: List[ + Union[CompositeErrorHandler, DefaultErrorHandler, CustomErrorHandler] + ] = Field( + ..., + description="List of error handlers to iterate on to determine how to handle a failed response.", + title="Error Handlers", ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -2369,9 +2389,9 @@ class Config: type: Literal["DeclarativeSource"] check: Union[CheckStream, CheckDynamicStream] - streams: Optional[List[Union[ConditionalStreams, DeclarativeStream, StateDelegatingStream]]] = ( - None - ) + streams: Optional[ + List[Union[ConditionalStreams, DeclarativeStream, StateDelegatingStream]] + ] = None dynamic_streams: List[DynamicDeclarativeStream] version: str = Field( ..., @@ -2496,16 +2516,20 @@ class Config: extra = Extra.allow type: Literal["DeclarativeStream"] - name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") + name: Optional[str] = Field( + "", description="The stream name.", example=["Users"], title="Name" + ) retriever: Union[SimpleRetriever, AsyncRetriever, CustomRetriever] = Field( ..., description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) - incremental_sync: Optional[Union[DatetimeBasedCursor, IncrementingCountCursor]] = Field( - None, - description="Component used to fetch data incrementally based on a time field in the data.", - title="Incremental Sync", + incremental_sync: Optional[Union[DatetimeBasedCursor, IncrementingCountCursor]] = ( + Field( + None, + description="Component used to fetch data incrementally based on a time field in the data.", + title="Incremental Sync", + ) ) primary_key: Optional[PrimaryKey] = Field("", title="Primary Key") schema_loader: Optional[ @@ -2679,18 +2703,20 @@ class HttpRequester(BaseModelWithDeprecations): description="For APIs that require explicit specification of the properties to query for, this component will take a static or dynamic set of properties (which can be optionally split into chunks) and allow them to be injected into an outbound request by accessing stream_partition.extra_fields.", title="Query Properties", ) - request_parameters: Optional[Union[Dict[str, Union[str, QueryProperties]], str]] = Field( - None, - description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.", - examples=[ - {"unit": "day"}, - { - "query": 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"' - }, - {"searchIn": "{{ ','.join(config.get('search_in', [])) }}"}, - {"sort_by[asc]": "updated_at"}, - ], - title="Query Parameters", + request_parameters: Optional[Union[Dict[str, Union[str, QueryProperties]], str]] = ( + Field( + None, + description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.", + examples=[ + {"unit": "day"}, + { + "query": 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"' + }, + {"searchIn": "{{ ','.join(config.get('search_in', [])) }}"}, + {"sort_by[asc]": "updated_at"}, + ], + title="Query Parameters", + ) ) request_headers: Optional[Union[Dict[str, str], str]] = Field( None, @@ -2741,7 +2767,7 @@ class HttpRequester(BaseModelWithDeprecations): ) use_cache: Optional[bool] = Field( False, - description="Enables stream requests caching. This field is automatically set by the CDK.", + description="Enables stream requests caching. When set to true, repeated requests to the same URL will return cached responses. Parent streams automatically have caching enabled. Only set this to false if you are certain that caching should be disabled, as it may negatively impact performance when the same data is needed multiple times (e.g., for scroll-based pagination APIs where caching causes duplicate records).", title="Use Cache", ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -2862,7 +2888,9 @@ class QueryProperties(BaseModel): class StateDelegatingStream(BaseModel): type: Literal["StateDelegatingStream"] - name: str = Field(..., description="The stream name.", example=["Users"], title="Name") + name: str = Field( + ..., description="The stream name.", example=["Users"], title="Name" + ) full_refresh_stream: DeclarativeStream = Field( ..., description="Component used to coordinate how records are extracted across stream slices and request pages when the state is empty or not provided.", @@ -2949,13 +2977,17 @@ class AsyncRetriever(BaseModel): status_extractor: Union[DpathExtractor, CustomRecordExtractor] = Field( ..., description="Responsible for fetching the actual status of the async job." ) - download_target_extractor: Optional[Union[DpathExtractor, CustomRecordExtractor]] = Field( + download_target_extractor: Optional[ + Union[DpathExtractor, CustomRecordExtractor] + ] = Field( None, description="Responsible for fetching the final result `urls` provided by the completed / finished / ready async job.", ) download_extractor: Optional[ Union[DpathExtractor, CustomRecordExtractor, ResponseToFileExtractor] - ] = Field(None, description="Responsible for fetching the records from provided urls.") + ] = Field( + None, description="Responsible for fetching the records from provided urls." + ) creation_requester: Union[HttpRequester, CustomRequester] = Field( ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.", diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index d83a05442..f276ec683 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -68,6 +68,7 @@ ) from airbyte_cdk.sources.declarative.auth.token_provider import ( InterpolatedStringTokenProvider, + PrefixedTokenProvider, SessionTokenProvider, TokenProvider, ) @@ -1169,6 +1170,14 @@ def create_session_token_authenticator( token_provider=token_provider, ) else: + # Get the token_prefix if specified, wrap the token provider if needed + token_prefix = getattr(model.request_authentication, "token_prefix", None) or "" + final_token_provider: TokenProvider = token_provider + if token_prefix: + final_token_provider = PrefixedTokenProvider( + token_provider=token_provider, + prefix=token_prefix, + ) return self.create_api_key_authenticator( ApiKeyAuthenticatorModel( type="ApiKeyAuthenticator", @@ -1176,7 +1185,7 @@ def create_session_token_authenticator( inject_into=model.request_authentication.inject_into, ), # type: ignore # $parameters and headers default to None config=config, - token_provider=token_provider, + token_provider=final_token_provider, ) @staticmethod diff --git a/unit_tests/sources/declarative/auth/test_token_provider.py b/unit_tests/sources/declarative/auth/test_token_provider.py index 2958cf04b..b546560ff 100644 --- a/unit_tests/sources/declarative/auth/test_token_provider.py +++ b/unit_tests/sources/declarative/auth/test_token_provider.py @@ -10,6 +10,7 @@ from airbyte_cdk.sources.declarative.auth.token_provider import ( InterpolatedStringTokenProvider, + PrefixedTokenProvider, SessionTokenProvider, ) from airbyte_cdk.sources.declarative.exceptions import ReadException @@ -79,3 +80,22 @@ def test_session_token_provider_ignored_response(): provider.login_requester.send_request.return_value = None with pytest.raises(ReadException): provider.get_token() + + +@pytest.mark.parametrize( + "prefix,expected_token", + [ + pytest.param("Token ", "Token my_token", id="token_prefix"), + pytest.param("Bearer ", "Bearer my_token", id="bearer_prefix"), + pytest.param("", "my_token", id="empty_prefix"), + pytest.param("Custom-", "Custom-my_token", id="custom_prefix"), + ], +) +def test_prefixed_token_provider(prefix, expected_token): + """Test that PrefixedTokenProvider correctly prepends prefix to token.""" + underlying_provider = create_session_token_provider() + prefixed_provider = PrefixedTokenProvider( + token_provider=underlying_provider, + prefix=prefix, + ) + assert prefixed_provider.get_token() == expected_token From 812f8ab31c8065552bee822055f6c97552ce038e Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 27 Jan 2026 15:55:31 +0000 Subject: [PATCH 2/4] style: format generated model file Co-Authored-By: Ryan Waskewich --- .../models/declarative_component_schema.py | 144 +++++++----------- 1 file changed, 58 insertions(+), 86 deletions(-) diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 346f68e76..2c0c1608d 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -926,28 +926,24 @@ class OAuthConfigSpecification(BaseModel): class Config: extra = Extra.allow - oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = ( - Field( - None, - description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", - examples=[ - {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, - { - "app_id": { - "type": "string", - "path_in_connector_config": ["info", "app_id"], - } - }, - ], - title="OAuth user input", - ) + oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field( + None, + description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", + examples=[ + {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, + { + "app_id": { + "type": "string", + "path_in_connector_config": ["info", "app_id"], + } + }, + ], + title="OAuth user input", ) - oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = ( - Field( - None, - description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', - title="DeclarativeOAuth Connector Specification", - ) + oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field( + None, + description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', + title="DeclarativeOAuth Connector Specification", ) complete_oauth_output_specification: Optional[Dict[str, Any]] = Field( None, @@ -965,9 +961,7 @@ class Config: complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field( None, description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }", - examples=[ - {"client_id": {"type": "string"}, "client_secret": {"type": "string"}} - ], + examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}], title="OAuth input specification", ) complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field( @@ -1471,9 +1465,7 @@ class CustomConfigTransformation(BaseModel): class_name: str = Field( ..., description="Fully-qualified name of the class that will be implementing the custom config transformation. The format is `source_..`.", - examples=[ - "source_declarative_manifest.components.MyCustomConfigTransformation" - ], + examples=["source_declarative_manifest.components.MyCustomConfigTransformation"], ) parameters: Optional[Dict[str, Any]] = Field( None, @@ -1901,9 +1893,7 @@ class OAuthAuthenticator(BaseModel): scopes: Optional[List[str]] = Field( None, description="List of scopes that should be granted to the access token.", - examples=[ - ["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"] - ], + examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]], title="Scopes", ) token_expiry_date: Optional[str] = Field( @@ -2126,9 +2116,7 @@ class RecordSelector(BaseModel): description="Responsible for filtering records to be emitted by the Source.", title="Record Filter", ) - schema_normalization: Optional[ - Union[SchemaNormalization, CustomSchemaNormalization] - ] = Field( + schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field( None, description="Responsible for normalization according to the schema.", title="Schema Normalization", @@ -2170,12 +2158,10 @@ class DpathValidator(BaseModel): ], title="Field Path", ) - validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = ( - Field( - ..., - description="The condition that the specified config value will be evaluated against", - title="Validation Strategy", - ) + validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = Field( + ..., + description="The condition that the specified config value will be evaluated against", + title="Validation Strategy", ) @@ -2192,12 +2178,10 @@ class PredicateValidator(BaseModel): ], title="Value", ) - validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = ( - Field( - ..., - description="The validation strategy to apply to the value.", - title="Validation Strategy", - ) + validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = Field( + ..., + description="The validation strategy to apply to the value.", + title="Validation Strategy", ) @@ -2222,12 +2206,12 @@ class ConfigAddFields(BaseModel): class CompositeErrorHandler(BaseModel): type: Literal["CompositeErrorHandler"] - error_handlers: List[ - Union[CompositeErrorHandler, DefaultErrorHandler, CustomErrorHandler] - ] = Field( - ..., - description="List of error handlers to iterate on to determine how to handle a failed response.", - title="Error Handlers", + error_handlers: List[Union[CompositeErrorHandler, DefaultErrorHandler, CustomErrorHandler]] = ( + Field( + ..., + description="List of error handlers to iterate on to determine how to handle a failed response.", + title="Error Handlers", + ) ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -2389,9 +2373,9 @@ class Config: type: Literal["DeclarativeSource"] check: Union[CheckStream, CheckDynamicStream] - streams: Optional[ - List[Union[ConditionalStreams, DeclarativeStream, StateDelegatingStream]] - ] = None + streams: Optional[List[Union[ConditionalStreams, DeclarativeStream, StateDelegatingStream]]] = ( + None + ) dynamic_streams: List[DynamicDeclarativeStream] version: str = Field( ..., @@ -2516,20 +2500,16 @@ class Config: extra = Extra.allow type: Literal["DeclarativeStream"] - name: Optional[str] = Field( - "", description="The stream name.", example=["Users"], title="Name" - ) + name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") retriever: Union[SimpleRetriever, AsyncRetriever, CustomRetriever] = Field( ..., description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) - incremental_sync: Optional[Union[DatetimeBasedCursor, IncrementingCountCursor]] = ( - Field( - None, - description="Component used to fetch data incrementally based on a time field in the data.", - title="Incremental Sync", - ) + incremental_sync: Optional[Union[DatetimeBasedCursor, IncrementingCountCursor]] = Field( + None, + description="Component used to fetch data incrementally based on a time field in the data.", + title="Incremental Sync", ) primary_key: Optional[PrimaryKey] = Field("", title="Primary Key") schema_loader: Optional[ @@ -2703,20 +2683,18 @@ class HttpRequester(BaseModelWithDeprecations): description="For APIs that require explicit specification of the properties to query for, this component will take a static or dynamic set of properties (which can be optionally split into chunks) and allow them to be injected into an outbound request by accessing stream_partition.extra_fields.", title="Query Properties", ) - request_parameters: Optional[Union[Dict[str, Union[str, QueryProperties]], str]] = ( - Field( - None, - description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.", - examples=[ - {"unit": "day"}, - { - "query": 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"' - }, - {"searchIn": "{{ ','.join(config.get('search_in', [])) }}"}, - {"sort_by[asc]": "updated_at"}, - ], - title="Query Parameters", - ) + request_parameters: Optional[Union[Dict[str, Union[str, QueryProperties]], str]] = Field( + None, + description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.", + examples=[ + {"unit": "day"}, + { + "query": 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"' + }, + {"searchIn": "{{ ','.join(config.get('search_in', [])) }}"}, + {"sort_by[asc]": "updated_at"}, + ], + title="Query Parameters", ) request_headers: Optional[Union[Dict[str, str], str]] = Field( None, @@ -2888,9 +2866,7 @@ class QueryProperties(BaseModel): class StateDelegatingStream(BaseModel): type: Literal["StateDelegatingStream"] - name: str = Field( - ..., description="The stream name.", example=["Users"], title="Name" - ) + name: str = Field(..., description="The stream name.", example=["Users"], title="Name") full_refresh_stream: DeclarativeStream = Field( ..., description="Component used to coordinate how records are extracted across stream slices and request pages when the state is empty or not provided.", @@ -2977,17 +2953,13 @@ class AsyncRetriever(BaseModel): status_extractor: Union[DpathExtractor, CustomRecordExtractor] = Field( ..., description="Responsible for fetching the actual status of the async job." ) - download_target_extractor: Optional[ - Union[DpathExtractor, CustomRecordExtractor] - ] = Field( + download_target_extractor: Optional[Union[DpathExtractor, CustomRecordExtractor]] = Field( None, description="Responsible for fetching the final result `urls` provided by the completed / finished / ready async job.", ) download_extractor: Optional[ Union[DpathExtractor, CustomRecordExtractor, ResponseToFileExtractor] - ] = Field( - None, description="Responsible for fetching the records from provided urls." - ) + ] = Field(None, description="Responsible for fetching the records from provided urls.") creation_requester: Union[HttpRequester, CustomRequester] = Field( ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.", From 1c3609040c0c38ff9535d62c0ec61fdbcdab6019 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 27 Jan 2026 16:57:54 +0000 Subject: [PATCH 3/4] test: add factory wiring test for token_prefix Co-Authored-By: Ryan Waskewich --- .../test_model_to_component_factory.py | 58 ++++++++++++++++++- 1 file changed, 57 insertions(+), 1 deletion(-) diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 0c3ea0a78..466bac65f 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -42,7 +42,10 @@ BearerAuthenticator, LegacySessionTokenAuthenticator, ) -from airbyte_cdk.sources.declarative.auth.token_provider import SessionTokenProvider +from airbyte_cdk.sources.declarative.auth.token_provider import ( + PrefixedTokenProvider, + SessionTokenProvider, +) from airbyte_cdk.sources.declarative.checks import CheckStream from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime @@ -1855,6 +1858,59 @@ def test_create_request_with_session_authenticator(): } +def test_create_request_with_session_authenticator_with_token_prefix(): + """Test that token_prefix wraps the token provider with PrefixedTokenProvider.""" + content = """ +requester: + type: HttpRequester + path: "/v3/marketing/lists" + $parameters: + name: 'lists' + url_base: "https://api.sendgrid.com" + authenticator: + type: SessionTokenAuthenticator + decoder: + type: JsonDecoder + expiration_duration: P10D + login_requester: + path: /session + type: HttpRequester + url_base: 'https://api.sendgrid.com' + http_method: POST + request_body_json: + password: '{{ config.apikey }}' + username: '{{ parameters.name }}' + session_token_path: + - id + request_authentication: + type: ApiKey + inject_into: + type: RequestOption + field_name: Authorization + inject_into: header + token_prefix: "Token " + """ + name = "name" + parsed_manifest = YamlDeclarativeSource._parse(content) + resolved_manifest = resolver.preprocess_manifest(parsed_manifest) + requester_manifest = transformer.propagate_types_and_parameters( + "", resolved_manifest["requester"], {} + ) + + selector = factory.create_component( + model_type=HttpRequesterModel, + component_definition=requester_manifest, + config=input_config, + name=name, + decoder=None, + ) + + assert isinstance(selector.authenticator, ApiKeyAuthenticator) + assert isinstance(selector.authenticator.token_provider, PrefixedTokenProvider) + assert selector.authenticator.token_provider.prefix == "Token " + assert isinstance(selector.authenticator.token_provider.token_provider, SessionTokenProvider) + + def test_given_composite_error_handler_does_not_match_response_then_fallback_on_default_error_handler( requests_mock, ): From a1f173f04ff24d752c55dc9d2544415b471bd251 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 27 Jan 2026 17:28:44 +0000 Subject: [PATCH 4/4] refactor(cdk): replace token_prefix with interpolation approach for SessionTokenAuthenticator This replaces the token_prefix approach with a more flexible interpolation approach using api_token template. Users can now use Jinja templates like 'Token {{ session_token }}' for Django REST Framework APIs. Changes: - Replace PrefixedTokenProvider with InterpolatedSessionTokenProvider - Update schema to use api_token field instead of token_prefix - Default api_token is '{{ session_token }}' for backward compatibility - Update factory to always wrap with InterpolatedSessionTokenProvider - Update tests to reflect new approach Co-Authored-By: Ryan Waskewich --- .../declarative/auth/token_provider.py | 20 +++++++----- .../declarative_component_schema.yaml | 16 ++++++---- .../models/declarative_component_schema.py | 18 +++++++---- .../parsers/model_to_component_factory.py | 20 ++++++------ .../declarative/auth/test_token_provider.py | 31 ++++++++++++------- .../test_model_to_component_factory.py | 31 +++++++++++-------- 6 files changed, 83 insertions(+), 53 deletions(-) diff --git a/airbyte_cdk/sources/declarative/auth/token_provider.py b/airbyte_cdk/sources/declarative/auth/token_provider.py index 861d2e8f7..f3abd8529 100644 --- a/airbyte_cdk/sources/declarative/auth/token_provider.py +++ b/airbyte_cdk/sources/declarative/auth/token_provider.py @@ -85,15 +85,21 @@ def get_token(self) -> str: @dataclass -class PrefixedTokenProvider(TokenProvider): - """Wraps a TokenProvider and prepends a prefix to the token value. +class InterpolatedSessionTokenProvider(TokenProvider): + """Provides a token by interpolating a template with the session token. - This is useful for APIs that require a specific prefix before the token, - such as Django REST Framework APIs that expect "Token " format. + This allows flexible token formatting, such as "Token {{ session_token }}" + for Django REST Framework APIs that expect "Authorization: Token ". """ - token_provider: TokenProvider - prefix: str + config: Config + api_token: Union[InterpolatedString, str] + session_token_provider: TokenProvider + parameters: Mapping[str, Any] + + def __post_init__(self) -> None: + self._token_template = InterpolatedString.create(self.api_token, parameters=self.parameters) def get_token(self) -> str: - return f"{self.prefix}{self.token_provider.get_token()}" + session_token = self.session_token_provider.get_token() + return str(self._token_template.eval(self.config, session_token=session_token)) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index a91b69f74..07b9b7b9c 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -2067,14 +2067,18 @@ definitions: field_name: Authorization - inject_into: request_parameter field_name: authKey - token_prefix: - title: Token Prefix - description: 'A prefix to prepend to the session token when injecting it into requests. For example, use "Token " (with trailing space) for APIs that expect "Authorization: Token ".' + api_token: + title: API Token Template + description: 'A template for the token value to inject. Use {{ session_token }} to reference the session token. For example, use "Token {{ session_token }}" for APIs that expect "Authorization: Token ".' type: string - default: "" + default: "{{ session_token }}" + interpolation_context: + - config + - session_token examples: - - "Token " - - "Bearer " + - "{{ session_token }}" + - "Token {{ session_token }}" + - "Bearer {{ session_token }}" SessionTokenRequestBearerAuthenticator: title: Bearer Authenticator description: Authenticator for requests using the session token as a standard bearer token. diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 2c0c1608d..f725a7573 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1,3 +1,5 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + # generated by datamodel-codegen: # filename: declarative_component_schema.yaml @@ -2055,11 +2057,15 @@ class SessionTokenRequestApiKeyAuthenticator(BaseModel): ], title="Inject API Key Into Outgoing HTTP Request", ) - token_prefix: Optional[str] = Field( - "", - description='A prefix to prepend to the session token when injecting it into requests. For example, use "Token " (with trailing space) for APIs that expect "Authorization: Token ".', - examples=["Token ", "Bearer "], - title="Token Prefix", + api_token: Optional[str] = Field( + "{{ session_token }}", + description='A template for the token value to inject. Use {{ session_token }} to reference the session token. For example, use "Token {{ session_token }}" for APIs that expect "Authorization: Token ".', + examples=[ + "{{ session_token }}", + "Token {{ session_token }}", + "Bearer {{ session_token }}", + ], + title="API Token Template", ) @@ -2745,7 +2751,7 @@ class HttpRequester(BaseModelWithDeprecations): ) use_cache: Optional[bool] = Field( False, - description="Enables stream requests caching. When set to true, repeated requests to the same URL will return cached responses. Parent streams automatically have caching enabled. Only set this to false if you are certain that caching should be disabled, as it may negatively impact performance when the same data is needed multiple times (e.g., for scroll-based pagination APIs where caching causes duplicate records).", + description="Enables stream requests caching. This field is automatically set by the CDK.", title="Use Cache", ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index f276ec683..fe96833e0 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -67,8 +67,8 @@ LegacySessionTokenAuthenticator, ) from airbyte_cdk.sources.declarative.auth.token_provider import ( + InterpolatedSessionTokenProvider, InterpolatedStringTokenProvider, - PrefixedTokenProvider, SessionTokenProvider, TokenProvider, ) @@ -1170,14 +1170,16 @@ def create_session_token_authenticator( token_provider=token_provider, ) else: - # Get the token_prefix if specified, wrap the token provider if needed - token_prefix = getattr(model.request_authentication, "token_prefix", None) or "" - final_token_provider: TokenProvider = token_provider - if token_prefix: - final_token_provider = PrefixedTokenProvider( - token_provider=token_provider, - prefix=token_prefix, - ) + # Get the api_token template if specified, default to just the session token + api_token_template = ( + getattr(model.request_authentication, "api_token", None) or "{{ session_token }}" + ) + final_token_provider: TokenProvider = InterpolatedSessionTokenProvider( + config=config, + api_token=api_token_template, + session_token_provider=token_provider, + parameters=model.parameters or {}, + ) return self.create_api_key_authenticator( ApiKeyAuthenticatorModel( type="ApiKeyAuthenticator", diff --git a/unit_tests/sources/declarative/auth/test_token_provider.py b/unit_tests/sources/declarative/auth/test_token_provider.py index b546560ff..076d39741 100644 --- a/unit_tests/sources/declarative/auth/test_token_provider.py +++ b/unit_tests/sources/declarative/auth/test_token_provider.py @@ -9,8 +9,8 @@ from isodate import parse_duration from airbyte_cdk.sources.declarative.auth.token_provider import ( + InterpolatedSessionTokenProvider, InterpolatedStringTokenProvider, - PrefixedTokenProvider, SessionTokenProvider, ) from airbyte_cdk.sources.declarative.exceptions import ReadException @@ -83,19 +83,26 @@ def test_session_token_provider_ignored_response(): @pytest.mark.parametrize( - "prefix,expected_token", + "api_token_template,expected_token", [ - pytest.param("Token ", "Token my_token", id="token_prefix"), - pytest.param("Bearer ", "Bearer my_token", id="bearer_prefix"), - pytest.param("", "my_token", id="empty_prefix"), - pytest.param("Custom-", "Custom-my_token", id="custom_prefix"), + pytest.param("Token {{ session_token }}", "Token my_token", id="token_prefix"), + pytest.param("Bearer {{ session_token }}", "Bearer my_token", id="bearer_prefix"), + pytest.param("{{ session_token }}", "my_token", id="just_session_token"), + pytest.param("Custom-{{ session_token }}", "Custom-my_token", id="custom_prefix"), + pytest.param( + "realm=xyz, token={{ session_token }}", + "realm=xyz, token=my_token", + id="complex_format", + ), ], ) -def test_prefixed_token_provider(prefix, expected_token): - """Test that PrefixedTokenProvider correctly prepends prefix to token.""" +def test_interpolated_session_token_provider(api_token_template, expected_token): + """Test that InterpolatedSessionTokenProvider correctly interpolates session token.""" underlying_provider = create_session_token_provider() - prefixed_provider = PrefixedTokenProvider( - token_provider=underlying_provider, - prefix=prefix, + interpolated_provider = InterpolatedSessionTokenProvider( + config={"some_config": "value"}, + api_token=api_token_template, + session_token_provider=underlying_provider, + parameters={}, ) - assert prefixed_provider.get_token() == expected_token + assert interpolated_provider.get_token() == expected_token diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 466bac65f..a5da89e4b 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -43,7 +43,7 @@ LegacySessionTokenAuthenticator, ) from airbyte_cdk.sources.declarative.auth.token_provider import ( - PrefixedTokenProvider, + InterpolatedSessionTokenProvider, SessionTokenProvider, ) from airbyte_cdk.sources.declarative.checks import CheckStream @@ -1844,22 +1844,25 @@ def test_create_request_with_session_authenticator(): ) assert isinstance(selector.authenticator, ApiKeyAuthenticator) - assert isinstance(selector.authenticator.token_provider, SessionTokenProvider) - assert selector.authenticator.token_provider.session_token_path == ["id"] - assert isinstance(selector.authenticator.token_provider.login_requester, HttpRequester) - assert selector.authenticator.token_provider.session_token_path == ["id"] + # Default behavior wraps with InterpolatedSessionTokenProvider using "{{ session_token }}" + assert isinstance(selector.authenticator.token_provider, InterpolatedSessionTokenProvider) + assert selector.authenticator.token_provider.api_token == "{{ session_token }}" + session_token_provider = selector.authenticator.token_provider.session_token_provider + assert isinstance(session_token_provider, SessionTokenProvider) + assert session_token_provider.session_token_path == ["id"] + assert isinstance(session_token_provider.login_requester, HttpRequester) assert ( - selector.authenticator.token_provider.login_requester._url_base.eval(input_config) + session_token_provider.login_requester._url_base.eval(input_config) == "https://api.sendgrid.com" ) - assert selector.authenticator.token_provider.login_requester.get_request_body_json() == { + assert session_token_provider.login_requester.get_request_body_json() == { "username": "lists", "password": "verysecrettoken", } -def test_create_request_with_session_authenticator_with_token_prefix(): - """Test that token_prefix wraps the token provider with PrefixedTokenProvider.""" +def test_create_request_with_session_authenticator_with_api_token_template(): + """Test that api_token wraps the token provider with InterpolatedSessionTokenProvider.""" content = """ requester: type: HttpRequester @@ -1888,7 +1891,7 @@ def test_create_request_with_session_authenticator_with_token_prefix(): type: RequestOption field_name: Authorization inject_into: header - token_prefix: "Token " + api_token: "Token {{ session_token }}" """ name = "name" parsed_manifest = YamlDeclarativeSource._parse(content) @@ -1906,9 +1909,11 @@ def test_create_request_with_session_authenticator_with_token_prefix(): ) assert isinstance(selector.authenticator, ApiKeyAuthenticator) - assert isinstance(selector.authenticator.token_provider, PrefixedTokenProvider) - assert selector.authenticator.token_provider.prefix == "Token " - assert isinstance(selector.authenticator.token_provider.token_provider, SessionTokenProvider) + assert isinstance(selector.authenticator.token_provider, InterpolatedSessionTokenProvider) + assert selector.authenticator.token_provider.api_token == "Token {{ session_token }}" + assert isinstance( + selector.authenticator.token_provider.session_token_provider, SessionTokenProvider + ) def test_given_composite_error_handler_does_not_match_response_then_fallback_on_default_error_handler(