Skip to content

Commit b15a9ee

Browse files
codeSamuraiiCopilot
andcommitted
Worker-only imports
Co-authored-by: Copilot <copilot@github.com>
1 parent e6919dd commit b15a9ee

11 files changed

Lines changed: 580 additions & 18 deletions

File tree

docs/QUICK_START.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,24 @@ with install_package_as("PyYAML"):
107107

108108
Common mappings (`cv2``opencv-python`, `PIL``Pillow`, etc.) are built in.
109109

110+
### Worker-only imports
111+
112+
Skip installing packages locally — the worker installs them on demand:
113+
114+
```python
115+
from pyfuse import worker_only_import
116+
117+
with worker_only_import():
118+
import requests
119+
120+
with worker_only_import("opencv-python-headless"):
121+
import cv2
122+
```
123+
124+
The local `requests` and `cv2` resolve to lightweight stubs. They're fine to reference inside a `@trace` function (the worker re-imports them for real), but raise `WorkerOnlyError` if used directly on the client.
125+
126+
Only the names imported literally inside the `with` block are stubbed — real installed packages and their transitive imports are unaffected.
127+
110128
## Progress, cancellation, and results
111129

112130
```python

docs/TECHNICAL_OVERVIEW.md

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -442,7 +442,8 @@ A single import binding:
442442
|-------|---------|
443443
| `statement` | `"import csv"`, `"from os.path import join"` |
444444
| `bound_name` | `"csv"`, `"join"` |
445-
| `package` | `"opencv-python"` (from `install_package_as`, or `None`) |
445+
| `package` | `"opencv-python"` (from `install_package_as` / `worker_only_import`, or `None`) |
446+
| `worker_only` | `True` if the import was inside a `worker_only_import` block (omitted when `False`) |
446447

447448
Multi-name imports are split into individual objects for per-function tracking.
448449

@@ -489,6 +490,24 @@ with install_package_as("opencv-python"):
489490

490491
The worker sees the `package` field on the import and knows to `pip install opencv-python` instead of `pip install cv2`.
491492

493+
### `worker_only_import` context manager
494+
495+
Lets clients reference a package without installing it locally. On the client, imports inside the block resolve to lightweight stub modules (`_WorkerOnlyStub`) that raise `WorkerOnlyError` if used outside a worker context. On the worker, the package is installed via pip and imported normally.
496+
497+
```python
498+
with worker_only_import(): # import name == pip name
499+
import requests
500+
501+
with worker_only_import("opencv-python-headless"): # explicit pip name
502+
import cv2
503+
```
504+
505+
Implementation:
506+
- A meta-path finder is appended to `sys.meta_path` for the duration of the block (real installed packages still win because the finder runs last).
507+
- The finder's whitelist is populated by parsing the caller's `with` block source via AST — only names the user literally writes inside the block are eligible to be stubbed. This prevents transitive missing imports from real installed packages from being silently swallowed.
508+
- The AST analyzer marks every `ImportInfo` in the block with `worker_only=True` and records any explicit pip name as `package`.
509+
- The worker treats worker-only imports identically to regular third-party imports: `extract_third_party_modules` collects them, `_collect_package_hints` honors the pip name, and `pip install` runs before reconstruction.
510+
492511
## Heartbeat and stall detection
493512

494513
Workers send periodic heartbeats (every 1 second) while executing a task via an `asyncio.Task`. Clients use heartbeat data for stall detection.

examples/package_installation.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@
44
pyfuse detects the imports, installs the packages via pip, and executes
55
the function -- all automatically.
66
7+
This example also demonstrates ``worker_only_import``: the client never
8+
installs ``markdown`` or ``python-dateutil`` -- only the worker does.
9+
The local imports resolve to lightweight stubs that raise
10+
``WorkerOnlyError`` if used outside a traced function.
11+
712
Requires Redis on localhost:6379. Install: pip install redis
813
914
Usage:
@@ -17,12 +22,9 @@
1722
import asyncio
1823
from html.parser import HTMLParser
1924

20-
import requests
21-
import markdown
22-
2325
import pyfuse
2426
from pyfuse import trace
25-
from pyfuse import install_package_as
27+
from pyfuse import install_package_as, worker_only_import
2628

2729
# Some packages have different import and pip names:
2830
# import yaml -> pip install PyYAML
@@ -33,7 +35,18 @@
3335
with install_package_as("PyYAML"):
3436
import yaml
3537

36-
with install_package_as("python-dateutil"):
38+
# `worker_only_import` skips the local install entirely. The package only
39+
# needs to be available on the worker. The client gets a stub object that
40+
# raises WorkerOnlyError if used directly, but is fine to reference inside
41+
# a @trace function (it's serialized and re-imported on the worker).
42+
43+
# No pip name needed when import name == package name:
44+
with worker_only_import():
45+
import requests
46+
import markdown
47+
48+
# Pass an explicit pip name when the import name differs from the package:
49+
with worker_only_import("python-dateutil"):
3750
from dateutil import parser as date_parser
3851

3952

pyfuse/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,12 @@
1515
SignatureError,
1616
DependencyError,
1717
ThrottleError,
18+
WorkerOnlyError,
1819
)
1920
from pyfuse.core.models import ImportInfo, FunctionNode
2021
from pyfuse.graph.graph import Graph
2122
from pyfuse.graph.store import Store, MergeResult
22-
from pyfuse.worker.deps import install_package_as
23+
from pyfuse.worker.deps import install_package_as, worker_only_import
2324
from pyfuse.core.pairing import (
2425
PairingResult,
2526
generate_pin,
@@ -111,6 +112,7 @@ def pack(func: Callable[..., object], *args: Any, **kwargs: Any) -> Task:
111112
"disconnect",
112113
"serve",
113114
"install_package_as",
115+
"worker_only_import",
114116
"progress",
115117
# Serialization
116118
"serialize",
@@ -130,6 +132,7 @@ def pack(func: Callable[..., object], *args: Any, **kwargs: Any) -> Task:
130132
"ThrottleError",
131133
"SignatureError",
132134
"PairingError",
135+
"WorkerOnlyError",
133136
# Graph
134137
"get_graph",
135138
"Graph",

pyfuse/__main__.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
from pyfuse._venv import temp_venv
2929
from pyfuse.worker.deps import DEFAULT_IMPORT_TO_PACKAGE
3030
from pyfuse.worker.remote import serve
31-
from pyfuse.graph.analyzer import _parse_install_package_as
31+
from pyfuse.graph.analyzer import _parse_install_package_as, _parse_worker_only_import
3232

3333

3434
def _build_worker_cmd(python: str, args: argparse.Namespace) -> list[str]:
@@ -228,6 +228,7 @@ def _detect_script_packages(script: str) -> list[str]:
228228

229229
# module name -> pip package name (None means use default mapping)
230230
modules: dict[str, str | None] = {}
231+
skip: set[str] = set()
231232
for node in ast.iter_child_nodes(tree):
232233
if isinstance(node, (ast.Import, ast.ImportFrom)):
233234
for m in _extract_top_modules(node):
@@ -238,11 +239,18 @@ def _detect_script_packages(script: str) -> list[str]:
238239
for child in node.body:
239240
for m in _extract_top_modules(child):
240241
modules[m] = package
242+
continue
243+
if _parse_worker_only_import(node) is not False:
244+
for child in node.body:
245+
for m in _extract_top_modules(child):
246+
skip.add(m)
241247

242248
packages: dict[str, None] = {}
243249
for m, explicit_package in sorted(modules.items()):
244250
if m in sys.stdlib_module_names or m == "pyfuse":
245251
continue
252+
if m in skip:
253+
continue
246254
if _is_local_package(m, script_dir):
247255
continue
248256
pkg = explicit_package or DEFAULT_IMPORT_TO_PACKAGE.get(m, m)

pyfuse/core/errors.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ class ThrottleError(Error):
5050
"""Raised when a task is rejected due to rate limiting."""
5151

5252

53+
class WorkerOnlyError(Error):
54+
"""Raised when a worker-only import stub is used on the client."""
55+
56+
5357
# ---------------------------------------------------------------------------
5458
# Custom excepthook: suppress client traceback for RemoteError
5559
# ---------------------------------------------------------------------------

pyfuse/core/models.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,19 @@ class ImportInfo:
1313
statement: str
1414
bound_name: str
1515
package: str | None = None
16+
worker_only: bool = False
1617

17-
def to_dict(self) -> dict[str, str]:
18+
def to_dict(self) -> dict[str, Any]:
1819
"""Serialize to a plain dict."""
19-
d = {"statement": self.statement, "bound_name": self.bound_name}
20+
d: dict[str, Any] = {"statement": self.statement, "bound_name": self.bound_name}
2021
if self.package is not None:
2122
d["package"] = self.package
23+
if self.worker_only:
24+
d["worker_only"] = True
2225
return d
2326

2427
@classmethod
25-
def from_dict(cls, data: dict[str, str]) -> Self:
28+
def from_dict(cls, data: dict[str, Any]) -> Self:
2629
"""Deserialize from a plain dict.
2730
2831
Raises
@@ -34,6 +37,7 @@ def from_dict(cls, data: dict[str, str]) -> Self:
3437
statement=data["statement"],
3538
bound_name=data["bound_name"],
3639
package=data.get("package"),
40+
worker_only=bool(data.get("worker_only", False)),
3741
)
3842

3943

pyfuse/graph/analyzer.py

Lines changed: 58 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,15 @@ def get_module_imports(func: Callable[..., object]) -> list[ImportInfo]:
7474
imports.extend(_extract_import(child, package))
7575
elif isinstance(child, ast.ImportFrom):
7676
imports.extend(_extract_import_from(child, package))
77+
continue
78+
wo = _parse_worker_only_import(node)
79+
if wo is not False:
80+
wo_package = wo if isinstance(wo, str) else None
81+
for child in node.body:
82+
if isinstance(child, ast.Import):
83+
imports.extend(_extract_import(child, wo_package, worker_only=True))
84+
elif isinstance(child, ast.ImportFrom):
85+
imports.extend(_extract_import_from(child, wo_package, worker_only=True))
7786

7887
logger.debug(
7988
"Found %d import bindings in %s", len(imports), source_file
@@ -129,19 +138,57 @@ def _parse_install_package_as(node: ast.With) -> str | None:
129138
return ctx.args[0].value
130139

131140

141+
def _is_worker_only_import_call(expr: ast.expr) -> bool:
142+
"""Match ``worker_only_import`` or ``pyfuse.worker_only_import``."""
143+
if isinstance(expr, ast.Name):
144+
return expr.id == "worker_only_import"
145+
if isinstance(expr, ast.Attribute):
146+
return expr.attr == "worker_only_import"
147+
return False
148+
149+
150+
def _parse_worker_only_import(node: ast.With) -> str | bool:
151+
"""Detect ``with worker_only_import([package]):`` blocks.
152+
153+
Returns ``False`` if not a match, ``True`` if matched without a
154+
package argument, or the package string if one was supplied.
155+
"""
156+
if len(node.items) != 1:
157+
return False
158+
ctx = node.items[0].context_expr
159+
if not isinstance(ctx, ast.Call) or not _is_worker_only_import_call(ctx.func):
160+
return False
161+
if not ctx.args:
162+
return True
163+
if (
164+
len(ctx.args) == 1
165+
and isinstance(ctx.args[0], ast.Constant)
166+
and isinstance(ctx.args[0].value, str)
167+
):
168+
return ctx.args[0].value
169+
return True
170+
171+
132172
def _extract_import(
133-
node: ast.Import, package: str | None = None
173+
node: ast.Import,
174+
package: str | None = None,
175+
worker_only: bool = False,
134176
) -> list[ImportInfo]:
135177
result: list[ImportInfo] = []
136178
for alias in node.names:
137179
bound = alias.asname or alias.name.split(".")[0]
138180
stmt = ast.unparse(ast.Import(names=[alias]))
139-
result.append(ImportInfo(statement=stmt, bound_name=bound, package=package))
181+
result.append(ImportInfo(
182+
statement=stmt, bound_name=bound,
183+
package=package, worker_only=worker_only,
184+
))
140185
return result
141186

142187

143188
def _extract_import_from(
144-
node: ast.ImportFrom, package: str | None = None
189+
node: ast.ImportFrom,
190+
package: str | None = None,
191+
worker_only: bool = False,
145192
) -> list[ImportInfo]:
146193
result: list[ImportInfo] = []
147194
for alias in node.names:
@@ -170,7 +217,10 @@ def _extract_import_from(
170217
for export_name in exported:
171218
stmt = f"from {node.module} import {export_name}"
172219
result.append(
173-
ImportInfo(statement=stmt, bound_name=export_name, package=package)
220+
ImportInfo(
221+
statement=stmt, bound_name=export_name,
222+
package=package, worker_only=worker_only,
223+
)
174224
)
175225
except ImportError:
176226
warnings.warn(
@@ -185,7 +235,10 @@ def _extract_import_from(
185235
module=node.module, names=[alias], level=node.level
186236
)
187237
)
188-
result.append(ImportInfo(statement=stmt, bound_name=bound, package=package))
238+
result.append(ImportInfo(
239+
statement=stmt, bound_name=bound,
240+
package=package, worker_only=worker_only,
241+
))
189242
return result
190243

191244

pyfuse/worker/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1-
from pyfuse.worker.deps import install_package_as, ensure_dependencies
1+
from pyfuse.worker.deps import install_package_as, worker_only_import, ensure_dependencies
22
from pyfuse.worker.remote import serve, connect, disconnect, submit_remote
33
from pyfuse.worker.result import Result, ResultEnvelope
44
from pyfuse.worker.worker import Worker, execute
55

66
__all__ = [
77
"install_package_as",
8+
"worker_only_import",
89
"ensure_dependencies",
910
"serve",
1011
"connect",

0 commit comments

Comments
 (0)