Skip to content

Commit f8443f6

Browse files
committed
Better examples
1 parent 21cc4f1 commit f8443f6

10 files changed

Lines changed: 493 additions & 488 deletions

examples/async_db_client.py

Lines changed: 0 additions & 74 deletions
This file was deleted.

examples/csv_etl.py

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
"""Chunked CSV transform with pandas, fanned out across workers.
2+
3+
A daily ETL pattern: split a large CSV into row-ranges, send each chunk
4+
to a worker, merge the partial results locally. Each task is pure --
5+
bytes in, dict out.
6+
7+
The traced entry point ``summarize_chunk`` calls three plain helpers
8+
(``_load``, ``_clean``, ``_aggregate``). None of them are decorated;
9+
pyfuse picks them up by walking the call graph of the entry point and
10+
sends them along with the task.
11+
12+
Usage:
13+
pyfuse worker --backend redis://localhost:6379 --tmp
14+
python -m pyfuse run --tmp examples/csv_etl.py
15+
"""
16+
17+
import asyncio
18+
import csv
19+
import io
20+
import random
21+
22+
import pandas as pd
23+
24+
import pyfuse
25+
from pyfuse import trace
26+
27+
pyfuse.connect("redis://localhost:6379")
28+
29+
30+
# --- helpers (auto-discovered) --------------------------------------------
31+
32+
def _load(csv_bytes: bytes) -> pd.DataFrame:
33+
return pd.read_csv(io.BytesIO(csv_bytes))
34+
35+
36+
def _clean(df: pd.DataFrame) -> pd.DataFrame:
37+
df = df.dropna(subset=["amount"])
38+
df = df.assign(amount=df["amount"].astype(float))
39+
return df[df["amount"] >= 0]
40+
41+
42+
def _aggregate(df: pd.DataFrame) -> dict[str, float | int]:
43+
if df.empty:
44+
return {"rows": 0, "sum": 0.0, "min": 0.0, "max": 0.0}
45+
return {
46+
"rows": int(len(df)),
47+
"sum": float(df["amount"].sum()),
48+
"min": float(df["amount"].min()),
49+
"max": float(df["amount"].max()),
50+
}
51+
52+
53+
# --- entry point ----------------------------------------------------------
54+
55+
@trace
56+
def summarize_chunk(csv_bytes: bytes) -> dict[str, float | int]:
57+
"""Clean a CSV chunk and return per-chunk aggregates."""
58+
return _aggregate(_clean(_load(csv_bytes)))
59+
60+
61+
# --- local-only test data + dispatch -------------------------------------
62+
63+
def make_synthetic_csv(rows: int, seed: int) -> bytes:
64+
rng = random.Random(seed)
65+
buf = io.StringIO()
66+
w = csv.writer(buf)
67+
w.writerow(["id", "amount", "category"])
68+
for i in range(rows):
69+
amount = "" if rng.random() < 0.02 else f"{rng.gauss(50, 20):.2f}"
70+
w.writerow([i, amount, rng.choice(["a", "b", "c"])])
71+
return buf.getvalue().encode()
72+
73+
74+
def split_csv(blob: bytes, chunks: int) -> list[bytes]:
75+
text = blob.decode()
76+
header, _, body = text.partition("\n")
77+
lines = body.splitlines()
78+
step = max(1, len(lines) // chunks)
79+
return [
80+
"\n".join([header, *lines[i : i + step]]).encode()
81+
for i in range(0, len(lines), step)
82+
]
83+
84+
85+
async def main() -> None:
86+
blob = make_synthetic_csv(rows=50_000, seed=7)
87+
chunks = split_csv(blob, chunks=8)
88+
print(f"Dispatching {len(chunks)} chunks ({len(blob) // 1024} KiB total)")
89+
90+
partials = await summarize_chunk.map([(c,) for c in chunks])
91+
92+
total_rows = sum(p["rows"] for p in partials)
93+
total_sum = sum(p["sum"] for p in partials)
94+
print(f"rows kept: {total_rows}")
95+
print(f"sum: {total_sum:.2f}")
96+
print(f"avg: {total_sum / total_rows:.2f}")
97+
print(f"min/max: {min(p['min'] for p in partials):.2f} / "
98+
f"{max(p['max'] for p in partials):.2f}")
99+
100+
101+
if __name__ == "__main__":
102+
asyncio.run(main())

examples/email_attachments.py

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
"""Poll an IMAP mailbox locally, fan attachment work out to workers.
2+
3+
Two concerns kept apart:
4+
5+
* The **poller** (this script's main loop) is stateful: it owns the IMAP
6+
connection and tracks which UIDs were already seen. It stays local.
7+
* The **per-attachment work** is pure: bytes in, structured result out.
8+
That's the pyfuse task.
9+
10+
The traced entry point ``process_attachment`` is small. It calls three
11+
plain helpers -- ``_classify``, ``_extract_text``, ``_score_risk`` --
12+
that are not decorated. pyfuse discovers them by walking the AST and
13+
ships their source as part of the same task envelope.
14+
15+
Usage:
16+
pyfuse worker --backend redis://localhost:6379 --tmp
17+
python -m pyfuse run --tmp examples/email_attachments.py
18+
"""
19+
20+
import asyncio
21+
import hashlib
22+
import imaplib
23+
import re
24+
from email import message_from_bytes
25+
from email.message import Message
26+
from typing import Any
27+
28+
import pyfuse
29+
from pyfuse import trace
30+
31+
pyfuse.connect("redis://localhost:6379")
32+
33+
34+
# --- helpers (auto-discovered) --------------------------------------------
35+
36+
DANGEROUS_EXTS = {".exe", ".js", ".vbs", ".scr", ".bat"}
37+
SUSPICIOUS_KEYWORDS = ("invoice", "wire", "urgent", "password")
38+
39+
40+
def _classify(filename: str) -> str:
41+
name = filename.lower()
42+
for ext in DANGEROUS_EXTS:
43+
if name.endswith(ext):
44+
return "executable"
45+
if name.endswith((".pdf", ".doc", ".docx")):
46+
return "document"
47+
if name.endswith((".png", ".jpg", ".jpeg", ".gif")):
48+
return "image"
49+
return "other"
50+
51+
52+
def _extract_text(payload: bytes, kind: str) -> str:
53+
if kind != "document":
54+
return ""
55+
# Stub: real version would shell out to pdftotext / antiword / etc.
56+
text = payload.decode("utf-8", errors="ignore")
57+
return re.sub(r"\s+", " ", text)[:2000]
58+
59+
60+
def _score_risk(filename: str, kind: str, text: str) -> int:
61+
score = 0
62+
if kind == "executable":
63+
score += 80
64+
if any(kw in text.lower() for kw in SUSPICIOUS_KEYWORDS):
65+
score += 25
66+
if any(kw in filename.lower() for kw in SUSPICIOUS_KEYWORDS):
67+
score += 15
68+
return min(score, 100)
69+
70+
71+
# --- entry point ----------------------------------------------------------
72+
73+
@trace
74+
def process_attachment(filename: str, payload: bytes) -> dict[str, Any]:
75+
"""Inspect a single attachment. Calls the helpers above on the worker."""
76+
kind = _classify(filename)
77+
text = _extract_text(payload, kind)
78+
return {
79+
"filename": filename,
80+
"size": len(payload),
81+
"sha256": hashlib.sha256(payload).hexdigest(),
82+
"kind": kind,
83+
"risk": _score_risk(filename, kind, text),
84+
}
85+
86+
87+
# --- local-only IMAP plumbing --------------------------------------------
88+
89+
def iter_attachments(msg: Message) -> list[tuple[str, bytes]]:
90+
out: list[tuple[str, bytes]] = []
91+
for part in msg.walk():
92+
if part.get_content_maintype() == "multipart":
93+
continue
94+
if part.get("Content-Disposition") is None:
95+
continue
96+
name = part.get_filename()
97+
payload = part.get_payload(decode=True)
98+
if name and isinstance(payload, bytes):
99+
out.append((name, payload))
100+
return out
101+
102+
103+
def fetch_unseen(host: str, user: str, password: str) -> list[Message]:
104+
cli = imaplib.IMAP4_SSL(host)
105+
try:
106+
cli.login(user, password)
107+
cli.select("INBOX")
108+
_, data = cli.search(None, "UNSEEN")
109+
msgs: list[Message] = []
110+
for uid in data[0].split():
111+
_, parts = cli.fetch(uid, "(RFC822)")
112+
raw = parts[0][1] if parts and parts[0] else None
113+
if isinstance(raw, bytes):
114+
msgs.append(message_from_bytes(raw))
115+
return msgs
116+
finally:
117+
cli.logout()
118+
119+
120+
async def main() -> None:
121+
host, user, password = "imap.example.com", "you@example.com", "secret"
122+
123+
while True:
124+
try:
125+
messages = await asyncio.to_thread(fetch_unseen, host, user, password)
126+
except Exception as exc:
127+
print(f"poll failed: {exc}")
128+
await asyncio.sleep(30)
129+
continue
130+
131+
attachments = [a for m in messages for a in iter_attachments(m)]
132+
if not attachments:
133+
await asyncio.sleep(30)
134+
continue
135+
136+
results = await process_attachment.map(attachments)
137+
for r in results:
138+
flag = "!!" if r["risk"] >= 50 else "ok"
139+
print(f" [{flag}] {r['filename']:<40} {r['kind']:<10} risk={r['risk']}")
140+
141+
await asyncio.sleep(30)
142+
143+
144+
if __name__ == "__main__":
145+
asyncio.run(main())

0 commit comments

Comments
 (0)