From bd61268789ed136ef688e06efb30ffbb55f8d543 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 21 Apr 2026 19:41:02 +0000 Subject: [PATCH 1/4] feat: add GitHub Actions integration test workflow and cross-process e2e tests Agent-Logs-Url: https://github.com/codeSamuraii/pyfuse/sessions/925ac194-398a-43f8-aba9-4fc0abfedaad Co-authored-by: codeSamuraii <17270548+codeSamuraii@users.noreply.github.com> --- .github/workflows/integration.yml | 159 +++++++++++++++++ tests/integration/__init__.py | 0 tests/integration/conftest.py | 202 +++++++++++++++++++++ tests/integration/test_e2e.py | 286 ++++++++++++++++++++++++++++++ 4 files changed, 647 insertions(+) create mode 100644 .github/workflows/integration.yml create mode 100644 tests/integration/__init__.py create mode 100644 tests/integration/conftest.py create mode 100644 tests/integration/test_e2e.py diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml new file mode 100644 index 0000000..3f12b20 --- /dev/null +++ b/.github/workflows/integration.yml @@ -0,0 +1,159 @@ +name: Integration Tests + +on: + push: + branches: ["main", "master"] + pull_request: + workflow_dispatch: + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +env: + PYTHON_VERSION: "3.12" + +jobs: + # ────────────────────────────────────────────────────────────────────────── + # Job 1 – existing unit and backend test suite + # Runs with real Redis and RabbitMQ so that the backend-specific test + # files (test_rabbitmq_backend.py, test_local_backend.py, …) are + # exercised against real services. + # ────────────────────────────────────────────────────────────────────────── + unit-tests: + name: "Unit & backend tests" + runs-on: ubuntu-latest + + services: + redis: + image: redis:7-alpine + ports: ["6379:6379"] + options: >- + --health-cmd "redis-cli ping" + --health-interval 5s + --health-timeout 5s + --health-retries 5 + + rabbitmq: + image: rabbitmq:3-management-alpine + ports: ["5672:5672"] + options: >- + --health-cmd "rabbitmq-diagnostics -q ping" + --health-interval 10s + --health-timeout 10s + --health-retries 5 + + steps: + - uses: actions/checkout@v4 + + - uses: actions/setup-python@v5 + with: + python-version: ${{ env.PYTHON_VERSION }} + + - name: Install pyfuse with all optional extras and dev dependencies + run: | + pip install --upgrade pip + pip install -e ".[redis,rabbitmq]" + pip install pytest pytest-asyncio + + - name: Run unit and backend test suite + env: + PYFUSE_TEST_RABBITMQ_URL: amqp://localhost + run: pytest tests/ -x -v --ignore=tests/integration/ + + # ────────────────────────────────────────────────────────────────────────── + # Job 2 – cross-process integration tests + # Each matrix variant runs a real worker subprocess in a separate process + # and a client in the test-runner process, connected to a real backend. + # + # Matrix dimensions: + # backend – local (TCP broker), redis, rabbitmq + # signing – with or without HMAC-signed tasks + # sandbox – with or without Docker sandbox isolation + # ────────────────────────────────────────────────────────────────────────── + integration: + name: "E2E (${{ matrix.backend }}, sign=${{ matrix.signing }}, sandbox=${{ matrix.sandbox }})" + runs-on: ubuntu-latest + needs: unit-tests + + # All matrix variants share the same job definition. Including both + # services in every variant is simpler than splitting into separate jobs + # and the idle service overhead is negligible. + services: + redis: + image: redis:7-alpine + ports: ["6379:6379"] + options: >- + --health-cmd "redis-cli ping" + --health-interval 5s + --health-timeout 5s + --health-retries 5 + + rabbitmq: + image: rabbitmq:3-alpine + ports: ["5672:5672"] + options: >- + --health-cmd "rabbitmq-diagnostics -q ping" + --health-interval 10s + --health-timeout 10s + --health-retries 5 + + strategy: + fail-fast: false + matrix: + backend: [local, redis, rabbitmq] + signing: ["true", "false"] + sandbox: ["true", "false"] + + steps: + - uses: actions/checkout@v4 + + - uses: actions/setup-python@v5 + with: + python-version: ${{ env.PYTHON_VERSION }} + + - name: Install pyfuse with all optional extras + run: | + pip install --upgrade pip + pip install -e ".[redis,rabbitmq]" + pip install pytest pytest-asyncio + + # Build the Docker sandbox image only for sandbox=true variants. + # The image is built from the bundled Dockerfile in pyfuse/worker/sandbox/. + - name: Build sandbox Docker image + if: matrix.sandbox == 'true' + run: bash scripts/setup_sandbox_docker.sh + + # Generate a fresh 32-byte random signing token for this run. + # The token is stored as a step output and injected as PYFUSE_SIGNING_TOKEN + # so that both the worker subprocess (which inherits it from the test + # environment) and the client code use the same key material. + - name: Generate signing token + id: gen-token + if: matrix.signing == 'true' + run: | + TOKEN=$(python -c "import os; print(os.urandom(32).hex())") + echo "token=$TOKEN" >> "$GITHUB_OUTPUT" + + # ── Signing enabled ───────────────────────────────────────────────── + # PYFUSE_SIGNING_TOKEN is set for both the test runner (client) and + # the worker subprocess (which inherits the environment). + - name: Run integration tests (signing enabled) + if: matrix.signing == 'true' + env: + PYFUSE_TEST_BACKEND: ${{ matrix.backend }} + PYFUSE_TEST_SIGNING: "true" + PYFUSE_TEST_SANDBOX: ${{ matrix.sandbox }} + PYFUSE_SIGNING_TOKEN: ${{ steps.gen-token.outputs.token }} + run: pytest tests/integration/ -v -s + + # ── Signing disabled ───────────────────────────────────────────────── + # No PYFUSE_SIGNING_TOKEN in the environment; tasks are unsigned and + # the worker accepts them without verification. + - name: Run integration tests (signing disabled) + if: matrix.signing == 'false' + env: + PYFUSE_TEST_BACKEND: ${{ matrix.backend }} + PYFUSE_TEST_SIGNING: "false" + PYFUSE_TEST_SANDBOX: ${{ matrix.sandbox }} + run: pytest tests/integration/ -v -s diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py new file mode 100644 index 0000000..fe9b1e0 --- /dev/null +++ b/tests/integration/conftest.py @@ -0,0 +1,202 @@ +"""Fixtures for cross-process integration tests. + +The worker subprocess and backend URL are shared across the whole test session +(session scope). Each test function gets its own client connection +(function scope) so that global state does not leak between tests. + +Configuration (set by the CI workflow via environment variables): + PYFUSE_TEST_BACKEND – "local", "redis", or "rabbitmq" + PYFUSE_TEST_SIGNING – "true" or "false" + PYFUSE_TEST_SANDBOX – "true" or "false" + PYFUSE_SIGNING_TOKEN – 64-char hex token (present when signing=true) +""" + +import os +import socket +import subprocess +import sys +import time +from collections.abc import AsyncIterator, Generator +from urllib.parse import urlparse + +import pytest + +import pyfuse.worker.remote as _remote + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + + +def _free_port() -> int: + """Return a currently-unused TCP port on 127.0.0.1.""" + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("127.0.0.1", 0)) + return s.getsockname()[1] + + +def _wait_for_tcp(host: str, port: int, timeout: float = 20.0) -> bool: + """Poll until a TCP service accepts connections, or *timeout* seconds pass.""" + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + try: + with socket.create_connection((host, port), timeout=0.5): + return True + except (ConnectionRefusedError, OSError): + time.sleep(0.25) + return False + + +# --------------------------------------------------------------------------- +# Session-scoped fixtures (computed once for the whole test run) +# --------------------------------------------------------------------------- + + +@pytest.fixture(scope="session") +def backend_url() -> str: + """Return the backend URL for this test session. + + For the local backend a random free port is chosen so that parallel + CI jobs running on the same host cannot conflict. + """ + backend = os.environ.get("PYFUSE_TEST_BACKEND", "local").lower() + if backend == "redis": + return "redis://localhost:6379" + if backend in ("rabbitmq", "amqp"): + return "amqp://localhost" + # local: pick a free port + port = _free_port() + return f"local://127.0.0.1:{port}" + + +@pytest.fixture(scope="session") +def signing_enabled() -> bool: + """True when PYFUSE_TEST_SIGNING=true.""" + return os.environ.get("PYFUSE_TEST_SIGNING", "false").lower() == "true" + + +@pytest.fixture(scope="session") +def sandbox_enabled() -> bool: + """True when PYFUSE_TEST_SANDBOX=true.""" + return os.environ.get("PYFUSE_TEST_SANDBOX", "false").lower() == "true" + + +@pytest.fixture(scope="session") +def worker_process( + backend_url: str, + signing_enabled: bool, + sandbox_enabled: bool, +) -> Generator[subprocess.Popen[bytes], None, None]: + """Spawn a pyfuse worker subprocess for the entire test session. + + The subprocess inherits the test runner's environment, so + ``PYFUSE_SIGNING_TOKEN`` (when present) flows automatically to the + worker without any extra plumbing. + + Teardown: SIGTERM → wait 10 s → SIGKILL. + """ + cmd = [ + sys.executable, "-m", "pyfuse", "worker", + "--backend", backend_url, + "--no-auto-install", + "--log-level", "DEBUG", + ] + if signing_enabled: + cmd.append("--require-signing") + if sandbox_enabled: + cmd.append("--sandbox") + + proc = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + + # Wait until the broker/worker is reachable before yielding. + if backend_url.startswith("local://"): + parsed = urlparse(backend_url) + host = parsed.hostname or "127.0.0.1" + port = parsed.port or 9748 + if not _wait_for_tcp(host, port, timeout=20.0): + proc.terminate() + _out, err = proc.communicate(timeout=5) + raise RuntimeError( + f"Worker broker did not start on {host}:{port}.\n" + f"stderr:\n{err.decode()}" + ) + else: + # Redis / RabbitMQ workers connect asynchronously; give them time + # to connect and begin listening before the first task is submitted. + time.sleep(4) + + if proc.poll() is not None: + _out, err = proc.communicate() + raise RuntimeError( + f"Worker process exited prematurely (rc={proc.returncode}).\n" + f"stderr:\n{err.decode()}" + ) + + yield proc + + proc.terminate() + try: + proc.wait(timeout=10) + except subprocess.TimeoutExpired: + proc.kill() + proc.wait() + + +# --------------------------------------------------------------------------- +# Function-scoped fixtures (one per test) +# --------------------------------------------------------------------------- + + +@pytest.fixture +async def client( + backend_url: str, + worker_process: subprocess.Popen[bytes], +) -> AsyncIterator[None]: + """Connect to the backend as a client for the duration of one test. + + The *worker_process* dependency ensures the worker is running before + any connection attempt is made. The global backend state is reset + during teardown so tests are fully isolated from each other. + """ + import pyfuse + + pyfuse.connect(backend_url) + yield + try: + await pyfuse.disconnect() + except Exception: + pass + finally: + _remote._active_backend = None + _remote._atexit_registered = False + + +@pytest.fixture +async def client_no_signing( + backend_url: str, + worker_process: subprocess.Popen[bytes], + monkeypatch: pytest.MonkeyPatch, +) -> AsyncIterator[None]: + """Connect as a client WITHOUT a signing token. + + Even when ``PYFUSE_SIGNING_TOKEN`` is set in the environment (signing=true + scenario) this fixture removes it so the client submits unsigned tasks. + Used to verify that a worker with signing enabled rejects them. + """ + import pyfuse + + monkeypatch.delenv("PYFUSE_SIGNING_TOKEN", raising=False) + pyfuse.connect(backend_url) + yield + try: + await pyfuse.disconnect() + except Exception: + pass + finally: + _remote._active_backend = None + _remote._atexit_registered = False diff --git a/tests/integration/test_e2e.py b/tests/integration/test_e2e.py new file mode 100644 index 0000000..e639a35 --- /dev/null +++ b/tests/integration/test_e2e.py @@ -0,0 +1,286 @@ +"""Cross-process integration tests for pyfuse. + +Each test spawns (via the ``worker_process`` fixture) a real pyfuse worker +as a separate subprocess and connects to the same backend as a client from +within the test runner process. This validates the full stack: + + client process → backend (local/redis/rabbitmq) → worker process + (optional sandbox) + +The combination of backend, signing, and sandbox modes is controlled by +environment variables set by the CI workflow: + + PYFUSE_TEST_BACKEND – "local", "redis", or "rabbitmq" + PYFUSE_TEST_SIGNING – "true" or "false" + PYFUSE_TEST_SANDBOX – "true" or "false" + PYFUSE_SIGNING_TOKEN – 64-char hex key (present when signing=true) +""" + +import os + +import pytest + +from pyfuse import trace +from pyfuse.core.errors import RemoteError, SignatureError + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +_RESULT_TIMEOUT = 60.0 # seconds to wait for a single task result +_SANDBOX_BOOT = 35.0 # extra seconds for the Docker container to boot + + +async def _get(future, *, timeout: float = _RESULT_TIMEOUT) -> object: + """Await a Result with an explicit timeout and stall detection disabled. + + Using stall_timeout=None avoids flakiness in slow CI environments where + heartbeat intervals may exceed the default 10-second stall threshold. + """ + return await future.result(timeout=timeout, stall_timeout=None) + + +# --------------------------------------------------------------------------- +# Basic remote execution +# --------------------------------------------------------------------------- + + +class TestBasicExecution: + """Verify fundamental task-submission → result retrieval on every backend.""" + + @pytest.mark.asyncio + async def test_arithmetic(self, client: None) -> None: + """Integer arithmetic is computed correctly by the remote worker.""" + + @trace + def add(a: int, b: int) -> int: + return a + b + + future = await add.start(3, 4) + result = await _get(future) + assert result == 7 + + @pytest.mark.asyncio + async def test_string_processing(self, client: None) -> None: + """String operations round-trip through the serialization layer.""" + + @trace + def greet(name: str) -> str: + return f"hello {name}" + + future = await greet.start("world") + result = await _get(future) + assert result == "hello world" + + @pytest.mark.asyncio + async def test_list_result(self, client: None) -> None: + """Composite return values are serialized and deserialized correctly.""" + + @trace + def squares(n: int) -> list[int]: + return [i * i for i in range(n)] + + future = await squares.start(5) + result = await _get(future) + assert result == [0, 1, 4, 9, 16] + + @pytest.mark.asyncio + async def test_kwargs(self, client: None) -> None: + """Keyword arguments are forwarded to the worker unchanged.""" + + @trace + def power(base: int, *, exp: int = 2) -> int: + return base ** exp + + future = await power.start(3, exp=4) + result = await _get(future) + assert result == 81 + + @pytest.mark.asyncio + async def test_sequential_tasks(self, client: None) -> None: + """Multiple tasks submitted in sequence all return correct results.""" + + @trace + def double(x: int) -> int: + return x * 2 + + results = [] + for i in range(3): + future = await double.start(i) + results.append(await _get(future)) + + assert results == [0, 2, 4] + + +# --------------------------------------------------------------------------- +# Error propagation +# --------------------------------------------------------------------------- + + +class TestErrorPropagation: + """Exceptions raised in the worker are surfaced as RemoteError on the client.""" + + @pytest.mark.asyncio + async def test_value_error_propagates(self, client: None) -> None: + """ValueError raised in the worker becomes a RemoteError.""" + + @trace + def explode(msg: str) -> None: + raise ValueError(msg) + + future = await explode.start("intentional") + with pytest.raises(RemoteError, match="ValueError.*intentional"): + await _get(future) + + @pytest.mark.asyncio + async def test_runtime_error_propagates(self, client: None) -> None: + """RuntimeError is preserved through the result envelope.""" + + @trace + def fail() -> None: + raise RuntimeError("something went wrong") + + future = await fail.start() + with pytest.raises(RemoteError, match="RuntimeError"): + await _get(future) + + @pytest.mark.asyncio + async def test_error_does_not_crash_worker(self, client: None) -> None: + """A failing task must not crash the worker; it must stay alive for the next task.""" + + @trace + def bad() -> None: + raise RuntimeError("intentional failure") + + @trace + def ok() -> str: + return "still alive" + + # First task fails (RuntimeError is caught by the worker and returned + # as an error envelope; the worker process stays alive). + future_bad = await bad.start() + with pytest.raises(RemoteError): + await _get(future_bad) + + # Worker should still be responsive for subsequent tasks. + future_ok = await ok.start() + result = await _get(future_ok) + assert result == "still alive" + + +# --------------------------------------------------------------------------- +# Signing enforcement +# --------------------------------------------------------------------------- + + +class TestSigning: + """Verify HMAC signing behaviour: acceptance and rejection.""" + + @pytest.mark.asyncio + async def test_signed_task_accepted( + self, client: None, signing_enabled: bool + ) -> None: + """A properly signed task is accepted and executed by the worker.""" + if not signing_enabled: + pytest.skip("Signing is not enabled for this scenario") + + @trace + def compute(x: int) -> int: + return x * 2 + + future = await compute.start(21) + result = await _get(future) + assert result == 42 + + @pytest.mark.asyncio + async def test_unsigned_task_rejected( + self, client_no_signing: None, signing_enabled: bool + ) -> None: + """When signing is required, unsigned tasks are rejected with an error result.""" + if not signing_enabled: + pytest.skip("Signing is not enabled for this scenario") + + @trace + def noop() -> str: + return "should not reach here" + + # The client connected without a signing token, so the task is unsigned. + # The worker (started with --require-signing) must reject it. + future = await noop.start() + with pytest.raises((RemoteError, SignatureError)): + await _get(future, timeout=15.0) + + @pytest.mark.asyncio + async def test_signing_does_not_break_normal_execution( + self, client: None, signing_enabled: bool + ) -> None: + """Signing is transparent to the caller when token is present on both sides.""" + if not signing_enabled: + pytest.skip("Signing is not enabled for this scenario") + + @trace + def multiply(a: int, b: int) -> int: + return a * b + + future = await multiply.start(6, 7) + result = await _get(future) + assert result == 42 + + +# --------------------------------------------------------------------------- +# Sandbox isolation +# --------------------------------------------------------------------------- + + +class TestSandbox: + """Verify that tasks run correctly inside the Docker sandbox.""" + + @pytest.mark.asyncio + async def test_sandboxed_arithmetic( + self, client: None, sandbox_enabled: bool + ) -> None: + """Tasks execute and return correct results when the sandbox is active.""" + if not sandbox_enabled: + pytest.skip("Sandbox is not enabled for this scenario") + + @trace + def add(a: int, b: int) -> int: + return a + b + + future = await add.start(10, 32) + # Allow extra time for the container to boot on first use. + result = await _get(future, timeout=_RESULT_TIMEOUT + _SANDBOX_BOOT) + assert result == 42 + + @pytest.mark.asyncio + async def test_sandboxed_error_propagation( + self, client: None, sandbox_enabled: bool + ) -> None: + """Exceptions raised inside the sandbox are forwarded as RemoteError.""" + if not sandbox_enabled: + pytest.skip("Sandbox is not enabled for this scenario") + + @trace + def boom() -> None: + raise ValueError("sandboxed failure") + + future = await boom.start() + with pytest.raises(RemoteError, match="ValueError.*sandboxed failure"): + await _get(future, timeout=_RESULT_TIMEOUT + _SANDBOX_BOOT) + + @pytest.mark.asyncio + async def test_sandbox_with_signing( + self, client: None, sandbox_enabled: bool, signing_enabled: bool + ) -> None: + """Signed tasks execute correctly inside the sandbox.""" + if not sandbox_enabled or not signing_enabled: + pytest.skip("Both sandbox and signing must be enabled for this test") + + @trace + def triple(x: int) -> int: + return x * 3 + + future = await triple.start(14) + result = await _get(future, timeout=_RESULT_TIMEOUT + _SANDBOX_BOOT) + assert result == 42 From 086fffd43550129b327f49917bd5e27b2a42eb7f Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 21 Apr 2026 19:43:01 +0000 Subject: [PATCH 2/4] fix: rename unused _out to _, add explicit workflow permissions Agent-Logs-Url: https://github.com/codeSamuraii/pyfuse/sessions/925ac194-398a-43f8-aba9-4fc0abfedaad Co-authored-by: codeSamuraii <17270548+codeSamuraii@users.noreply.github.com> --- .github/workflows/integration.yml | 4 ++++ tests/integration/conftest.py | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index 3f12b20..cb05aa7 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -10,6 +10,10 @@ concurrency: group: ${{ github.workflow }}-${{ github.ref }} cancel-in-progress: true +# Restrict the workflow token to the minimum required permissions. +permissions: + contents: read + env: PYTHON_VERSION: "3.12" diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index fe9b1e0..02b26b4 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -120,7 +120,7 @@ def worker_process( port = parsed.port or 9748 if not _wait_for_tcp(host, port, timeout=20.0): proc.terminate() - _out, err = proc.communicate(timeout=5) + _, err = proc.communicate(timeout=5) raise RuntimeError( f"Worker broker did not start on {host}:{port}.\n" f"stderr:\n{err.decode()}" @@ -131,7 +131,7 @@ def worker_process( time.sleep(4) if proc.poll() is not None: - _out, err = proc.communicate() + _, err = proc.communicate() raise RuntimeError( f"Worker process exited prematurely (rc={proc.returncode}).\n" f"stderr:\n{err.decode()}" From 7b9f32d129f6051c4909b2d61565dfc4c587a355 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 21 Apr 2026 21:20:39 +0000 Subject: [PATCH 3/4] fix(ci): fix integration test reliability for sandbox and RabbitMQ backends - conftest.py: add `-u` flag for unbuffered Python output in worker subprocess - conftest.py: replace subprocess.PIPE pair with stdout=DEVNULL + stderr-watching background thread; this prevents pipe-buffer deadlocks and detects readiness from the "Listening for tasks" log line (works for all backends incl. sandbox) - conftest.py: wait up to 120 s for worker readiness (instead of 4 s sleep); sandbox container boot can take ~60 s on a cold CI runner - conftest.py: amqp URL now includes ?heartbeat=600 to prevent AMQP idle-disconnect; without this the default 60 s heartbeat causes the listen() generator to exit and stops the worker consuming any tasks - integration.yml: use rabbitmq:3-management-alpine (matches unit-tests job) Agent-Logs-Url: https://github.com/codeSamuraii/pyfuse/sessions/3fc206b9-a7f6-45e5-9770-7a59c2f59f3c Co-authored-by: codeSamuraii <17270548+codeSamuraii@users.noreply.github.com> --- .github/workflows/integration.yml | 2 +- tests/integration/conftest.py | 76 +++++++++++++++++++++++++------ 2 files changed, 63 insertions(+), 15 deletions(-) diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index cb05aa7..347d301 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -94,7 +94,7 @@ jobs: --health-retries 5 rabbitmq: - image: rabbitmq:3-alpine + image: rabbitmq:3-management-alpine ports: ["5672:5672"] options: >- --health-cmd "rabbitmq-diagnostics -q ping" diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 02b26b4..a7442e4 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -15,6 +15,7 @@ import socket import subprocess import sys +import threading import time from collections.abc import AsyncIterator, Generator from urllib.parse import urlparse @@ -64,7 +65,12 @@ def backend_url() -> str: if backend == "redis": return "redis://localhost:6379" if backend in ("rabbitmq", "amqp"): - return "amqp://localhost" + # heartbeat=600 keeps the AMQP connection alive across the whole test + # session. Without this the default 60-second heartbeat timeout + # causes the connection to be closed after 60 s of idle time, which + # makes the listen() generator exit and stops the worker from + # processing further tasks. + return "amqp://localhost?heartbeat=600" # local: pick a free port port = _free_port() return f"local://127.0.0.1:{port}" @@ -94,10 +100,19 @@ def worker_process( ``PYFUSE_SIGNING_TOKEN`` (when present) flows automatically to the worker without any extra plumbing. + Readiness is detected by watching the worker's stderr for the + "Listening for tasks" log line. A background daemon thread drains + the stderr pipe so it never fills up (which would freeze the worker), + and forwards every line to the test-runner's stderr for CI visibility. + + For the local backend an additional TCP probe confirms the broker is + accepting connections before the readiness wait begins. + Teardown: SIGTERM → wait 10 s → SIGKILL. """ + # -u forces unbuffered output so log lines appear immediately in the pipe. cmd = [ - sys.executable, "-m", "pyfuse", "worker", + sys.executable, "-u", "-m", "pyfuse", "worker", "--backend", backend_url, "--no-auto-install", "--log-level", "DEBUG", @@ -109,32 +124,64 @@ def worker_process( proc = subprocess.Popen( cmd, - stdout=subprocess.PIPE, + stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, ) - # Wait until the broker/worker is reachable before yielding. + # Background thread: drain stderr and signal when the worker is ready. + # Draining is essential — if we never read from the pipe and the worker + # writes more than 64 KB of debug output, it will block on the write() + # syscall, freezing its asyncio event loop. + ready = threading.Event() + + def _watch_stderr() -> None: + assert proc.stderr is not None + for raw_line in proc.stderr: + try: + line = raw_line.decode("utf-8", errors="replace") + except Exception: + line = repr(raw_line) + "\n" + sys.stderr.write(f"[worker] {line}") + sys.stderr.flush() + if "Listening for tasks" in line: + ready.set() + # Process exited or pipe closed — unblock any waiter. + ready.set() + + watcher = threading.Thread(target=_watch_stderr, daemon=True) + watcher.start() + + # For the local backend, also wait for the broker TCP port to be open. + # This provides a fast early-failure signal (e.g. port already in use) + # without waiting the full readiness timeout. if backend_url.startswith("local://"): parsed = urlparse(backend_url) host = parsed.hostname or "127.0.0.1" port = parsed.port or 9748 if not _wait_for_tcp(host, port, timeout=20.0): proc.terminate() - _, err = proc.communicate(timeout=5) + proc.wait(timeout=5) + watcher.join(timeout=5) raise RuntimeError( - f"Worker broker did not start on {host}:{port}.\n" - f"stderr:\n{err.decode()}" + f"Worker broker did not start on {host}:{port}." ) - else: - # Redis / RabbitMQ workers connect asynchronously; give them time - # to connect and begin listening before the first task is submitted. - time.sleep(4) + + # Wait until the worker reports it is listening for tasks. + # Sandbox container boot can take up to ~60 seconds on a cold CI runner, + # so we allow up to 120 seconds here. + ready_timeout = 120.0 + if not ready.wait(timeout=ready_timeout): + proc.terminate() + proc.wait(timeout=5) + watcher.join(timeout=5) + raise RuntimeError( + f"Worker did not become ready within {ready_timeout:.0f} seconds." + ) if proc.poll() is not None: - _, err = proc.communicate() + watcher.join(timeout=5) raise RuntimeError( - f"Worker process exited prematurely (rc={proc.returncode}).\n" - f"stderr:\n{err.decode()}" + f"Worker process exited prematurely (rc={proc.returncode})." ) yield proc @@ -145,6 +192,7 @@ def worker_process( except subprocess.TimeoutExpired: proc.kill() proc.wait() + watcher.join(timeout=5) # --------------------------------------------------------------------------- From b158e0048e72bceb7de70e5fd62683a156110c7c Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 21 Apr 2026 21:22:28 +0000 Subject: [PATCH 4/4] refactor(ci): simplify stderr decode in watcher thread per code review Agent-Logs-Url: https://github.com/codeSamuraii/pyfuse/sessions/3fc206b9-a7f6-45e5-9770-7a59c2f59f3c Co-authored-by: codeSamuraii <17270548+codeSamuraii@users.noreply.github.com> --- tests/integration/conftest.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index a7442e4..fc5a8db 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -137,10 +137,7 @@ def worker_process( def _watch_stderr() -> None: assert proc.stderr is not None for raw_line in proc.stderr: - try: - line = raw_line.decode("utf-8", errors="replace") - except Exception: - line = repr(raw_line) + "\n" + line = raw_line.decode("utf-8", errors="replace") sys.stderr.write(f"[worker] {line}") sys.stderr.flush() if "Listening for tasks" in line: