Skip to content

Commit eaf785b

Browse files
committed
New features: scheduling and throttling.
1 parent 9b36937 commit eaf785b

19 files changed

Lines changed: 853 additions & 12 deletions

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ After setup, tasks are signed automatically. No client-side code changes. See [S
8181
| **Package auto-install** | Workers `pip install` missing packages before execution |
8282
| **Async-native** | `.run()`, `.start()`, `.map()`, `asyncio.gather` |
8383
| **Retry & timeout** | `@trace(timeout=30, retries=3)` with exponential backoff |
84+
| **Scheduling** | `.run_in(delay)`, `.run_at(datetime)`, `.run_every(freq)` with cancellation |
85+
| **Throttling** | `@trace(throttle=timedelta(hours=24)/50)` — rate-limit executions |
8486
| **Progress & cancellation** | `pyfuse.progress(3, 10)` inside tasks; `await future.cancel()` on client |
8587
| **Heartbeat & stall detection** | Workers heartbeat; clients raise `TaskStalled` on silence |
8688
| **Content-hash caching** | Same code = cache hit, regardless of client |
@@ -105,7 +107,7 @@ pyfuse worker --backend local://localhost:9748 --tmp
105107
pyfuse run examples/remote_execution.py
106108
```
107109

108-
[`remote_execution.py`](examples/remote_execution.py) · [`async_execution.py`](examples/async_execution.py) · [`package_installation.py`](examples/package_installation.py) · [`progress_reporting.py`](examples/progress_reporting.py) · [`cancellation.py`](examples/cancellation.py) · [`large_module.py`](examples/large_module.py)
110+
[`remote_execution.py`](examples/remote_execution.py) · [`async_execution.py`](examples/async_execution.py) · [`package_installation.py`](examples/package_installation.py) · [`progress_reporting.py`](examples/progress_reporting.py) · [`cancellation.py`](examples/cancellation.py) · [`scheduling.py`](examples/scheduling.py) · [`throttling_and_retry.py`](examples/throttling_and_retry.py) · [`large_module.py`](examples/large_module.py)
109111

110112
## License
111113

docs/QUICK_START.md

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,39 @@ def flaky_task(url: str) -> str: ...
5858

5959
Retries use exponential backoff (1s, 2s, 4s).
6060

61+
## Scheduling
62+
63+
Execute tasks on a delay, at a specific time, or on a recurring schedule:
64+
65+
```python
66+
from datetime import datetime, timedelta
67+
68+
# Run after a delay
69+
result = await func.run_in(timedelta(minutes=5), *args)
70+
71+
# Run at a specific time
72+
result = await func.run_at(datetime(2026, 4, 21, 9, 0), *args)
73+
74+
# Recurring execution (every hour)
75+
schedule = await func.run_every(timedelta(hours=1), *args)
76+
await schedule.cancel() # stop the schedule
77+
```
78+
79+
`start_at` and `start_in` return a `Result` handle (like `.start()`).
80+
81+
## Throttling
82+
83+
Rate-limit how often a function can be executed:
84+
85+
```python
86+
from datetime import timedelta
87+
88+
@trace(throttle=timedelta(hours=24) / 50) # ~29 min cooldown
89+
def expensive_api_call(query: str) -> str: ...
90+
```
91+
92+
If a task arrives during the cooldown window, the worker returns a `ThrottleError` immediately (no retry). The cooldown is only recorded after a successful execution.
93+
6194
## Third-party packages
6295

6396
Workers auto-install missing packages. When the import name differs from the pip package:

docs/TECHNICAL_OVERVIEW.md

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ pyfuse/
5959
# Decorator
6060
@trace # capture function for remote execution
6161
@trace(timeout=30, retries=3) # with execution options
62+
@trace(throttle=timedelta(hours=24)/50) # rate-limit: 50 calls per day
6263

6364
# Remote execution (all async)
6465
pyfuse.connect("redis://localhost:6379") # configure backend (sync)
@@ -67,6 +68,12 @@ await func.run(*args) # submit + await result
6768
future = await func.start(*args) # submit, returns Result handle
6869
results = await func.map([(a1, b1), ...]) # batch submit + await all
6970

71+
# Scheduling
72+
await func.run_in(timedelta(minutes=5), *args) # execute after delay
73+
await func.run_at(datetime(2026, 1, 1), *args) # execute at specific time
74+
schedule = await func.run_every(timedelta(hours=1), *args) # recurring
75+
await schedule.cancel() # stop recurring
76+
7077
# Result handling
7178
result = await future # await result value
7279
result = await future.result(timeout=10) # with timeout and stall detection
@@ -101,13 +108,17 @@ pyfuse.get_graph().to_mermaid(func) # -> Mermaid diagram string
101108
### Worker side: `await serve()` / `pyfuse worker`
102109

103110
1. **Listen** -- `async for task_json in backend.listen()` yields tasks as they arrive.
104-
2. **Cancellation check** -- If `await backend.is_cancelled(task_id)` returns ``True``, skip execution and log the cancellation.
105-
3. **Deserialize** -- Parse the JSON graph into a `Store`.
111+
2. **Scheduling wait** -- If the task has a `scheduled_at` timestamp in the future, `await asyncio.sleep(delay)` until that time.
112+
3. **Cancellation check** -- If `await backend.is_cancelled(task_id)` returns ``True``, skip execution and log the cancellation.
113+
4. **Throttle check** -- If the task has a `throttle` value and the cooldown hasn't elapsed, return a `"throttled"` result immediately.
114+
5. **Deserialize** -- Parse the JSON graph into a `Store`.
106115
4. **Cache check** -- Compute a subgraph key (SHA-256 of all reachable content hashes). If cached, skip to step 7.
107116
5. **Install dependencies** -- Extract third-party imports, install missing packages via `asyncio.create_subprocess_exec`.
108117
6. **Reconstruct** -- Produce a self-contained Python script from the store. `compile()` and `exec()` it into a fresh namespace.
109-
7. **Execute** -- Call the function with the provided arguments. Async functions are awaited directly; sync functions run in an executor with explicit context propagation (for progress reporting). Apply retry/timeout policies via `asyncio.wait_for`.
110-
8. **Send result** -- Wrap the return value (or exception traceback) in a `ResultEnvelope` and send it back. If cancelled during execution, skip result delivery.
118+
9. **Execute** -- Call the function with the provided arguments. Async functions are awaited directly; sync functions run in an executor with explicit context propagation (for progress reporting). Apply retry/timeout policies via `asyncio.wait_for`.
119+
10. **Send result** -- Wrap the return value (or exception traceback) in a `ResultEnvelope` and send it back. If cancelled during execution, skip result delivery.
120+
11. **Record throttle** -- If the task has a `throttle` value and execution succeeded, record a cooldown in the backend.
121+
12. **Re-enqueue recurring** -- If the task has a `recur_interval` and its schedule hasn't been cancelled, submit a new task instance with `scheduled_at = now + interval`.
111122

112123
### Client side: `await future` / `await future.result()`
113124

@@ -117,7 +128,7 @@ The `Result` object supports two waiting strategies:
117128

118129
**With stall detection** (default, `stall_timeout=10.0`) -- An async polling loop calls `try_get_result()` and checks heartbeats every second. If the heartbeat hasn't changed for longer than `stall_timeout`, `TaskStalled` is raised.
119130

120-
**Unwrap** -- If status is `"ok"`, return the value. If `"error"`, raise `RemoteError` with the remote traceback.
131+
**Unwrap** -- If status is `"ok"`, return the value. If `"error"`, raise `RemoteError` with the remote traceback. If `"throttled"`, raise `ThrottleError`.
121132

122133
## Transport backends
123134

@@ -137,6 +148,10 @@ The `Backend` ABC defines an async transport interface:
137148
| `async is_cancelled(task_id)` | Check cancellation flag (default returns `False`) |
138149
| `async send_progress(task_id, json)` | Store latest progress data (no-op default) |
139150
| `async get_progress(task_id)` | Get latest progress JSON (default returns `None`) |
151+
| `async cancel_schedule(schedule_id)` | Mark recurring schedule cancelled (no-op default) |
152+
| `async is_schedule_cancelled(schedule_id)` | Check schedule cancellation (default `False`) |
153+
| `async check_throttle(function_name)` | Check if function is rate-limited (default `True`) |
154+
| `async record_throttle(fn, seconds)` | Start cooldown after execution (no-op default) |
140155
| `async notify_result(task_id)` | Push notification that result is ready (no-op default) |
141156
| `async subscribe_results()` | Async iterator yielding task_ids on result arrival |
142157
| `async close()` | Release resources |
@@ -151,6 +166,8 @@ Uses `redis.asyncio.Redis` with `RPUSH`/`BLPOP` patterns. Keys:
151166
- `pyfuse:heartbeat:{task_id}` -- worker heartbeat timestamp (TTL: 30s)
152167
- `pyfuse:cancel:{task_id}` -- cancellation flag (TTL: 3600s)
153168
- `pyfuse:progress:{task_id}` -- latest progress JSON (TTL: 300s)
169+
- `pyfuse:schedule:{schedule_id}` -- schedule cancellation flag (TTL: 30 days)
170+
- `pyfuse:throttle:{function_name}` -- throttle cooldown (TTL: throttle seconds)
154171
- `pyfuse:notify` -- Pub/Sub channel for result notifications
155172

156173
Result notifications use Redis Pub/Sub (`PUBLISH`/`SUBSCRIBE`). Batch heartbeat fetching uses `MGET` for efficiency.

examples/scheduling.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
"""Task scheduling with pyfuse.
2+
3+
Demonstrates:
4+
- run_in(delay) — execute after a delay
5+
- run_at(datetime) — execute at a specific time
6+
- run_every(freq) — recurring execution with cancellation
7+
8+
Usage:
9+
# Terminal 1 -- start a worker
10+
pyfuse worker --backend redis://localhost:6379 --tmp
11+
12+
# Terminal 2 -- run this script
13+
pyfuse run examples/scheduling.py
14+
"""
15+
16+
import asyncio
17+
from datetime import datetime, timedelta
18+
19+
import pyfuse
20+
from pyfuse import trace
21+
22+
pyfuse.connect("local://localhost:9748")
23+
24+
25+
@trace
26+
def greet(name: str) -> str:
27+
return f"Hello, {name}! (executed at {datetime.now():%H:%M:%S})"
28+
29+
30+
@trace
31+
def tick(n: int) -> str:
32+
return f"Tick #{n} at {datetime.now():%H:%M:%S}"
33+
34+
35+
async def main() -> None:
36+
print(f"Now: {datetime.now():%H:%M:%S}\n")
37+
38+
# 1. run_in — execute after a 2-second delay
39+
print("— run_in(2 seconds) —")
40+
result = await greet.run_in(timedelta(seconds=2), "Alice")
41+
print(result)
42+
43+
# 2. run_at — execute at a specific time (3 seconds from now)
44+
print("\n— run_at(now + 3s) —")
45+
target = datetime.now() + timedelta(seconds=3)
46+
print(f"Scheduled for: {target:%H:%M:%S}")
47+
result = await greet.run_at(target, "Bob")
48+
print(result)
49+
50+
# 3. run_every — recurring execution every 2 seconds
51+
print("\n— run_every(2 seconds) for ~6 seconds —")
52+
schedule = await tick.run_every(timedelta(seconds=2), 1)
53+
print(f"Schedule started: {schedule.schedule_id}")
54+
await asyncio.sleep(6)
55+
56+
# Cancel the recurring schedule
57+
await schedule.cancel()
58+
print(f"Schedule cancelled: {schedule.schedule_id}")
59+
60+
61+
asyncio.run(main())

examples/throttling_and_retry.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
"""Throttling and retry with pyfuse.
2+
3+
Demonstrates:
4+
- @trace(throttle=timedelta) — rate-limit executions
5+
- @trace(retries=N, retry_delay=T) — retry with exponential backoff
6+
- Combining throttle + retry
7+
8+
Usage:
9+
# Terminal 1 -- start a worker
10+
pyfuse worker --backend redis://localhost:6379 --tmp
11+
12+
# Terminal 2 -- run this script
13+
pyfuse run examples/throttling_and_retry.py
14+
"""
15+
16+
import asyncio
17+
import random
18+
from datetime import timedelta
19+
20+
import pyfuse
21+
from pyfuse import trace, ThrottleError
22+
23+
pyfuse.connect("local://localhost:9748")
24+
25+
26+
# --- Throttling: at most once every 5 seconds ---
27+
28+
@trace(throttle=timedelta(seconds=5))
29+
def expensive_query(query: str) -> str:
30+
return f"Result for '{query}'"
31+
32+
33+
# --- Retry: 3 attempts with exponential backoff ---
34+
35+
@trace(retries=3, retry_delay=0.5)
36+
def flaky_operation(x: int) -> int:
37+
if random.random() < 0.5:
38+
raise RuntimeError("Random failure!")
39+
return x * 2
40+
41+
42+
async def main() -> None:
43+
# 1. Throttling demo
44+
print("— Throttle demo (5s cooldown) —")
45+
result = await expensive_query.run("first call")
46+
print(f" Call 1: {result}")
47+
48+
try:
49+
result = await expensive_query.run("second call (too soon)")
50+
print(f" Call 2: {result}")
51+
except ThrottleError:
52+
print(" Call 2: ThrottleError — rate limited!")
53+
54+
# Wait for cooldown
55+
print(" Waiting 5s for cooldown...")
56+
await asyncio.sleep(5)
57+
58+
result = await expensive_query.run("third call (after cooldown)")
59+
print(f" Call 3: {result}")
60+
61+
# 2. Retry demo
62+
print("\n— Retry demo (3 retries, 0.5s base delay) —")
63+
try:
64+
result = await flaky_operation.run(21)
65+
print(f" Result: {result}")
66+
except Exception as exc:
67+
print(f" Failed after retries: {exc}")
68+
69+
70+
asyncio.run(main())

pyfuse/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
TaskCancelled,
1515
SignatureError,
1616
DependencyError,
17+
ThrottleError,
1718
)
1819
from pyfuse.core.models import ImportInfo, FunctionNode
1920
from pyfuse.graph.graph import Graph
@@ -50,6 +51,7 @@
5051
from pyfuse.worker.worker import Worker
5152
from pyfuse.worker.worker import execute as execute
5253
from pyfuse.worker.sandbox import DockerSandbox
54+
from pyfuse.worker.schedule import ScheduleHandle
5355
from pyfuse.graph.decorator import trace
5456
from pyfuse.worker.backends.base import Backend
5557

@@ -125,6 +127,7 @@ def pack(func: Callable[..., object], *args: Any, **kwargs: Any) -> Task:
125127
"RemoteError",
126128
"TaskStalled",
127129
"TaskCancelled",
130+
"ThrottleError",
128131
"SignatureError",
129132
"PairingError",
130133
# Graph
@@ -154,6 +157,8 @@ def pack(func: Callable[..., object], *args: Any, **kwargs: Any) -> Task:
154157
"Task",
155158
"Worker",
156159
"Backend",
160+
# Scheduling
161+
"ScheduleHandle",
157162
# Sandbox
158163
"DockerSandbox",
159164
]

pyfuse/core/errors.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ class PairingError(Error):
4646
"""Raised when the PIN-based pairing protocol fails."""
4747

4848

49+
class ThrottleError(Error):
50+
"""Raised when a task is rejected due to rate limiting."""
51+
52+
4953
# ---------------------------------------------------------------------------
5054
# Custom excepthook: suppress client traceback for RemoteError
5155
# ---------------------------------------------------------------------------

pyfuse/core/task.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,10 @@ class Task:
102102
timeout: float | None = None
103103
retries: int = 0
104104
retry_delay: float = 1.0
105+
scheduled_at: float | None = None
106+
recur_interval: float | None = None
107+
schedule_id: str | None = None
108+
throttle: float | None = None
105109
signature: str | None = None
106110

107111
# -- Serialization -------------------------------------------------------
@@ -121,6 +125,14 @@ def _to_dict(self) -> dict[str, Any]:
121125
d["retries"] = self.retries
122126
if self.retry_delay != 1.0:
123127
d["retry_delay"] = self.retry_delay
128+
if self.scheduled_at is not None:
129+
d["scheduled_at"] = self.scheduled_at
130+
if self.recur_interval is not None:
131+
d["recur_interval"] = self.recur_interval
132+
if self.schedule_id is not None:
133+
d["schedule_id"] = self.schedule_id
134+
if self.throttle is not None:
135+
d["throttle"] = self.throttle
124136
return d
125137

126138
def to_json(self, *, signing_key: bytes | None = None) -> str:
@@ -185,5 +197,9 @@ def from_json(
185197
timeout=data.get("timeout"),
186198
retries=data.get("retries", 0),
187199
retry_delay=data.get("retry_delay", 1.0),
200+
scheduled_at=data.get("scheduled_at"),
201+
recur_interval=data.get("recur_interval"),
202+
schedule_id=data.get("schedule_id"),
203+
throttle=data.get("throttle"),
188204
signature=sig or None, # normalise empty string to None
189205
)

0 commit comments

Comments
 (0)