Skip to content

Commit 9021315

Browse files
committed
Revert "fix: Clear state BEFORE constructing full_refresh_stream in stale cursor fallback"
This reverts commit 39a2aee.
1 parent 39a2aee commit 9021315

2 files changed

Lines changed: 21 additions & 25 deletions

File tree

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -618,7 +618,6 @@
618618
NoopMessageRepository,
619619
)
620620
from airbyte_cdk.sources.message.repository import StateFilteringMessageRepository
621-
from airbyte_cdk.sources.streams import NO_CURSOR_STATE_KEY
622621
from airbyte_cdk.sources.streams.call_rate import (
623622
APIBudget,
624623
FixedWindowCallRatePolicy,
@@ -3595,50 +3594,53 @@ def create_state_delegating_stream(
35953594
or model.name in self._stream_name_to_configured_stream
35963595
)
35973596
if model.api_retention_period and stream_is_in_catalog:
3597+
full_refresh_stream: DefaultStream = self._create_component_from_model(
3598+
model.full_refresh_stream, config=config, **kwargs
3599+
) # type: ignore[assignment]
35983600
if self._is_cursor_older_than_retention_period(
35993601
stream_state,
3602+
full_refresh_stream.cursor,
36003603
incremental_stream.cursor,
36013604
model.api_retention_period,
36023605
model.name,
36033606
):
3604-
# Clear state BEFORE constructing the full_refresh_stream so that
3605-
# its cursor starts from start_date instead of the stale cursor.
36063607
self._connector_state_manager.update_state_for_stream(model.name, None, {})
36073608
state_message = self._connector_state_manager.create_state_message(model.name, None)
36083609
self._message_repository.emit_message(state_message)
3609-
return self._create_component_from_model( # type: ignore[no-any-return]
3610-
model.full_refresh_stream, config=config, **kwargs
3611-
)
3610+
return full_refresh_stream
36123611

36133612
return incremental_stream
36143613

36153614
@staticmethod
36163615
def _is_cursor_older_than_retention_period(
36173616
stream_state: Mapping[str, Any],
3617+
full_refresh_cursor: Cursor,
36183618
incremental_cursor: Cursor,
36193619
api_retention_period: str,
36203620
stream_name: str,
36213621
) -> bool:
36223622
"""Check if the cursor value in the state is older than the API's retention period.
36233623
3624-
If the state contains NO_CURSOR_STATE_KEY, the previous sync was a completed
3625-
full refresh and the cursor is considered current — returns False.
3626-
3627-
Otherwise, uses the incremental cursor to parse the datetime from state.
3624+
Checks cursors in sequence: full refresh cursor first, then incremental cursor.
3625+
FinalStateCursor returns now() for completed full refresh state (NO_CURSOR_STATE_KEY),
3626+
which is always within retention, so we use incremental. For other states, it returns
3627+
None and we fall back to checking the incremental cursor.
36283628
36293629
Returns True if the cursor is older than the retention period (should use full refresh).
36303630
Returns False if the cursor is within the retention period (safe to use incremental).
36313631
"""
3632-
if stream_state.get(NO_CURSOR_STATE_KEY):
3633-
return False
3634-
36353632
retention_duration = parse_duration(api_retention_period)
36363633
retention_cutoff = datetime.datetime.now(datetime.timezone.utc) - retention_duration
36373634

3638-
cursor_datetime = incremental_cursor.get_cursor_datetime_from_state(stream_state)
3635+
# Check full refresh cursor first
3636+
cursor_datetime = full_refresh_cursor.get_cursor_datetime_from_state(stream_state)
3637+
3638+
# If full refresh cursor returns None, check incremental cursor
3639+
if cursor_datetime is None:
3640+
cursor_datetime = incremental_cursor.get_cursor_datetime_from_state(stream_state)
36393641

36403642
if cursor_datetime is None:
3641-
# Cursor could not parse the state - fall back to full refresh to be safe
3643+
# Neither cursor could parse the state - fall back to full refresh to be safe
36423644
return True
36433645

36443646
if cursor_datetime < retention_cutoff:

unit_tests/sources/declarative/test_state_delegating_stream.py

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -613,20 +613,14 @@ def test_cursor_age_validation_raises_error_for_unparseable_cursor():
613613

614614

615615
@freezegun.freeze_time("2024-07-15")
616-
def test_no_cursor_state_key_uses_incremental_not_full_refresh():
617-
"""When state has NO_CURSOR_STATE_KEY, the previous sync was a completed full refresh.
618-
619-
The cursor is considered current so the stream should use incremental sync,
620-
not fall back to full refresh again.
621-
"""
616+
def test_final_state_cursor_falls_back_to_full_refresh_when_state_unparseable():
617+
"""When state is a final state (NO_CURSOR_STATE_KEY), ConcurrentCursor cannot parse it,
618+
so both cursors return None and the implementation falls back to full refresh as the safe default."""
622619
manifest = _create_manifest_with_retention_period("P7D")
623620

624621
with HttpMocker() as http_mocker:
625-
# Mock the incremental endpoint with query params (start_date=2024-07-01, step=P15D, frozen at 2024-07-15)
626622
http_mocker.get(
627-
HttpRequest(
628-
url="https://api.test.com/items_with_filtration?start=2024-07-01&end=2024-07-15"
629-
),
623+
HttpRequest(url="https://api.test.com/items"),
630624
HttpResponse(
631625
body=json.dumps(
632626
[

0 commit comments

Comments
 (0)