Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,10 @@ def _emit_state_message(self, throttle: bool = True) -> None:
if current_time is None:
return
self._last_emission_time = current_time
# Skip state emit for global cursor if parent state is empty
if self._use_global_cursor and not self._parent_state:
return

self._connector_state_manager.update_state_for_stream(
self._stream_name,
self._stream_namespace,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1483,7 +1483,6 @@ def create_concurrent_cursor_from_perpartition_cursor(
)
)
stream_state = self.apply_stream_state_migrations(stream_state_migrations, stream_state)

# Per-partition state doesn't make sense for GroupingPartitionRouter, so force the global state
use_global_cursor = isinstance(
partition_router, GroupingPartitionRouter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,12 +309,33 @@
"partition_router"
]["parent_stream_configs"][0]["incremental_dependency"] = False

SUBSTREAM_MANIFEST_WITH_GLOBAL_CURSOR_AND_NO_DEPENDENCY = deepcopy(SUBSTREAM_MANIFEST)
# Disable incremental_dependency
SUBSTREAM_MANIFEST_WITH_GLOBAL_CURSOR_AND_NO_DEPENDENCY["definitions"]["post_comments_stream"][
"retriever"
]["partition_router"]["parent_stream_configs"][0]["incremental_dependency"] = False
SUBSTREAM_MANIFEST_WITH_GLOBAL_CURSOR_AND_NO_DEPENDENCY["definitions"]["post_comment_votes_stream"][
"retriever"
]["partition_router"]["parent_stream_configs"][0]["incremental_dependency"] = False
# Enable global_cursor
SUBSTREAM_MANIFEST_WITH_GLOBAL_CURSOR_AND_NO_DEPENDENCY["definitions"]["cursor_incremental_sync"][
"global_substream_cursor"
] = True


import orjson
import requests_mock


def run_mocked_test(
mock_requests, manifest, config, stream_name, initial_state, expected_records, expected_state
mock_requests,
manifest,
config,
stream_name,
initial_state,
expected_records,
expected_state,
state_count=None,
):
"""
Helper function to mock requests, run the test, and verify the results.
Expand Down Expand Up @@ -354,6 +375,8 @@ def run_mocked_test(
expected_records, key=lambda x: x["id"]
)

assert len(output.state_messages) == state_count if state_count else True

# Verify state
final_state = output.state_messages[-1].state.stream.stream_state
assert final_state.__dict__ == expected_state
Expand Down Expand Up @@ -431,7 +454,7 @@ def _run_read(


@pytest.mark.parametrize(
"test_name, manifest, mock_requests, expected_records, initial_state, expected_state",
"test_name, manifest, mock_requests, expected_records, initial_state, expected_state, state_count",
[
(
"test_incremental_parent_state",
Expand Down Expand Up @@ -739,11 +762,266 @@ def _run_read(
"parent_state": {},
"state": {"created_at": VOTE_100_CREATED_AT},
},
# State count
2,
),
(
"test_incremental_parent_state_with",
SUBSTREAM_MANIFEST_WITH_GLOBAL_CURSOR_AND_NO_DEPENDENCY,
[
# Fetch the first page of posts
(
f"https://api.example.com/community/posts?per_page=100&start_time={START_DATE}",
{
"posts": [
{"id": 1, "updated_at": POST_1_UPDATED_AT},
{"id": 2, "updated_at": POST_2_UPDATED_AT},
],
"next_page": f"https://api.example.com/community/posts?per_page=100&start_time={START_DATE}&page=2",
},
),
# Fetch the second page of posts
(
f"https://api.example.com/community/posts?per_page=100&start_time={START_DATE}&page=2",
{"posts": [{"id": 3, "updated_at": POST_3_UPDATED_AT}]},
),
# Fetch the first page of comments for post 1
(
"https://api.example.com/community/posts/1/comments?per_page=100",
{
"comments": [
{
"id": 9,
"post_id": 1,
"updated_at": COMMENT_9_OLDEST, # No requests for comment 9, filtered out due to the date
},
{
"id": 10,
"post_id": 1,
"updated_at": COMMENT_10_UPDATED_AT,
},
{
"id": 11,
"post_id": 1,
"updated_at": COMMENT_11_UPDATED_AT,
},
],
"next_page": "https://api.example.com/community/posts/1/comments?per_page=100&page=2",
},
),
# Fetch the second page of comments for post 1
(
"https://api.example.com/community/posts/1/comments?per_page=100&page=2",
{
"comments": [
{
"id": 12,
"post_id": 1,
"updated_at": COMMENT_12_UPDATED_AT,
}
]
},
),
# Fetch the first page of votes for comment 10 of post 1
(
f"https://api.example.com/community/posts/1/comments/10/votes?per_page=100&start_time={INITIAL_STATE_PARTITION_10_CURSOR}",
{
"votes": [
{
"id": 100,
"comment_id": 10,
"created_at": VOTE_100_CREATED_AT,
}
],
"next_page": f"https://api.example.com/community/posts/1/comments/10/votes?per_page=100&page=2&start_time={INITIAL_STATE_PARTITION_10_CURSOR}",
},
),
# Fetch the second page of votes for comment 10 of post 1
(
f"https://api.example.com/community/posts/1/comments/10/votes?per_page=100&page=2&start_time={INITIAL_STATE_PARTITION_10_CURSOR}",
{
"votes": [
{
"id": 101,
"comment_id": 10,
"created_at": VOTE_101_CREATED_AT,
}
]
},
),
# Fetch the first page of votes for comment 11 of post 1
(
f"https://api.example.com/community/posts/1/comments/11/votes?per_page=100&start_time={INITIAL_STATE_PARTITION_11_CURSOR}",
{
"votes": [
{
"id": 111,
"comment_id": 11,
"created_at": VOTE_111_CREATED_AT,
}
]
},
),
# Fetch the first page of votes for comment 12 of post 1
(
f"https://api.example.com/community/posts/1/comments/12/votes?per_page=100&start_time={LOOKBACK_DATE}",
{"votes": []},
),
# Fetch the first page of comments for post 2
(
"https://api.example.com/community/posts/2/comments?per_page=100",
{
"comments": [
{
"id": 20,
"post_id": 2,
"updated_at": COMMENT_20_UPDATED_AT,
}
],
"next_page": "https://api.example.com/community/posts/2/comments?per_page=100&page=2",
},
),
# Fetch the second page of comments for post 2
(
"https://api.example.com/community/posts/2/comments?per_page=100&page=2",
{
"comments": [
{
"id": 21,
"post_id": 2,
"updated_at": COMMENT_21_UPDATED_AT,
}
]
},
),
# Fetch the first page of votes for comment 20 of post 2
(
f"https://api.example.com/community/posts/2/comments/20/votes?per_page=100&start_time={LOOKBACK_DATE}",
{
"votes": [
{
"id": 200,
"comment_id": 20,
"created_at": VOTE_200_CREATED_AT,
}
]
},
),
# Fetch the first page of votes for comment 21 of post 2
(
f"https://api.example.com/community/posts/2/comments/21/votes?per_page=100&start_time={LOOKBACK_DATE}",
{
"votes": [
{
"id": 210,
"comment_id": 21,
"created_at": VOTE_210_CREATED_AT,
}
]
},
),
# Fetch the first page of comments for post 3
(
"https://api.example.com/community/posts/3/comments?per_page=100",
{
"comments": [
{
"id": 30,
"post_id": 3,
"updated_at": COMMENT_30_UPDATED_AT,
}
]
},
),
# Fetch the first page of votes for comment 30 of post 3
(
f"https://api.example.com/community/posts/3/comments/30/votes?per_page=100&start_time={LOOKBACK_DATE}",
{
"votes": [
{
"id": 300,
"comment_id": 30,
"created_at": VOTE_300_CREATED_AT_TIMESTAMP,
}
]
},
),
],
# Expected records
[
{
"comment_id": 10,
"comment_updated_at": COMMENT_10_UPDATED_AT,
"created_at": VOTE_100_CREATED_AT,
"id": 100,
},
{
"comment_id": 10,
"comment_updated_at": COMMENT_10_UPDATED_AT,
"created_at": VOTE_101_CREATED_AT,
"id": 101,
},
{
"comment_id": 11,
"comment_updated_at": COMMENT_11_UPDATED_AT,
"created_at": VOTE_111_CREATED_AT,
"id": 111,
},
{
"comment_id": 20,
"comment_updated_at": COMMENT_20_UPDATED_AT,
"created_at": VOTE_200_CREATED_AT,
"id": 200,
},
{
"comment_id": 21,
"comment_updated_at": COMMENT_21_UPDATED_AT,
"created_at": VOTE_210_CREATED_AT,
"id": 210,
},
{
"comment_id": 30,
"comment_updated_at": COMMENT_30_UPDATED_AT,
"created_at": str(VOTE_300_CREATED_AT_TIMESTAMP),
"id": 300,
},
],
# Initial state
{
"parent_state": {},
"use_global_cursor": True,
"states": [
{
"partition": {
"id": 10,
"parent_slice": {"id": 1, "parent_slice": {}},
},
"cursor": {"created_at": INITIAL_STATE_PARTITION_10_CURSOR_TIMESTAMP},
},
{
"partition": {
"id": 11,
"parent_slice": {"id": 1, "parent_slice": {}},
},
"cursor": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR},
},
],
"state": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR_TIMESTAMP},
"lookback_window": 86400,
},
# Expected state
{
"use_global_cursor": True,
"lookback_window": 1,
"parent_state": {},
"state": {"created_at": VOTE_100_CREATED_AT},
},
1,
),
],
)
def test_incremental_parent_state_no_incremental_dependency(
test_name, manifest, mock_requests, expected_records, initial_state, expected_state
test_name, manifest, mock_requests, expected_records, initial_state, expected_state, state_count
):
"""
This is a pretty complicated test that syncs a low-code connector stream with three levels of substreams
Expand All @@ -765,6 +1043,7 @@ def test_incremental_parent_state_no_incremental_dependency(
initial_state,
expected_records,
expected_state,
state_count=state_count,
)


Expand Down
Loading