diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 788681d41..8f7f9fc55 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -618,6 +618,7 @@ NoopMessageRepository, ) from airbyte_cdk.sources.message.repository import StateFilteringMessageRepository +from airbyte_cdk.sources.streams import NO_CURSOR_STATE_KEY from airbyte_cdk.sources.streams.call_rate import ( APIBudget, FixedWindowCallRatePolicy, @@ -3578,7 +3579,14 @@ def create_state_delegating_stream( has_parent = False if has_parent_state is None else has_parent_state if not stream_state and not has_parent: - return self._create_component_from_model(model.full_refresh_stream, config=config, **kwargs) # type: ignore[no-any-return] + return self._create_component_from_model( + model.full_refresh_stream, config=config, **kwargs + ) # type: ignore[no-any-return] + + if stream_state and stream_state.get(NO_CURSOR_STATE_KEY): + return self._create_component_from_model( + model.incremental_stream, config=config, **kwargs + ) # type: ignore[no-any-return] incremental_stream: DefaultStream = self._create_component_from_model( model.incremental_stream, config=config, **kwargs @@ -3615,20 +3623,12 @@ def _is_cursor_older_than_retention_period( for cursor in cursors: if not hasattr(cursor, "get_cursor_datetime_from_state"): - raise SystemError( - f"Stream '{stream_name}' cursor type '{type(cursor).__name__}' does not have " - f"get_cursor_datetime_from_state method. Cursor age validation with " - f"api_retention_period is not supported for this cursor type." - ) + continue try: cursor_datetime = cursor.get_cursor_datetime_from_state(stream_state) except NotImplementedError: - raise SystemError( - f"Stream '{stream_name}' cursor type '{type(cursor).__name__}' does not implement " - f"get_cursor_datetime_from_state. Cursor age validation with " - f"api_retention_period is not supported for this cursor type." - ) + continue if cursor_datetime is not None: break diff --git a/unit_tests/sources/declarative/test_state_delegating_stream.py b/unit_tests/sources/declarative/test_state_delegating_stream.py index 1a7df9c07..e8a5573b5 100644 --- a/unit_tests/sources/declarative/test_state_delegating_stream.py +++ b/unit_tests/sources/declarative/test_state_delegating_stream.py @@ -566,3 +566,40 @@ def test_cursor_age_validation_raises_error_for_unparseable_cursor(): with pytest.raises(ValueError, match="not-a-date"): source.discover(logger=MagicMock(), config=_CONFIG) + + +@freezegun.freeze_time("2024-07-15") +def test_final_state_cursor_skips_retention_check_and_uses_incremental(): + """When state is a final state from FinalStateCursor, skip retention check and use incremental.""" + manifest = _create_manifest_with_retention_period("P7D") + + with HttpMocker() as http_mocker: + http_mocker.get( + HttpRequest( + url="https://api.test.com/items_with_filtration?start=2024-07-01&end=2024-07-15" + ), + HttpResponse( + body=json.dumps( + [ + {"id": 1, "name": "item_1", "updated_at": "2024-07-14"}, + ] + ) + ), + ) + + state = [ + AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor(name="TestStream", namespace=None), + stream_state=AirbyteStateBlob(__ab_no_cursor_state_message=True), + ), + ) + ] + source = ConcurrentDeclarativeSource( + source_config=manifest, config=_CONFIG, catalog=None, state=state + ) + configured_catalog = create_configured_catalog(source, _CONFIG) + + records = get_records(source, _CONFIG, configured_catalog, state) + assert len(records) == 1