Skip to content

Commit 0a34b3b

Browse files
committed
core: streaming: cancel inner task when client disconnects
1 parent c024745 commit 0a34b3b

1 file changed

Lines changed: 22 additions & 6 deletions

File tree

core/libs/commonwealth/src/commonwealth/utils/streaming.py

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,15 +73,27 @@ async def generator_wrapper(gen: AsyncGenerator[str | bytes, None], queue: async
7373
finally:
7474
if heartbeat_task:
7575
heartbeat_task.cancel()
76+
try:
77+
await gen.aclose()
78+
except Exception:
79+
pass
7680
await queue.put(None)
7781

78-
asyncio.create_task(generator_wrapper(gen, queue))
82+
wrapper_task = asyncio.create_task(generator_wrapper(gen, queue))
7983

80-
while True:
81-
item = await queue.get()
82-
if item is None:
83-
break
84-
yield item
84+
try:
85+
while True:
86+
item = await queue.get()
87+
if item is None:
88+
break
89+
yield item
90+
finally:
91+
if not wrapper_task.done():
92+
wrapper_task.cancel()
93+
try:
94+
await wrapper_task
95+
except asyncio.CancelledError:
96+
pass
8597

8698

8799
async def _fetch_stream(
@@ -93,6 +105,10 @@ async def _fetch_stream(
93105
except Exception as e:
94106
await queue.put((None, e))
95107
finally:
108+
try:
109+
await gen.aclose()
110+
except Exception:
111+
pass
96112
await queue.put((None, None))
97113

98114

0 commit comments

Comments
 (0)