@@ -83,6 +83,7 @@ def __init__(
8383 self ._tasks .add_error_handler (self ._on_task_error )
8484 self ._queue = queue or InMemoryMessageQueue ()
8585 self ._closed = False
86+ self ._disconnected = False
8687 self ._sender = (sender_factory or self ._default_sender_factory )(self ._writer , self ._tasks )
8788 if listening :
8889 self ._recv_task = self ._tasks .create (
@@ -132,6 +133,7 @@ def add_observer(self, observer: StreamObserver) -> None:
132133 self ._observers .append (observer )
133134
134135 async def send_request (self , method : str , params : JsonValue | None = None ) -> Any :
136+ self ._raise_if_unavailable ()
135137 request_id = self ._next_request_id
136138 self ._next_request_id += 1
137139 future = self ._state .register_outgoing (request_id , method )
@@ -141,6 +143,7 @@ async def send_request(self, method: str, params: JsonValue | None = None) -> An
141143 return await future
142144
143145 async def send_notification (self , method : str , params : JsonValue | None = None ) -> None :
146+ self ._raise_if_unavailable ()
144147 payload = {"jsonrpc" : "2.0" , "method" : method , "params" : params }
145148 await self ._sender .send (payload )
146149 self ._notify_observers (StreamDirection .OUTGOING , payload )
@@ -160,14 +163,7 @@ async def _receive_loop(self) -> None:
160163 await self ._process_message (message )
161164 except asyncio .CancelledError :
162165 return
163- # EOF: the remote end closed the connection. Reject any in-flight
164- # outgoing requests so their callers receive an error instead of
165- # hanging forever. Without this, a subprocess crash during
166- # initialize() or new_session() silently converts into an infinite
167- # hang because _on_receive_error is only invoked on exceptions.
168- self ._state .reject_all_outgoing (
169- ConnectionError ("Connection closed: remote end sent EOF" )
170- )
166+ self ._disconnect ()
171167
172168 async def _process_message (self , message : dict [str , Any ]) -> None :
173169 method = message .get ("method" )
@@ -270,7 +266,7 @@ async def _handle_response(self, message: dict[str, Any]) -> None:
270266
271267 def _on_receive_error (self , task : asyncio .Task [Any ], exc : BaseException ) -> None :
272268 logging .exception ("Receive loop failed" , exc_info = exc )
273- self ._state . reject_all_outgoing ( exc )
269+ self ._disconnect ( )
274270
275271 def _on_task_error (self , task : asyncio .Task [Any ], exc : BaseException ) -> None :
276272 logging .exception ("Background task failed" , exc_info = exc )
@@ -293,3 +289,13 @@ def _default_dispatcher_factory(
293289
294290 def _default_sender_factory (self , writer : asyncio .StreamWriter , supervisor : TaskSupervisor ) -> MessageSender :
295291 return MessageSender (writer , supervisor )
292+
293+ def _disconnect (self ) -> None :
294+ if self ._disconnected :
295+ return
296+ self ._disconnected = True
297+ self ._state .reject_all_outgoing (ConnectionError ("Connection closed" ))
298+
299+ def _raise_if_unavailable (self ) -> None :
300+ if self ._disconnected or self ._closed :
301+ raise ConnectionError ("Connection closed" )
0 commit comments