Skip to content

Commit ad77074

Browse files
mikasenghaasclaude
andauthored
fix env server deadlock (#921)
asyncio.wait_for wraps recv_multipart in a Task and cancels it on timeout. There is a race in CPython's Task.__step: when the recv completes (consuming data from the ZMQ buffer) but _must_cancel is already set by the timeout, the result is silently discarded via super().cancel() instead of super().set_result(). The message is consumed from the socket but never processed — gone forever. This caused a deadlock when training with rescheduling + validation: the client hangs waiting for responses to requests the server silently dropped. Observed as 2/450 messages lost in production. Replace with zmq.asyncio.Poller which is non-destructive: poll only checks socket readability without consuming any data. recv_multipart is only called when data is guaranteed available, so it completes immediately with no cancellation window. Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 92f0ae6 commit ad77074

1 file changed

Lines changed: 15 additions & 12 deletions

File tree

verifiers/workers/server/zmq_env_server.py

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,22 +32,28 @@ def __init__(self, *args, address: str = "tcp://127.0.0.1:5000", **kwargs):
3232
async def run(self, stop_event: asyncio.Event | None = None):
3333
self.logger.info(f"{self.__class__.__name__} started on {self.address}")
3434

35-
# Create a task to wait for stop signal
36-
stop_task = asyncio.create_task(stop_event.wait()) if stop_event else None
35+
# Use a poller to check for incoming data instead of asyncio.wait_for.
36+
# asyncio.wait_for wraps recv_multipart in a Task and cancels it on
37+
# timeout. There is a race in CPython's Task.__step where the recv
38+
# completes (consuming data from the ZMQ buffer) but _must_cancel is
39+
# already set, so the result is silently discarded — the message is
40+
# gone forever. A poller is non-destructive: it only checks socket
41+
# readability without consuming any data.
42+
poller = zmq.asyncio.Poller()
43+
poller.register(self.socket, zmq.POLLIN)
3744

3845
try:
3946
while True:
40-
# exit gracefully on stop signal
4147
if stop_event and stop_event.is_set():
4248
self.logger.info("Stop event received, shutting down gracefully")
4349
break
4450

4551
try:
46-
# receive with timeout to periodically check stop_event
47-
frames = await asyncio.wait_for(
48-
self.socket.recv_multipart(),
49-
timeout=1.0 if stop_event else None,
50-
)
52+
events = dict(await poller.poll(timeout=1000))
53+
if self.socket not in events:
54+
continue
55+
56+
frames = await self.socket.recv_multipart()
5157

5258
if len(frames) != 3:
5359
self.logger.warning(
@@ -64,15 +70,12 @@ async def run(self, stop_event: asyncio.Event | None = None):
6470
self.pending_tasks.add(task)
6571
task.add_done_callback(self.pending_tasks.discard)
6672

67-
except asyncio.TimeoutError:
68-
continue
6973
except asyncio.CancelledError:
7074
break
7175
except Exception as e:
7276
self.logger.error(f"Error in server loop: {e}", exc_info=True)
7377
finally:
74-
if stop_task and not stop_task.done():
75-
stop_task.cancel()
78+
poller.unregister(self.socket)
7679

7780
async def close(self):
7881
# cancel and await all pending tasks

0 commit comments

Comments
 (0)