Skip to content

Commit 46d5220

Browse files
committed
Async
1 parent d213d9c commit 46d5220

28 files changed

Lines changed: 1167 additions & 1719 deletions

README.md

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
`pyfuse` captures a function's source code, dependencies, and imports via a single `@trace` decorator.<br/>Workers reconstruct and execute the function from scratch – no deployment, no shared filesystem. Packages are installed automatically.
1010

1111
```python
12+
import asyncio
1213
import math
1314

1415
import pyfuse
@@ -23,8 +24,11 @@ def add(a: int, b: int) -> int:
2324
def hypotenuse(a: float, b: float) -> float:
2425
return math.sqrt(add(a**2, b**2))
2526

26-
future = hypotenuse.run(3.0, 4.0)
27-
print(future.result()) # 5.0
27+
async def main():
28+
result = await hypotenuse.run(3.0, 4.0)
29+
print(result) # 5.0
30+
31+
asyncio.run(main())
2832
```
2933

3034
Only the entry point needs `@trace`. Everything it calls -- `add()`, imports, class methods -- is captured automatically.
@@ -75,12 +79,11 @@ Common mappings (`cv2` -> `opencv-python`, `PIL` -> `Pillow`, etc.) are built in
7579

7680
- **Automatic dependency detection** -- AST-based, recursive. Untraced helpers, class methods, module-level constants, class-level attributes, and class decorators are all captured.
7781
- **Third-party package auto-install** -- Workers install missing packages via pip before execution.
78-
- **Async support** -- `async def` functions execute transparently. `await result`, `.arun()`, `.amap()`, and `asyncio.gather` all work out of the box.
79-
- **Notification-based result delivery** -- Push notifications fan out to many waiters via a single backend listener. No polling.
82+
- **Async-native** -- The entire I/O layer is built on `asyncio`. `.run()`, `.start()`, `.map()`, `await result`, and `asyncio.gather` all work out of the box.
8083
- **Heartbeat & stall detection** -- Workers send periodic heartbeats. Clients raise `TaskStalled` when a worker stops responding.
8184
- **Class methods** -- `self.method()` and `cls.method()` dependencies are detected. Entire class hierarchies (including `super()`), class-level attributes, decorators (`@dataclass`, etc.), and metaclass keywords are reconstructed.
8285
- **Retry and timeout** -- `@trace(timeout=30, retries=3)` with exponential backoff.
83-
- **Batch submission** -- `func.map([(a1, b1), (a2, b2)])` submits multiple tasks at once.
86+
- **Batch submission** -- `await func.map([(a1, b1), (a2, b2)])` submits and awaits multiple tasks.
8487
- **Pluggable backends** -- Redis (`redis://`) for multi-machine, shared memory (`shm://`) for same-machine IPC.
8588
- **Content-hash caching** -- Workers cache compiled functions by content hash. Same code from different clients = cache hit.
8689

@@ -93,7 +96,7 @@ pyfuse run examples/remote_execution.py
9396
```
9497

9598
- **[`examples/remote_execution.py`](examples/remote_execution.py)** -- Remote execution with auto-discovered dependencies
96-
- **[`examples/async_execution.py`](examples/async_execution.py)** -- Async: `await`, `.arun()`, `.amap()`, `asyncio.gather`
99+
- **[`examples/async_execution.py`](examples/async_execution.py)** -- Async: `.run()`, `.start()`, `.map()`, `asyncio.gather`
97100
- **[`examples/package_installation.py`](examples/package_installation.py)** -- Auto-installing third-party packages on workers
98101
- **[`examples/large_module.py`](examples/large_module.py)** -- Stress test: 47 functions across 7 files, one `@trace`
99102

docs/CONTEXT.md

Lines changed: 45 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -12,27 +12,27 @@ pyfuse is a Python library for distributed function execution via automatic sour
1212
pyfuse/
1313
├── __init__.py # Public API surface (trace, connect, serve, serialize, etc.)
1414
├── __main__.py # CLI: worker, run, info, serialize, reconstruct
15-
├── _venv.py # Temporary virtual environment management (run/worker --tmp)
15+
├── _venv.py # Temporary virtual environment management (async)
1616
├── py.typed # PEP 561 typed package marker
1717
├── core/
1818
│ ├── task.py # Task dataclass: serializable envelope (graph + args + options)
19-
│ ├── models.py # FunctionNode and ImportInfo dataclasses, content hashing (incl. class_keywords, class_attrs, class_decorators)
19+
│ ├── models.py # FunctionNode and ImportInfo dataclasses, content hashing
2020
│ ├── version.py # _VERSION = "0.4.0"
2121
│ └── errors.py # Error, WorkerError, RemoteError, DependencyError, TaskStalled
2222
├── graph/
23-
│ ├── decorator.py # @trace: marks functions, adds .run()/.map()/.arun()/.amap()
23+
│ ├── decorator.py # @trace: marks functions, adds .run()/.start()/.map()
2424
│ ├── graph.py # Graph class: registration, auto-discovery, serialization
2525
│ ├── store.py # Content-addressable store: serialize/reconstruct/merge
26-
│ ├── analyzer.py # AST-based source capture, import extraction, dependency detection, class attrs/decorators
26+
│ ├── analyzer.py # AST-based source capture, import extraction, dependency detection
2727
│ └── tracing.py # Runtime call-stack tracing via contextvars (TracingMixin)
2828
└── worker/
29-
├── worker.py # Worker: reconstruct, cache, execute with retry/timeout
30-
├── remote.py # connect(), disconnect(), serve(), submit_remote()
31-
├── result.py # Result (future), ResultEnvelope, ResultWaiter (notification fan-out)
32-
├── deps.py # Third-party dependency extraction and pip installation
29+
├── worker.py # Worker: reconstruct, cache, execute with retry/timeout (async)
30+
├── remote.py # connect(), disconnect(), serve(), submit_remote() (async)
31+
├── result.py # Result (awaitable future), ResultEnvelope
32+
├── deps.py # Third-party dependency extraction and pip installation (async)
3333
└── backends/
34-
├── base.py # Backend ABC: submit, listen, send_result, get_result, heartbeat, notifications
35-
├── redis.py # RedisBackend: RPUSH/BLPOP pattern
34+
├── base.py # Backend ABC: async transport interface
35+
├── redis.py # RedisBackend: redis.asyncio with RPUSH/BLPOP pattern
3636
└── shm.py # SharedMemoryBackend: multiprocessing shared memory IPC
3737
```
3838

@@ -52,7 +52,13 @@ Data models and error types. `FunctionNode` represents a function in the graph,
5252

5353
### 3. Worker layer (`worker/`)
5454

55-
Handles remote execution. The `Worker` class reconstructs functions from serialized stores, caches compiled namespaces by subgraph content hash, and executes with retry/timeout policies (including `async def` functions via `asyncio.run()`). `remote.py` orchestrates the connection lifecycle, worker event loop, and heartbeat threads. `Result` is an awaitable future returned by `.run()`. The `ResultWaiter` singleton per backend uses push notifications to fan out results to many waiters without polling.
55+
Handles remote execution. Built entirely on `asyncio`:
56+
57+
- **`worker.py`**: `Worker` class reconstructs functions from serialized stores, caches compiled namespaces by subgraph content hash, and executes with retry/timeout policies. Async user functions are awaited directly; sync user functions run in `loop.run_in_executor()`. Timeouts use `asyncio.wait_for()`.
58+
- **`remote.py`**: Orchestrates the connection lifecycle, worker event loop (`asyncio.TaskGroup` + `asyncio.Semaphore` for bounded concurrency), and heartbeat tasks (`asyncio.create_task`).
59+
- **`result.py`**: `Result` is an awaitable future returned by `.start()`. Simple async polling loop for stall detection.
60+
- **`deps.py`**: Package installation via `asyncio.create_subprocess_exec`.
61+
- **`backends/`**: All backend methods are `async def`. `listen()` and `subscribe_results()` are async generators.
5662

5763
## Data flow
5864

@@ -63,19 +69,21 @@ Handles remote execution. The `Worker` class reconstructs functions from seriali
6369
-> graph.register(func) # add FunctionNode to graph
6470
-> graph._auto_register(func) # recursively discover deps
6571
66-
func.run(*args)
72+
await func.run(*args) # submit + await result (returns value)
73+
await func.start(*args) # submit only (returns Result)
6774
-> graph.serialize(func) # build Store, export JSON
6875
-> Task(graph_json, function_name, args, ...) # package into Task
69-
-> backend.submit(task.to_json()) # send via transport
70-
-> return Result(task_id, backend) # future handle
76+
-> await backend.submit(task.to_json()) # send via transport
77+
-> return Result(task_id, backend) # future handle (.start())
78+
-> return await result # awaited value (.run())
7179
72-
Worker.run(task)
80+
await Worker.run(task)
7381
-> store = Store.from_json(task.graph_json) # deserialize
74-
-> deps.ensure_dependencies(store, func_name) # pip install missing
82+
-> await deps.ensure_dependencies(...) # pip install missing
7583
-> store.reconstruct(func_name) # emit Python source
7684
-> compile() + exec() into namespace # build callable
77-
-> namespace[func_name](*args, **kwargs) # execute
78-
-> backend.send_result(task_id, envelope) # return result
85+
-> await namespace[func_name](*args) # execute (or run_in_executor for sync)
86+
-> await backend.send_result(task_id, envelope) # return result
7987
```
8088

8189
## Key patterns and conventions
@@ -87,13 +95,12 @@ Worker.run(task)
8795
- **Cross-module inlining**: Imports like `from utils import helper` where `helper` is a user function get converted from import statements to inline dependency edges, making reconstructed code self-contained.
8896
- **Module-level variables**: Constants and assignments (`MAX_RETRIES = 5`, `CONFIG = {...}`) referenced by traced functions are captured and emitted in reconstructed source.
8997
- **Class-level attributes**: Class body statements (assignments, annotated assignments, docstrings) are extracted from AST and emitted in reconstructed class blocks. Class decorators (`@dataclass`, etc.) and metaclass keywords (`metaclass=ABCMeta`) are captured and emitted.
90-
- **Closure handling**: Multi-tier capture: `repr()` validation, then lambda source extraction, then auto-discovery for non-traced user functions, then constructor expressions for common stdlib types (`defaultdict`, `Counter`, `deque`), then pickle fallback for picklable objects. Traced function references become dependency edges.
98+
- **Closure handling**: Multi-tier capture: repr validation → traced functions → lambdas (source extraction) → non-traced user functions (auto-registration) → constructor expressions (defaultdict/Counter/deque) → pickle fallback → warning. Traced function references become dependency edges.
9199
- **Decorator stripping**: `@trace` lines are removed from captured source so reconstructed code doesn't depend on pyfuse.
92100
- **Backend auto-detection**: `connect()` picks Redis or shared memory based on URL scheme. Falls back to `PYFUSE_BACKEND` env var.
93101
- **Worker caching**: Keyed by SHA-256 of all reachable content hashes (sorted + joined). Same code from different clients = cache hit.
94-
- **Async transparency**: Workers detect `async def` functions and run them via `asyncio.run()`. Results are awaitable via `asyncio.Future` fan-out.
95-
- **Notification fan-out**: `ResultWaiter` singleton per backend runs one listener thread and one heartbeat thread, serving all pending `Result` objects. No per-task polling.
96-
- **Heartbeat monitoring**: Workers send heartbeats every 1s. Client-side stall detection tracks when heartbeat *values* last changed using local monotonic clock (no cross-machine timestamp comparison).
102+
- **Async-native I/O**: All backend methods, worker execution, result handling, pip installation, and subprocess management use `asyncio`. Sync user functions run in `loop.run_in_executor()` to avoid blocking the event loop.
103+
- **Heartbeat**: Workers send heartbeats via `asyncio.create_task`. Client-side stall detection tracks when heartbeat *values* last changed using local monotonic clock (no cross-machine timestamp comparison).
97104

98105
## Serialization format (v0.4.0)
99106

@@ -128,24 +135,24 @@ Worker.run(task)
128135
@trace # capture function
129136
@trace(timeout=30, retries=3) # with execution options
130137

131-
# Remote execution
132-
pyfuse.connect("redis://localhost:6379") # configure backend
133-
pyfuse.serve("redis://...", concurrency=4) # start worker loop
134-
func.run(*args) # -> Result (awaitable future)
135-
func.map([(a1, b1), (a2, b2)]) # -> list[Result]
136-
await func.arun(*args) # async submit + await
137-
await func.amap([(a1, b1), ...]) # async batch submit + await all
138+
# Remote execution (all async)
139+
pyfuse.connect("redis://localhost:6379") # configure backend (sync)
140+
await pyfuse.serve("redis://...", concurrency=4) # start worker loop
141+
await func.run(*args) # submit + await result (returns value)
142+
future = await func.start(*args) # submit (returns Result handle)
143+
results = await func.map([(a1, b1), ...]) # batch submit + await all (returns values)
138144

139145
# Result handling
140-
result = future.result(timeout=10) # sync blocking
141-
result = await future # async await
142-
result = await future.aresult(timeout=10, stall_timeout=10.0) # async with options
146+
result = await future # shorthand for await future.result()
147+
result = await future.result(timeout=10, stall_timeout=10.0) # with options
148+
await future.done() # non-blocking check
149+
await future.status() # "pending", "success", or "error"
143150

144-
# Serialization
151+
# Serialization (sync -- pure CPU)
145152
pyfuse.serialize(func) # -> JSON string
146153
pyfuse.reconstruct(json_str, "name") # -> Python source string
147154
pyfuse.pack(func, *args) # -> Task
148-
pyfuse.execute(task) # -> return value
155+
await pyfuse.execute(task) # -> return value
149156

150157
# Inspection
151158
pyfuse.get_graph() # -> Graph
@@ -159,7 +166,9 @@ pytest # run all tests
159166
pytest tests/test_api.py # specific module
160167
```
161168

162-
15 test modules covering: API surface, AST analysis, async features (aresult, await, arun, amap, gather, heartbeat, stall detection, notification-based result delivery), auto-discovery (including metaclass keywords, class attributes, class decorators, `__init_subclass__`), dependency management, graph operations, integration scenarios, remote execution, runtime tracing (including closure capture of non-traced functions, lambdas, constructor expressions, pickle fallback), shared memory backend, store operations, stress tests (47 functions across 7 files), task serialization, temp venv management, and worker caching/execution.
169+
15 test modules covering: API surface, AST analysis, async features (Result.result, await, .run(), .start(), .map(), gather, heartbeat, stall detection), auto-discovery (including metaclass keywords, class attributes, class decorators, `__init_subclass__`), dependency management, graph operations, integration scenarios, remote execution, runtime tracing (including closure capture of non-traced functions, lambdas, constructor expressions, pickle fallback), shared memory backend, store operations, stress tests (47 functions across 7 files), task serialization, temp venv management, and worker caching/execution.
170+
171+
All async tests use `pytest-asyncio` with `asyncio_mode = "auto"`.
163172

164173
## Development
165174

@@ -176,8 +185,7 @@ pytest # test suite
176185
- `analyzer.py` is the core of static analysis (~365 lines). Changes here affect what gets captured.
177186
- `tracing.py` uses `contextvars.ContextVar` for thread/async safety. The `_runtime_deps` dict is guarded by `threading.Lock`.
178187
- The `Task` wire format keeps `graph` as a JSON string (not nested object) to keep the envelope flat.
179-
- Backend implementations must satisfy the `Backend` ABC in `backends/base.py`. New methods (`notify_result`, `subscribe_results`, `get_heartbeats`) are non-abstract with safe defaults -- custom backends don't break.
180-
- `ResultWaiter` in `result.py` is a per-backend singleton with two daemon threads (listener + heartbeat). It uses `loop.call_soon_threadsafe()` for async fan-out and `threading.Event` for sync fan-out.
188+
- Backend implementations must satisfy the `Backend` ABC in `backends/base.py`. All methods are `async def`. New methods (`notify_result`, `subscribe_results`, `get_heartbeats`) are non-abstract with safe defaults -- custom backends don't break.
181189
- `install_package_as()` is a no-op at runtime; the AST analyzer in `decorator.py`/`analyzer.py` detects the `with` block pattern and tags `ImportInfo` objects with the package name.
182190
- `_capture_closure()` in `graph.py` uses a multi-tier strategy: repr validation → traced functions → lambdas (source extraction) → non-traced user functions (auto-registration) → constructor expressions (defaultdict/Counter/deque) → pickle fallback → warning. Returns function objects for auto-registration.
183191
- `_set_class_metadata()` in `graph.py` captures class-level attributes and decorators from the class source AST. Called from both `_auto_register_class` and `_discover_self_call_deps` to handle both constructor-discovered and directly-traced method classes.

0 commit comments

Comments
 (0)