Skip to content

Commit b8065b1

Browse files
committed
fix: pop an empty deque issue
1 parent 71e511c commit b8065b1

File tree

2 files changed

+33
-0
lines changed

2 files changed

+33
-0
lines changed

src/ghoshell_moss/core/helpers/stream.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ def __iter__(self):
6868
def __next__(self) -> ItemT:
6969
if len(self._queue) > 0:
7070
item = self._queue.popleft()
71+
if len(self._queue) == 0:
72+
self._added.clear()
7173
if isinstance(item, Exception):
7274
raise item
7375
elif item is None:
@@ -104,6 +106,8 @@ def __aiter__(self):
104106
async def __anext__(self) -> ItemT:
105107
if len(self._queue) > 0:
106108
item = self._queue.popleft()
109+
if len(self._queue) == 0:
110+
self._added.clear()
107111
if isinstance(item, Exception):
108112
raise item
109113
elif item is None:

tests/helpers/test_stream.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
22
import threading
3+
import time
34

45
from ghoshell_moss.core.helpers.stream import (
56
create_thread_safe_stream,
@@ -62,3 +63,31 @@ def sync_receiving():
6263
t1.join()
6364
t2.join()
6465
assert content == done[0]
66+
67+
68+
def test_receiver_waits_after_queue_empty_until_new_item_sync():
69+
sender, receiver = create_thread_safe_stream(timeout=1.0)
70+
consumed: list[str] = []
71+
72+
def producer():
73+
with sender:
74+
sender.append("A") # queue has one item; not completed yet
75+
time.sleep(0.1) # ensure consumer attempts the next() on empty queue
76+
sender.append("B")
77+
sender.commit()
78+
79+
def consumer():
80+
with receiver:
81+
a = next(receiver)
82+
consumed.append(a)
83+
b = next(receiver)
84+
consumed.append(b)
85+
86+
t1 = threading.Thread(target=producer)
87+
t2 = threading.Thread(target=consumer)
88+
t1.start()
89+
t2.start()
90+
t1.join()
91+
t2.join()
92+
93+
assert consumed == ["A", "B"]

0 commit comments

Comments
 (0)