diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index e212b0f2a..5972a2186 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -19,7 +19,10 @@ from airbyte_cdk.sources.declarative.extractors.record_filter import ( ClientSideIncrementalRecordFilterDecorator, ) -from airbyte_cdk.sources.declarative.incremental import ConcurrentPerPartitionCursor +from airbyte_cdk.sources.declarative.incremental import ( + ConcurrentPerPartitionCursor, + GlobalSubstreamCursor, +) from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor from airbyte_cdk.sources.declarative.incremental.per_partition_with_global import ( PerPartitionWithGlobalCursor, @@ -361,7 +364,8 @@ def _group_streams( == DatetimeBasedCursorModel.__name__ and hasattr(declarative_stream.retriever, "stream_slicer") and isinstance( - declarative_stream.retriever.stream_slicer, PerPartitionWithGlobalCursor + declarative_stream.retriever.stream_slicer, + (GlobalSubstreamCursor, PerPartitionWithGlobalCursor), ) ): stream_state = self._connector_state_manager.get_stream_state( diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 09b20b028..064fb6c91 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -1439,7 +1439,9 @@ def create_concurrent_cursor_from_perpartition_cursor( stream_state = self.apply_stream_state_migrations(stream_state_migrations, stream_state) # Per-partition state doesn't make sense for GroupingPartitionRouter, so force the global state - use_global_cursor = isinstance(partition_router, GroupingPartitionRouter) + use_global_cursor = isinstance( + partition_router, GroupingPartitionRouter + ) or component_definition.get("global_substream_cursor", False) # Return the concurrent cursor and state converter return ConcurrentPerPartitionCursor( diff --git a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py index e23d03a4a..5638d9580 100644 --- a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py +++ b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py @@ -3449,3 +3449,48 @@ def test_semaphore_cleanup(): assert '{"id":"2"}' not in cursor._semaphore_per_partition assert len(cursor._partition_parent_state_map) == 0 # All parent states should be popped assert cursor._parent_state == {"parent": {"state": "state2"}} # Last parent state + + +def test_given_global_state_when_read_then_state_is_not_per_partition() -> None: + manifest = deepcopy(SUBSTREAM_MANIFEST) + manifest["definitions"]["post_comments_stream"]["incremental_sync"][ + "global_substream_cursor" + ] = True + manifest["streams"].remove({"$ref": "#/definitions/post_comment_votes_stream"}) + record = { + "id": 9, + "post_id": 1, + "updated_at": COMMENT_10_UPDATED_AT, + } + mock_requests = [ + ( + f"https://api.example.com/community/posts?per_page=100&start_time={START_DATE}", + { + "posts": [ + {"id": 1, "updated_at": POST_1_UPDATED_AT}, + ], + }, + ), + # Fetch the first page of comments for post 1 + ( + "https://api.example.com/community/posts/1/comments?per_page=100", + { + "comments": [record], + }, + ), + ] + + run_mocked_test( + mock_requests, + manifest, + CONFIG, + "post_comments", + {}, + [record], + { + "lookback_window": 1, + "parent_state": {"posts": {"updated_at": "2024-01-30T00:00:00Z"}}, + "state": {"updated_at": "2024-01-25T00:00:00Z"}, + "use_global_cursor": True, # ensures that it is running the Concurrent CDK version as this is not populated in the declarative implementation + }, # this state does have per partition which would be under `states` + )