From 3bb3382e0a15766969cf6beb8fc9fb62ab0b0200 Mon Sep 17 00:00:00 2001 From: "Daniel A. Wozniak" Date: Wed, 20 May 2026 23:38:40 -0700 Subject: [PATCH] Fix silent drop of return payloads under master backpressure MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When RequestClient._send_recv pulled a (future, message) pair off the queue whose future had already been marked done — i.e. the per-message timeout fired while the entry waited behind a slow master — the pre-send `if future.done(): continue` short-circuit discarded the message without ever sending it. The minion's caller saw the same SaltReqTimeoutError it sees in 3007.x, but the master never received the frame, so _return / event_return / external job cache side effects never fired and result stores were left silently incomplete. The reproducer in TRANSPORT_BUG.md shows ~4x delivery loss in 3008.0rc3 vs 3007.14 under the same workload. The short-circuit was added during the worker-pool routing rewrite (#68532) as a load-shedding optimization. That intent is sound for genuine request/response traffic, but wrong for fire-and-await-ACK payloads like _return: the master persists the side effects from the message itself, regardless of whether the originating future is still alive. Remove the pre-send check from both _send_recv implementations. The remaining post-send `future.done()` handling and the recv-loop's `elif future.done(): break` preserve the back-pressure guarantees (POLLOUT/POLLIN poll budgets, reconnect on stale futures) — same control flow as 3007.x. Add a regression test that pre-stages 20 stale-future entries on the queue and asserts every one reaches the mocked wire. Fails on the unpatched source with 0/20 sends recorded. Also fix test_client_timeout_msg to use a guaranteed-unused local port instead of the default 4506 — the test assumes nothing is listening there, which collides with any host running a real salt-master. --- salt/transport/zeromq.py | 6 -- tests/pytests/unit/transport/test_zeromq.py | 8 ++ .../unit/transport/test_zeromq_concurrency.py | 92 +++++++++++++++++++ 3 files changed, 100 insertions(+), 6 deletions(-) diff --git a/salt/transport/zeromq.py b/salt/transport/zeromq.py index 303d84852065..f1e14a539e54 100644 --- a/salt/transport/zeromq.py +++ b/salt/transport/zeromq.py @@ -1162,9 +1162,6 @@ async def _send_recv( send_recv_running = False break - if future.done(): - continue - try: # Wait for socket to be ready for sending if not await socket.poll(300, zmq.POLLOUT): @@ -1859,9 +1856,6 @@ async def _send_recv( send_recv_running = False break - if future.done(): - continue - try: # Wait for socket to be ready for sending if not await socket.poll(300, zmq.POLLOUT): diff --git a/tests/pytests/unit/transport/test_zeromq.py b/tests/pytests/unit/transport/test_zeromq.py index 3073777579ef..c68f5ad69e46 100644 --- a/tests/pytests/unit/transport/test_zeromq.py +++ b/tests/pytests/unit/transport/test_zeromq.py @@ -12,6 +12,7 @@ import pytest import tornado.ioloop import zmq.eventloop.future +from pytestshellutils.utils import ports import salt.config import salt.transport.base @@ -1741,6 +1742,13 @@ async def test_req_chan_bad_payload_to_decode(pki_dir, io_loop, caplog): async def test_client_timeout_msg(minion_opts, io_loop): + # Point at a port that nothing is listening on so the send is forced to + # actually wait out the client-side timeout. The conftest's default + # master_uri uses port 4506, which collides with a real salt-master if + # one happens to be running on the test host. + minion_opts["master_uri"] = "tcp://127.0.0.1:{}".format( + ports.get_unused_localhost_port() + ) client = salt.transport.zeromq.RequestClient(minion_opts, io_loop) await client.connect() try: diff --git a/tests/pytests/unit/transport/test_zeromq_concurrency.py b/tests/pytests/unit/transport/test_zeromq_concurrency.py index ee07f3d2ef4d..3a37d5264b57 100644 --- a/tests/pytests/unit/transport/test_zeromq_concurrency.py +++ b/tests/pytests/unit/transport/test_zeromq_concurrency.py @@ -1,8 +1,11 @@ import asyncio +import tornado.concurrent import zmq +import salt.payload import salt.transport.zeromq +from salt.exceptions import SaltReqTimeoutError from tests.support.mock import AsyncMock @@ -62,6 +65,95 @@ async def mocked_recv(**kwargs): client.close() +async def test_request_client_sends_stale_future_messages(minion_opts, io_loop): + """ + Regression test: queued messages whose futures have already been marked + done (e.g. the caller-side timeout fired while the message was waiting + in the queue behind a slow master) must still be sent on the wire. + + The master persists side effects from the payload itself (``_return`` + writes to the job cache, fires ``event_return``, calls into the + external job cache / RaaS). Whether the originating worker thread is + still waiting for an ACK is irrelevant — dropping the send means the + master never sees the return data and the result store is silently + incomplete. + + A pre-send ``if future.done(): continue`` short-circuit in + ``_send_recv`` (added during the worker-pool routing rewrite) made + every stale entry get skipped instead of sent. This test fails if + that short-circuit is present and passes once it is removed. + """ + n_messages = 20 + client = salt.transport.zeromq.RequestClient(minion_opts, io_loop) + + wire_log = [] + + async def mocked_send(msg, **kwargs): + wire_log.append(msg) + + async def mocked_recv(**kwargs): + return salt.payload.dumps({"ret": "ok"}) + + async def mocked_poll(timeout, flag=None, **kwargs): + return True + + mock_socket = AsyncMock() + mock_socket.send = mocked_send + mock_socket.recv = mocked_recv + mock_socket.poll = mocked_poll + + # In production, the post-send ``if future.done():`` branch reconnects + # the socket and spawns a fresh ``_send_recv`` task to drain the next + # queued entry. Mirror that respawn chain here without touching real + # ZMQ machinery, so the loop can process all N pre-staged entries. + async def fake_reconnect(): + client.send_recv_task_id += 1 + client.socket = mock_socket + client.send_recv_task = asyncio.create_task( + client._send_recv( + mock_socket, client._queue, task_id=client.send_recv_task_id + ) + ) + + client._reconnect = fake_reconnect + + # Pre-stage N (future, message) pairs whose futures have already been + # marked done with SaltReqTimeoutError. Models the production case: + # _timeout_message fired while the entry was waiting in the queue. + for i in range(n_messages): + future = tornado.concurrent.Future() + future.set_exception(SaltReqTimeoutError("Message timed out")) + # Retrieve the exception so the gc doesn't complain that nobody + # awaited it. + future.exception() + message = salt.payload.dumps({"cmd": "_return", "seq": i}) + client._queue.put_nowait((future, message)) + + client.socket = mock_socket + client.send_recv_task = asyncio.create_task( + client._send_recv(mock_socket, client._queue, task_id=client.send_recv_task_id) + ) + + loop = asyncio.get_event_loop() + deadline = loop.time() + 5 + while len(wire_log) < n_messages and loop.time() < deadline: + await asyncio.sleep(0.05) + + client._closing = True + if client.send_recv_task and not client.send_recv_task.done(): + client.send_recv_task.cancel() + try: + await client.send_recv_task + except (asyncio.CancelledError, BaseException): # pylint: disable=broad-except + pass + + assert len(wire_log) == n_messages, ( + f"Expected {n_messages} frames on the wire, observed {len(wire_log)}. " + "Messages with already-timed-out futures are being silently dropped " + "before they reach the master." + ) + + async def test_request_client_reconnect_task_safety(minion_opts, io_loop): """ Regression test for task leaks and state corruption during reconnections.