Skip to content

Commit e6919dd

Browse files
codeSamuraiiCopilot
andcommitted
better examples
Co-authored-by: Copilot <copilot@github.com>
1 parent ba573ba commit e6919dd

6 files changed

Lines changed: 158 additions & 161 deletions

File tree

examples/cancellation.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,15 @@
2121
@trace
2222
async def slow_computation(n: int) -> int:
2323
"""A deliberately slow function."""
24+
total = 0
2425
try:
25-
total = 0
2626
for _ in range(n):
2727
await asyncio.sleep(1.0)
2828
total += 1
2929
return total
3030
finally:
3131
if total < n:
32-
print("Task cancelled")
33-
32+
print(f"Task interrupted after {total}/{n} iterations")
3433

3534

3635
async def main() -> None:
@@ -46,6 +45,7 @@ async def main() -> None:
4645
# Awaiting a cancelled task raises TaskCancelled
4746
try:
4847
result = await future
48+
print(f"Unexpected result: {result}")
4949
except TaskCancelled:
5050
print("Task was cancelled successfully!")
5151

@@ -54,4 +54,5 @@ async def main() -> None:
5454
print(f"Task status: {status}") # "cancelled"
5555

5656

57-
asyncio.run(main())
57+
if __name__ == "__main__":
58+
asyncio.run(main())

examples/email_attachments.py

Lines changed: 58 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
"""Poll an IMAP mailbox locally, fan attachment work out to workers.
1+
"""Inspect email attachments on workers, fanned out from a local poller.
22
33
Two concerns kept apart:
44
@@ -14,15 +14,18 @@
1414
1515
Usage:
1616
pyfuse worker --backend redis://localhost:6379 --tmp
17-
python -m pyfuse run --tmp examples/email_attachments.py
17+
python examples/email_attachments.py
18+
19+
The script synthesizes a handful of emails with attachments in-memory
20+
(no IMAP server needed), dispatches each attachment to the worker, and
21+
prints the verdict.
1822
"""
1923

2024
import asyncio
2125
import hashlib
22-
import imaplib
2326
import re
2427
from email import message_from_bytes
25-
from email.message import Message
28+
from email.message import EmailMessage, Message
2629
from typing import Any
2730

2831
import pyfuse
@@ -84,7 +87,7 @@ def process_attachment(filename: str, payload: bytes) -> dict[str, Any]:
8487
}
8588

8689

87-
# --- local-only IMAP plumbing --------------------------------------------
90+
# --- local-only mailbox plumbing -----------------------------------------
8891

8992
def iter_attachments(msg: Message) -> list[tuple[str, bytes]]:
9093
out: list[tuple[str, bytes]] = []
@@ -100,45 +103,59 @@ def iter_attachments(msg: Message) -> list[tuple[str, bytes]]:
100103
return out
101104

102105

103-
def fetch_unseen(host: str, user: str, password: str) -> list[Message]:
104-
cli = imaplib.IMAP4_SSL(host)
105-
try:
106-
cli.login(user, password)
107-
cli.select("INBOX")
108-
_, data = cli.search(None, "UNSEEN")
109-
msgs: list[Message] = []
110-
for uid in data[0].split():
111-
_, parts = cli.fetch(uid, "(RFC822)")
112-
raw = parts[0][1] if parts and parts[0] else None
113-
if isinstance(raw, bytes):
114-
msgs.append(message_from_bytes(raw))
115-
return msgs
116-
finally:
117-
cli.logout()
106+
def synthesize_inbox() -> list[Message]:
107+
"""Build a small in-memory inbox with attachments for the demo."""
108+
samples: list[tuple[str, str, list[tuple[str, bytes]]]] = [
109+
(
110+
"alice@example.com",
111+
"Lunch tomorrow?",
112+
[("photo.jpg", b"\xff\xd8\xff" + b"\x00" * 4096)],
113+
),
114+
(
115+
"billing@vendor.com",
116+
"Invoice INV-9921",
117+
[("invoice.pdf", b"%PDF-1.4 invoice total: $1,250.00 due upon receipt")],
118+
),
119+
(
120+
"noreply@scary.example",
121+
"URGENT: please review attached",
122+
[("urgent_invoice.scr", b"MZ" + b"\x00" * 2048)],
123+
),
124+
(
125+
"ops@partner.com",
126+
"Q1 deck and notes",
127+
[
128+
("deck.pdf", b"%PDF-1.4 strategy review Q1"),
129+
("notes.txt", b"meeting notes go here"),
130+
],
131+
),
132+
]
133+
134+
inbox: list[Message] = []
135+
for sender, subject, atts in samples:
136+
msg = EmailMessage()
137+
msg["From"] = sender
138+
msg["To"] = "you@example.com"
139+
msg["Subject"] = subject
140+
msg.set_content("See attached.")
141+
for name, payload in atts:
142+
msg.add_attachment(
143+
payload, maintype="application", subtype="octet-stream", filename=name,
144+
)
145+
inbox.append(message_from_bytes(bytes(msg)))
146+
return inbox
118147

119148

120149
async def main() -> None:
121-
host, user, password = "imap.example.com", "you@example.com", "secret"
122-
123-
while True:
124-
try:
125-
messages = await asyncio.to_thread(fetch_unseen, host, user, password)
126-
except Exception as exc:
127-
print(f"poll failed: {exc}")
128-
await asyncio.sleep(30)
129-
continue
130-
131-
attachments = [a for m in messages for a in iter_attachments(m)]
132-
if not attachments:
133-
await asyncio.sleep(30)
134-
continue
135-
136-
results = await process_attachment.map(attachments)
137-
for r in results:
138-
flag = "!!" if r["risk"] >= 50 else "ok"
139-
print(f" [{flag}] {r['filename']:<40} {r['kind']:<10} risk={r['risk']}")
140-
141-
await asyncio.sleep(30)
150+
messages = synthesize_inbox()
151+
attachments = [a for m in messages for a in iter_attachments(m)]
152+
print(f"Inbox: {len(messages)} messages, {len(attachments)} attachments")
153+
154+
results = await process_attachment.map(attachments)
155+
for r in results:
156+
flag = "!!" if r["risk"] >= 50 else "ok"
157+
print(f" [{flag}] {r['filename']:<24} {r['kind']:<10} "
158+
f"size={r['size']:<6} risk={r['risk']}")
142159

143160

144161
if __name__ == "__main__":

examples/image_thumbnails.py

Lines changed: 0 additions & 80 deletions
This file was deleted.

examples/large_module.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,14 @@
1414
python examples/large_module.py
1515
"""
1616
import asyncio
17+
import sys
18+
from pathlib import Path
19+
20+
# Make ``tests.fixtures.stress_test_module`` importable when running this
21+
# script directly from a checkout, without needing PYTHONPATH gymnastics.
22+
_REPO_ROOT = Path(__file__).resolve().parent.parent
23+
if str(_REPO_ROOT) not in sys.path:
24+
sys.path.insert(0, str(_REPO_ROOT))
1725

1826
import pyfuse
1927
from pyfuse import trace
@@ -47,18 +55,13 @@ def full_sensor_report(sensor_count: int, readings_per_sensor: int, seed: int =
4755
stats, anomalies, normalized,
4856
)
4957
report["delta_count"] = len(deltas)
50-
r = format_text_report(report)
51-
print(r)
52-
return r
58+
return format_text_report(report)
59+
5360

5461
async def main() -> None:
55-
# Connect to the worker
5662
pyfuse.connect("redis://localhost:6379")
57-
58-
# Remote call (same function, same args, on a worker)
59-
remote_result = await full_sensor_report.run(3, 50, seed=42)
60-
61-
print(f"Success! Local and remote results match:\n {repr(remote_result)[:80] + '...'}")
63+
report = await full_sensor_report.run(3, 50, seed=42)
64+
print(report)
6265

6366

6467
if __name__ == "__main__":

examples/pdf_report.py

Lines changed: 51 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,20 @@
1515
1616
Usage:
1717
pyfuse worker --backend redis://localhost:6379 --tmp
18-
uvicorn examples.pdf_report:app --reload
18+
python examples/pdf_report.py
1919
20-
curl -X POST localhost:8000/reports -o report.pdf \\
21-
-H 'content-type: application/json' \\
22-
-d '{"title":"Q1","rows":[["Revenue",120000],["Costs",80000]]}'
20+
The script starts the FastAPI app in-process, posts a sample report,
21+
prints the resulting PDF size (and writes it to ``/tmp/pyfuse_report.pdf``),
22+
and exits.
2323
"""
2424

25+
import asyncio
26+
from contextlib import asynccontextmanager
2527
from io import BytesIO
28+
from pathlib import Path
29+
from typing import AsyncIterator
2630

31+
import httpx
2732
import uvicorn
2833
from fastapi import FastAPI
2934
from fastapi.responses import Response
@@ -68,32 +73,61 @@ def render_report(title: str, rows: list[list[str | float]]) -> bytes:
6873
# --- FastAPI app ----------------------------------------------------------
6974

7075
class ReportRequest(BaseModel):
71-
title: str = "Example Report"
76+
title: str = "Quarterly Report"
7277
rows: list[list[str | float]] = [
7378
["Revenue", 120000],
7479
["Costs", 80000],
80+
["Profit", 40000],
7581
]
7682

7783

78-
app = FastAPI(title="pyfuse PDF service")
79-
80-
81-
@app.on_event("startup")
82-
async def _startup() -> None:
84+
@asynccontextmanager
85+
async def lifespan(_: FastAPI) -> AsyncIterator[None]:
8386
pyfuse.connect("redis://localhost:6379")
87+
try:
88+
yield
89+
finally:
90+
await pyfuse.disconnect()
8491

8592

86-
@app.on_event("shutdown")
87-
async def _shutdown() -> None:
88-
await pyfuse.disconnect()
93+
app = FastAPI(title="pyfuse PDF service", lifespan=lifespan)
8994

9095

91-
@app.get("/reports")
92-
async def make_report() -> Response:
93-
req = ReportRequest()
96+
@app.post("/reports")
97+
async def make_report(req: ReportRequest) -> Response:
9498
pdf = await render_report.run(req.title, req.rows)
9599
return Response(content=pdf, media_type="application/pdf")
96100

97101

102+
# --- self-driving demo ----------------------------------------------------
103+
104+
async def _demo() -> None:
105+
config = uvicorn.Config(app, host="127.0.0.1", port=8080, log_level="warning")
106+
server = uvicorn.Server(config)
107+
task = asyncio.create_task(server.serve())
108+
while not server.started:
109+
await asyncio.sleep(0.5)
110+
111+
payload = {
112+
"title": "Q1 2026 Report",
113+
"rows": [
114+
["Revenue", 240_000],
115+
["Cost of goods", 90_000],
116+
["Operating expenses", 55_000],
117+
["Net profit", 95_000],
118+
],
119+
}
120+
try:
121+
async with httpx.AsyncClient(base_url="http://127.0.0.1:8080") as client:
122+
resp = await client.post("/reports", json=payload, timeout=60.0)
123+
resp.raise_for_status()
124+
out = Path("/tmp/pyfuse_report.pdf")
125+
out.write_bytes(resp.content)
126+
print(f"PDF: {len(resp.content)} bytes -> {out}")
127+
finally:
128+
server.should_exit = True
129+
await task
130+
131+
98132
if __name__ == "__main__":
99-
uvicorn.run(app, host="0.0.0.0", port=8080)
133+
asyncio.run(_demo())

0 commit comments

Comments
 (0)