Skip to content

Commit 8afe8e1

Browse files
fix: Clear state when falling back to full refresh due to stale cursor
When the cursor is older than the API retention period and we fall back to full refresh, clear the stream state and emit an empty state message to the platform. This ensures the platform does not retain stale state that would cause missed records on subsequent syncs. Co-Authored-By: alfredo.garcia@airbyte.io <freddy.garcia7.fg@gmail.com>
1 parent acd7156 commit 8afe8e1

2 files changed

Lines changed: 53 additions & 0 deletions

File tree

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3592,6 +3592,13 @@ def create_state_delegating_stream(
35923592
model.api_retention_period,
35933593
model.name,
35943594
):
3595+
self._connector_state_manager.update_state_for_stream(
3596+
model.name, None, {}
3597+
)
3598+
state_message = self._connector_state_manager.create_state_message(
3599+
model.name, None
3600+
)
3601+
self._message_repository.emit_message(state_message)
35953602
return self._create_component_from_model( # type: ignore[no-any-return]
35963603
model.full_refresh_stream, config=config, **kwargs
35973604
)

unit_tests/sources/declarative/test_state_delegating_stream.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,52 @@ def test_cursor_age_validation_falls_back_to_full_refresh_when_cursor_too_old():
305305
assert expected == records
306306

307307

308+
@freezegun.freeze_time("2024-07-15")
309+
def test_cursor_age_validation_clears_state_when_falling_back_to_full_refresh():
310+
"""Test that state is cleared when cursor is older than retention period."""
311+
manifest = _create_manifest_with_retention_period("P7D")
312+
313+
with HttpMocker() as http_mocker:
314+
http_mocker.get(
315+
HttpRequest(url="https://api.test.com/items"),
316+
HttpResponse(
317+
body=json.dumps(
318+
[
319+
{"id": 1, "name": "item_1", "updated_at": "2024-07-13"},
320+
{"id": 2, "name": "item_2", "updated_at": "2024-07-14"},
321+
]
322+
)
323+
),
324+
)
325+
326+
state = [
327+
AirbyteStateMessage(
328+
type=AirbyteStateType.STREAM,
329+
stream=AirbyteStreamState(
330+
stream_descriptor=StreamDescriptor(name="TestStream", namespace=None),
331+
stream_state=AirbyteStateBlob(updated_at="2024-07-01"),
332+
),
333+
)
334+
]
335+
source = ConcurrentDeclarativeSource(
336+
source_config=manifest, config=_CONFIG, catalog=None, state=state
337+
)
338+
configured_catalog = create_configured_catalog(source, _CONFIG)
339+
340+
all_messages = list(
341+
source.read(
342+
logger=MagicMock(), config=_CONFIG, catalog=configured_catalog, state=state
343+
)
344+
)
345+
346+
state_messages = [msg for msg in all_messages if msg.type == Type.STATE]
347+
assert len(state_messages) > 0, "Expected at least one state message"
348+
first_state = state_messages[0].state.stream.stream_state
349+
assert first_state == AirbyteStateBlob(), (
350+
f"Expected first state message to be empty (clearing stale state), got: {first_state}"
351+
)
352+
353+
308354
@freezegun.freeze_time("2024-07-15")
309355
def test_cursor_age_validation_uses_incremental_when_cursor_within_retention():
310356
"""Test that when cursor is within retention period, incremental sync is used."""

0 commit comments

Comments
 (0)