Skip to content

Commit 0e57414

Browse files
devin-ai-integration[bot]agarctfioctavia-squidington-iiigithub-code-quality[bot]tolik0
authored
feat(cdk): Add cursor age validation to StateDelegatingStream (#890)
Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: Alfredo Garcia <alfredo.garcia@hallmark.edu> Co-authored-by: octavia-squidington-iii <contact@airbyte.com> Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com> Co-authored-by: gl_anatolii.yatsuk@airbyte.io <gl_anatolii.yatsuk@airbyte.io> Co-authored-by: alfredo.garcia@airbyte.io
1 parent fd21b86 commit 0e57414

File tree

7 files changed

+1078
-27
lines changed

7 files changed

+1078
-27
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3801,6 +3801,22 @@ definitions:
38013801
title: Incremental Stream
38023802
description: Component used to coordinate how records are extracted across stream slices and request pages when the state provided.
38033803
"$ref": "#/definitions/DeclarativeStream"
3804+
api_retention_period:
3805+
title: API Retention Period
3806+
description: |
3807+
The data retention period of the incremental API (ISO8601 duration). If the cursor value is older than this retention period, the connector will automatically fall back to a full refresh to avoid data loss.
3808+
This is useful for APIs like Stripe Events API which only retain data for 30 days.
3809+
* **PT1H**: 1 hour
3810+
* **P1D**: 1 day
3811+
* **P1W**: 1 week
3812+
* **P1M**: 1 month
3813+
* **P1Y**: 1 year
3814+
* **P30D**: 30 days
3815+
type: string
3816+
examples:
3817+
- "P30D"
3818+
- "P90D"
3819+
- "P1Y"
38043820
$parameters:
38053821
type: object
38063822
additionalProperties: true

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#
44

55
import copy
6+
import datetime
67
import logging
78
import threading
89
import time
@@ -658,3 +659,21 @@ def get_global_state(
658659
if stream_state and "state" in stream_state
659660
else None
660661
)
662+
663+
def get_cursor_datetime_from_state(
664+
self, stream_state: Mapping[str, Any]
665+
) -> datetime.datetime | None:
666+
"""Extract and parse the cursor datetime from the global cursor in per-partition state.
667+
668+
For per-partition cursors, the global cursor is stored under the "state" key.
669+
This method delegates to the underlying cursor factory to parse the datetime.
670+
671+
Returns None if the global cursor is not present or cannot be parsed.
672+
"""
673+
global_state = stream_state.get(self._GLOBAL_STATE_KEY)
674+
if not global_state or not isinstance(global_state, dict):
675+
return None
676+
677+
# Create a cursor to delegate the parsing
678+
cursor = self._cursor_factory.create(stream_state={}, runtime_lookback_window=None)
679+
return cursor.get_cursor_datetime_from_state(global_state)

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2-
31
# generated by datamodel-codegen:
42
# filename: declarative_component_schema.yaml
53

@@ -2922,6 +2920,12 @@ class StateDelegatingStream(BaseModel):
29222920
description="Component used to coordinate how records are extracted across stream slices and request pages when the state provided.",
29232921
title="Incremental Stream",
29242922
)
2923+
api_retention_period: Optional[str] = Field(
2924+
None,
2925+
description="The data retention period of the incremental API (ISO8601 duration). If the cursor value is older than this retention period, the connector will automatically fall back to a full refresh to avoid data loss.\nThis is useful for APIs like Stripe Events API which only retain data for 30 days.\n * **PT1H**: 1 hour\n * **P1D**: 1 day\n * **P1W**: 1 week\n * **P1M**: 1 month\n * **P1Y**: 1 year\n * **P30D**: 30 days\n",
2926+
examples=["P30D", "P90D", "P1Y"],
2927+
title="API Retention Period",
2928+
)
29252929
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
29262930

29272931

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 124 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import re
1212
from functools import partial
1313
from typing import (
14+
TYPE_CHECKING,
1415
Any,
1516
Callable,
1617
Dict,
@@ -27,6 +28,11 @@
2728
get_type_hints,
2829
)
2930

31+
if TYPE_CHECKING:
32+
from airbyte_cdk.legacy.sources.declarative.incremental.datetime_based_cursor import (
33+
DatetimeBasedCursor,
34+
)
35+
3036
from airbyte_protocol_dataclasses.models import ConfiguredAirbyteStream
3137
from isodate import parse_duration
3238
from pydantic.v1 import BaseModel
@@ -3548,7 +3554,6 @@ def create_state_delegating_stream(
35483554
self,
35493555
model: StateDelegatingStreamModel,
35503556
config: Config,
3551-
has_parent_state: Optional[bool] = None,
35523557
**kwargs: Any,
35533558
) -> DefaultStream:
35543559
if (
@@ -3559,18 +3564,119 @@ def create_state_delegating_stream(
35593564
f"state_delegating_stream, full_refresh_stream name and incremental_stream must have equal names. Instead has {model.name}, {model.full_refresh_stream.name} and {model.incremental_stream.name}."
35603565
)
35613566

3562-
stream_model = self._get_state_delegating_stream_model(
3563-
False if has_parent_state is None else has_parent_state, model
3564-
)
3567+
# Resolve api_retention_period with config context (supports Jinja2 interpolation)
3568+
resolved_retention_period: Optional[str] = None
3569+
if model.api_retention_period:
3570+
interpolated_retention = InterpolatedString.create(
3571+
model.api_retention_period, parameters=model.parameters or {}
3572+
)
3573+
resolved_value = interpolated_retention.eval(config=config)
3574+
if resolved_value:
3575+
resolved_retention_period = str(resolved_value)
3576+
3577+
if resolved_retention_period:
3578+
for stream_model in (model.full_refresh_stream, model.incremental_stream):
3579+
if isinstance(stream_model.incremental_sync, IncrementingCountCursorModel):
3580+
raise ValueError(
3581+
f"Stream '{model.name}' uses IncrementingCountCursor which is not supported "
3582+
f"with api_retention_period. IncrementingCountCursor does not use datetime-based "
3583+
f"cursors, so cursor age validation cannot be performed."
3584+
)
3585+
3586+
stream_state = self._connector_state_manager.get_stream_state(model.name, None)
35653587

3566-
return self._create_component_from_model(stream_model, config=config, **kwargs) # type: ignore[no-any-return] # DeclarativeStream will be created as stream_model is alwyas DeclarativeStreamModel
3588+
if not stream_state:
3589+
return self._create_component_from_model( # type: ignore[no-any-return]
3590+
model.full_refresh_stream, config=config, **kwargs
3591+
)
3592+
3593+
incremental_stream: DefaultStream = self._create_component_from_model(
3594+
model.incremental_stream, config=config, **kwargs
3595+
) # type: ignore[assignment]
3596+
3597+
# Only run cursor age validation for streams that are in the configured
3598+
# catalog (or when no catalog was provided, e.g. during discover / connector
3599+
# builder). Streams not selected by the user but instantiated as parent-stream
3600+
# dependencies must not go through this path because it emits state messages
3601+
# that the destination does not know about, causing "Stream not found" crashes.
3602+
stream_is_in_catalog = (
3603+
not self._stream_name_to_configured_stream # no catalog → validate by default
3604+
or model.name in self._stream_name_to_configured_stream
3605+
)
3606+
if resolved_retention_period and stream_is_in_catalog:
3607+
full_refresh_stream: DefaultStream = self._create_component_from_model(
3608+
model.full_refresh_stream, config=config, **kwargs
3609+
) # type: ignore[assignment]
3610+
if self._is_cursor_older_than_retention_period(
3611+
stream_state,
3612+
full_refresh_stream.cursor,
3613+
incremental_stream.cursor,
3614+
resolved_retention_period,
3615+
model.name,
3616+
):
3617+
# Clear state BEFORE constructing the full_refresh_stream so that
3618+
# its cursor starts from start_date instead of the stale cursor.
3619+
self._connector_state_manager.update_state_for_stream(model.name, None, {})
3620+
state_message = self._connector_state_manager.create_state_message(model.name, None)
3621+
self._message_repository.emit_message(state_message)
3622+
return self._create_component_from_model( # type: ignore[no-any-return]
3623+
model.full_refresh_stream, config=config, **kwargs
3624+
)
3625+
3626+
return incremental_stream
3627+
3628+
@staticmethod
3629+
def _is_cursor_older_than_retention_period(
3630+
stream_state: Mapping[str, Any],
3631+
full_refresh_cursor: Cursor,
3632+
incremental_cursor: Cursor,
3633+
api_retention_period: str,
3634+
stream_name: str,
3635+
) -> bool:
3636+
"""Check if the cursor value in the state is older than the API's retention period.
3637+
3638+
Checks cursors in sequence: full refresh cursor first, then incremental cursor.
3639+
FinalStateCursor returns now() for completed full refresh state (NO_CURSOR_STATE_KEY),
3640+
which is always within retention, so we use incremental. For other states, it returns
3641+
None and we fall back to checking the incremental cursor.
3642+
3643+
Returns True if the cursor is older than the retention period (should use full refresh).
3644+
Returns False if the cursor is within the retention period (safe to use incremental).
3645+
"""
3646+
retention_duration = parse_duration(api_retention_period)
3647+
retention_cutoff = datetime.datetime.now(datetime.timezone.utc) - retention_duration
3648+
3649+
# Check full refresh cursor first
3650+
cursor_datetime = full_refresh_cursor.get_cursor_datetime_from_state(stream_state)
3651+
3652+
# If full refresh cursor returns None, check incremental cursor
3653+
if cursor_datetime is None:
3654+
cursor_datetime = incremental_cursor.get_cursor_datetime_from_state(stream_state)
3655+
3656+
if cursor_datetime is None:
3657+
# Neither cursor could parse the state - fall back to full refresh to be safe
3658+
return True
3659+
3660+
if cursor_datetime < retention_cutoff:
3661+
logging.warning(
3662+
f"Stream '{stream_name}' has a cursor value older than "
3663+
f"the API's retention period of {api_retention_period} "
3664+
f"(cutoff: {retention_cutoff.isoformat()}). "
3665+
f"Falling back to full refresh to avoid data loss."
3666+
)
3667+
return True
3668+
3669+
return False
35673670

35683671
def _get_state_delegating_stream_model(
3569-
self, has_parent_state: bool, model: StateDelegatingStreamModel
3672+
self,
3673+
model: StateDelegatingStreamModel,
3674+
parent_state: Optional[Mapping[str, Any]] = None,
35703675
) -> DeclarativeStreamModel:
3676+
"""Return the appropriate underlying stream model based on state."""
35713677
return (
35723678
model.incremental_stream
3573-
if self._connector_state_manager.get_stream_state(model.name, None) or has_parent_state
3679+
if self._connector_state_manager.get_stream_state(model.name, None) or parent_state
35743680
else model.full_refresh_stream
35753681
)
35763682

@@ -3901,17 +4007,13 @@ def create_substream_partition_router(
39014007
def create_parent_stream_config_with_substream_wrapper(
39024008
self, model: ParentStreamConfigModel, config: Config, *, stream_name: str, **kwargs: Any
39034009
) -> Any:
3904-
# getting the parent state
39054010
child_state = self._connector_state_manager.get_stream_state(stream_name, None)
39064011

3907-
# This flag will be used exclusively for StateDelegatingStream when a parent stream is created
3908-
has_parent_state = bool(
3909-
self._connector_state_manager.get_stream_state(stream_name, None)
3910-
if model.incremental_dependency
3911-
else False
4012+
parent_state: Optional[Mapping[str, Any]] = (
4013+
child_state if model.incremental_dependency and child_state else None
39124014
)
39134015
connector_state_manager = self._instantiate_parent_stream_state_manager(
3914-
child_state, config, model, has_parent_state
4016+
child_state, config, model, parent_state
39154017
)
39164018

39174019
substream_factory = ModelToComponentFactory(
@@ -3943,7 +4045,7 @@ def _instantiate_parent_stream_state_manager(
39434045
child_state: MutableMapping[str, Any],
39444046
config: Config,
39454047
model: ParentStreamConfigModel,
3946-
has_parent_state: bool,
4048+
parent_state: Optional[Mapping[str, Any]] = None,
39474049
) -> ConnectorStateManager:
39484050
"""
39494051
With DefaultStream, the state needs to be provided during __init__ of the cursor as opposed to the
@@ -3955,36 +4057,33 @@ def _instantiate_parent_stream_state_manager(
39554057
"""
39564058
if model.incremental_dependency and child_state:
39574059
parent_stream_name = model.stream.name or ""
3958-
parent_state = ConcurrentPerPartitionCursor.get_parent_state(
4060+
extracted_parent_state = ConcurrentPerPartitionCursor.get_parent_state(
39594061
child_state, parent_stream_name
39604062
)
39614063

3962-
if not parent_state:
3963-
# there are two migration cases: state value from child stream or from global state
3964-
parent_state = ConcurrentPerPartitionCursor.get_global_state(
4064+
if not extracted_parent_state:
4065+
extracted_parent_state = ConcurrentPerPartitionCursor.get_global_state(
39654066
child_state, parent_stream_name
39664067
)
39674068

3968-
if not parent_state and not isinstance(parent_state, dict):
4069+
if not extracted_parent_state and not isinstance(extracted_parent_state, dict):
39694070
cursor_values = child_state.values()
39704071
if cursor_values and len(cursor_values) == 1:
3971-
# We assume the child state is a pair `{<cursor_field>: <cursor_value>}` and we will use the
3972-
# cursor value as a parent state.
39734072
incremental_sync_model: Union[
39744073
DatetimeBasedCursorModel,
39754074
IncrementingCountCursorModel,
39764075
] = (
39774076
model.stream.incremental_sync # type: ignore # if we are there, it is because there is incremental_dependency and therefore there is an incremental_sync on the parent stream
39784077
if isinstance(model.stream, DeclarativeStreamModel)
39794078
else self._get_state_delegating_stream_model(
3980-
has_parent_state, model.stream
4079+
model.stream, parent_state=parent_state
39814080
).incremental_sync
39824081
)
39834082
cursor_field = InterpolatedString.create(
39844083
incremental_sync_model.cursor_field,
39854084
parameters=incremental_sync_model.parameters or {},
39864085
).eval(config)
3987-
parent_state = AirbyteStateMessage(
4086+
extracted_parent_state = AirbyteStateMessage(
39884087
type=AirbyteStateType.STREAM,
39894088
stream=AirbyteStreamState(
39904089
stream_descriptor=StreamDescriptor(
@@ -3995,7 +4094,7 @@ def _instantiate_parent_stream_state_manager(
39954094
),
39964095
),
39974096
)
3998-
return ConnectorStateManager([parent_state] if parent_state else [])
4097+
return ConnectorStateManager([extracted_parent_state] if extracted_parent_state else [])
39994098

40004099
return ConnectorStateManager([])
40014100

0 commit comments

Comments
 (0)