2929
3030
3131class _StreamError :
32- """Wraps a retryable error with the stream generation that produced it."""
32+ """Wraps an error with the stream generation that produced it."""
3333
3434 def __init__ (self , exception : Exception , generation : int ):
3535 self .exception = exception
@@ -77,7 +77,7 @@ def register(self, read_ids: Set[int]) -> asyncio.Queue:
7777 return queue
7878
7979 def unregister (self , read_ids : Set [int ]) -> None :
80- """Remove read_ids from routing. Stops recv loop if no tasks remain. """
80+ """Remove read_ids from routing."""
8181 for read_id in read_ids :
8282 self ._queues .pop (read_id , None )
8383
@@ -151,9 +151,7 @@ async def reopen_stream(
151151 await self ._recv_task
152152 except (asyncio .CancelledError , Exception ):
153153 pass
154- error = _StreamError (
155- Exception ("Stream reopening" ), self ._stream_generation
156- )
154+ error = _StreamError (Exception ("Stream reopening" ), self ._stream_generation )
157155 for queue in self ._get_unique_queues ():
158156 self ._put_error_nowait (queue , error )
159157 try :
@@ -171,8 +169,6 @@ async def close(self) -> None:
171169 await self ._recv_task
172170 except (asyncio .CancelledError , Exception ):
173171 pass
174- error = _StreamError (
175- Exception ("Multiplexer closed" ), self ._stream_generation
176- )
172+ error = _StreamError (Exception ("Multiplexer closed" ), self ._stream_generation )
177173 for queue in self ._get_unique_queues ():
178174 self ._put_error_nowait (queue , error )
0 commit comments