Skip to content

Commit 7b9f32d

Browse files
fix(ci): fix integration test reliability for sandbox and RabbitMQ backends
- conftest.py: add `-u` flag for unbuffered Python output in worker subprocess - conftest.py: replace subprocess.PIPE pair with stdout=DEVNULL + stderr-watching background thread; this prevents pipe-buffer deadlocks and detects readiness from the "Listening for tasks" log line (works for all backends incl. sandbox) - conftest.py: wait up to 120 s for worker readiness (instead of 4 s sleep); sandbox container boot can take ~60 s on a cold CI runner - conftest.py: amqp URL now includes ?heartbeat=600 to prevent AMQP idle-disconnect; without this the default 60 s heartbeat causes the listen() generator to exit and stops the worker consuming any tasks - integration.yml: use rabbitmq:3-management-alpine (matches unit-tests job) Agent-Logs-Url: https://github.com/codeSamuraii/pyfuse/sessions/3fc206b9-a7f6-45e5-9770-7a59c2f59f3c Co-authored-by: codeSamuraii <17270548+codeSamuraii@users.noreply.github.com>
1 parent 086fffd commit 7b9f32d

2 files changed

Lines changed: 63 additions & 15 deletions

File tree

.github/workflows/integration.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ jobs:
9494
--health-retries 5
9595
9696
rabbitmq:
97-
image: rabbitmq:3-alpine
97+
image: rabbitmq:3-management-alpine
9898
ports: ["5672:5672"]
9999
options: >-
100100
--health-cmd "rabbitmq-diagnostics -q ping"

tests/integration/conftest.py

Lines changed: 62 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import socket
1616
import subprocess
1717
import sys
18+
import threading
1819
import time
1920
from collections.abc import AsyncIterator, Generator
2021
from urllib.parse import urlparse
@@ -64,7 +65,12 @@ def backend_url() -> str:
6465
if backend == "redis":
6566
return "redis://localhost:6379"
6667
if backend in ("rabbitmq", "amqp"):
67-
return "amqp://localhost"
68+
# heartbeat=600 keeps the AMQP connection alive across the whole test
69+
# session. Without this the default 60-second heartbeat timeout
70+
# causes the connection to be closed after 60 s of idle time, which
71+
# makes the listen() generator exit and stops the worker from
72+
# processing further tasks.
73+
return "amqp://localhost?heartbeat=600"
6874
# local: pick a free port
6975
port = _free_port()
7076
return f"local://127.0.0.1:{port}"
@@ -94,10 +100,19 @@ def worker_process(
94100
``PYFUSE_SIGNING_TOKEN`` (when present) flows automatically to the
95101
worker without any extra plumbing.
96102
103+
Readiness is detected by watching the worker's stderr for the
104+
"Listening for tasks" log line. A background daemon thread drains
105+
the stderr pipe so it never fills up (which would freeze the worker),
106+
and forwards every line to the test-runner's stderr for CI visibility.
107+
108+
For the local backend an additional TCP probe confirms the broker is
109+
accepting connections before the readiness wait begins.
110+
97111
Teardown: SIGTERM → wait 10 s → SIGKILL.
98112
"""
113+
# -u forces unbuffered output so log lines appear immediately in the pipe.
99114
cmd = [
100-
sys.executable, "-m", "pyfuse", "worker",
115+
sys.executable, "-u", "-m", "pyfuse", "worker",
101116
"--backend", backend_url,
102117
"--no-auto-install",
103118
"--log-level", "DEBUG",
@@ -109,32 +124,64 @@ def worker_process(
109124

110125
proc = subprocess.Popen(
111126
cmd,
112-
stdout=subprocess.PIPE,
127+
stdout=subprocess.DEVNULL,
113128
stderr=subprocess.PIPE,
114129
)
115130

116-
# Wait until the broker/worker is reachable before yielding.
131+
# Background thread: drain stderr and signal when the worker is ready.
132+
# Draining is essential — if we never read from the pipe and the worker
133+
# writes more than 64 KB of debug output, it will block on the write()
134+
# syscall, freezing its asyncio event loop.
135+
ready = threading.Event()
136+
137+
def _watch_stderr() -> None:
138+
assert proc.stderr is not None
139+
for raw_line in proc.stderr:
140+
try:
141+
line = raw_line.decode("utf-8", errors="replace")
142+
except Exception:
143+
line = repr(raw_line) + "\n"
144+
sys.stderr.write(f"[worker] {line}")
145+
sys.stderr.flush()
146+
if "Listening for tasks" in line:
147+
ready.set()
148+
# Process exited or pipe closed — unblock any waiter.
149+
ready.set()
150+
151+
watcher = threading.Thread(target=_watch_stderr, daemon=True)
152+
watcher.start()
153+
154+
# For the local backend, also wait for the broker TCP port to be open.
155+
# This provides a fast early-failure signal (e.g. port already in use)
156+
# without waiting the full readiness timeout.
117157
if backend_url.startswith("local://"):
118158
parsed = urlparse(backend_url)
119159
host = parsed.hostname or "127.0.0.1"
120160
port = parsed.port or 9748
121161
if not _wait_for_tcp(host, port, timeout=20.0):
122162
proc.terminate()
123-
_, err = proc.communicate(timeout=5)
163+
proc.wait(timeout=5)
164+
watcher.join(timeout=5)
124165
raise RuntimeError(
125-
f"Worker broker did not start on {host}:{port}.\n"
126-
f"stderr:\n{err.decode()}"
166+
f"Worker broker did not start on {host}:{port}."
127167
)
128-
else:
129-
# Redis / RabbitMQ workers connect asynchronously; give them time
130-
# to connect and begin listening before the first task is submitted.
131-
time.sleep(4)
168+
169+
# Wait until the worker reports it is listening for tasks.
170+
# Sandbox container boot can take up to ~60 seconds on a cold CI runner,
171+
# so we allow up to 120 seconds here.
172+
ready_timeout = 120.0
173+
if not ready.wait(timeout=ready_timeout):
174+
proc.terminate()
175+
proc.wait(timeout=5)
176+
watcher.join(timeout=5)
177+
raise RuntimeError(
178+
f"Worker did not become ready within {ready_timeout:.0f} seconds."
179+
)
132180

133181
if proc.poll() is not None:
134-
_, err = proc.communicate()
182+
watcher.join(timeout=5)
135183
raise RuntimeError(
136-
f"Worker process exited prematurely (rc={proc.returncode}).\n"
137-
f"stderr:\n{err.decode()}"
184+
f"Worker process exited prematurely (rc={proc.returncode})."
138185
)
139186

140187
yield proc
@@ -145,6 +192,7 @@ def worker_process(
145192
except subprocess.TimeoutExpired:
146193
proc.kill()
147194
proc.wait()
195+
watcher.join(timeout=5)
148196

149197

150198
# ---------------------------------------------------------------------------

0 commit comments

Comments
 (0)