Skip to content

Commit 87db0cc

Browse files
fix: consume ensure_at_least_one_state_emitted generator in perpartition cursor tests
Co-Authored-By: gl_anatolii.yatsuk <gl_anatolii.yatsuk@airbyte.io>
1 parent b9ac43f commit 87db0cc

1 file changed

Lines changed: 8 additions & 5 deletions

File tree

unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3739,7 +3739,7 @@ def test_given_unfinished_first_parent_partition_no_parent_state_update():
37393739
record_counter=RecordCounter(),
37403740
)
37413741
)
3742-
cursor.ensure_at_least_one_state_emitted()
3742+
list(cursor.ensure_at_least_one_state_emitted())
37433743

37443744
state = cursor.state
37453745
assert state == {
@@ -3835,7 +3835,7 @@ def test_given_unfinished_last_parent_partition_with_partial_parent_state_update
38353835
record_counter=RecordCounter(),
38363836
)
38373837
)
3838-
cursor.ensure_at_least_one_state_emitted()
3838+
list(cursor.ensure_at_least_one_state_emitted())
38393839

38403840
state = cursor.state
38413841
assert state == {
@@ -3927,15 +3927,18 @@ def test_given_all_partitions_finished_when_close_partition_then_final_state_emi
39273927
)
39283928
)
39293929

3930-
cursor.ensure_at_least_one_state_emitted()
3930+
state_messages = list(cursor.ensure_at_least_one_state_emitted())
39313931

39323932
final_state = cursor.state
39333933
assert final_state["use_global_cursor"] is False
39343934
assert len(final_state["states"]) == 2
39353935
assert final_state["state"]["updated_at"] == "2024-01-02T00:00:00Z"
39363936
assert final_state["parent_state"] == {"posts": {"updated_at": "2024-01-06T00:00:00Z"}}
39373937
assert final_state["lookback_window"] == 86400
3938-
assert cursor._message_repository.emit_message.call_count == 2
3938+
# close_partition() emits 1 state via message_repository (second is throttled)
3939+
# ensure_at_least_one_state_emitted() returns 1 state directly (no longer uses message_repository)
3940+
assert cursor._message_repository.emit_message.call_count == 1
3941+
assert len(state_messages) == 1
39393942
assert mock_cursor.stream_slices.call_count == 2 # Called once for each partition
39403943

39413944
# Checks that all internal variables are cleaned up
@@ -4001,7 +4004,7 @@ def test_given_partition_limit_exceeded_when_close_partition_then_switch_to_glob
40014004
record_counter=RecordCounter(),
40024005
)
40034006
)
4004-
cursor.ensure_at_least_one_state_emitted()
4007+
list(cursor.ensure_at_least_one_state_emitted())
40054008

40064009
final_state = cursor.state
40074010
assert len(slices) == 3

0 commit comments

Comments
 (0)