Skip to content

Commit 567ca7a

Browse files
fix: Parse cursor from both full_refresh_stream and incremental_stream
Address tolik0's review comment: During the first sync, the state will be produced by full_refresh_stream, and during subsequent syncs, by incremental_stream. We need to correctly parse the state for both cases. Changes: - Extract incremental_sync from both full_refresh_stream and incremental_stream - Update _is_cursor_older_than_retention_period to accept list of sources - Update _parse_cursor_datetime to collect and try formats from all sources Co-Authored-By: unknown <>
1 parent 86d5ea6 commit 567ca7a

1 file changed

Lines changed: 35 additions & 14 deletions

File tree

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3575,9 +3575,13 @@ def _get_state_delegating_stream_model(
35753575
return model.full_refresh_stream
35763576

35773577
if model.api_retention_period and stream_state:
3578-
incremental_sync = model.incremental_stream.incremental_sync
3579-
if incremental_sync and self._is_cursor_older_than_retention_period(
3580-
stream_state, incremental_sync, model.api_retention_period, model.name
3578+
incremental_sync_sources = [
3579+
model.full_refresh_stream.incremental_sync,
3580+
model.incremental_stream.incremental_sync,
3581+
]
3582+
incremental_sync_sources = [s for s in incremental_sync_sources if s is not None]
3583+
if incremental_sync_sources and self._is_cursor_older_than_retention_period(
3584+
stream_state, incremental_sync_sources, model.api_retention_period, model.name
35813585
):
35823586
return model.full_refresh_stream
35833587

@@ -3586,7 +3590,7 @@ def _get_state_delegating_stream_model(
35863590
def _is_cursor_older_than_retention_period(
35873591
self,
35883592
stream_state: Mapping[str, Any],
3589-
incremental_sync: Any,
3593+
incremental_sync_sources: list[Any],
35903594
api_retention_period: str,
35913595
stream_name: str,
35923596
) -> bool:
@@ -3595,11 +3599,19 @@ def _is_cursor_older_than_retention_period(
35953599
If the cursor is too old, the incremental API may not have data going back that far,
35963600
so we should fall back to a full refresh to avoid data loss.
35973601
3602+
The state could have been produced by either full_refresh_stream (first sync) or
3603+
incremental_stream (subsequent syncs), so we try parsing with formats from both.
3604+
35983605
Returns True if the cursor is older than the retention period or if the cursor is
35993606
invalid/unparseable (should use full refresh).
36003607
Returns False if the cursor is within the retention period (safe to use incremental).
36013608
"""
3602-
cursor_field = getattr(incremental_sync, "cursor_field", None)
3609+
cursor_field = None
3610+
for incremental_sync in incremental_sync_sources:
3611+
cursor_field = getattr(incremental_sync, "cursor_field", None)
3612+
if cursor_field:
3613+
break
3614+
36033615
if not cursor_field:
36043616
return True
36053617

@@ -3616,7 +3628,7 @@ def _is_cursor_older_than_retention_period(
36163628
retention_cutoff = datetime.datetime.now(datetime.timezone.utc) - retention_duration
36173629

36183630
cursor_datetime = self._parse_cursor_datetime(
3619-
cursor_value_str, incremental_sync, stream_name
3631+
cursor_value_str, incremental_sync_sources, stream_name
36203632
)
36213633
if cursor_datetime is None:
36223634
return True
@@ -3632,16 +3644,25 @@ def _is_cursor_older_than_retention_period(
36323644
def _parse_cursor_datetime(
36333645
self,
36343646
cursor_value: str,
3635-
incremental_sync: Any,
3647+
incremental_sync_sources: list[Any],
36363648
stream_name: str,
3637-
) -> Optional[datetime.datetime]:
3638-
"""Parse the cursor value into a datetime object using the cursor's datetime formats."""
3639-
parser = DatetimeParser()
3649+
) -> datetime.datetime | None:
3650+
"""Parse the cursor value into a datetime object using datetime formats from all sources.
36403651
3641-
datetime_format = getattr(incremental_sync, "datetime_format", None)
3642-
cursor_datetime_formats = getattr(incremental_sync, "cursor_datetime_formats", None) or []
3652+
The state could have been produced by either full_refresh_stream (first sync) or
3653+
incremental_stream (subsequent syncs), so we try parsing with formats from both.
3654+
"""
3655+
parser = DatetimeParser()
36433656

3644-
formats_to_try = cursor_datetime_formats + ([datetime_format] if datetime_format else [])
3657+
formats_to_try: list[str] = []
3658+
for incremental_sync in incremental_sync_sources:
3659+
datetime_format = getattr(incremental_sync, "datetime_format", None)
3660+
cursor_datetime_formats = (
3661+
getattr(incremental_sync, "cursor_datetime_formats", None) or []
3662+
)
3663+
formats_to_try.extend(cursor_datetime_formats)
3664+
if datetime_format:
3665+
formats_to_try.append(datetime_format)
36453666

36463667
for fmt in formats_to_try:
36473668
try:
@@ -3651,7 +3672,7 @@ def _parse_cursor_datetime(
36513672

36523673
logging.warning(
36533674
f"Could not parse cursor value '{cursor_value}' for stream '{stream_name}' "
3654-
f"using formats {formats_to_try}. Skipping cursor age validation."
3675+
f"using formats {formats_to_try}. Falling back to full refresh."
36553676
)
36563677
return None
36573678

0 commit comments

Comments
 (0)