Skip to content

Commit a2d4b56

Browse files
refactor: Wire factory to use cursor class get_cursor_datetime_from_state
Co-Authored-By: gl_anatolii.yatsuk@airbyte.io <gl_anatolii.yatsuk@airbyte.io>
1 parent fbda39f commit a2d4b56

1 file changed

Lines changed: 42 additions & 102 deletions

File tree

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 42 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@
7878
DynamicStreamCheckConfig,
7979
)
8080
from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel
81-
from airbyte_cdk.sources.declarative.datetime.datetime_parser import DatetimeParser
8281
from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime
8382
from airbyte_cdk.sources.declarative.decoders import (
8483
Decoder,
@@ -3561,13 +3560,13 @@ def create_state_delegating_stream(
35613560
)
35623561

35633562
stream_model = self._get_state_delegating_stream_model(
3564-
False if has_parent_state is None else has_parent_state, model
3563+
False if has_parent_state is None else has_parent_state, model, config
35653564
)
35663565

35673566
return self._create_component_from_model(stream_model, config=config, **kwargs) # type: ignore[no-any-return] # DeclarativeStream will be created as stream_model is alwyas DeclarativeStreamModel
35683567

35693568
def _get_state_delegating_stream_model(
3570-
self, has_parent_state: bool, model: StateDelegatingStreamModel
3569+
self, has_parent_state: bool, model: StateDelegatingStreamModel, config: Config
35713570
) -> DeclarativeStreamModel:
35723571
stream_state = self._connector_state_manager.get_stream_state(model.name, None)
35733572

@@ -3581,7 +3580,7 @@ def _get_state_delegating_stream_model(
35813580
]
35823581
incremental_sync_sources = [s for s in incremental_sync_sources if s is not None]
35833582
if incremental_sync_sources and self._is_cursor_older_than_retention_period(
3584-
stream_state, incremental_sync_sources, model.api_retention_period, model.name
3583+
stream_state, incremental_sync_sources, model.api_retention_period, model.name, config
35853584
):
35863585
return model.full_refresh_stream
35873586

@@ -3593,20 +3592,21 @@ def _is_cursor_older_than_retention_period(
35933592
incremental_sync_sources: list[Any],
35943593
api_retention_period: str,
35953594
stream_name: str,
3595+
config: Config,
35963596
) -> bool:
35973597
"""Check if the cursor value in the state is older than the API's retention period.
35983598
3599-
If the cursor is too old, the incremental API may not have data going back that far,
3600-
so we should fall back to a full refresh to avoid data loss.
3601-
3602-
This method handles different state structures:
3603-
- Simple cursor state: {"cursor_field": "value"}
3604-
- Per-partition state: {"state": {"cursor_field": "value"}, "states": [...]}
3599+
Delegates cursor datetime extraction to cursor class instances via
3600+
get_cursor_datetime_from_state, which handles format-specific parsing.
36053601
36063602
Returns True if the cursor is older than the retention period or if the cursor is
36073603
invalid/unparseable (should use full refresh).
36083604
Returns False if the cursor is within the retention period (safe to use incremental).
36093605
"""
3606+
from airbyte_cdk.legacy.sources.declarative.incremental.datetime_based_cursor import (
3607+
DatetimeBasedCursor,
3608+
)
3609+
36103610
for incremental_sync in incremental_sync_sources:
36113611
if isinstance(incremental_sync, IncrementingCountCursorModel):
36123612
raise ValueError(
@@ -3615,114 +3615,54 @@ def _is_cursor_older_than_retention_period(
36153615
f"cursors, so cursor age validation cannot be performed."
36163616
)
36173617

3618-
cursor_field = None
3618+
cursor_datetime: datetime.datetime | None = None
36193619
for incremental_sync in incremental_sync_sources:
3620-
cursor_field = getattr(incremental_sync, "cursor_field", None)
3621-
if cursor_field:
3620+
if not isinstance(incremental_sync, DatetimeBasedCursorModel):
3621+
continue
3622+
cursor = self._create_cursor_for_age_check(incremental_sync, config)
3623+
cursor_datetime = cursor.get_cursor_datetime_from_state(stream_state)
3624+
if cursor_datetime is not None:
36223625
break
3626+
global_state = stream_state.get("state")
3627+
if isinstance(global_state, dict):
3628+
cursor_datetime = cursor.get_cursor_datetime_from_state(global_state)
3629+
if cursor_datetime is not None:
3630+
break
36233631

3624-
if not cursor_field:
3625-
return True
3626-
3627-
cursor_value = self._extract_cursor_value_from_state(stream_state, cursor_field)
3628-
if not cursor_value:
3629-
return True
3630-
3631-
if not isinstance(cursor_value, (str, int)):
3632+
if cursor_datetime is None:
36323633
return True
36333634

3634-
cursor_value_str = str(cursor_value)
3635-
36363635
retention_duration = parse_duration(api_retention_period)
36373636
retention_cutoff = datetime.datetime.now(datetime.timezone.utc) - retention_duration
36383637

3639-
cursor_datetime = self._parse_cursor_datetime(
3640-
cursor_value_str, incremental_sync_sources, stream_name
3641-
)
3642-
if cursor_datetime is None:
3643-
return True
3644-
36453638
if cursor_datetime < retention_cutoff:
3646-
self._emit_warning_for_stale_cursor(
3647-
stream_name, cursor_value_str, api_retention_period, retention_cutoff
3639+
logging.warning(
3640+
f"Stream '{stream_name}' has a cursor value older than "
3641+
f"the API's retention period of {api_retention_period} "
3642+
f"(cutoff: {retention_cutoff.isoformat()}). "
3643+
f"Falling back to full refresh to avoid data loss."
36483644
)
36493645
return True
36503646

36513647
return False
36523648

3653-
def _extract_cursor_value_from_state(
3654-
self,
3655-
stream_state: Mapping[str, Any],
3656-
cursor_field: str,
3657-
) -> Any:
3658-
"""Extract cursor value from state, handling different state structures.
3659-
3660-
Supports:
3661-
- Simple cursor state: {"cursor_field": "value"} -> returns "value"
3662-
- Per-partition state: {"state": {"cursor_field": "value"}, ...} -> returns "value"
3663-
(uses global cursor from "state" key)
3664-
3665-
Returns None if cursor value cannot be extracted.
3666-
"""
3667-
if cursor_field in stream_state:
3668-
return stream_state.get(cursor_field)
3669-
3670-
global_state = stream_state.get("state")
3671-
if isinstance(global_state, dict) and cursor_field in global_state:
3672-
return global_state.get(cursor_field)
3673-
3674-
return None
3675-
3676-
def _parse_cursor_datetime(
3677-
self,
3678-
cursor_value: str,
3679-
incremental_sync_sources: list[Any],
3680-
stream_name: str,
3681-
) -> datetime.datetime | None:
3682-
"""Parse the cursor value into a datetime object using datetime formats from all sources.
3683-
3684-
The state could have been produced by either full_refresh_stream (first sync) or
3685-
incremental_stream (subsequent syncs), so we try parsing with formats from both.
3686-
"""
3687-
parser = DatetimeParser()
3688-
3689-
formats_to_try: list[str] = []
3690-
for incremental_sync in incremental_sync_sources:
3691-
datetime_format = getattr(incremental_sync, "datetime_format", None)
3692-
cursor_datetime_formats = (
3693-
getattr(incremental_sync, "cursor_datetime_formats", None) or []
3694-
)
3695-
formats_to_try.extend(cursor_datetime_formats)
3696-
if datetime_format:
3697-
formats_to_try.append(datetime_format)
3698-
3699-
for fmt in formats_to_try:
3700-
try:
3701-
return parser.parse(cursor_value, fmt)
3702-
except (ValueError, TypeError):
3703-
continue
3704-
3705-
logging.warning(
3706-
f"Could not parse cursor value '{cursor_value}' for stream '{stream_name}' "
3707-
f"using formats {formats_to_try}. Falling back to full refresh."
3649+
@staticmethod
3650+
def _create_cursor_for_age_check(
3651+
model: DatetimeBasedCursorModel, config: Config
3652+
) -> "DatetimeBasedCursor":
3653+
"""Create a lightweight DatetimeBasedCursor for cursor age validation."""
3654+
from airbyte_cdk.legacy.sources.declarative.incremental.datetime_based_cursor import (
3655+
DatetimeBasedCursor,
37083656
)
3709-
return None
37103657

3711-
def _emit_warning_for_stale_cursor(
3712-
self,
3713-
stream_name: str,
3714-
cursor_value: str,
3715-
api_retention_period: str,
3716-
retention_cutoff: datetime.datetime,
3717-
) -> None:
3718-
"""Emit a warning message when the cursor is older than the API's retention period."""
3719-
warning_message = (
3720-
f"Stream '{stream_name}' has a cursor value '{cursor_value}' that is older than "
3721-
f"the API's retention period of {api_retention_period} (cutoff: {retention_cutoff.isoformat()}). "
3722-
f"Falling back to full refresh to avoid data loss. "
3723-
f"This may happen if a previous sync failed mid-way and the state was checkpointed."
3724-
)
3725-
logging.warning(warning_message)
3658+
return DatetimeBasedCursor(
3659+
start_datetime="2000-01-01T00:00:00Z",
3660+
cursor_field=model.cursor_field,
3661+
datetime_format=model.datetime_format,
3662+
config=config,
3663+
parameters=model.parameters or {},
3664+
cursor_datetime_formats=model.cursor_datetime_formats or [],
3665+
)
37263666

37273667
def _create_async_job_status_mapping(
37283668
self, model: AsyncJobStatusMapModel, config: Config, **kwargs: Any

0 commit comments

Comments
 (0)