-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprogress_reporting.py
More file actions
56 lines (39 loc) · 1.37 KB
/
Copy pathprogress_reporting.py
File metadata and controls
56 lines (39 loc) · 1.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
"""Progress reporting with offwork.
Demonstrates how a long-running task can report progress
back to the client in real time.
Usage:
# Terminal 1 -- start a worker
offwork worker --backend local://localhost:9748 --tmp
# Terminal 2 -- run this script
offwork run examples/progress_reporting.py
"""
import asyncio
import time
import offwork
from offwork import progress
offwork.connect("local://localhost:9748")
def process_item(item: str) -> str:
"""Simulate work on a single item."""
time.sleep(0.3)
return item.upper()
@offwork.task
def batch_process(items: list[str]) -> list[str]:
"""Process items one by one, reporting progress after each."""
results = []
total = len(items)
for i, item in enumerate(items):
results.append(process_item(item))
progress(i + 1, total, message=f"Processed '{item}'")
return results
async def main() -> None:
items = ["alpha", "bravo", "charlie", "delta", "echo"]
# Start the task (returns immediately)
future = await batch_process.submit(items)
print(f"Task submitted: {future.task_id}")
# Stream progress updates until the task finishes
async for p in future.progress():
print(f"Progress: {p.percent}% ({p.current}/{p.total}) - {p.message}")
# Get the final result
result = await future
print(f"\nDone! Results: {result}")
asyncio.run(main())