Each example is a self-contained script. Run any of them with:
offwork worker --backend local://localhost:9748 --tmp # Terminal 1 — start a worker
offwork run examples/<script>.py # Terminal 2 — run the exampleoffwork run creates a temporary venv, auto-detects and installs dependencies, and runs the script. No global installs required.
The baseline: one decorator on an entry-point function, plain helpers around it. Demonstrates that offwork captures the full call graph without any extra annotations.
def add(a, b): ... # plain helper — no @offwork.task
def multiply(a, b): ... # another helper
@offwork.task
def dot_product(u, v):
return sum(multiply(a, b) for a, b in zip(u, v)) # both helpers are capturedAll four async execution patterns side by side:
result = await func.run(3, 4) # submit + await
future = await func.submit(3, 4) # submit → handle
result = await future # await later
results = await func.map([(3, 4), (5, 12)]) # batch
r1, r2 = await asyncio.gather(func.run(3, 4), func.run(5, 12))Workers auto-install missing packages before execution. Also shows worker_only_import — packages that exist only on the worker, resolved to lightweight stubs on the client:
from offwork import worker_only_import, install_package_as
with worker_only_import(): # client never installs this
import requests
with install_package_as("PyYAML"): # pip name differs from import name
import yamlReal-time progress from a long-running task. offwork.progress() is a no-op when called outside a worker:
@offwork.task
def process(n: int) -> int:
for i in range(n):
offwork.progress(i + 1, n, message=f"step {i+1}/{n}")
...
future = await process.submit(100)
async for p in future.progress(): # streams each update until done
print(f"{p.percent:.0f}%")An async def ... yield task streams each value back as it is produced. Consume it with async for via .stream(...):
@offwork.task
async def tail_lines(count: int):
for i in range(count):
yield f"line {i + 1}"
async for line in tail_lines.stream(5): # values arrive in order
print(line)Cooperative task cancellation. The client cancels a pending or in-flight task; awaiting it raises TaskCancelled:
future = await slow_task.submit()
await asyncio.sleep(1)
await future.cancel() # wait up to 30 s for worker confirmation
try:
await future
except offwork.TaskCancelled:
print("cancelled")Three scheduling modes — delayed, point-in-time, and recurring:
await func.run(*args, run_in=timedelta(seconds=5)) # after a delay
await func.run(*args, run_at=datetime(2026, 6, 1, 9, 0)) # at a specific time
schedule = await func.submit(*args, run_every=timedelta(minutes=10))
await asyncio.sleep(35)
await schedule.cancel() # stop after ~3 executionsRate-limiting and fault tolerance in the decorator:
@offwork.task(throttle=timedelta(minutes=30)) # at most once per 30 min
def rate_limited(): ...
@offwork.task(retries=3, timeout=10) # up to 3 retries, 10s each
def flaky(): ...Calling a throttled task during the cooldown raises ThrottleError immediately.
Stress test: a module with 47 functions across 7 files, 3 classes, and deep dependency chains. Only the entry point is decorated — offwork discovers everything else automatically. Useful for verifying auto-discovery at scale.
offwork worker --backend redis://localhost:6379 # requires Redis
python examples/large_module.pyFan-out ETL pattern: split a large CSV into chunks, process each chunk on a worker, merge results. Each task is pure (bytes in, dict out). Demonstrates .map() for batch submission:
chunks = [(0, 500), (500, 1000), (1000, 1500)]
results = await summarize_chunk.map([(data, start, end) for start, end in chunks])
merged = merge(results)FastAPI endpoint that offloads PDF rendering to a worker. The web process stays lightweight; heavy CPU work (reportlab) runs on the worker pool. Shows the typical "offload CPU work from a web handler" pattern:
@offwork.task
def render_report(data: dict) -> bytes: # returns PDF bytes
...
# In the FastAPI handler:
pdf_bytes = await render_report.run(report_data)
return Response(pdf_bytes, media_type="application/pdf")Stateful poller (IMAP connection, seen-UID tracking) stays local; per-attachment analysis is offloaded. Illustrates the pattern of keeping stateful I/O local while farming out pure compute:
@offwork.task
def process_attachment(filename: str, data: bytes) -> dict:
classification = _classify(filename)
text = _extract_text(data)
return _score_risk(classification, text)Recurring backup job via submit(run_every=...). The task is small; it delegates to three plain helpers (_archive, _compress, _upload) that offwork discovers automatically:
schedule = await snapshot_directory.submit(source_path, run_every=timedelta(hours=6))
# runs every 6 hours; call await schedule.cancel() to stop