Skip to content

Commit 80d8b2b

Browse files
committed
Fix unit tests
1 parent 314ded1 commit 80d8b2b

1 file changed

Lines changed: 1 addition & 57 deletions

File tree

unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py

Lines changed: 1 addition & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -945,7 +945,7 @@ def test_defer_stream_when_grandparent_active(self):
945945
# Child should be back in the queue
946946
assert len(handler._stream_instances_to_start_partition_generation) == 1
947947

948-
def test_retry_blocked_stream_after_blocker_done(self):
948+
def test_different_groups_do_not_block_each_other(self):
949949
"""Test that independent streams with different groups don't block each other"""
950950
stream1 = self._create_mock_stream("stream1", block_simultaneous_read="group1")
951951
stream2 = self._create_mock_stream("stream2", block_simultaneous_read="group2")
@@ -1402,59 +1402,3 @@ def test_child_starts_after_parent_completes_via_partition_complete_sentinel(sel
14021402
]
14031403
assert len(started_messages) == 1
14041404
assert started_messages[0].trace.stream_status.stream_descriptor.name == "child"
1405-
1406-
def test_child_starts_after_parent_completes_via_partition_complete_sentinel(self):
1407-
"""Test that child stream starts after parent completes via on_partition_complete_sentinel"""
1408-
parent = self._create_mock_stream("parent", block_simultaneous_read="api_group")
1409-
child = self._create_mock_stream_with_parent(
1410-
"child", parent, block_simultaneous_read="api_group"
1411-
)
1412-
1413-
handler = ConcurrentReadProcessor(
1414-
[parent, child],
1415-
self._partition_enqueuer,
1416-
self._thread_pool_manager,
1417-
self._logger,
1418-
self._slice_logger,
1419-
self._message_repository,
1420-
self._partition_reader,
1421-
)
1422-
1423-
# Start parent
1424-
handler.start_next_partition_generator()
1425-
assert "parent" in handler._active_stream_names
1426-
1427-
# Try to start child (should be deferred)
1428-
result = handler.start_next_partition_generator()
1429-
assert result is None
1430-
assert "child" not in handler._active_stream_names
1431-
assert len(handler._stream_instances_to_start_partition_generation) == 1
1432-
1433-
# Create a partition for parent and add it to running partitions
1434-
# (parent is already in _streams_currently_generating_partitions from start_next_partition_generator)
1435-
mock_partition = Mock(spec=Partition)
1436-
mock_partition.stream_name.return_value = "parent"
1437-
handler._streams_to_running_partitions["parent"].add(mock_partition)
1438-
1439-
# Complete partition generation for parent
1440-
sentinel_gen = PartitionGenerationCompletedSentinel(parent)
1441-
list(handler.on_partition_generation_completed(sentinel_gen))
1442-
1443-
# Now complete the partition (this triggers stream done)
1444-
sentinel_complete = PartitionCompleteSentinel(mock_partition)
1445-
messages = list(handler.on_partition_complete_sentinel(sentinel_complete))
1446-
1447-
# Child should have been started automatically
1448-
assert "child" in handler._active_stream_names
1449-
assert len(handler._stream_instances_to_start_partition_generation) == 0
1450-
1451-
# Verify a STARTED message was emitted for child
1452-
started_messages = [
1453-
msg
1454-
for msg in messages
1455-
if msg.type == MessageType.TRACE
1456-
and msg.trace.stream_status
1457-
and msg.trace.stream_status.status == AirbyteStreamStatus.STARTED
1458-
]
1459-
assert len(started_messages) == 1
1460-
assert started_messages[0].trace.stream_status.stream_descriptor.name == "child"

0 commit comments

Comments
 (0)