Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
cf01a75
feat(cdk): Add cursor age validation to StateDelegatingStream
devin-ai-integration[bot] Feb 2, 2026
67bc5c8
chore: re-trigger CI
devin-ai-integration[bot] Feb 2, 2026
45772f4
Merge branch 'main' into devin/1770066385-state-delegating-stream-cur…
agarctfi Feb 3, 2026
1edeedd
Auto-fix lint and format issues
Feb 3, 2026
61d8d5d
Potential fix for pull request finding 'Unused import'
agarctfi Feb 3, 2026
21da112
Potential fix for pull request finding 'Unused import'
agarctfi Feb 3, 2026
0e33418
fix: Address Copilot review comments
devin-ai-integration[bot] Feb 3, 2026
324344f
fix: Correct ruff format for assert statement
devin-ai-integration[bot] Feb 3, 2026
da8a5a5
fix: Convert cursor_value to str for type safety
devin-ai-integration[bot] Feb 3, 2026
37e046e
fix: Format long line for ruff compliance
devin-ai-integration[bot] Feb 3, 2026
dceb70d
Potential fix for pull request finding 'Unused import'
agarctfi Feb 3, 2026
c14f963
refactor: Move incremental_sync check to _get_state_delegating_stream…
devin-ai-integration[bot] Feb 3, 2026
86d5ea6
fix: Return True (full refresh) when cursor is invalid/unparseable
devin-ai-integration[bot] Feb 3, 2026
567ca7a
fix: Parse cursor from both full_refresh_stream and incremental_stream
devin-ai-integration[bot] Feb 3, 2026
be72c5c
feat: Add support for per-partition state and IncrementingCountCursor…
devin-ai-integration[bot] Feb 4, 2026
2b54cc5
feat: Add get_cursor_datetime_from_state method to cursor classes
devin-ai-integration[bot] Feb 5, 2026
f199583
feat: Add get_cursor_datetime_from_state to concurrent cursor classes
devin-ai-integration[bot] Feb 9, 2026
fbda39f
fix: Fix MyPy type errors in ConcurrentCursor.get_cursor_datetime_fro…
devin-ai-integration[bot] Feb 9, 2026
a2d4b56
refactor: Wire factory to use cursor class get_cursor_datetime_from_s…
devin-ai-integration[bot] Feb 18, 2026
1defe9e
fix: Fix ruff format and mypy errors in model_to_component_factory
devin-ai-integration[bot] Feb 18, 2026
a017dff
fix: Skip retention check for concurrent state format
devin-ai-integration[bot] Feb 18, 2026
d3e76d4
fix: Skip retention check for IncrementingCountCursor instead of rais…
devin-ai-integration[bot] Feb 18, 2026
d31c26b
fix: Return False (skip) when no datetime-based cursors found for ret…
devin-ai-integration[bot] Feb 18, 2026
653022b
fix: Remove unused pytest import
devin-ai-integration[bot] Feb 18, 2026
43dc47e
fix: Raise ValueError for unparseable cursor datetime when api_retent…
devin-ai-integration[bot] Feb 18, 2026
1531b39
refactor: Use stream cursor for retention period check, remove legacy…
devin-ai-integration[bot] Feb 18, 2026
b4c24c6
fix: Try both full_refresh and incremental cursors for state parsing
devin-ai-integration[bot] Feb 18, 2026
67f9e60
fix: Remove per-partition state fallback, let cursor classes handle s…
devin-ai-integration[bot] Feb 18, 2026
8608b5f
fix: Re-add _get_state_delegating_stream_model and fix ruff format
devin-ai-integration[bot] Feb 18, 2026
8faa0ae
Revert "fix: Re-add _get_state_delegating_stream_model and fix ruff f…
devin-ai-integration[bot] Feb 18, 2026
ea7a757
fix: ruff format long lines in create_state_delegating_stream
devin-ai-integration[bot] Feb 18, 2026
714c667
fix: Restore _get_state_delegating_stream_model and fix MyPy errors
devin-ai-integration[bot] Feb 18, 2026
16a895e
fix: Handle FinalStateCursor gracefully and detect final-state for re…
devin-ai-integration[bot] Feb 19, 2026
bddc671
refactor: Move FinalStateCursor handling to cursor classes, replace h…
devin-ai-integration[bot] Feb 19, 2026
8828eea
refactor: Clean NO_CURSOR_STATE_KEY from ConcurrentCursor, add tests …
devin-ai-integration[bot] Feb 19, 2026
6b65b7a
style: Fix ruff format issues in factory and test files
devin-ai-integration[bot] Feb 19, 2026
17f857a
fix: Raise error for incompatible cursor types with api_retention_period
devin-ai-integration[bot] Feb 19, 2026
1163395
refactor: Simplify cursor age validation per brianjlai's review
devin-ai-integration[bot] Feb 19, 2026
acd7156
fix: Use Cursor type instead of Any for cursor parameter
devin-ai-integration[bot] Feb 19, 2026
8afe8e1
fix: Clear state when falling back to full refresh due to stale cursor
devin-ai-integration[bot] Feb 20, 2026
2a4f385
style: Fix ruff format issues in state clearing code
devin-ai-integration[bot] Feb 20, 2026
e4f71ff
fix: Implement tolik0's FinalStateCursor feedback with NO_CURSOR_STAT…
devin-ai-integration[bot] Feb 23, 2026
9340d3c
fix: Update FinalStateCursor test to match new behavior per tolik0's …
devin-ai-integration[bot] Feb 23, 2026
e021f58
style: Fix ruff format issues in test file
devin-ai-integration[bot] Feb 23, 2026
1dcc8ab
refactor: Remove early return for NO_CURSOR_STATE_KEY per tolik0's re…
devin-ai-integration[bot] Feb 23, 2026
6d95923
fix: Remove unused NO_CURSOR_STATE_KEY import
devin-ai-integration[bot] Feb 23, 2026
a3a2073
fix: Update FinalStateCursor test to match actual ConcurrentCursor be…
devin-ai-integration[bot] Feb 23, 2026
020d2f5
fix: Skip state emission for streams not in configured catalog
devin-ai-integration[bot] Feb 25, 2026
21bb2a9
refactor: Move catalog check to skip entire retention validation for …
devin-ai-integration[bot] Feb 25, 2026
2a2459d
style: Fix ruff format issue in create_state_delegating_stream
devin-ai-integration[bot] Feb 25, 2026
39a2aee
fix: Clear state BEFORE constructing full_refresh_stream in stale cur…
devin-ai-integration[bot] Mar 2, 2026
9021315
Revert "fix: Clear state BEFORE constructing full_refresh_stream in s…
agarctfi Mar 2, 2026
5c6c88b
fix: cursor age validation to clear state before constructing full re…
agarctfi Mar 2, 2026
99119a9
Feat: Add interpolation support for API Retention Period
agarctfi Mar 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3752,6 +3752,22 @@ definitions:
title: Incremental Stream
description: Component used to coordinate how records are extracted across stream slices and request pages when the state provided.
"$ref": "#/definitions/DeclarativeStream"
api_retention_period:
title: API Retention Period
description: |
The data retention period of the incremental API (ISO8601 duration). If the cursor value is older than this retention period, the connector will automatically fall back to a full refresh to avoid data loss.
This is useful for APIs like Stripe Events API which only retain data for 30 days.
* **PT1H**: 1 hour
* **P1D**: 1 day
* **P1W**: 1 week
* **P1M**: 1 month
* **P1Y**: 1 year
* **P30D**: 30 days
Comment thread
agarctfi marked this conversation as resolved.
Outdated
type: string
examples:
- "P30D"
- "P90D"
- "P1Y"
$parameters:
type: object
additionalProperties: true
Expand Down
152 changes: 92 additions & 60 deletions airbyte_cdk/sources/declarative/models/declarative_component_schema.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.

# generated by datamodel-codegen:
# filename: declarative_component_schema.yaml

Expand Down Expand Up @@ -930,24 +928,28 @@ class OAuthConfigSpecification(BaseModel):
class Config:
extra = Extra.allow

oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field(
None,
description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }",
examples=[
{"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}},
{
"app_id": {
"type": "string",
"path_in_connector_config": ["info", "app_id"],
}
},
],
title="OAuth user input",
oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = (
Field(
None,
description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }",
examples=[
{"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}},
{
"app_id": {
"type": "string",
"path_in_connector_config": ["info", "app_id"],
}
},
],
title="OAuth user input",
)
)
oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field(
None,
description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }',
title="DeclarativeOAuth Connector Specification",
oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = (
Field(
None,
description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }',
title="DeclarativeOAuth Connector Specification",
)
)
complete_oauth_output_specification: Optional[Dict[str, Any]] = Field(
None,
Expand All @@ -965,7 +967,9 @@ class Config:
complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field(
None,
description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }",
examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}],
examples=[
{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}
],
title="OAuth input specification",
)
complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field(
Expand Down Expand Up @@ -1469,7 +1473,9 @@ class CustomConfigTransformation(BaseModel):
class_name: str = Field(
...,
description="Fully-qualified name of the class that will be implementing the custom config transformation. The format is `source_<name>.<package>.<class_name>`.",
examples=["source_declarative_manifest.components.MyCustomConfigTransformation"],
examples=[
"source_declarative_manifest.components.MyCustomConfigTransformation"
],
)
parameters: Optional[Dict[str, Any]] = Field(
None,
Expand Down Expand Up @@ -1897,7 +1903,9 @@ class OAuthAuthenticator(BaseModel):
scopes: Optional[List[str]] = Field(
None,
description="List of scopes that should be granted to the access token.",
examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]],
examples=[
["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]
],
title="Scopes",
)
token_expiry_date: Optional[str] = Field(
Expand Down Expand Up @@ -2124,7 +2132,9 @@ class RecordSelector(BaseModel):
description="Responsible for filtering records to be emitted by the Source.",
title="Record Filter",
)
schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field(
schema_normalization: Optional[
Union[SchemaNormalization, CustomSchemaNormalization]
] = Field(
None,
description="Responsible for normalization according to the schema.",
title="Schema Normalization",
Expand Down Expand Up @@ -2166,10 +2176,12 @@ class DpathValidator(BaseModel):
],
title="Field Path",
)
validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = Field(
...,
description="The condition that the specified config value will be evaluated against",
title="Validation Strategy",
validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = (
Field(
...,
description="The condition that the specified config value will be evaluated against",
title="Validation Strategy",
)
)


Expand All @@ -2186,10 +2198,12 @@ class PredicateValidator(BaseModel):
],
title="Value",
)
validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = Field(
...,
description="The validation strategy to apply to the value.",
title="Validation Strategy",
validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = (
Field(
...,
description="The validation strategy to apply to the value.",
title="Validation Strategy",
)
)


Expand All @@ -2214,12 +2228,12 @@ class ConfigAddFields(BaseModel):

class CompositeErrorHandler(BaseModel):
type: Literal["CompositeErrorHandler"]
error_handlers: List[Union[CompositeErrorHandler, DefaultErrorHandler, CustomErrorHandler]] = (
Field(
...,
description="List of error handlers to iterate on to determine how to handle a failed response.",
title="Error Handlers",
)
error_handlers: List[
Union[CompositeErrorHandler, DefaultErrorHandler, CustomErrorHandler]
] = Field(
...,
description="List of error handlers to iterate on to determine how to handle a failed response.",
title="Error Handlers",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")

Expand Down Expand Up @@ -2381,9 +2395,9 @@ class Config:

type: Literal["DeclarativeSource"]
check: Union[CheckStream, CheckDynamicStream]
streams: Optional[List[Union[ConditionalStreams, DeclarativeStream, StateDelegatingStream]]] = (
None
)
streams: Optional[
List[Union[ConditionalStreams, DeclarativeStream, StateDelegatingStream]]
] = None
dynamic_streams: List[DynamicDeclarativeStream]
version: str = Field(
...,
Expand Down Expand Up @@ -2508,16 +2522,20 @@ class Config:
extra = Extra.allow

type: Literal["DeclarativeStream"]
name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name")
name: Optional[str] = Field(
"", description="The stream name.", example=["Users"], title="Name"
)
retriever: Union[SimpleRetriever, AsyncRetriever, CustomRetriever] = Field(
...,
description="Component used to coordinate how records are extracted across stream slices and request pages.",
title="Retriever",
)
incremental_sync: Optional[Union[DatetimeBasedCursor, IncrementingCountCursor]] = Field(
None,
description="Component used to fetch data incrementally based on a time field in the data.",
title="Incremental Sync",
incremental_sync: Optional[Union[DatetimeBasedCursor, IncrementingCountCursor]] = (
Field(
None,
description="Component used to fetch data incrementally based on a time field in the data.",
title="Incremental Sync",
)
)
primary_key: Optional[PrimaryKey] = Field("", title="Primary Key")
schema_loader: Optional[
Expand Down Expand Up @@ -2691,18 +2709,20 @@ class HttpRequester(BaseModelWithDeprecations):
description="For APIs that require explicit specification of the properties to query for, this component will take a static or dynamic set of properties (which can be optionally split into chunks) and allow them to be injected into an outbound request by accessing stream_partition.extra_fields.",
title="Query Properties",
)
request_parameters: Optional[Union[Dict[str, Union[str, QueryProperties]], str]] = Field(
None,
description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.",
examples=[
{"unit": "day"},
{
"query": 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"'
},
{"searchIn": "{{ ','.join(config.get('search_in', [])) }}"},
{"sort_by[asc]": "updated_at"},
],
title="Query Parameters",
request_parameters: Optional[Union[Dict[str, Union[str, QueryProperties]], str]] = (
Field(
None,
description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.",
examples=[
{"unit": "day"},
{
"query": 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"'
},
{"searchIn": "{{ ','.join(config.get('search_in', [])) }}"},
{"sort_by[asc]": "updated_at"},
],
title="Query Parameters",
)
)
request_headers: Optional[Union[Dict[str, str], str]] = Field(
None,
Expand Down Expand Up @@ -2874,7 +2894,9 @@ class QueryProperties(BaseModel):

class StateDelegatingStream(BaseModel):
type: Literal["StateDelegatingStream"]
name: str = Field(..., description="The stream name.", example=["Users"], title="Name")
name: str = Field(
..., description="The stream name.", example=["Users"], title="Name"
)
full_refresh_stream: DeclarativeStream = Field(
...,
description="Component used to coordinate how records are extracted across stream slices and request pages when the state is empty or not provided.",
Expand All @@ -2885,6 +2907,12 @@ class StateDelegatingStream(BaseModel):
description="Component used to coordinate how records are extracted across stream slices and request pages when the state provided.",
title="Incremental Stream",
)
api_retention_period: Optional[str] = Field(
None,
description="The data retention period of the incremental API (ISO8601 duration). If the cursor value is older than this retention period, the connector will automatically fall back to a full refresh to avoid data loss.\nThis is useful for APIs like Stripe Events API which only retain data for 30 days.\n * **PT1H**: 1 hour\n * **P1D**: 1 day\n * **P1W**: 1 week\n * **P1M**: 1 month\n * **P1Y**: 1 year\n * **P30D**: 30 days\n",
examples=["P30D", "P90D", "P1Y"],
title="API Retention Period",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


Expand Down Expand Up @@ -2961,13 +2989,17 @@ class AsyncRetriever(BaseModel):
status_extractor: Union[DpathExtractor, CustomRecordExtractor] = Field(
..., description="Responsible for fetching the actual status of the async job."
)
download_target_extractor: Optional[Union[DpathExtractor, CustomRecordExtractor]] = Field(
download_target_extractor: Optional[
Union[DpathExtractor, CustomRecordExtractor]
] = Field(
None,
description="Responsible for fetching the final result `urls` provided by the completed / finished / ready async job.",
)
download_extractor: Optional[
Union[DpathExtractor, CustomRecordExtractor, ResponseToFileExtractor]
] = Field(None, description="Responsible for fetching the records from provided urls.")
] = Field(
None, description="Responsible for fetching the records from provided urls."
)
creation_requester: Union[HttpRequester, CustomRequester] = Field(
...,
description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.",
Expand Down
Loading
Loading