Skip to content

Commit df6241f

Browse files
committed
Minor changes
1 parent 25557e4 commit df6241f

10 files changed

Lines changed: 110 additions & 39 deletions

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ Only the entry point needs `@trace`. Everything it calls -- `add()`, imports, cl
3838
Start an isolated worker (temporary venv with `--tmp`, cleaned up on exit):
3939

4040
```bash
41-
pyfuse worker --tmp --backend local://localhost:9748 # Same machine (async TCP)
42-
pyfuse worker --tmp --backend redis://localhost:6379 # Remote using Redis
41+
pyfuse worker --tmp --backend local://localhost:9748 # Same machine (async TCP)
42+
pyfuse worker --tmp --backend redis://localhost:6379 # Remote using Redis
4343
```
4444

4545
Run a script:

docs/QUICK_START.md

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ def hypotenuse(a: float, b: float) -> float:
3636
### 2. Start a worker
3737

3838
```bash
39-
python -m pyfuse worker --backend redis://localhost:6379
39+
pyfuse worker --backend redis://localhost:6379
4040
```
4141

4242
The worker waits for tasks, reconstructs the function source on the fly, and executes it -- no prior knowledge of your code required.
@@ -250,13 +250,13 @@ result = await fetch_and_process.run("https://example.com")
250250

251251
```bash
252252
# 4 concurrent tasks
253-
python -m pyfuse worker --backend redis://localhost:6379 -c 4
253+
pyfuse worker --backend redis://localhost:6379 -c 4
254254

255255
# Disable automatic pip installs
256-
python -m pyfuse worker --backend redis://localhost:6379 --no-auto-install
256+
pyfuse worker --backend redis://localhost:6379 --no-auto-install
257257

258258
# Run in an isolated temporary venv (auto-cleaned on exit)
259-
python -m pyfuse worker --backend redis://localhost:6379 --tmp
259+
pyfuse worker --backend redis://localhost:6379 --tmp
260260
```
261261

262262
Or start a worker programmatically:
@@ -273,7 +273,7 @@ asyncio.run(pyfuse.serve("redis://localhost:6379", concurrency=4))
273273
The `run` command creates an isolated venv, auto-detects third-party dependencies from the script (including `install_package_as` blocks), installs them, and runs the script:
274274

275275
```bash
276-
python -m pyfuse run examples/remote_execution.py
276+
pyfuse run examples/remote_execution.py
277277
```
278278

279279
## Backends
@@ -507,8 +507,8 @@ except TaskCancelled:
507507

508508
```bash
509509
# Remote execution
510-
python -m pyfuse worker --backend local://localhost:9748 --tmp # Terminal 1
511-
python -m pyfuse run examples/remote_execution.py # Terminal 2
510+
pyfuse worker --backend local://localhost:9748 --tmp # Terminal 1
511+
pyfuse run examples/remote_execution.py # Terminal 2
512512
```
513513

514514
The `--tmp` flag creates an isolated temporary venv for the worker, which is auto-cleaned on exit.

docs/TECHNICAL_OVERVIEW.md

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ pyfuse enables remote execution of Python functions without deploying code to wo
77
```
88
Client Worker
99
────── ──────
10-
@trace python -m pyfuse worker
10+
@trace pyfuse worker
1111
await func.run(args) ← listen for tasks
1212
│ │
1313
├─ capture source + deps ├─ deserialize graph
@@ -22,7 +22,7 @@ pyfuse enables remote execution of Python functions without deploying code to wo
2222
```
2323
pyfuse/
2424
__init__.py Public API: trace, connect, serve, serialize, reconstruct, ...
25-
__main__.py CLI: python -m pyfuse worker/run/info/serialize/reconstruct
25+
__main__.py CLI: pyfuse worker/run/info/serialize/reconstruct
2626
_venv.py Temporary virtual environment management (async)
2727
core/
2828
models.py FunctionNode, ImportInfo dataclasses (incl. content hashing, class metadata)
@@ -55,7 +55,7 @@ pyfuse/
5555
3. **Submit** -- `await backend.submit(task_json)` sends the task to the transport layer.
5656
4. **Return** -- `.run()` awaits the result and returns the value directly. `.start()` returns a `Result` handle immediately.
5757

58-
### Worker side: `await serve()` / `python -m pyfuse worker`
58+
### Worker side: `await serve()` / `pyfuse worker`
5959

6060
1. **Listen** -- `async for task_json in backend.listen()` yields tasks as they arrive.
6161
2. **Cancellation check** -- If `await backend.is_cancelled(task_id)` returns ``True``, skip execution and log the cancellation.
@@ -513,20 +513,20 @@ When a module contains `from X import *`:
513513

514514
```bash
515515
# Start a worker
516-
python -m pyfuse worker --backend redis://localhost:6379
517-
python -m pyfuse worker --backend redis://localhost:6379 -c 4
518-
python -m pyfuse worker --backend redis://localhost:6379 --no-auto-install
519-
python -m pyfuse worker --backend redis://localhost:6379 --tmp # isolated temp venv
516+
pyfuse worker --backend redis://localhost:6379
517+
pyfuse worker --backend redis://localhost:6379 -c 4
518+
pyfuse worker --backend redis://localhost:6379 --no-auto-install
519+
pyfuse worker --backend redis://localhost:6379 --tmp # isolated temp venv
520520

521521
# Run a script in a temporary venv (auto-detects and installs dependencies)
522-
python -m pyfuse run examples/script.py
522+
pyfuse run examples/script.py
523523

524524
# Show configuration
525-
python -m pyfuse info
525+
pyfuse info
526526

527527
# Serialize a function to JSON
528-
python -m pyfuse serialize mymodule:csv_to_json
528+
pyfuse serialize mymodule:csv_to_json
529529

530530
# Reconstruct source from a graph file
531-
python -m pyfuse reconstruct graph.json csv_to_json
531+
pyfuse reconstruct graph.json csv_to_json
532532
```

examples/cancellation.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
pyfuse worker --backend redis://localhost:6379 --tmp
88
99
# Terminal 2 -- run this script
10-
python -m pyfuse run examples/cancellation.py
10+
pyfuse run examples/cancellation.py
1111
"""
1212

1313
import asyncio

examples/large_module.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
99
Usage:
1010
# Terminal 1 -- start a worker
11-
python -m pyfuse worker --backend redis://localhost:6379
11+
pyfuse worker --backend redis://localhost:6379
1212
1313
# Terminal 2 -- run this script
1414
python examples/large_module.py

examples/package_installation.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
99
Usage:
1010
# Terminal 1 -- start a worker
11-
python -m pyfuse worker --backend redis://localhost:6379
11+
pyfuse worker --backend redis://localhost:6379
1212
1313
# Terminal 2 -- run this script
1414
python examples/package_installation.py

examples/progress_reporting.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
pyfuse worker --backend redis://localhost:6379 --tmp
99
1010
# Terminal 2 -- run this script
11-
python -m pyfuse run examples/progress_reporting.py
11+
pyfuse run examples/progress_reporting.py
1212
"""
1313

1414
import asyncio

pyfuse/__main__.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@
22
33
Usage::
44
5-
python -m pyfuse worker --backend redis://localhost:6379
6-
python -m pyfuse worker --backend redis://localhost:6379 --tmp
7-
python -m pyfuse run examples/script.py
8-
python -m pyfuse info
9-
python -m pyfuse serialize mymodule:csv_to_json
10-
python -m pyfuse reconstruct graph.json csv_to_json
5+
pyfuse worker --backend redis://localhost:6379
6+
pyfuse worker --backend redis://localhost:6379 --tmp
7+
pyfuse run examples/script.py
8+
pyfuse info
9+
pyfuse serialize mymodule:csv_to_json
10+
pyfuse reconstruct graph.json csv_to_json
1111
"""
1212
from __future__ import annotations
1313

pyfuse/worker/remote.py

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -189,13 +189,27 @@ async def _heartbeat_loop(
189189
backend: Backend,
190190
task_id: str,
191191
cancel_event: asyncio.Event,
192+
exec_task: asyncio.Task[Any] | None = None,
192193
) -> None:
193-
"""Send periodic heartbeats until *cancel_event* is set."""
194+
"""Send periodic heartbeats and check for cancellation.
195+
196+
When *exec_task* is provided and the backend reports the task as
197+
cancelled, the execution task is cancelled via
198+
:meth:`asyncio.Task.cancel`, which raises :class:`CancelledError`
199+
at the next ``await`` in async user functions.
200+
"""
194201
while not cancel_event.is_set():
195202
try:
196203
await backend.send_heartbeat(task_id)
197204
except Exception:
198205
pass # best-effort
206+
if exec_task is not None:
207+
try:
208+
if await backend.is_cancelled(task_id):
209+
exec_task.cancel()
210+
return
211+
except Exception:
212+
pass
199213
try:
200214
await asyncio.wait_for(cancel_event.wait(), timeout=_HEARTBEAT_INTERVAL)
201215
except asyncio.TimeoutError:
@@ -316,18 +330,25 @@ async def _handle_task(
316330
_log_task_result(task, envelope, 0, worker)
317331
return
318332

319-
cancel_event = asyncio.Event()
320-
hb_task = asyncio.create_task(_heartbeat_loop(backend, task.task_id, cancel_event))
321-
322333
# Set up rate-limited progress callback
323334
loop = asyncio.get_running_loop()
324335
progress_cb, flush = _make_progress_callback(backend, task.task_id, loop)
325336
token = _progress_callback.set(progress_cb)
326337

338+
# Run execution as a task so the heartbeat loop can cancel it
339+
exec_task: asyncio.Task[Any] = asyncio.create_task(worker.run_with_policy(task))
340+
341+
cancel_event = asyncio.Event()
342+
hb_task = asyncio.create_task(
343+
_heartbeat_loop(backend, task.task_id, cancel_event, exec_task),
344+
)
345+
327346
t0 = time.monotonic()
328347
try:
329-
result = await worker.run_with_policy(task)
348+
result = await exec_task
330349
envelope = ResultEnvelope.success(task.task_id, result)
350+
except asyncio.CancelledError:
351+
envelope = ResultEnvelope.cancelled(task.task_id)
331352
except Exception as exc:
332353
logger.debug("Task %s failed", task.task_id, exc_info=True)
333354
envelope = ResultEnvelope.failure(task.task_id, exc)

tests/test_cancellation_progress.py

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,22 @@ def _slow_store() -> tuple[Store, str]:
125125
return store, store.to_json()
126126

127127

128+
def _async_slow_store() -> tuple[Store, str]:
129+
store = Store()
130+
node = _node("slow_async", (
131+
"async def slow_async(n):\n"
132+
" import asyncio\n"
133+
" total = 0\n"
134+
" for i in range(n):\n"
135+
" await asyncio.sleep(0.1)\n"
136+
" total += i\n"
137+
" return total"
138+
))
139+
h = store.put(node)
140+
store.set_ref("slow_async", h)
141+
return store, store.to_json()
142+
143+
128144
def _progress_store() -> tuple[Store, str]:
129145
store = Store()
130146
node = _node("process", (
@@ -238,8 +254,42 @@ async def test_handle_task_skips_cancelled(self, backend: InMemoryBackend) -> No
238254
assert task.task_id not in backend.results
239255

240256
@pytest.mark.asyncio
241-
async def test_cancel_during_execution(self, backend: InMemoryBackend) -> None:
242-
"""Worker sends its result normally; cancel semantics are client-side."""
257+
async def test_cancel_during_async_execution(
258+
self, backend: InMemoryBackend, monkeypatch: pytest.MonkeyPatch,
259+
) -> None:
260+
"""Cancelling during async execution interrupts the function."""
261+
monkeypatch.setattr(_remote, "_HEARTBEAT_INTERVAL", 0.1)
262+
263+
_, json_str = _async_slow_store()
264+
# 30 iterations × 0.1 s = 3 s without cancellation
265+
task = Task(graph_json=json_str, function_name="slow_async", args=(30,))
266+
267+
worker = Worker(auto_install=False)
268+
269+
async def cancel_later() -> None:
270+
await asyncio.sleep(0.15)
271+
await backend.cancel_task(task.task_id)
272+
273+
t0 = time.monotonic()
274+
asyncio.create_task(cancel_later())
275+
await _handle_task(worker, backend, task.to_json())
276+
elapsed = time.monotonic() - t0
277+
278+
# Should finish well before 3 s (the function was interrupted)
279+
assert elapsed < 1.5
280+
281+
# Worker sends a cancelled envelope
282+
assert task.task_id in backend.results
283+
env = ResultEnvelope.from_json(backend.results[task.task_id])
284+
assert env.status == "cancelled"
285+
286+
@pytest.mark.asyncio
287+
async def test_cancel_during_sync_execution(
288+
self, backend: InMemoryBackend, monkeypatch: pytest.MonkeyPatch,
289+
) -> None:
290+
"""Cancelling during sync execution marks result as cancelled."""
291+
monkeypatch.setattr(_remote, "_HEARTBEAT_INTERVAL", 0.1)
292+
243293
_, json_str = _slow_store()
244294
task = Task(graph_json=json_str, function_name="slow", args=(5,))
245295

@@ -252,10 +302,10 @@ async def cancel_later() -> None:
252302
asyncio.create_task(cancel_later())
253303
await _handle_task(worker, backend, task.to_json())
254304

255-
# Worker always sends its result; client reads cancel envelope first
305+
# Executor thread can't be interrupted, but the coroutine is cancelled
256306
assert task.task_id in backend.results
257307
env = ResultEnvelope.from_json(backend.results[task.task_id])
258-
assert env.status == "ok"
308+
assert env.status == "cancelled"
259309

260310

261311
# ---------------------------------------------------------------------------

0 commit comments

Comments
 (0)