Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions salt/transport/zeromq.py
Original file line number Diff line number Diff line change
Expand Up @@ -1162,9 +1162,6 @@ async def _send_recv(
send_recv_running = False
break

if future.done():
continue

try:
# Wait for socket to be ready for sending
if not await socket.poll(300, zmq.POLLOUT):
Expand Down Expand Up @@ -1859,9 +1856,6 @@ async def _send_recv(
send_recv_running = False
break

if future.done():
continue

try:
# Wait for socket to be ready for sending
if not await socket.poll(300, zmq.POLLOUT):
Expand Down
8 changes: 8 additions & 0 deletions tests/pytests/unit/transport/test_zeromq.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import pytest
import tornado.ioloop
import zmq.eventloop.future
from pytestshellutils.utils import ports

import salt.config
import salt.transport.base
Expand Down Expand Up @@ -1741,6 +1742,13 @@ async def test_req_chan_bad_payload_to_decode(pki_dir, io_loop, caplog):


async def test_client_timeout_msg(minion_opts, io_loop):
# Point at a port that nothing is listening on so the send is forced to
# actually wait out the client-side timeout. The conftest's default
# master_uri uses port 4506, which collides with a real salt-master if
# one happens to be running on the test host.
minion_opts["master_uri"] = "tcp://127.0.0.1:{}".format(
ports.get_unused_localhost_port()
)
client = salt.transport.zeromq.RequestClient(minion_opts, io_loop)
await client.connect()
try:
Expand Down
92 changes: 92 additions & 0 deletions tests/pytests/unit/transport/test_zeromq_concurrency.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import asyncio

import tornado.concurrent
import zmq

import salt.payload
import salt.transport.zeromq
from salt.exceptions import SaltReqTimeoutError
from tests.support.mock import AsyncMock


Expand Down Expand Up @@ -62,6 +65,95 @@ async def mocked_recv(**kwargs):
client.close()


async def test_request_client_sends_stale_future_messages(minion_opts, io_loop):
"""
Regression test: queued messages whose futures have already been marked
done (e.g. the caller-side timeout fired while the message was waiting
in the queue behind a slow master) must still be sent on the wire.

The master persists side effects from the payload itself (``_return``
writes to the job cache, fires ``event_return``, calls into the
external job cache / RaaS). Whether the originating worker thread is
still waiting for an ACK is irrelevant — dropping the send means the
master never sees the return data and the result store is silently
incomplete.

A pre-send ``if future.done(): continue`` short-circuit in
``_send_recv`` (added during the worker-pool routing rewrite) made
every stale entry get skipped instead of sent. This test fails if
that short-circuit is present and passes once it is removed.
"""
n_messages = 20
client = salt.transport.zeromq.RequestClient(minion_opts, io_loop)

wire_log = []

async def mocked_send(msg, **kwargs):
wire_log.append(msg)

async def mocked_recv(**kwargs):
return salt.payload.dumps({"ret": "ok"})

async def mocked_poll(timeout, flag=None, **kwargs):
return True

mock_socket = AsyncMock()
mock_socket.send = mocked_send
mock_socket.recv = mocked_recv
mock_socket.poll = mocked_poll

# In production, the post-send ``if future.done():`` branch reconnects
# the socket and spawns a fresh ``_send_recv`` task to drain the next
# queued entry. Mirror that respawn chain here without touching real
# ZMQ machinery, so the loop can process all N pre-staged entries.
async def fake_reconnect():
client.send_recv_task_id += 1
client.socket = mock_socket
client.send_recv_task = asyncio.create_task(
client._send_recv(
mock_socket, client._queue, task_id=client.send_recv_task_id
)
)

client._reconnect = fake_reconnect

# Pre-stage N (future, message) pairs whose futures have already been
# marked done with SaltReqTimeoutError. Models the production case:
# _timeout_message fired while the entry was waiting in the queue.
for i in range(n_messages):
future = tornado.concurrent.Future()
future.set_exception(SaltReqTimeoutError("Message timed out"))
# Retrieve the exception so the gc doesn't complain that nobody
# awaited it.
future.exception()
message = salt.payload.dumps({"cmd": "_return", "seq": i})
client._queue.put_nowait((future, message))

client.socket = mock_socket
client.send_recv_task = asyncio.create_task(
client._send_recv(mock_socket, client._queue, task_id=client.send_recv_task_id)
)

loop = asyncio.get_event_loop()
deadline = loop.time() + 5
while len(wire_log) < n_messages and loop.time() < deadline:
await asyncio.sleep(0.05)

client._closing = True
if client.send_recv_task and not client.send_recv_task.done():
client.send_recv_task.cancel()
try:
await client.send_recv_task
except (asyncio.CancelledError, BaseException): # pylint: disable=broad-except
pass

assert len(wire_log) == n_messages, (
f"Expected {n_messages} frames on the wire, observed {len(wire_log)}. "
"Messages with already-timed-out futures are being silently dropped "
"before they reach the master."
)


async def test_request_client_reconnect_task_safety(minion_opts, io_loop):
"""
Regression test for task leaks and state corruption during reconnections.
Expand Down
Loading