Skip to content

Commit 9b36937

Browse files
authored
Merge pull request #10 from codeSamuraii/copilot/add-worker-logging-feature
Harden input validation, security, and packaging for first open source release
2 parents 3c5fd06 + 62119b6 commit 9b36937

12 files changed

Lines changed: 73 additions & 9 deletions

File tree

pyfuse/core/__init__.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,14 @@
22
from pyfuse.core.errors import Error, RemoteError, WorkerError, DependencyError
33
from pyfuse.core.models import ImportInfo, FunctionNode
44
from pyfuse.core.version import _VERSION
5+
6+
__all__ = [
7+
"Task",
8+
"resolve_args",
9+
"Error",
10+
"RemoteError",
11+
"WorkerError",
12+
"DependencyError",
13+
"ImportInfo",
14+
"FunctionNode",
15+
]

pyfuse/core/errors.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,4 +71,5 @@ def _pyfuse_excepthook(
7171
_original_excepthook(exc_type, exc_value, exc_tb)
7272

7373

74-
sys.excepthook = _pyfuse_excepthook
74+
if sys.excepthook is not _pyfuse_excepthook:
75+
sys.excepthook = _pyfuse_excepthook

pyfuse/core/models.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,13 @@ def to_dict(self) -> dict[str, str]:
2323

2424
@classmethod
2525
def from_dict(cls, data: dict[str, str]) -> Self:
26-
"""Deserialize from a plain dict."""
26+
"""Deserialize from a plain dict.
27+
28+
Raises
29+
------
30+
KeyError
31+
If required fields (``statement``, ``bound_name``) are missing.
32+
"""
2733
return cls(
2834
statement=data["statement"],
2935
bound_name=data["bound_name"],
@@ -134,7 +140,18 @@ def content_hash(self) -> str:
134140

135141
@classmethod
136142
def from_dict(cls, data: dict[str, Any]) -> Self:
137-
"""Deserialize from a plain dict."""
143+
"""Deserialize from a plain dict.
144+
145+
Raises
146+
------
147+
KeyError
148+
If required fields (``qualified_name``, ``name``, ``module``,
149+
``source``, ``imports``, ``dependencies``) are missing.
150+
"""
151+
required = ("qualified_name", "name", "module", "source", "imports", "dependencies")
152+
missing = [k for k in required if k not in data]
153+
if missing:
154+
raise KeyError(f"FunctionNode missing required fields: {', '.join(missing)}")
138155
return cls(
139156
qualified_name=data["qualified_name"],
140157
name=data["name"],

pyfuse/core/task.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ def from_json(
166166
sig = data.pop("signature", None)
167167

168168
if signing_key is not None:
169-
if sig is None:
169+
if not sig:
170170
raise SignatureError(
171171
"Task is unsigned but signing is enabled — "
172172
"rejecting unauthenticated task"
@@ -185,5 +185,5 @@ def from_json(
185185
timeout=data.get("timeout"),
186186
retries=data.get("retries", 0),
187187
retry_delay=data.get("retry_delay", 1.0),
188-
signature=sig,
188+
signature=sig or None, # normalise empty string to None
189189
)

pyfuse/graph/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
11
from pyfuse.graph.graph import Graph
22
from pyfuse.graph.store import Store, MergeResult
33
from pyfuse.graph.decorator import trace
4+
5+
__all__ = ["Graph", "Store", "MergeResult", "trace"]

pyfuse/graph/decorator.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,13 @@ def _apply_trace(
5555
retries: int = 0,
5656
retry_delay: float = 1.0,
5757
) -> TracedFunction[_P, _R]:
58+
if timeout is not None and timeout <= 0:
59+
raise ValueError(f"timeout must be positive, got {timeout}")
60+
if retries < 0:
61+
raise ValueError(f"retries must be non-negative, got {retries}")
62+
if retry_delay < 0:
63+
raise ValueError(f"retry_delay must be non-negative, got {retry_delay}")
64+
5865
logger.debug("@trace applied to %s", func.__qualname__)
5966
graph = Graph.default()
6067
graph.register(func)

pyfuse/worker/__init__.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,16 @@
22
from pyfuse.worker.remote import serve, connect, disconnect, submit_remote
33
from pyfuse.worker.result import Result, ResultEnvelope
44
from pyfuse.worker.worker import Worker, execute
5+
6+
__all__ = [
7+
"install_package_as",
8+
"ensure_dependencies",
9+
"serve",
10+
"connect",
11+
"disconnect",
12+
"submit_remote",
13+
"Result",
14+
"ResultEnvelope",
15+
"Worker",
16+
"execute",
17+
]

pyfuse/worker/backends/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
11
from pyfuse.worker.backends.base import Backend
2+
3+
__all__ = ["Backend"]

pyfuse/worker/deps.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,10 @@ async def _pip_install(package: str, extra_args: list[str]) -> tuple[int, str, s
9696
stderr=asyncio.subprocess.PIPE,
9797
)
9898
stdout_bytes, stderr_bytes = await proc.communicate()
99+
# returncode is None if the process hasn't terminated; treat as failure
100+
returncode = proc.returncode if proc.returncode is not None else 1
99101
return (
100-
proc.returncode or 0,
102+
returncode,
101103
stdout_bytes.decode() if stdout_bytes else "",
102104
stderr_bytes.decode() if stderr_bytes else "",
103105
)

pyfuse/worker/remote.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,14 +220,14 @@ async def _heartbeat_loop(
220220
try:
221221
await backend.send_heartbeat(task_id)
222222
except Exception:
223-
pass # best-effort
223+
logger.debug("Heartbeat send failed for task %s", task_id, exc_info=True)
224224
if exec_task is not None:
225225
try:
226226
if await backend.is_cancelled(task_id):
227227
exec_task.cancel()
228228
return
229229
except Exception:
230-
pass
230+
logger.debug("Cancellation check failed for task %s", task_id, exc_info=True)
231231
try:
232232
await asyncio.wait_for(cancel_event.wait(), timeout=_HEARTBEAT_INTERVAL)
233233
except asyncio.TimeoutError:

0 commit comments

Comments
 (0)