File tree Expand file tree Collapse file tree 2 files changed +33
-0
lines changed
src/ghoshell_moss/core/helpers Expand file tree Collapse file tree 2 files changed +33
-0
lines changed Original file line number Diff line number Diff line change @@ -68,6 +68,8 @@ def __iter__(self):
6868 def __next__ (self ) -> ItemT :
6969 if len (self ._queue ) > 0 :
7070 item = self ._queue .popleft ()
71+ if len (self ._queue ) == 0 :
72+ self ._added .clear ()
7173 if isinstance (item , Exception ):
7274 raise item
7375 elif item is None :
@@ -104,6 +106,8 @@ def __aiter__(self):
104106 async def __anext__ (self ) -> ItemT :
105107 if len (self ._queue ) > 0 :
106108 item = self ._queue .popleft ()
109+ if len (self ._queue ) == 0 :
110+ self ._added .clear ()
107111 if isinstance (item , Exception ):
108112 raise item
109113 elif item is None :
Original file line number Diff line number Diff line change 11import asyncio
22import threading
3+ import time
34
45from ghoshell_moss .core .helpers .stream import (
56 create_thread_safe_stream ,
@@ -62,3 +63,31 @@ def sync_receiving():
6263 t1 .join ()
6364 t2 .join ()
6465 assert content == done [0 ]
66+
67+
68+ def test_receiver_waits_after_queue_empty_until_new_item_sync ():
69+ sender , receiver = create_thread_safe_stream (timeout = 1.0 )
70+ consumed : list [str ] = []
71+
72+ def producer ():
73+ with sender :
74+ sender .append ("A" ) # queue has one item; not completed yet
75+ time .sleep (0.1 ) # ensure consumer attempts the next() on empty queue
76+ sender .append ("B" )
77+ sender .commit ()
78+
79+ def consumer ():
80+ with receiver :
81+ a = next (receiver )
82+ consumed .append (a )
83+ b = next (receiver )
84+ consumed .append (b )
85+
86+ t1 = threading .Thread (target = producer )
87+ t2 = threading .Thread (target = consumer )
88+ t1 .start ()
89+ t2 .start ()
90+ t1 .join ()
91+ t2 .join ()
92+
93+ assert consumed == ["A" , "B" ]
You can’t perform that action at this time.
0 commit comments