Skip to content

Commit cf01a75

Browse files
feat(cdk): Add cursor age validation to StateDelegatingStream
This adds an optional api_retention_period field to StateDelegatingStream that validates whether a cursor is within an API's data retention window before switching from full refresh to incremental sync. When the cursor value is older than the retention period, the connector automatically falls back to a full refresh to avoid data loss. This is useful for APIs like Stripe Events API which only retain data for 30 days. Key changes: - Add api_retention_period field to StateDelegatingStream schema (ISO8601 duration) - Implement cursor age validation in model_to_component_factory.py - Emit warning log when falling back to full refresh due to stale cursor - Add unit tests for cursor age validation Fixes: airbytehq/oncall#11103 Co-Authored-By: unknown <>
1 parent d3c9419 commit cf01a75

4 files changed

Lines changed: 325 additions & 64 deletions

File tree

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3752,6 +3752,22 @@ definitions:
37523752
title: Incremental Stream
37533753
description: Component used to coordinate how records are extracted across stream slices and request pages when the state provided.
37543754
"$ref": "#/definitions/DeclarativeStream"
3755+
api_retention_period:
3756+
title: API Retention Period
3757+
description: |
3758+
The data retention period of the incremental API (ISO8601 duration). If the cursor value is older than this retention period, the connector will automatically fall back to a full refresh to avoid data loss.
3759+
This is useful for APIs like Stripe Events API which only retain data for 30 days.
3760+
* **PT1H**: 1 hour
3761+
* **P1D**: 1 day
3762+
* **P1W**: 1 week
3763+
* **P1M**: 1 month
3764+
* **P1Y**: 1 year
3765+
* **P30D**: 30 days
3766+
type: string
3767+
examples:
3768+
- "P30D"
3769+
- "P90D"
3770+
- "P1Y"
37553771
$parameters:
37563772
type: object
37573773
additionalProperties: true

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 92 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2-
31
# generated by datamodel-codegen:
42
# filename: declarative_component_schema.yaml
53

@@ -930,24 +928,28 @@ class OAuthConfigSpecification(BaseModel):
930928
class Config:
931929
extra = Extra.allow
932930

933-
oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field(
934-
None,
935-
description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }",
936-
examples=[
937-
{"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}},
938-
{
939-
"app_id": {
940-
"type": "string",
941-
"path_in_connector_config": ["info", "app_id"],
942-
}
943-
},
944-
],
945-
title="OAuth user input",
931+
oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = (
932+
Field(
933+
None,
934+
description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }",
935+
examples=[
936+
{"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}},
937+
{
938+
"app_id": {
939+
"type": "string",
940+
"path_in_connector_config": ["info", "app_id"],
941+
}
942+
},
943+
],
944+
title="OAuth user input",
945+
)
946946
)
947-
oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field(
948-
None,
949-
description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }',
950-
title="DeclarativeOAuth Connector Specification",
947+
oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = (
948+
Field(
949+
None,
950+
description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }',
951+
title="DeclarativeOAuth Connector Specification",
952+
)
951953
)
952954
complete_oauth_output_specification: Optional[Dict[str, Any]] = Field(
953955
None,
@@ -965,7 +967,9 @@ class Config:
965967
complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field(
966968
None,
967969
description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }",
968-
examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}],
970+
examples=[
971+
{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}
972+
],
969973
title="OAuth input specification",
970974
)
971975
complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field(
@@ -1469,7 +1473,9 @@ class CustomConfigTransformation(BaseModel):
14691473
class_name: str = Field(
14701474
...,
14711475
description="Fully-qualified name of the class that will be implementing the custom config transformation. The format is `source_<name>.<package>.<class_name>`.",
1472-
examples=["source_declarative_manifest.components.MyCustomConfigTransformation"],
1476+
examples=[
1477+
"source_declarative_manifest.components.MyCustomConfigTransformation"
1478+
],
14731479
)
14741480
parameters: Optional[Dict[str, Any]] = Field(
14751481
None,
@@ -1897,7 +1903,9 @@ class OAuthAuthenticator(BaseModel):
18971903
scopes: Optional[List[str]] = Field(
18981904
None,
18991905
description="List of scopes that should be granted to the access token.",
1900-
examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]],
1906+
examples=[
1907+
["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]
1908+
],
19011909
title="Scopes",
19021910
)
19031911
token_expiry_date: Optional[str] = Field(
@@ -2124,7 +2132,9 @@ class RecordSelector(BaseModel):
21242132
description="Responsible for filtering records to be emitted by the Source.",
21252133
title="Record Filter",
21262134
)
2127-
schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field(
2135+
schema_normalization: Optional[
2136+
Union[SchemaNormalization, CustomSchemaNormalization]
2137+
] = Field(
21282138
None,
21292139
description="Responsible for normalization according to the schema.",
21302140
title="Schema Normalization",
@@ -2166,10 +2176,12 @@ class DpathValidator(BaseModel):
21662176
],
21672177
title="Field Path",
21682178
)
2169-
validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = Field(
2170-
...,
2171-
description="The condition that the specified config value will be evaluated against",
2172-
title="Validation Strategy",
2179+
validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = (
2180+
Field(
2181+
...,
2182+
description="The condition that the specified config value will be evaluated against",
2183+
title="Validation Strategy",
2184+
)
21732185
)
21742186

21752187

@@ -2186,10 +2198,12 @@ class PredicateValidator(BaseModel):
21862198
],
21872199
title="Value",
21882200
)
2189-
validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = Field(
2190-
...,
2191-
description="The validation strategy to apply to the value.",
2192-
title="Validation Strategy",
2201+
validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = (
2202+
Field(
2203+
...,
2204+
description="The validation strategy to apply to the value.",
2205+
title="Validation Strategy",
2206+
)
21932207
)
21942208

21952209

@@ -2214,12 +2228,12 @@ class ConfigAddFields(BaseModel):
22142228

22152229
class CompositeErrorHandler(BaseModel):
22162230
type: Literal["CompositeErrorHandler"]
2217-
error_handlers: List[Union[CompositeErrorHandler, DefaultErrorHandler, CustomErrorHandler]] = (
2218-
Field(
2219-
...,
2220-
description="List of error handlers to iterate on to determine how to handle a failed response.",
2221-
title="Error Handlers",
2222-
)
2231+
error_handlers: List[
2232+
Union[CompositeErrorHandler, DefaultErrorHandler, CustomErrorHandler]
2233+
] = Field(
2234+
...,
2235+
description="List of error handlers to iterate on to determine how to handle a failed response.",
2236+
title="Error Handlers",
22232237
)
22242238
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
22252239

@@ -2381,9 +2395,9 @@ class Config:
23812395

23822396
type: Literal["DeclarativeSource"]
23832397
check: Union[CheckStream, CheckDynamicStream]
2384-
streams: Optional[List[Union[ConditionalStreams, DeclarativeStream, StateDelegatingStream]]] = (
2385-
None
2386-
)
2398+
streams: Optional[
2399+
List[Union[ConditionalStreams, DeclarativeStream, StateDelegatingStream]]
2400+
] = None
23872401
dynamic_streams: List[DynamicDeclarativeStream]
23882402
version: str = Field(
23892403
...,
@@ -2508,16 +2522,20 @@ class Config:
25082522
extra = Extra.allow
25092523

25102524
type: Literal["DeclarativeStream"]
2511-
name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name")
2525+
name: Optional[str] = Field(
2526+
"", description="The stream name.", example=["Users"], title="Name"
2527+
)
25122528
retriever: Union[SimpleRetriever, AsyncRetriever, CustomRetriever] = Field(
25132529
...,
25142530
description="Component used to coordinate how records are extracted across stream slices and request pages.",
25152531
title="Retriever",
25162532
)
2517-
incremental_sync: Optional[Union[DatetimeBasedCursor, IncrementingCountCursor]] = Field(
2518-
None,
2519-
description="Component used to fetch data incrementally based on a time field in the data.",
2520-
title="Incremental Sync",
2533+
incremental_sync: Optional[Union[DatetimeBasedCursor, IncrementingCountCursor]] = (
2534+
Field(
2535+
None,
2536+
description="Component used to fetch data incrementally based on a time field in the data.",
2537+
title="Incremental Sync",
2538+
)
25212539
)
25222540
primary_key: Optional[PrimaryKey] = Field("", title="Primary Key")
25232541
schema_loader: Optional[
@@ -2691,18 +2709,20 @@ class HttpRequester(BaseModelWithDeprecations):
26912709
description="For APIs that require explicit specification of the properties to query for, this component will take a static or dynamic set of properties (which can be optionally split into chunks) and allow them to be injected into an outbound request by accessing stream_partition.extra_fields.",
26922710
title="Query Properties",
26932711
)
2694-
request_parameters: Optional[Union[Dict[str, Union[str, QueryProperties]], str]] = Field(
2695-
None,
2696-
description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.",
2697-
examples=[
2698-
{"unit": "day"},
2699-
{
2700-
"query": 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"'
2701-
},
2702-
{"searchIn": "{{ ','.join(config.get('search_in', [])) }}"},
2703-
{"sort_by[asc]": "updated_at"},
2704-
],
2705-
title="Query Parameters",
2712+
request_parameters: Optional[Union[Dict[str, Union[str, QueryProperties]], str]] = (
2713+
Field(
2714+
None,
2715+
description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.",
2716+
examples=[
2717+
{"unit": "day"},
2718+
{
2719+
"query": 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"'
2720+
},
2721+
{"searchIn": "{{ ','.join(config.get('search_in', [])) }}"},
2722+
{"sort_by[asc]": "updated_at"},
2723+
],
2724+
title="Query Parameters",
2725+
)
27062726
)
27072727
request_headers: Optional[Union[Dict[str, str], str]] = Field(
27082728
None,
@@ -2874,7 +2894,9 @@ class QueryProperties(BaseModel):
28742894

28752895
class StateDelegatingStream(BaseModel):
28762896
type: Literal["StateDelegatingStream"]
2877-
name: str = Field(..., description="The stream name.", example=["Users"], title="Name")
2897+
name: str = Field(
2898+
..., description="The stream name.", example=["Users"], title="Name"
2899+
)
28782900
full_refresh_stream: DeclarativeStream = Field(
28792901
...,
28802902
description="Component used to coordinate how records are extracted across stream slices and request pages when the state is empty or not provided.",
@@ -2885,6 +2907,12 @@ class StateDelegatingStream(BaseModel):
28852907
description="Component used to coordinate how records are extracted across stream slices and request pages when the state provided.",
28862908
title="Incremental Stream",
28872909
)
2910+
api_retention_period: Optional[str] = Field(
2911+
None,
2912+
description="The data retention period of the incremental API (ISO8601 duration). If the cursor value is older than this retention period, the connector will automatically fall back to a full refresh to avoid data loss.\nThis is useful for APIs like Stripe Events API which only retain data for 30 days.\n * **PT1H**: 1 hour\n * **P1D**: 1 day\n * **P1W**: 1 week\n * **P1M**: 1 month\n * **P1Y**: 1 year\n * **P30D**: 30 days\n",
2913+
examples=["P30D", "P90D", "P1Y"],
2914+
title="API Retention Period",
2915+
)
28882916
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
28892917

28902918

@@ -2961,13 +2989,17 @@ class AsyncRetriever(BaseModel):
29612989
status_extractor: Union[DpathExtractor, CustomRecordExtractor] = Field(
29622990
..., description="Responsible for fetching the actual status of the async job."
29632991
)
2964-
download_target_extractor: Optional[Union[DpathExtractor, CustomRecordExtractor]] = Field(
2992+
download_target_extractor: Optional[
2993+
Union[DpathExtractor, CustomRecordExtractor]
2994+
] = Field(
29652995
None,
29662996
description="Responsible for fetching the final result `urls` provided by the completed / finished / ready async job.",
29672997
)
29682998
download_extractor: Optional[
29692999
Union[DpathExtractor, CustomRecordExtractor, ResponseToFileExtractor]
2970-
] = Field(None, description="Responsible for fetching the records from provided urls.")
3000+
] = Field(
3001+
None, description="Responsible for fetching the records from provided urls."
3002+
)
29713003
creation_requester: Union[HttpRequester, CustomRequester] = Field(
29723004
...,
29733005
description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.",

0 commit comments

Comments
 (0)