Skip to content

Commit b4c24c6

Browse files
fix: Try both full_refresh and incremental cursors for state parsing
The state format may match either the full refresh or incremental cursor, so we need to try both when checking cursor age against retention period. Co-Authored-By: gl_anatolii.yatsuk@airbyte.io <gl_anatolii.yatsuk@airbyte.io>
1 parent 1531b39 commit b4c24c6

1 file changed

Lines changed: 29 additions & 21 deletions

File tree

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3583,49 +3583,57 @@ def create_state_delegating_stream(
35833583
incremental_stream: DefaultStream = self._create_component_from_model(model.incremental_stream, config=config, **kwargs) # type: ignore[assignment]
35843584

35853585
if model.api_retention_period and stream_state:
3586-
cursor = incremental_stream.cursor
3586+
full_refresh_stream: DefaultStream = self._create_component_from_model(model.full_refresh_stream, config=config, **kwargs) # type: ignore[assignment]
3587+
cursors = [full_refresh_stream.cursor, incremental_stream.cursor]
35873588
if self._is_cursor_older_than_retention_period(
3588-
stream_state, cursor, model.api_retention_period, model.name
3589+
stream_state, cursors, model.api_retention_period, model.name
35893590
):
3590-
return self._create_component_from_model(model.full_refresh_stream, config=config, **kwargs) # type: ignore[no-any-return]
3591+
return full_refresh_stream
35913592

35923593
return incremental_stream
35933594

35943595
@staticmethod
35953596
def _is_cursor_older_than_retention_period(
35963597
stream_state: Mapping[str, Any],
3597-
cursor: Any,
3598+
cursors: list[Any],
35983599
api_retention_period: str,
35993600
stream_name: str,
36003601
) -> bool:
36013602
"""Check if the cursor value in the state is older than the API's retention period.
36023603
3603-
Delegates cursor datetime extraction to the cursor instance via
3604-
get_cursor_datetime_from_state.
3604+
Tries each cursor's get_cursor_datetime_from_state to extract the cursor datetime,
3605+
since the state format may match either the full refresh or incremental cursor.
36053606
36063607
Returns True if the cursor is older than the retention period (should use full refresh).
36073608
Returns False if the cursor is within the retention period (safe to use incremental).
36083609
"""
3609-
if not hasattr(cursor, "get_cursor_datetime_from_state"):
3610-
raise SystemError(
3611-
f"Stream '{stream_name}' cursor type '{type(cursor).__name__}' does not have "
3612-
f"get_cursor_datetime_from_state method. Cursor age validation with "
3613-
f"api_retention_period is not supported for this cursor type."
3614-
)
3610+
cursor_datetime: datetime.datetime | None = None
3611+
3612+
for cursor in cursors:
3613+
if not hasattr(cursor, "get_cursor_datetime_from_state"):
3614+
raise SystemError(
3615+
f"Stream '{stream_name}' cursor type '{type(cursor).__name__}' does not have "
3616+
f"get_cursor_datetime_from_state method. Cursor age validation with "
3617+
f"api_retention_period is not supported for this cursor type."
3618+
)
36153619

3616-
try:
3617-
cursor_datetime = cursor.get_cursor_datetime_from_state(stream_state)
3618-
except NotImplementedError:
3619-
raise SystemError(
3620-
f"Stream '{stream_name}' cursor type '{type(cursor).__name__}' does not implement "
3621-
f"get_cursor_datetime_from_state. Cursor age validation with "
3622-
f"api_retention_period is not supported for this cursor type."
3623-
)
3620+
try:
3621+
cursor_datetime = cursor.get_cursor_datetime_from_state(stream_state)
3622+
except NotImplementedError:
3623+
raise SystemError(
3624+
f"Stream '{stream_name}' cursor type '{type(cursor).__name__}' does not implement "
3625+
f"get_cursor_datetime_from_state. Cursor age validation with "
3626+
f"api_retention_period is not supported for this cursor type."
3627+
)
3628+
3629+
if cursor_datetime is not None:
3630+
break
36243631

3625-
if cursor_datetime is None:
36263632
global_state = stream_state.get("state")
36273633
if isinstance(global_state, dict):
36283634
cursor_datetime = cursor.get_cursor_datetime_from_state(global_state)
3635+
if cursor_datetime is not None:
3636+
break
36293637

36303638
if cursor_datetime is None:
36313639
return True

0 commit comments

Comments
 (0)