From 65b1e80031c97b64b6f514ec6b786cbda4b07ff8 Mon Sep 17 00:00:00 2001 From: Debug Agent Date: Wed, 1 Apr 2026 15:24:14 -0300 Subject: [PATCH 1/2] fix: reject pending requests on EOF to prevent infinite hang When the remote end closes the connection (e.g., subprocess crashes), _receive_loop exits cleanly on EOF without raising an exception. This means _on_receive_error is never called and pending outgoing request futures hang forever. Add reject_all_outgoing() after the receive loop breaks on EOF so callers get a ConnectionError instead of an infinite hang. Fixes #85 Co-Authored-By: Claude Opus 4.6 --- src/acp/connection.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/acp/connection.py b/src/acp/connection.py index aca1c19..11688e6 100644 --- a/src/acp/connection.py +++ b/src/acp/connection.py @@ -160,6 +160,14 @@ async def _receive_loop(self) -> None: await self._process_message(message) except asyncio.CancelledError: return + # EOF: the remote end closed the connection. Reject any in-flight + # outgoing requests so their callers receive an error instead of + # hanging forever. Without this, a subprocess crash during + # initialize() or new_session() silently converts into an infinite + # hang because _on_receive_error is only invoked on exceptions. + self._state.reject_all_outgoing( + ConnectionError("Connection closed: remote end sent EOF") + ) async def _process_message(self, message: dict[str, Any]) -> None: method = message.get("method") From 56f31da3a833d8592e19529d29e9e2c44d8cc98a Mon Sep 17 00:00:00 2001 From: Chojan Shang Date: Wed, 15 Apr 2026 02:44:56 +0800 Subject: [PATCH 2/2] fix: fail fast after connection EOF --- src/acp/connection.py | 24 +++++++++++++++--------- tests/test_rpc.py | 30 ++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 9 deletions(-) diff --git a/src/acp/connection.py b/src/acp/connection.py index 11688e6..ccf2fc3 100644 --- a/src/acp/connection.py +++ b/src/acp/connection.py @@ -83,6 +83,7 @@ def __init__( self._tasks.add_error_handler(self._on_task_error) self._queue = queue or InMemoryMessageQueue() self._closed = False + self._disconnected = False self._sender = (sender_factory or self._default_sender_factory)(self._writer, self._tasks) if listening: self._recv_task = self._tasks.create( @@ -132,6 +133,7 @@ def add_observer(self, observer: StreamObserver) -> None: self._observers.append(observer) async def send_request(self, method: str, params: JsonValue | None = None) -> Any: + self._raise_if_unavailable() request_id = self._next_request_id self._next_request_id += 1 future = self._state.register_outgoing(request_id, method) @@ -141,6 +143,7 @@ async def send_request(self, method: str, params: JsonValue | None = None) -> An return await future async def send_notification(self, method: str, params: JsonValue | None = None) -> None: + self._raise_if_unavailable() payload = {"jsonrpc": "2.0", "method": method, "params": params} await self._sender.send(payload) self._notify_observers(StreamDirection.OUTGOING, payload) @@ -160,14 +163,7 @@ async def _receive_loop(self) -> None: await self._process_message(message) except asyncio.CancelledError: return - # EOF: the remote end closed the connection. Reject any in-flight - # outgoing requests so their callers receive an error instead of - # hanging forever. Without this, a subprocess crash during - # initialize() or new_session() silently converts into an infinite - # hang because _on_receive_error is only invoked on exceptions. - self._state.reject_all_outgoing( - ConnectionError("Connection closed: remote end sent EOF") - ) + self._disconnect() async def _process_message(self, message: dict[str, Any]) -> None: method = message.get("method") @@ -270,7 +266,7 @@ async def _handle_response(self, message: dict[str, Any]) -> None: def _on_receive_error(self, task: asyncio.Task[Any], exc: BaseException) -> None: logging.exception("Receive loop failed", exc_info=exc) - self._state.reject_all_outgoing(exc) + self._disconnect() def _on_task_error(self, task: asyncio.Task[Any], exc: BaseException) -> None: logging.exception("Background task failed", exc_info=exc) @@ -293,3 +289,13 @@ def _default_dispatcher_factory( def _default_sender_factory(self, writer: asyncio.StreamWriter, supervisor: TaskSupervisor) -> MessageSender: return MessageSender(writer, supervisor) + + def _disconnect(self) -> None: + if self._disconnected: + return + self._disconnected = True + self._state.reject_all_outgoing(ConnectionError("Connection closed")) + + def _raise_if_unavailable(self) -> None: + if self._disconnected or self._closed: + raise ConnectionError("Connection closed") diff --git a/tests/test_rpc.py b/tests/test_rpc.py index 0d3bb75..b30e358 100644 --- a/tests/test_rpc.py +++ b/tests/test_rpc.py @@ -26,6 +26,7 @@ update_agent_message_text, update_tool_call, ) +from acp.connection import Connection from acp.core import AgentSideConnection, ClientSideConnection from acp.schema import ( AgentMessageChunk, @@ -199,6 +200,35 @@ async def read_one(i: int): assert res.content == f"Content {i}" +@pytest.mark.asyncio +async def test_pending_request_fails_when_remote_sends_eof(server): + conn = Connection(lambda method, params, is_notification: None, server.client_writer, server.client_reader) + request = asyncio.create_task(conn.send_request("ping", {"value": 1})) + + await asyncio.sleep(0.05) + server.server_writer.close() + await server.server_writer.wait_closed() + + with pytest.raises(ConnectionError, match="Connection closed"): + await asyncio.wait_for(request, timeout=1.0) + + await conn.close() + + +@pytest.mark.asyncio +async def test_new_requests_fail_fast_after_remote_eof(server): + conn = Connection(lambda method, params, is_notification: None, server.client_writer, server.client_reader) + + server.server_writer.close() + await server.server_writer.wait_closed() + await asyncio.sleep(0.05) + + with pytest.raises(ConnectionError, match="Connection closed"): + await asyncio.wait_for(conn.send_request("ping", {"value": 1}), timeout=1.0) + + await conn.close() + + @pytest.mark.asyncio async def test_invalid_params_results_in_error_response(connect, server): # Only start agent-side (server) so we can inject raw request from client socket