Skip to content

Commit 1531b39

Browse files
refactor: Use stream cursor for retention period check, remove legacy get_cursor_datetime_from_state
- Rewrite create_state_delegating_stream to create actual stream object and extract cursor - Add model-level check for IncrementingCountCursor with api_retention_period - Delegate cursor datetime extraction to cursor's get_cursor_datetime_from_state method - Remove get_cursor_datetime_from_state from legacy cursors (DeclarativeCursor, DatetimeBasedCursor, PerPartitionCursor) - Remove factory helper methods (_create_cursor_for_age_check, _get_state_delegating_stream_model) - Update tests to match new behavior Co-Authored-By: gl_anatolii.yatsuk@airbyte.io <gl_anatolii.yatsuk@airbyte.io>
1 parent 43dc47e commit 1531b39

5 files changed

Lines changed: 54 additions & 156 deletions

File tree

airbyte_cdk/legacy/sources/declarative/incremental/datetime_based_cursor.py

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -212,26 +212,6 @@ def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[S
212212
# through each slice and does not belong to a specific slice. We just return stream state as it is.
213213
return self.get_stream_state()
214214

215-
def get_cursor_datetime_from_state(
216-
self, stream_state: Mapping[str, Any]
217-
) -> Optional[datetime.datetime]:
218-
"""Extract and parse the cursor datetime from the given stream state.
219-
220-
Returns the cursor datetime if present and parseable, otherwise returns None.
221-
"""
222-
cursor_field_key = self.cursor_field.eval(self.config) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__
223-
if cursor_field_key not in stream_state:
224-
return None
225-
226-
cursor_value = stream_state.get(cursor_field_key)
227-
if not cursor_value:
228-
return None
229-
230-
try:
231-
return self.parse_date(str(cursor_value))
232-
except ValueError:
233-
return None
234-
235215
def _calculate_earliest_possible_value(
236216
self, end_datetime: datetime.datetime
237217
) -> datetime.datetime:
Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
22

3-
import datetime
43
from abc import ABC
5-
from typing import Any, Mapping, Optional
64

75
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
86
from airbyte_cdk.sources.streams.checkpoint.cursor import Cursor
@@ -13,24 +11,3 @@ class DeclarativeCursor(Cursor, StreamSlicer, ABC):
1311
DeclarativeCursors are components that allow for checkpointing syncs. In addition to managing the fetching and updating of
1412
state, declarative cursors also manage stream slicing and injecting slice values into outbound requests.
1513
"""
16-
17-
def get_cursor_datetime_from_state(
18-
self, stream_state: Mapping[str, Any]
19-
) -> Optional[datetime.datetime]:
20-
"""Extract and parse the cursor datetime from the given stream state.
21-
22-
This method is used by StateDelegatingStream to validate cursor age against
23-
an API's data retention period. Subclasses should implement this method to
24-
extract the cursor value from their specific state structure and parse it
25-
into a datetime object.
26-
27-
Returns None if the cursor cannot be extracted or parsed, which will cause
28-
StateDelegatingStream to fall back to full refresh (safe default).
29-
30-
Raises NotImplementedError by default - subclasses must implement this method
31-
if they want to support cursor age validation with api_retention_period.
32-
"""
33-
raise NotImplementedError(
34-
f"{self.__class__.__name__} does not implement get_cursor_datetime_from_state. "
35-
f"Cursor age validation with api_retention_period is not supported for this cursor type."
36-
)

airbyte_cdk/legacy/sources/declarative/incremental/per_partition_cursor.py

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
44

5-
import datetime
65
import logging
76
from collections import OrderedDict
87
from typing import Any, Callable, Iterable, Mapping, Optional, Union
@@ -212,23 +211,6 @@ def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[S
212211

213212
return self._get_state_for_partition(stream_slice.partition)
214213

215-
def get_cursor_datetime_from_state(
216-
self, stream_state: Mapping[str, Any]
217-
) -> Optional[datetime.datetime]:
218-
"""Extract and parse the cursor datetime from the global cursor in per-partition state.
219-
220-
For per-partition cursors, the global cursor is stored under the "state" key.
221-
This method delegates to the underlying cursor factory to parse the datetime.
222-
223-
Returns None if the global cursor is not present or cannot be parsed.
224-
"""
225-
global_state = stream_state.get("state")
226-
if not global_state or not isinstance(global_state, dict):
227-
return None
228-
229-
cursor = self._cursor_factory.create()
230-
return cursor.get_cursor_datetime_from_state(global_state)
231-
232214
def _create_cursor(self, cursor_state: Any) -> DeclarativeCursor:
233215
cursor = self._cursor_factory.create()
234216
cursor.set_initial_state(cursor_state)

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 38 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -3565,86 +3565,70 @@ def create_state_delegating_stream(
35653565
f"state_delegating_stream, full_refresh_stream name and incremental_stream must have equal names. Instead has {model.name}, {model.full_refresh_stream.name} and {model.incremental_stream.name}."
35663566
)
35673567

3568-
stream_model = self._get_state_delegating_stream_model(
3569-
False if has_parent_state is None else has_parent_state, model, config
3570-
)
3571-
3572-
return self._create_component_from_model(stream_model, config=config, **kwargs) # type: ignore[no-any-return] # DeclarativeStream will be created as stream_model is alwyas DeclarativeStreamModel
3568+
if model.api_retention_period:
3569+
for stream_model in (model.full_refresh_stream, model.incremental_stream):
3570+
if isinstance(stream_model.incremental_sync, IncrementingCountCursorModel):
3571+
raise ValueError(
3572+
f"Stream '{model.name}' uses IncrementingCountCursor which is not supported "
3573+
f"with api_retention_period. IncrementingCountCursor does not use datetime-based "
3574+
f"cursors, so cursor age validation cannot be performed."
3575+
)
35733576

3574-
def _get_state_delegating_stream_model(
3575-
self, has_parent_state: bool, model: StateDelegatingStreamModel, config: Config
3576-
) -> DeclarativeStreamModel:
35773577
stream_state = self._connector_state_manager.get_stream_state(model.name, None)
3578+
has_parent = False if has_parent_state is None else has_parent_state
35783579

3579-
if not stream_state and not has_parent_state:
3580-
return model.full_refresh_stream
3580+
if not stream_state and not has_parent:
3581+
return self._create_component_from_model(model.full_refresh_stream, config=config, **kwargs) # type: ignore[no-any-return]
3582+
3583+
incremental_stream: DefaultStream = self._create_component_from_model(model.incremental_stream, config=config, **kwargs) # type: ignore[assignment]
35813584

35823585
if model.api_retention_period and stream_state:
3583-
incremental_sync_sources = [
3584-
model.full_refresh_stream.incremental_sync,
3585-
model.incremental_stream.incremental_sync,
3586-
]
3587-
incremental_sync_sources = [s for s in incremental_sync_sources if s is not None]
3588-
if incremental_sync_sources and self._is_cursor_older_than_retention_period(
3589-
stream_state,
3590-
incremental_sync_sources,
3591-
model.api_retention_period,
3592-
model.name,
3593-
config,
3586+
cursor = incremental_stream.cursor
3587+
if self._is_cursor_older_than_retention_period(
3588+
stream_state, cursor, model.api_retention_period, model.name
35943589
):
3595-
return model.full_refresh_stream
3590+
return self._create_component_from_model(model.full_refresh_stream, config=config, **kwargs) # type: ignore[no-any-return]
35963591

3597-
return model.incremental_stream
3592+
return incremental_stream
35983593

3594+
@staticmethod
35993595
def _is_cursor_older_than_retention_period(
3600-
self,
36013596
stream_state: Mapping[str, Any],
3602-
incremental_sync_sources: list[Any],
3597+
cursor: Any,
36033598
api_retention_period: str,
36043599
stream_name: str,
3605-
config: Config,
36063600
) -> bool:
36073601
"""Check if the cursor value in the state is older than the API's retention period.
36083602
3609-
Delegates cursor datetime extraction to cursor class instances via
3610-
get_cursor_datetime_from_state, which handles format-specific parsing.
3603+
Delegates cursor datetime extraction to the cursor instance via
3604+
get_cursor_datetime_from_state.
36113605
36123606
Returns True if the cursor is older than the retention period (should use full refresh).
36133607
Returns False if the cursor is within the retention period (safe to use incremental).
3614-
Raises ValueError if the cursor datetime could not be parsed from state.
36153608
"""
3616-
# Skip retention check for concurrent state format (e.g. {"state_type": "date-range", "slices": [...]}).
3617-
# The DatetimeBasedCursor used for the age check only handles sequential state format.
3618-
# Today, is_sequential_state=True is hardcoded for all declarative cursors, so concurrent
3619-
# format state should never appear in practice. If that changes in the future, this guard
3620-
# prevents spurious full-refresh fallbacks until proper concurrent cursor delegation is added.
3621-
if "state_type" in stream_state or "slices" in stream_state:
3622-
return False
3623-
3624-
datetime_cursor_sources = [
3625-
s for s in incremental_sync_sources if isinstance(s, DatetimeBasedCursorModel)
3626-
]
3627-
if not datetime_cursor_sources:
3628-
return False
3609+
if not hasattr(cursor, "get_cursor_datetime_from_state"):
3610+
raise SystemError(
3611+
f"Stream '{stream_name}' cursor type '{type(cursor).__name__}' does not have "
3612+
f"get_cursor_datetime_from_state method. Cursor age validation with "
3613+
f"api_retention_period is not supported for this cursor type."
3614+
)
36293615

3630-
cursor_datetime: datetime.datetime | None = None
3631-
for incremental_sync in datetime_cursor_sources:
3632-
cursor = self._create_cursor_for_age_check(incremental_sync, config)
3616+
try:
36333617
cursor_datetime = cursor.get_cursor_datetime_from_state(stream_state)
3634-
if cursor_datetime is not None:
3635-
break
3618+
except NotImplementedError:
3619+
raise SystemError(
3620+
f"Stream '{stream_name}' cursor type '{type(cursor).__name__}' does not implement "
3621+
f"get_cursor_datetime_from_state. Cursor age validation with "
3622+
f"api_retention_period is not supported for this cursor type."
3623+
)
3624+
3625+
if cursor_datetime is None:
36363626
global_state = stream_state.get("state")
36373627
if isinstance(global_state, dict):
36383628
cursor_datetime = cursor.get_cursor_datetime_from_state(global_state)
3639-
if cursor_datetime is not None:
3640-
break
36413629

36423630
if cursor_datetime is None:
3643-
raise ValueError(
3644-
f"Stream '{stream_name}' has api_retention_period set to '{api_retention_period}' "
3645-
f"but the cursor datetime could not be parsed from state. Check that cursor_field "
3646-
f"and datetime_format match the state format."
3647-
)
3631+
return True
36483632

36493633
retention_duration = parse_duration(api_retention_period)
36503634
retention_cutoff = datetime.datetime.now(datetime.timezone.utc) - retention_duration
@@ -3660,24 +3644,6 @@ def _is_cursor_older_than_retention_period(
36603644

36613645
return False
36623646

3663-
@staticmethod
3664-
def _create_cursor_for_age_check(
3665-
model: DatetimeBasedCursorModel, config: Config
3666-
) -> "DatetimeBasedCursor":
3667-
"""Create a lightweight DatetimeBasedCursor for cursor age validation."""
3668-
from airbyte_cdk.legacy.sources.declarative.incremental.datetime_based_cursor import (
3669-
DatetimeBasedCursor as _DatetimeBasedCursor,
3670-
)
3671-
3672-
return _DatetimeBasedCursor(
3673-
start_datetime="2000-01-01T00:00:00Z",
3674-
cursor_field=model.cursor_field,
3675-
datetime_format=model.datetime_format,
3676-
config=config,
3677-
parameters=model.parameters or {},
3678-
cursor_datetime_formats=model.cursor_datetime_formats or [],
3679-
)
3680-
36813647
def _create_async_job_status_mapping(
36823648
self, model: AsyncJobStatusMapModel, config: Config, **kwargs: Any
36833649
) -> Mapping[str, AsyncJobStatus]:

unit_tests/sources/declarative/test_state_delegating_stream.py

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -527,33 +527,26 @@ def _create_manifest_with_incrementing_count_cursor(api_retention_period: str) -
527527
return manifest
528528

529529

530-
def test_cursor_age_validation_skips_incrementing_count_cursor():
531-
"""Test that IncrementingCountCursor with api_retention_period is silently skipped (no error, uses incremental)."""
530+
def test_cursor_age_validation_raises_error_for_incrementing_count_cursor():
531+
"""Test that IncrementingCountCursor with api_retention_period raises ValueError."""
532532
manifest = _create_manifest_with_incrementing_count_cursor("P7D")
533533

534-
with HttpMocker() as http_mocker:
535-
http_mocker.get(
536-
HttpRequest(url="https://api.test.com/items_with_filtration"),
537-
HttpResponse(body=json.dumps([{"id": 101, "updated_at": "2024-07-14"}])),
534+
state = [
535+
AirbyteStateMessage(
536+
type=AirbyteStateType.STREAM,
537+
stream=AirbyteStreamState(
538+
stream_descriptor=StreamDescriptor(name="TestStream", namespace=None),
539+
stream_state=AirbyteStateBlob(id=100),
540+
),
538541
)
542+
]
539543

540-
state = [
541-
AirbyteStateMessage(
542-
type=AirbyteStateType.STREAM,
543-
stream=AirbyteStreamState(
544-
stream_descriptor=StreamDescriptor(name="TestStream", namespace=None),
545-
stream_state=AirbyteStateBlob(id=100),
546-
),
547-
)
548-
]
549-
550-
source = ConcurrentDeclarativeSource(
551-
source_config=manifest, config=_CONFIG, catalog=None, state=state
552-
)
553-
configured_catalog = create_configured_catalog(source, _CONFIG)
544+
source = ConcurrentDeclarativeSource(
545+
source_config=manifest, config=_CONFIG, catalog=None, state=state
546+
)
554547

555-
records = get_records(source, _CONFIG, configured_catalog, state)
556-
assert len(records) >= 0
548+
with pytest.raises(ValueError, match="IncrementingCountCursor"):
549+
source.discover(logger=MagicMock(), config=_CONFIG)
557550

558551

559552
def test_cursor_age_validation_raises_error_for_unparseable_cursor():
@@ -574,5 +567,5 @@ def test_cursor_age_validation_raises_error_for_unparseable_cursor():
574567
source_config=manifest, config=_CONFIG, catalog=None, state=state
575568
)
576569

577-
with pytest.raises(ValueError, match="could not be parsed from state"):
570+
with pytest.raises(ValueError, match="not-a-date"):
578571
source.discover(logger=MagicMock(), config=_CONFIG)

0 commit comments

Comments
 (0)