diff --git a/salt/transport/zeromq.py b/salt/transport/zeromq.py index 303d8485206..f1e14a539e5 100644 --- a/salt/transport/zeromq.py +++ b/salt/transport/zeromq.py @@ -1162,9 +1162,6 @@ async def _send_recv( send_recv_running = False break - if future.done(): - continue - try: # Wait for socket to be ready for sending if not await socket.poll(300, zmq.POLLOUT): @@ -1859,9 +1856,6 @@ async def _send_recv( send_recv_running = False break - if future.done(): - continue - try: # Wait for socket to be ready for sending if not await socket.poll(300, zmq.POLLOUT): diff --git a/tests/pytests/unit/transport/test_zeromq.py b/tests/pytests/unit/transport/test_zeromq.py index 3073777579e..c68f5ad69e4 100644 --- a/tests/pytests/unit/transport/test_zeromq.py +++ b/tests/pytests/unit/transport/test_zeromq.py @@ -12,6 +12,7 @@ import pytest import tornado.ioloop import zmq.eventloop.future +from pytestshellutils.utils import ports import salt.config import salt.transport.base @@ -1741,6 +1742,13 @@ async def test_req_chan_bad_payload_to_decode(pki_dir, io_loop, caplog): async def test_client_timeout_msg(minion_opts, io_loop): + # Point at a port that nothing is listening on so the send is forced to + # actually wait out the client-side timeout. The conftest's default + # master_uri uses port 4506, which collides with a real salt-master if + # one happens to be running on the test host. + minion_opts["master_uri"] = "tcp://127.0.0.1:{}".format( + ports.get_unused_localhost_port() + ) client = salt.transport.zeromq.RequestClient(minion_opts, io_loop) await client.connect() try: diff --git a/tests/pytests/unit/transport/test_zeromq_concurrency.py b/tests/pytests/unit/transport/test_zeromq_concurrency.py index ee07f3d2ef4..3a37d5264b5 100644 --- a/tests/pytests/unit/transport/test_zeromq_concurrency.py +++ b/tests/pytests/unit/transport/test_zeromq_concurrency.py @@ -1,8 +1,11 @@ import asyncio +import tornado.concurrent import zmq +import salt.payload import salt.transport.zeromq +from salt.exceptions import SaltReqTimeoutError from tests.support.mock import AsyncMock @@ -62,6 +65,95 @@ async def mocked_recv(**kwargs): client.close() +async def test_request_client_sends_stale_future_messages(minion_opts, io_loop): + """ + Regression test: queued messages whose futures have already been marked + done (e.g. the caller-side timeout fired while the message was waiting + in the queue behind a slow master) must still be sent on the wire. + + The master persists side effects from the payload itself (``_return`` + writes to the job cache, fires ``event_return``, calls into the + external job cache / RaaS). Whether the originating worker thread is + still waiting for an ACK is irrelevant — dropping the send means the + master never sees the return data and the result store is silently + incomplete. + + A pre-send ``if future.done(): continue`` short-circuit in + ``_send_recv`` (added during the worker-pool routing rewrite) made + every stale entry get skipped instead of sent. This test fails if + that short-circuit is present and passes once it is removed. + """ + n_messages = 20 + client = salt.transport.zeromq.RequestClient(minion_opts, io_loop) + + wire_log = [] + + async def mocked_send(msg, **kwargs): + wire_log.append(msg) + + async def mocked_recv(**kwargs): + return salt.payload.dumps({"ret": "ok"}) + + async def mocked_poll(timeout, flag=None, **kwargs): + return True + + mock_socket = AsyncMock() + mock_socket.send = mocked_send + mock_socket.recv = mocked_recv + mock_socket.poll = mocked_poll + + # In production, the post-send ``if future.done():`` branch reconnects + # the socket and spawns a fresh ``_send_recv`` task to drain the next + # queued entry. Mirror that respawn chain here without touching real + # ZMQ machinery, so the loop can process all N pre-staged entries. + async def fake_reconnect(): + client.send_recv_task_id += 1 + client.socket = mock_socket + client.send_recv_task = asyncio.create_task( + client._send_recv( + mock_socket, client._queue, task_id=client.send_recv_task_id + ) + ) + + client._reconnect = fake_reconnect + + # Pre-stage N (future, message) pairs whose futures have already been + # marked done with SaltReqTimeoutError. Models the production case: + # _timeout_message fired while the entry was waiting in the queue. + for i in range(n_messages): + future = tornado.concurrent.Future() + future.set_exception(SaltReqTimeoutError("Message timed out")) + # Retrieve the exception so the gc doesn't complain that nobody + # awaited it. + future.exception() + message = salt.payload.dumps({"cmd": "_return", "seq": i}) + client._queue.put_nowait((future, message)) + + client.socket = mock_socket + client.send_recv_task = asyncio.create_task( + client._send_recv(mock_socket, client._queue, task_id=client.send_recv_task_id) + ) + + loop = asyncio.get_event_loop() + deadline = loop.time() + 5 + while len(wire_log) < n_messages and loop.time() < deadline: + await asyncio.sleep(0.05) + + client._closing = True + if client.send_recv_task and not client.send_recv_task.done(): + client.send_recv_task.cancel() + try: + await client.send_recv_task + except (asyncio.CancelledError, BaseException): # pylint: disable=broad-except + pass + + assert len(wire_log) == n_messages, ( + f"Expected {n_messages} frames on the wire, observed {len(wire_log)}. " + "Messages with already-timed-out futures are being silently dropped " + "before they reach the master." + ) + + async def test_request_client_reconnect_task_safety(minion_opts, io_loop): """ Regression test for task leaks and state corruption during reconnections.