@@ -1045,7 +1045,7 @@ def test_blocked_stream_added_to_end_of_queue(self):
10451045 assert handler ._stream_instances_to_start_partition_generation [1 ] == stream1
10461046
10471047 def test_no_defer_when_flag_false (self ):
1048- """Test that blocking doesn't occur when block_simultaneous_read=""" ""
1048+ """Test that blocking doesn't occur when block_simultaneous_read="" " ""
10491049 stream = self ._create_mock_stream ("stream1" , block_simultaneous_read = "" )
10501050
10511051 handler = ConcurrentReadProcessor (
@@ -1196,14 +1196,55 @@ def test_multiple_blocked_streams_retry_in_order(self):
11961196 result = handler .start_next_partition_generator ()
11971197 assert result is not None
11981198 assert "parent" in handler ._active_stream_names
1199+ assert "api_group" in handler ._active_groups
1200+ assert "parent" in handler ._active_groups ["api_group" ]
11991201
1200- # Try to start child1 ( should be deferred)
1202+ # Try to start next stream (child1) - should be deferred because parent is active
12011203 result = handler .start_next_partition_generator ()
1202- # child1 is deferred, but child2 might start if it's not blocked
1203- # Let me check the queue state
1204+ assert result is None # child1 was deferred
12041205
1205- # Both children should be deferred (parent is active)
1206- assert len (handler ._stream_instances_to_start_partition_generation ) >= 1
1206+ # After first deferral, we should still have 2 streams in queue (child1 moved to end)
1207+ assert len (handler ._stream_instances_to_start_partition_generation ) == 2
1208+ # child1 was moved to the back, so the queue has the other child first
1209+ queue_streams = handler ._stream_instances_to_start_partition_generation
1210+ assert child1 in queue_streams
1211+ assert child2 in queue_streams
1212+
1213+ # Try to start next stream (child2) - should also be deferred
1214+ result = handler .start_next_partition_generator ()
1215+ assert result is None # child2 was deferred
1216+
1217+ # Both streams still in queue, but order may have changed
1218+ assert len (handler ._stream_instances_to_start_partition_generation ) == 2
1219+
1220+ # Verify neither child is active yet (both blocked by parent)
1221+ assert "child1" not in handler ._active_stream_names
1222+ assert "child2" not in handler ._active_stream_names
1223+
1224+ # Verify deferral was logged for both children
1225+ logger_calls = [str (call ) for call in self ._logger .info .call_args_list ]
1226+ assert any ("Deferring stream 'child1'" in call for call in logger_calls )
1227+ assert any ("Deferring stream 'child2'" in call for call in logger_calls )
1228+
1229+ # Simulate parent completing partition generation (parent has no partitions, so it's done)
1230+ handler ._streams_currently_generating_partitions .append ("parent" )
1231+ handler ._streams_to_running_partitions ["parent" ] = set ()
1232+ sentinel = PartitionGenerationCompletedSentinel (parent )
1233+ list (handler .on_partition_generation_completed (sentinel ))
1234+
1235+ # After parent completes, one of the children should start (whichever was first in queue)
1236+ # We know at least one child started because the queue shrunk
1237+ assert len (handler ._stream_instances_to_start_partition_generation ) == 1
1238+
1239+ # Verify that exactly one child is now active
1240+ children_active = [
1241+ name for name in ["child1" , "child2" ]
1242+ if name in handler ._active_stream_names
1243+ ]
1244+ assert len (children_active ) == 1 , f"Expected exactly one child active, got: { children_active } "
1245+
1246+ # Parent should be re-activated because the active child needs to read from it
1247+ assert "parent" in handler ._active_stream_names
12071248
12081249 def test_child_without_flag_blocked_by_parent_with_flag (self ):
12091250 """Test that a child WITHOUT block_simultaneous_read is blocked by parent WITH the flag"""
0 commit comments