Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions airbyte_cdk/sources/declarative/auth/token_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ def _refresh(self) -> None:

@dataclass
class InterpolatedStringTokenProvider(TokenProvider):
"""Provides a token by interpolating a string with config values."""

config: Config
api_token: Union[InterpolatedString, str]
parameters: Mapping[str, Any]
Expand All @@ -80,3 +82,24 @@ def __post_init__(self) -> None:

def get_token(self) -> str:
return str(self._token.eval(self.config))


@dataclass
class InterpolatedSessionTokenProvider(TokenProvider):
"""Provides a token by interpolating a template with the session token.

This allows flexible token formatting, such as "Token {{ session_token }}"
for Django REST Framework APIs that expect "Authorization: Token <value>".
"""

config: Config
api_token: Union[InterpolatedString, str]
session_token_provider: TokenProvider
parameters: Mapping[str, Any]

def __post_init__(self) -> None:
self._token_template = InterpolatedString.create(self.api_token, parameters=self.parameters)

def get_token(self) -> str:
session_token = self.session_token_provider.get_token()
return str(self._token_template.eval(self.config, session_token=session_token))
12 changes: 12 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2067,6 +2067,18 @@ definitions:
field_name: Authorization
- inject_into: request_parameter
field_name: authKey
api_token:
title: API Token Template
description: 'A template for the token value to inject. Use {{ session_token }} to reference the session token. For example, use "Token {{ session_token }}" for APIs that expect "Authorization: Token <token>".'
type: string
default: "{{ session_token }}"
interpolation_context:
- config
- session_token
examples:
- "{{ session_token }}"
- "Token {{ session_token }}"
- "Bearer {{ session_token }}"
SessionTokenRequestBearerAuthenticator:
title: Bearer Authenticator
description: Authenticator for requests using the session token as a standard bearer token.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2057,6 +2057,16 @@ class SessionTokenRequestApiKeyAuthenticator(BaseModel):
],
title="Inject API Key Into Outgoing HTTP Request",
)
api_token: Optional[str] = Field(
"{{ session_token }}",
description='A template for the token value to inject. Use {{ session_token }} to reference the session token. For example, use "Token {{ session_token }}" for APIs that expect "Authorization: Token <token>".',
examples=[
"{{ session_token }}",
"Token {{ session_token }}",
"Bearer {{ session_token }}",
],
title="API Token Template",
)


class JsonSchemaPropertySelector(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
LegacySessionTokenAuthenticator,
)
from airbyte_cdk.sources.declarative.auth.token_provider import (
InterpolatedSessionTokenProvider,
InterpolatedStringTokenProvider,
SessionTokenProvider,
TokenProvider,
Expand Down Expand Up @@ -1169,14 +1170,24 @@ def create_session_token_authenticator(
token_provider=token_provider,
)
else:
# Get the api_token template if specified, default to just the session token
api_token_template = (
getattr(model.request_authentication, "api_token", None) or "{{ session_token }}"
)
final_token_provider: TokenProvider = InterpolatedSessionTokenProvider(
config=config,
api_token=api_token_template,
session_token_provider=token_provider,
parameters=model.parameters or {},
)
return self.create_api_key_authenticator(
ApiKeyAuthenticatorModel(
type="ApiKeyAuthenticator",
api_token="",
inject_into=model.request_authentication.inject_into,
), # type: ignore # $parameters and headers default to None
config=config,
token_provider=token_provider,
token_provider=final_token_provider,
)

@staticmethod
Expand Down
27 changes: 27 additions & 0 deletions unit_tests/sources/declarative/auth/test_token_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from isodate import parse_duration

from airbyte_cdk.sources.declarative.auth.token_provider import (
InterpolatedSessionTokenProvider,
InterpolatedStringTokenProvider,
SessionTokenProvider,
)
Expand Down Expand Up @@ -79,3 +80,29 @@ def test_session_token_provider_ignored_response():
provider.login_requester.send_request.return_value = None
with pytest.raises(ReadException):
provider.get_token()


@pytest.mark.parametrize(
"api_token_template,expected_token",
[
pytest.param("Token {{ session_token }}", "Token my_token", id="token_prefix"),
pytest.param("Bearer {{ session_token }}", "Bearer my_token", id="bearer_prefix"),
pytest.param("{{ session_token }}", "my_token", id="just_session_token"),
pytest.param("Custom-{{ session_token }}", "Custom-my_token", id="custom_prefix"),
pytest.param(
"realm=xyz, token={{ session_token }}",
"realm=xyz, token=my_token",
id="complex_format",
),
],
)
def test_interpolated_session_token_provider(api_token_template, expected_token):
"""Test that InterpolatedSessionTokenProvider correctly interpolates session token."""
underlying_provider = create_session_token_provider()
interpolated_provider = InterpolatedSessionTokenProvider(
config={"some_config": "value"},
api_token=api_token_template,
session_token_provider=underlying_provider,
parameters={},
)
assert interpolated_provider.get_token() == expected_token
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@
BearerAuthenticator,
LegacySessionTokenAuthenticator,
)
from airbyte_cdk.sources.declarative.auth.token_provider import SessionTokenProvider
from airbyte_cdk.sources.declarative.auth.token_provider import (
InterpolatedSessionTokenProvider,
SessionTokenProvider,
)
from airbyte_cdk.sources.declarative.checks import CheckStream
from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel
from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime
Expand Down Expand Up @@ -1841,20 +1844,78 @@ def test_create_request_with_session_authenticator():
)

assert isinstance(selector.authenticator, ApiKeyAuthenticator)
assert isinstance(selector.authenticator.token_provider, SessionTokenProvider)
assert selector.authenticator.token_provider.session_token_path == ["id"]
assert isinstance(selector.authenticator.token_provider.login_requester, HttpRequester)
assert selector.authenticator.token_provider.session_token_path == ["id"]
# Default behavior wraps with InterpolatedSessionTokenProvider using "{{ session_token }}"
assert isinstance(selector.authenticator.token_provider, InterpolatedSessionTokenProvider)
assert selector.authenticator.token_provider.api_token == "{{ session_token }}"
session_token_provider = selector.authenticator.token_provider.session_token_provider
assert isinstance(session_token_provider, SessionTokenProvider)
assert session_token_provider.session_token_path == ["id"]
assert isinstance(session_token_provider.login_requester, HttpRequester)
assert (
selector.authenticator.token_provider.login_requester._url_base.eval(input_config)
session_token_provider.login_requester._url_base.eval(input_config)
== "https://api.sendgrid.com"
)
assert selector.authenticator.token_provider.login_requester.get_request_body_json() == {
assert session_token_provider.login_requester.get_request_body_json() == {
"username": "lists",
"password": "verysecrettoken",
}


def test_create_request_with_session_authenticator_with_api_token_template():
"""Test that api_token wraps the token provider with InterpolatedSessionTokenProvider."""
content = """
requester:
type: HttpRequester
path: "/v3/marketing/lists"
$parameters:
name: 'lists'
url_base: "https://api.sendgrid.com"
authenticator:
type: SessionTokenAuthenticator
decoder:
type: JsonDecoder
expiration_duration: P10D
login_requester:
path: /session
type: HttpRequester
url_base: 'https://api.sendgrid.com'
http_method: POST
request_body_json:
password: '{{ config.apikey }}'
username: '{{ parameters.name }}'
session_token_path:
- id
request_authentication:
type: ApiKey
inject_into:
type: RequestOption
field_name: Authorization
inject_into: header
api_token: "Token {{ session_token }}"
"""
name = "name"
parsed_manifest = YamlDeclarativeSource._parse(content)
resolved_manifest = resolver.preprocess_manifest(parsed_manifest)
requester_manifest = transformer.propagate_types_and_parameters(
"", resolved_manifest["requester"], {}
)

selector = factory.create_component(
model_type=HttpRequesterModel,
component_definition=requester_manifest,
config=input_config,
name=name,
decoder=None,
)

assert isinstance(selector.authenticator, ApiKeyAuthenticator)
assert isinstance(selector.authenticator.token_provider, InterpolatedSessionTokenProvider)
assert selector.authenticator.token_provider.api_token == "Token {{ session_token }}"
assert isinstance(
selector.authenticator.token_provider.session_token_provider, SessionTokenProvider
)


def test_given_composite_error_handler_does_not_match_response_then_fallback_on_default_error_handler(
requests_mock,
):
Expand Down