1919import signal
2020import subprocess
2121import sys
22+ import threading
2223import time
2324from datetime import timedelta
2425from typing import Any
4647# Worker subprocess management
4748# ---------------------------------------------------------------------------
4849
50+ _WORKER_READY_TIMEOUT = 60 # seconds to wait for "Listening" log line
51+
52+
4953def _start_worker (
5054 backend : str ,
5155 * ,
5256 signing_token : str | None = None ,
5357 sandbox : bool = False ,
5458) -> subprocess .Popen [bytes ]:
55- """Launch ``python -m pyfuse worker`` in a subprocess."""
59+ """Launch ``python -m pyfuse worker`` in a subprocess.
60+
61+ Waits for the worker to print its "Listening for tasks" log line
62+ before returning, so the caller knows it is actually ready.
63+ """
5664 cmd = [sys .executable , "-m" , "pyfuse" , "worker" , "--backend" , backend ]
5765 if signing_token :
5866 cmd .append ("--require-signing" )
@@ -69,12 +77,30 @@ def _start_worker(
6977 stdout = subprocess .PIPE ,
7078 stderr = subprocess .PIPE ,
7179 )
72- # Give the worker time to connect and start listening
73- time .sleep (3 )
74- assert proc .poll () is None , (
75- f"Worker exited early with code { proc .returncode } :\n "
76- + (proc .stderr .read ().decode () if proc .stderr else "" )
77- )
80+
81+ # Read stderr in a background thread and wait for the ready signal.
82+ ready = threading .Event ()
83+ stderr_lines : list [str ] = []
84+
85+ def _drain_stderr () -> None :
86+ assert proc .stderr is not None
87+ for raw in proc .stderr :
88+ line = raw .decode (errors = "replace" )
89+ stderr_lines .append (line )
90+ if "Listening" in line :
91+ ready .set ()
92+
93+ reader = threading .Thread (target = _drain_stderr , daemon = True )
94+ reader .start ()
95+
96+ if not ready .wait (timeout = _WORKER_READY_TIMEOUT ):
97+ proc .kill ()
98+ proc .wait (timeout = 5 )
99+ raise RuntimeError (
100+ f"Worker not ready after { _WORKER_READY_TIMEOUT } s.\n "
101+ f"stderr:\n { '' .join (stderr_lines )} "
102+ )
103+
78104 return proc
79105
80106
@@ -297,6 +323,14 @@ def throttled_fn() -> str:
297323 await throttled_fn .run ()
298324
299325
326+ def _step_a (x : int ) -> int :
327+ return x + 1
328+
329+
330+ def _step_b (x : int ) -> int :
331+ return _step_a (x ) * 2
332+
333+
300334class TestErrorHandling :
301335 async def test_remote_error_propagation (self , worker : subprocess .Popen [bytes ]) -> None :
302336 @trace
@@ -309,18 +343,12 @@ def bad_func() -> None:
309343
310344 async def test_function_with_dependencies (self , worker : subprocess .Popen [bytes ]) -> None :
311345 """Multi-level dependency chain works end-to-end."""
312- def step_a (x : int ) -> int :
313- return x + 1
314-
315- def step_b (x : int ) -> int :
316- return step_a (x ) * 2
317-
318346 @trace
319347 def pipeline (x : int ) -> int :
320- return step_b (x ) + 10
348+ return _step_b (x ) + 10
321349
322350 result = await pipeline .run (5 )
323- assert result == 22 # step_a (5)=6, step_b (5)=12, pipeline(5)=22
351+ assert result == 22 # _step_a (5)=6, _step_b (5)=12, pipeline(5)=22
324352
325353
326354class TestClassMethods :
0 commit comments