diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py index 14d9b7c6b..fbdf6a296 100644 --- a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -242,6 +242,10 @@ def _emit_state_message(self, throttle: bool = True) -> None: if current_time is None: return self._last_emission_time = current_time + # Skip state emit for global cursor if parent state is empty + if self._use_global_cursor and not self._parent_state: + return + self._connector_state_manager.update_state_for_stream( self._stream_name, self._stream_namespace, diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 6883f6e70..7ceb19f14 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -1483,7 +1483,6 @@ def create_concurrent_cursor_from_perpartition_cursor( ) ) stream_state = self.apply_stream_state_migrations(stream_state_migrations, stream_state) - # Per-partition state doesn't make sense for GroupingPartitionRouter, so force the global state use_global_cursor = isinstance( partition_router, GroupingPartitionRouter diff --git a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py index 5638d9580..bca9d78b2 100644 --- a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py +++ b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py @@ -309,12 +309,33 @@ "partition_router" ]["parent_stream_configs"][0]["incremental_dependency"] = False +SUBSTREAM_MANIFEST_WITH_GLOBAL_CURSOR_AND_NO_DEPENDENCY = deepcopy(SUBSTREAM_MANIFEST) +# Disable incremental_dependency +SUBSTREAM_MANIFEST_WITH_GLOBAL_CURSOR_AND_NO_DEPENDENCY["definitions"]["post_comments_stream"][ + "retriever" +]["partition_router"]["parent_stream_configs"][0]["incremental_dependency"] = False +SUBSTREAM_MANIFEST_WITH_GLOBAL_CURSOR_AND_NO_DEPENDENCY["definitions"]["post_comment_votes_stream"][ + "retriever" +]["partition_router"]["parent_stream_configs"][0]["incremental_dependency"] = False +# Enable global_cursor +SUBSTREAM_MANIFEST_WITH_GLOBAL_CURSOR_AND_NO_DEPENDENCY["definitions"]["cursor_incremental_sync"][ + "global_substream_cursor" +] = True + + import orjson import requests_mock def run_mocked_test( - mock_requests, manifest, config, stream_name, initial_state, expected_records, expected_state + mock_requests, + manifest, + config, + stream_name, + initial_state, + expected_records, + expected_state, + state_count=None, ): """ Helper function to mock requests, run the test, and verify the results. @@ -354,6 +375,8 @@ def run_mocked_test( expected_records, key=lambda x: x["id"] ) + assert len(output.state_messages) == state_count if state_count else True + # Verify state final_state = output.state_messages[-1].state.stream.stream_state assert final_state.__dict__ == expected_state @@ -431,7 +454,7 @@ def _run_read( @pytest.mark.parametrize( - "test_name, manifest, mock_requests, expected_records, initial_state, expected_state", + "test_name, manifest, mock_requests, expected_records, initial_state, expected_state, state_count", [ ( "test_incremental_parent_state", @@ -739,11 +762,266 @@ def _run_read( "parent_state": {}, "state": {"created_at": VOTE_100_CREATED_AT}, }, + # State count + 2, + ), + ( + "test_incremental_parent_state_with", + SUBSTREAM_MANIFEST_WITH_GLOBAL_CURSOR_AND_NO_DEPENDENCY, + [ + # Fetch the first page of posts + ( + f"https://api.example.com/community/posts?per_page=100&start_time={START_DATE}", + { + "posts": [ + {"id": 1, "updated_at": POST_1_UPDATED_AT}, + {"id": 2, "updated_at": POST_2_UPDATED_AT}, + ], + "next_page": f"https://api.example.com/community/posts?per_page=100&start_time={START_DATE}&page=2", + }, + ), + # Fetch the second page of posts + ( + f"https://api.example.com/community/posts?per_page=100&start_time={START_DATE}&page=2", + {"posts": [{"id": 3, "updated_at": POST_3_UPDATED_AT}]}, + ), + # Fetch the first page of comments for post 1 + ( + "https://api.example.com/community/posts/1/comments?per_page=100", + { + "comments": [ + { + "id": 9, + "post_id": 1, + "updated_at": COMMENT_9_OLDEST, # No requests for comment 9, filtered out due to the date + }, + { + "id": 10, + "post_id": 1, + "updated_at": COMMENT_10_UPDATED_AT, + }, + { + "id": 11, + "post_id": 1, + "updated_at": COMMENT_11_UPDATED_AT, + }, + ], + "next_page": "https://api.example.com/community/posts/1/comments?per_page=100&page=2", + }, + ), + # Fetch the second page of comments for post 1 + ( + "https://api.example.com/community/posts/1/comments?per_page=100&page=2", + { + "comments": [ + { + "id": 12, + "post_id": 1, + "updated_at": COMMENT_12_UPDATED_AT, + } + ] + }, + ), + # Fetch the first page of votes for comment 10 of post 1 + ( + f"https://api.example.com/community/posts/1/comments/10/votes?per_page=100&start_time={INITIAL_STATE_PARTITION_10_CURSOR}", + { + "votes": [ + { + "id": 100, + "comment_id": 10, + "created_at": VOTE_100_CREATED_AT, + } + ], + "next_page": f"https://api.example.com/community/posts/1/comments/10/votes?per_page=100&page=2&start_time={INITIAL_STATE_PARTITION_10_CURSOR}", + }, + ), + # Fetch the second page of votes for comment 10 of post 1 + ( + f"https://api.example.com/community/posts/1/comments/10/votes?per_page=100&page=2&start_time={INITIAL_STATE_PARTITION_10_CURSOR}", + { + "votes": [ + { + "id": 101, + "comment_id": 10, + "created_at": VOTE_101_CREATED_AT, + } + ] + }, + ), + # Fetch the first page of votes for comment 11 of post 1 + ( + f"https://api.example.com/community/posts/1/comments/11/votes?per_page=100&start_time={INITIAL_STATE_PARTITION_11_CURSOR}", + { + "votes": [ + { + "id": 111, + "comment_id": 11, + "created_at": VOTE_111_CREATED_AT, + } + ] + }, + ), + # Fetch the first page of votes for comment 12 of post 1 + ( + f"https://api.example.com/community/posts/1/comments/12/votes?per_page=100&start_time={LOOKBACK_DATE}", + {"votes": []}, + ), + # Fetch the first page of comments for post 2 + ( + "https://api.example.com/community/posts/2/comments?per_page=100", + { + "comments": [ + { + "id": 20, + "post_id": 2, + "updated_at": COMMENT_20_UPDATED_AT, + } + ], + "next_page": "https://api.example.com/community/posts/2/comments?per_page=100&page=2", + }, + ), + # Fetch the second page of comments for post 2 + ( + "https://api.example.com/community/posts/2/comments?per_page=100&page=2", + { + "comments": [ + { + "id": 21, + "post_id": 2, + "updated_at": COMMENT_21_UPDATED_AT, + } + ] + }, + ), + # Fetch the first page of votes for comment 20 of post 2 + ( + f"https://api.example.com/community/posts/2/comments/20/votes?per_page=100&start_time={LOOKBACK_DATE}", + { + "votes": [ + { + "id": 200, + "comment_id": 20, + "created_at": VOTE_200_CREATED_AT, + } + ] + }, + ), + # Fetch the first page of votes for comment 21 of post 2 + ( + f"https://api.example.com/community/posts/2/comments/21/votes?per_page=100&start_time={LOOKBACK_DATE}", + { + "votes": [ + { + "id": 210, + "comment_id": 21, + "created_at": VOTE_210_CREATED_AT, + } + ] + }, + ), + # Fetch the first page of comments for post 3 + ( + "https://api.example.com/community/posts/3/comments?per_page=100", + { + "comments": [ + { + "id": 30, + "post_id": 3, + "updated_at": COMMENT_30_UPDATED_AT, + } + ] + }, + ), + # Fetch the first page of votes for comment 30 of post 3 + ( + f"https://api.example.com/community/posts/3/comments/30/votes?per_page=100&start_time={LOOKBACK_DATE}", + { + "votes": [ + { + "id": 300, + "comment_id": 30, + "created_at": VOTE_300_CREATED_AT_TIMESTAMP, + } + ] + }, + ), + ], + # Expected records + [ + { + "comment_id": 10, + "comment_updated_at": COMMENT_10_UPDATED_AT, + "created_at": VOTE_100_CREATED_AT, + "id": 100, + }, + { + "comment_id": 10, + "comment_updated_at": COMMENT_10_UPDATED_AT, + "created_at": VOTE_101_CREATED_AT, + "id": 101, + }, + { + "comment_id": 11, + "comment_updated_at": COMMENT_11_UPDATED_AT, + "created_at": VOTE_111_CREATED_AT, + "id": 111, + }, + { + "comment_id": 20, + "comment_updated_at": COMMENT_20_UPDATED_AT, + "created_at": VOTE_200_CREATED_AT, + "id": 200, + }, + { + "comment_id": 21, + "comment_updated_at": COMMENT_21_UPDATED_AT, + "created_at": VOTE_210_CREATED_AT, + "id": 210, + }, + { + "comment_id": 30, + "comment_updated_at": COMMENT_30_UPDATED_AT, + "created_at": str(VOTE_300_CREATED_AT_TIMESTAMP), + "id": 300, + }, + ], + # Initial state + { + "parent_state": {}, + "use_global_cursor": True, + "states": [ + { + "partition": { + "id": 10, + "parent_slice": {"id": 1, "parent_slice": {}}, + }, + "cursor": {"created_at": INITIAL_STATE_PARTITION_10_CURSOR_TIMESTAMP}, + }, + { + "partition": { + "id": 11, + "parent_slice": {"id": 1, "parent_slice": {}}, + }, + "cursor": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR}, + }, + ], + "state": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR_TIMESTAMP}, + "lookback_window": 86400, + }, + # Expected state + { + "use_global_cursor": True, + "lookback_window": 1, + "parent_state": {}, + "state": {"created_at": VOTE_100_CREATED_AT}, + }, + 1, ), ], ) def test_incremental_parent_state_no_incremental_dependency( - test_name, manifest, mock_requests, expected_records, initial_state, expected_state + test_name, manifest, mock_requests, expected_records, initial_state, expected_state, state_count ): """ This is a pretty complicated test that syncs a low-code connector stream with three levels of substreams @@ -765,6 +1043,7 @@ def test_incremental_parent_state_no_incremental_dependency( initial_state, expected_records, expected_state, + state_count=state_count, )