Skip to content

Commit d806dc8

Browse files
Merge main into devin/1769515185-thread-safe-oauth-refresh and resolve conflicts
Co-Authored-By: gl_anatolii.yatsuk@airbyte.io <gl_anatolii.yatsuk@airbyte.io>
2 parents c881893 + d3c9419 commit d806dc8

16 files changed

Lines changed: 598 additions & 49 deletions

File tree

airbyte_cdk/sources/declarative/auth/token_provider.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ def _refresh(self) -> None:
7171

7272
@dataclass
7373
class InterpolatedStringTokenProvider(TokenProvider):
74+
"""Provides a token by interpolating a string with config values."""
75+
7476
config: Config
7577
api_token: Union[InterpolatedString, str]
7678
parameters: Mapping[str, Any]
@@ -80,3 +82,24 @@ def __post_init__(self) -> None:
8082

8183
def get_token(self) -> str:
8284
return str(self._token.eval(self.config))
85+
86+
87+
@dataclass
88+
class InterpolatedSessionTokenProvider(TokenProvider):
89+
"""Provides a token by interpolating a template with the session token.
90+
91+
This allows flexible token formatting, such as "Token {{ session_token }}"
92+
for Django REST Framework APIs that expect "Authorization: Token <value>".
93+
"""
94+
95+
config: Config
96+
api_token: Union[InterpolatedString, str]
97+
session_token_provider: TokenProvider
98+
parameters: Mapping[str, Any]
99+
100+
def __post_init__(self) -> None:
101+
self._token_template = InterpolatedString.create(self.api_token, parameters=self.parameters)
102+
103+
def get_token(self) -> str:
104+
session_token = self.session_token_provider.get_token()
105+
return str(self._token_template.eval(self.config, session_token=session_token))

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -512,9 +512,16 @@ definitions:
512512
page_size:
513513
title: Page Size
514514
description: The number of records to include in each pages.
515-
type: integer
515+
anyOf:
516+
- type: integer
517+
title: Number of Records
518+
- type: string
519+
title: Interpolated Value
520+
interpolation_context:
521+
- config
516522
examples:
517523
- 100
524+
- "{{ config['page_size'] }}"
518525
stop_condition:
519526
title: Stop Condition
520527
description: Template string evaluating when to stop paginating.
@@ -2067,6 +2074,18 @@ definitions:
20672074
field_name: Authorization
20682075
- inject_into: request_parameter
20692076
field_name: authKey
2077+
api_token:
2078+
title: API Token Template
2079+
description: 'A template for the token value to inject. Use {{ session_token }} to reference the session token. For example, use "Token {{ session_token }}" for APIs that expect "Authorization: Token <token>".'
2080+
type: string
2081+
default: "{{ session_token }}"
2082+
interpolation_context:
2083+
- config
2084+
- session_token
2085+
examples:
2086+
- "{{ session_token }}"
2087+
- "Token {{ session_token }}"
2088+
- "Bearer {{ session_token }}"
20702089
SessionTokenRequestBearerAuthenticator:
20712090
title: Bearer Authenticator
20722091
description: Authenticator for requests using the session token as a standard bearer token.
@@ -2312,13 +2331,15 @@ definitions:
23122331
- IGNORE
23132332
- RESET_PAGINATION
23142333
- RATE_LIMITED
2334+
- REFRESH_TOKEN_THEN_RETRY
23152335
examples:
23162336
- SUCCESS
23172337
- FAIL
23182338
- RETRY
23192339
- IGNORE
23202340
- RESET_PAGINATION
23212341
- RATE_LIMITED
2342+
- REFRESH_TOKEN_THEN_RETRY
23222343
failure_type:
23232344
title: Failure Type
23242345
description: Failure type of traced exception if a response matches the filter.

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,10 +114,10 @@ class CursorPagination(BaseModel):
114114
],
115115
title="Cursor Value",
116116
)
117-
page_size: Optional[int] = Field(
117+
page_size: Optional[Union[int, str]] = Field(
118118
None,
119119
description="The number of records to include in each pages.",
120-
examples=[100],
120+
examples=[100, "{{ config['page_size'] }}"],
121121
title="Page Size",
122122
)
123123
stop_condition: Optional[str] = Field(
@@ -543,6 +543,7 @@ class Action(Enum):
543543
IGNORE = "IGNORE"
544544
RESET_PAGINATION = "RESET_PAGINATION"
545545
RATE_LIMITED = "RATE_LIMITED"
546+
REFRESH_TOKEN_THEN_RETRY = "REFRESH_TOKEN_THEN_RETRY"
546547

547548

548549
class FailureType(Enum):
@@ -563,6 +564,7 @@ class HttpResponseFilter(BaseModel):
563564
"IGNORE",
564565
"RESET_PAGINATION",
565566
"RATE_LIMITED",
567+
"REFRESH_TOKEN_THEN_RETRY",
566568
],
567569
title="Action",
568570
)
@@ -2057,6 +2059,16 @@ class SessionTokenRequestApiKeyAuthenticator(BaseModel):
20572059
],
20582060
title="Inject API Key Into Outgoing HTTP Request",
20592061
)
2062+
api_token: Optional[str] = Field(
2063+
"{{ session_token }}",
2064+
description='A template for the token value to inject. Use {{ session_token }} to reference the session token. For example, use "Token {{ session_token }}" for APIs that expect "Authorization: Token <token>".',
2065+
examples=[
2066+
"{{ session_token }}",
2067+
"Token {{ session_token }}",
2068+
"Bearer {{ session_token }}",
2069+
],
2070+
title="API Token Template",
2071+
)
20602072

20612073

20622074
class JsonSchemaPropertySelector(BaseModel):
@@ -2741,7 +2753,7 @@ class HttpRequester(BaseModelWithDeprecations):
27412753
)
27422754
use_cache: Optional[bool] = Field(
27432755
False,
2744-
description="Enables stream requests caching. This field is automatically set by the CDK.",
2756+
description="Enables stream requests caching. When set to true, repeated requests to the same URL will return cached responses. Parent streams automatically have caching enabled. Only set this to false if you are certain that caching should be disabled, as it may negatively impact performance when the same data is needed multiple times (e.g., for scroll-based pagination APIs where caching causes duplicate records).",
27452757
title="Use Cache",
27462758
)
27472759
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
LegacySessionTokenAuthenticator,
6868
)
6969
from airbyte_cdk.sources.declarative.auth.token_provider import (
70+
InterpolatedSessionTokenProvider,
7071
InterpolatedStringTokenProvider,
7172
SessionTokenProvider,
7273
TokenProvider,
@@ -1169,14 +1170,24 @@ def create_session_token_authenticator(
11691170
token_provider=token_provider,
11701171
)
11711172
else:
1173+
# Get the api_token template if specified, default to just the session token
1174+
api_token_template = (
1175+
getattr(model.request_authentication, "api_token", None) or "{{ session_token }}"
1176+
)
1177+
final_token_provider: TokenProvider = InterpolatedSessionTokenProvider(
1178+
config=config,
1179+
api_token=api_token_template,
1180+
session_token_provider=token_provider,
1181+
parameters=model.parameters or {},
1182+
)
11721183
return self.create_api_key_authenticator(
11731184
ApiKeyAuthenticatorModel(
11741185
type="ApiKeyAuthenticator",
11751186
api_token="",
11761187
inject_into=model.request_authentication.inject_into,
11771188
), # type: ignore # $parameters and headers default to None
11781189
config=config,
1179-
token_provider=token_provider,
1190+
token_provider=final_token_provider,
11801191
)
11811192

11821193
@staticmethod
@@ -1554,14 +1565,18 @@ def create_concurrent_cursor_from_incrementing_count_cursor(
15541565
f"Expected {model_type.__name__} component, but received {incrementing_count_cursor_model.__class__.__name__}"
15551566
)
15561567

1557-
interpolated_start_value = (
1558-
InterpolatedString.create(
1559-
incrementing_count_cursor_model.start_value, # type: ignore
1568+
start_value: Union[int, str, None] = incrementing_count_cursor_model.start_value
1569+
# Pydantic Union type coercion can convert int 0 to string '0' depending on Union order.
1570+
# We need to handle both int and str representations of numeric values.
1571+
# Evaluate the InterpolatedString and convert to int for the ConcurrentCursor.
1572+
if start_value is not None:
1573+
interpolated_start_value = InterpolatedString.create(
1574+
str(start_value), # Ensure we pass a string to InterpolatedString.create
15601575
parameters=incrementing_count_cursor_model.parameters or {},
15611576
)
1562-
if incrementing_count_cursor_model.start_value
1563-
else 0
1564-
)
1577+
evaluated_start_value: int = int(interpolated_start_value.eval(config=config))
1578+
else:
1579+
evaluated_start_value = 0
15651580

15661581
cursor_field = self._get_catalog_defined_cursor_field(
15671582
stream_name=stream_name,
@@ -1593,7 +1608,7 @@ def create_concurrent_cursor_from_incrementing_count_cursor(
15931608
connector_state_converter=connector_state_converter,
15941609
cursor_field=cursor_field,
15951610
slice_boundary_fields=None,
1596-
start=interpolated_start_value, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice
1611+
start=evaluated_start_value, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice
15971612
end_provider=connector_state_converter.get_end_provider(), # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice
15981613
)
15991614

@@ -1745,10 +1760,16 @@ def create_cursor_pagination(
17451760
self._UNSUPPORTED_DECODER_ERROR.format(decoder_type=type(inner_decoder))
17461761
)
17471762

1763+
# Pydantic v1 Union type coercion can convert int to string depending on Union order.
1764+
# If page_size is a string that represents an integer (not an interpolation), convert it back.
1765+
page_size = model.page_size
1766+
if isinstance(page_size, str) and page_size.isdigit():
1767+
page_size = int(page_size)
1768+
17481769
return CursorPaginationStrategy(
17491770
cursor_value=model.cursor_value,
17501771
decoder=decoder_to_use,
1751-
page_size=model.page_size,
1772+
page_size=page_size,
17521773
stop_condition=model.stop_condition,
17531774
config=config,
17541775
parameters=model.parameters or {},
@@ -2917,8 +2938,14 @@ def create_offset_increment(
29172938
else None
29182939
)
29192940

2941+
# Pydantic v1 Union type coercion can convert int to string depending on Union order.
2942+
# If page_size is a string that represents an integer (not an interpolation), convert it back.
2943+
page_size = model.page_size
2944+
if isinstance(page_size, str) and page_size.isdigit():
2945+
page_size = int(page_size)
2946+
29202947
return OffsetIncrement(
2921-
page_size=model.page_size,
2948+
page_size=page_size,
29222949
config=config,
29232950
decoder=decoder_to_use,
29242951
extractor=extractor,
@@ -2930,8 +2957,14 @@ def create_offset_increment(
29302957
def create_page_increment(
29312958
model: PageIncrementModel, config: Config, **kwargs: Any
29322959
) -> PageIncrement:
2960+
# Pydantic v1 Union type coercion can convert int to string depending on Union order.
2961+
# If page_size is a string that represents an integer (not an interpolation), convert it back.
2962+
page_size = model.page_size
2963+
if isinstance(page_size, str) and page_size.isdigit():
2964+
page_size = int(page_size)
2965+
29332966
return PageIncrement(
2934-
page_size=model.page_size,
2967+
page_size=page_size,
29352968
config=config,
29362969
start_from_page=model.start_from_page or 0,
29372970
inject_on_first_request=model.inject_on_first_request or False,

airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@
2323
@dataclass
2424
class CursorPaginationStrategy(PaginationStrategy):
2525
"""
26-
Pagination strategy that evaluates an interpolated string to define the next page token
26+
Pagination strategy that evaluates an interpolated string to define the next page token.
2727
2828
Attributes:
29-
page_size (Optional[int]): the number of records to request
29+
page_size (Optional[Union[str, int]]): the number of records to request
3030
cursor_value (Union[InterpolatedString, str]): template string evaluating to the cursor value
3131
config (Config): connection config
3232
stop_condition (Optional[InterpolatedBoolean]): template string evaluating when to stop paginating
@@ -36,7 +36,7 @@ class CursorPaginationStrategy(PaginationStrategy):
3636
cursor_value: Union[InterpolatedString, str]
3737
config: Config
3838
parameters: InitVar[Mapping[str, Any]]
39-
page_size: Optional[int] = None
39+
page_size: Optional[Union[str, int]] = None
4040
stop_condition: Optional[Union[InterpolatedBoolean, str]] = None
4141
decoder: Decoder = field(
4242
default_factory=lambda: PaginationDecoderDecorator(decoder=JsonDecoder(parameters={}))
@@ -54,6 +54,14 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
5454
else:
5555
self._stop_condition = self.stop_condition
5656

57+
if isinstance(self.page_size, int) or (self.page_size is None):
58+
self._page_size = self.page_size
59+
else:
60+
page_size = InterpolatedString(self.page_size, parameters=parameters).eval(self.config)
61+
if not isinstance(page_size, int):
62+
raise Exception(f"{page_size} is of type {type(page_size)}. Expected {int}")
63+
self._page_size = page_size
64+
5765
@property
5866
def initial_token(self) -> Optional[Any]:
5967
"""
@@ -95,4 +103,4 @@ def next_page_token(
95103
return token if token else None
96104

97105
def get_page_size(self) -> Optional[int]:
98-
return self.page_size
106+
return self._page_size

airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,13 @@
77
from typing import Any, Dict, List, Literal, Optional, Union
88

99
import dpath
10-
from pydantic.v1 import AnyUrl, BaseModel, Field
10+
from pydantic.v1 import AnyUrl, BaseModel, Field, validator
1111

1212
from airbyte_cdk import OneOfOptionConfig
1313
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
1414
from airbyte_cdk.sources.specs.transfer_modes import DeliverPermissions
1515
from airbyte_cdk.sources.utils import schema_helpers
16+
from airbyte_cdk.utils.datetime_helpers import ab_datetime_try_parse
1617

1718

1819
class DeliverRecords(BaseModel):
@@ -53,13 +54,39 @@ class AbstractFileBasedSpec(BaseModel):
5354
start_date: Optional[str] = Field(
5455
title="Start Date",
5556
description="UTC date and time in the format 2017-01-25T00:00:00.000000Z. Any file modified before this date will not be replicated.",
56-
examples=["2021-01-01T00:00:00.000000Z"],
57+
examples=[
58+
"2021-01-01",
59+
"2021-01-01T00:00:00Z",
60+
"2021-01-01T00:00:00.000Z",
61+
"2021-01-01T00:00:00.000000Z",
62+
],
5763
format="date-time",
58-
pattern="^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{6}Z$",
59-
pattern_descriptor="YYYY-MM-DDTHH:mm:ss.SSSSSSZ",
64+
pattern=r"^[0-9]{4}-[0-9]{2}-[0-9]{2}(T[0-9]{2}:[0-9]{2}:[0-9]{2}(\.[0-9]+)?(Z|[+-][0-9]{2}:[0-9]{2})?)?$",
65+
pattern_descriptor="YYYY-MM-DD, YYYY-MM-DDTHH:mm:ssZ, or YYYY-MM-DDTHH:mm:ss.SSSSSSZ",
6066
order=1,
6167
)
6268

69+
@validator("start_date", pre=True)
70+
def validate_start_date(
71+
cls, # noqa: N805 # Pydantic validators use cls, not self
72+
v: Optional[str],
73+
) -> Optional[str]:
74+
"""Validate that start_date is a parseable datetime string.
75+
76+
Uses ab_datetime_try_parse which accepts any common ISO8601/RFC3339 format,
77+
including formats with or without microseconds (e.g., both
78+
'2021-01-01T00:00:00Z' and '2021-01-01T00:00:00.000000Z' are valid).
79+
"""
80+
if v is None:
81+
return v
82+
parsed = ab_datetime_try_parse(v)
83+
if parsed is None:
84+
raise ValueError(
85+
f"'{v}' is not a valid datetime string. "
86+
"Please use a format like '2021-01-01T00:00:00Z' or '2021-01-01T00:00:00.000000Z'."
87+
)
88+
return v
89+
6390
streams: List[FileBasedStreamConfig] = Field(
6491
title="The list of streams to sync",
6592
description='Each instance of this configuration defines a <a href="https://docs.airbyte.com/cloud/core-concepts#stream">stream</a>. Use this to define which files belong in the stream, their format, and how they should be parsed and validated. When sending data to warehouse destination such as Snowflake or BigQuery, each stream is a separate table.',

airbyte_cdk/sources/streams/http/error_handlers/response_models.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ class ResponseAction(Enum):
1818
IGNORE = "IGNORE"
1919
RESET_PAGINATION = "RESET_PAGINATION"
2020
RATE_LIMITED = "RATE_LIMITED"
21+
REFRESH_TOKEN_THEN_RETRY = "REFRESH_TOKEN_THEN_RETRY"
2122

2223

2324
@dataclass

0 commit comments

Comments
 (0)