Skip to content

Commit 10ce693

Browse files
committed
core: streaming: cancel inner task when client disconnects
1 parent e138d85 commit 10ce693

1 file changed

Lines changed: 16 additions & 6 deletions

File tree

core/libs/commonwealth/src/commonwealth/utils/streaming.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,15 +73,24 @@ async def generator_wrapper(gen: AsyncGenerator[str | bytes, None], queue: async
7373
finally:
7474
if heartbeat_task:
7575
heartbeat_task.cancel()
76+
await gen.aclose()
7677
await queue.put(None)
7778

78-
asyncio.create_task(generator_wrapper(gen, queue))
79+
wrapper_task = asyncio.create_task(generator_wrapper(gen, queue))
7980

80-
while True:
81-
item = await queue.get()
82-
if item is None:
83-
break
84-
yield item
81+
try:
82+
while True:
83+
item = await queue.get()
84+
if item is None:
85+
break
86+
yield item
87+
finally:
88+
if not wrapper_task.done():
89+
wrapper_task.cancel()
90+
try:
91+
await wrapper_task
92+
except asyncio.CancelledError:
93+
pass
8594

8695

8796
async def _fetch_stream(
@@ -93,6 +102,7 @@ async def _fetch_stream(
93102
except Exception as e:
94103
await queue.put((None, e))
95104
finally:
105+
await gen.aclose()
96106
await queue.put((None, None))
97107

98108

0 commit comments

Comments
 (0)