Skip to content

Commit 21cc4f1

Browse files
committed
More examples
1 parent bbefeb5 commit 21cc4f1

6 files changed

Lines changed: 491 additions & 3 deletions

File tree

examples/async_db_client.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
"""Async DB clients (Motor / asyncpg-style) on a pyfuse worker.
2+
3+
Async clients are bound to the event loop they were created on. Running
4+
them on a worker is fine *as long as* you create them inside the traced
5+
async function -- pyfuse's worker awaits the coroutine on its own loop,
6+
so a freshly-created client picks up the right loop automatically.
7+
8+
This example uses Motor (async MongoDB driver). The pattern is identical
9+
for ``asyncpg``, ``aioredis``, ``aiomysql``, etc.
10+
11+
Usage:
12+
docker run -d -p 27017:27017 --name pyfuse-mongo mongo:7
13+
pyfuse worker --backend redis://localhost:6379 --tmp
14+
python -m pyfuse run --tmp examples/async_db_client.py
15+
"""
16+
17+
import asyncio
18+
from typing import Any
19+
20+
import motor.motor_asyncio as motor
21+
22+
import pyfuse
23+
from pyfuse import trace
24+
25+
pyfuse.connect("redis://localhost:6379")
26+
27+
MONGO_URL = "mongodb://localhost:27017"
28+
DB_NAME = "pyfuse_demo_async"
29+
30+
31+
@trace
32+
async def upsert_user(user_id: int, name: str, score: float) -> str:
33+
"""Async upsert -- exercises the worker's event loop end-to-end."""
34+
client = motor.AsyncIOMotorClient(MONGO_URL, serverSelectionTimeoutMS=3000)
35+
try:
36+
db = client[DB_NAME]
37+
await db.users.update_one(
38+
{"_id": user_id},
39+
{"$set": {"name": name, "score": score}},
40+
upsert=True,
41+
)
42+
return f"upserted user {user_id}"
43+
finally:
44+
client.close()
45+
46+
47+
@trace
48+
async def top_users(limit: int = 5) -> list[dict[str, Any]]:
49+
"""Concurrent reads via asyncio.gather inside the worker loop."""
50+
client = motor.AsyncIOMotorClient(MONGO_URL, serverSelectionTimeoutMS=3000)
51+
try:
52+
db = client[DB_NAME]
53+
cursor = db.users.find({}, {"_id": 1, "name": 1, "score": 1})
54+
cursor = cursor.sort("score", -1).limit(limit)
55+
return [doc async for doc in cursor]
56+
finally:
57+
client.close()
58+
59+
60+
async def main() -> None:
61+
# Fan-out a batch of upserts -- each runs as its own remote task,
62+
# each with its own client / loop on the worker side.
63+
users = [(i, f"user-{i}", float(i * 7 % 31)) for i in range(20)]
64+
acks = await upsert_user.map(users)
65+
print(f"{len(acks)} upserts acknowledged")
66+
67+
leaderboard = await top_users.run(5)
68+
print("Leaderboard:")
69+
for row in leaderboard:
70+
print(f" {row['name']:<10} score={row['score']}")
71+
72+
73+
if __name__ == "__main__":
74+
asyncio.run(main())

examples/fastapi_offload.py

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
"""Use pyfuse to offload heavy work from a FastAPI request handler.
2+
3+
A common production pattern: the web app stays light and responsive, while
4+
CPU-bound or slow I/O work is shipped to a worker pool over Redis. The
5+
request handler awaits the result and returns it.
6+
7+
Endpoints:
8+
GET / -- healthcheck
9+
POST /jobs/sync -- await the worker, return result
10+
POST /jobs/async -- start the task, return task_id
11+
GET /jobs/{task_id} -- poll status / result
12+
13+
Usage:
14+
pyfuse worker --backend redis://localhost:6379 --tmp
15+
uvicorn examples.fastapi_offload:app --reload
16+
curl -X POST localhost:8000/jobs/sync -d '{"text":"hello WORLD"}' \
17+
-H 'content-type: application/json'
18+
"""
19+
20+
import asyncio
21+
import hashlib
22+
from typing import Any
23+
24+
from fastapi import FastAPI, HTTPException
25+
from pydantic import BaseModel
26+
27+
import pyfuse
28+
from pyfuse import trace, Result
29+
30+
31+
# --- traced workloads ------------------------------------------------------
32+
33+
@trace
34+
def normalize(text: str) -> dict[str, Any]:
35+
"""Cheap demo workload -- runs on the worker."""
36+
cleaned = " ".join(text.split()).lower()
37+
return {
38+
"length": len(cleaned),
39+
"sha1": hashlib.sha1(cleaned.encode()).hexdigest(),
40+
"preview": cleaned[:80],
41+
}
42+
43+
44+
@trace(timeout=30, retries=2)
45+
def cpu_bound(n: int) -> int:
46+
"""Pretend-CPU work to demonstrate timeout / retry semantics."""
47+
total = 0
48+
for i in range(n):
49+
total += (i * i) % 97
50+
return total
51+
52+
53+
# --- FastAPI app -----------------------------------------------------------
54+
55+
class JobRequest(BaseModel):
56+
text: str = ""
57+
n: int = 1_000_000
58+
59+
60+
app = FastAPI(title="pyfuse offload demo")
61+
_pending: dict[str, Result[Any]] = {}
62+
63+
64+
@app.on_event("startup")
65+
async def _startup() -> None:
66+
# Connecting at startup avoids a per-request handshake.
67+
pyfuse.connect("redis://localhost:6379")
68+
69+
70+
@app.on_event("shutdown")
71+
async def _shutdown() -> None:
72+
await pyfuse.disconnect()
73+
74+
75+
@app.get("/")
76+
async def root() -> dict[str, str]:
77+
return {"status": "ok"}
78+
79+
80+
@app.post("/jobs/sync")
81+
async def submit_sync(req: JobRequest) -> dict[str, Any]:
82+
"""Block the request until the worker returns."""
83+
try:
84+
return await asyncio.wait_for(normalize.run(req.text), timeout=10)
85+
except asyncio.TimeoutError as exc:
86+
raise HTTPException(status_code=504, detail="worker timeout") from exc
87+
88+
89+
@app.post("/jobs/async")
90+
async def submit_async(req: JobRequest) -> dict[str, str]:
91+
"""Fire-and-forget; return the task_id for polling."""
92+
handle = await cpu_bound.start(req.n)
93+
_pending[handle.task_id] = handle
94+
return {"task_id": handle.task_id}
95+
96+
97+
@app.get("/jobs/{task_id}")
98+
async def poll(task_id: str) -> dict[str, Any]:
99+
handle = _pending.get(task_id)
100+
if handle is None:
101+
raise HTTPException(status_code=404, detail="unknown task")
102+
if not await handle.done():
103+
return {"task_id": task_id, "status": "pending"}
104+
try:
105+
result = await handle
106+
except Exception as exc:
107+
return {"task_id": task_id, "status": "error", "detail": str(exc)}
108+
finally:
109+
_pending.pop(task_id, None)
110+
return {"task_id": task_id, "status": "ok", "result": result}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
"""Async HTTP fan-out from a remote worker.
2+
3+
Each traced call opens its own ``httpx.AsyncClient`` (HTTP/2, connection
4+
pool, async DNS) and probes a list of URLs concurrently. This is a
5+
faithful test of the worker's event-loop integration: dozens of TCP
6+
sockets, TLS handshakes, and async iterators per task.
7+
8+
The client is created *inside* the async function so it binds to the
9+
worker's running loop. The ``async with`` block guarantees the pool is
10+
torn down before the task returns.
11+
12+
Usage:
13+
pyfuse worker --backend redis://localhost:6379 --tmp
14+
python -m pyfuse run --tmp examples/httpx_concurrent_scrape.py
15+
"""
16+
17+
import asyncio
18+
from typing import Any
19+
20+
import httpx
21+
22+
import pyfuse
23+
from pyfuse import trace, progress
24+
25+
pyfuse.connect("redis://localhost:6379")
26+
27+
28+
URLS = [
29+
"https://example.com",
30+
"https://www.python.org",
31+
"https://httpbin.org/get",
32+
"https://httpbin.org/status/200",
33+
"https://httpbin.org/status/404",
34+
"https://httpbin.org/delay/1",
35+
"https://httpbin.org/headers",
36+
"https://httpbin.org/uuid",
37+
]
38+
39+
40+
@trace(timeout=30, retries=1)
41+
async def probe_many(urls: list[str], concurrency: int = 4) -> list[dict[str, Any]]:
42+
"""Hit every URL once, capture status / size / latency."""
43+
sem = asyncio.Semaphore(concurrency)
44+
results: list[dict[str, Any]] = []
45+
46+
async with httpx.AsyncClient(http2=True, timeout=10.0) as client:
47+
async def probe(url: str) -> dict[str, Any]:
48+
loop = asyncio.get_running_loop()
49+
start = loop.time()
50+
try:
51+
async with sem:
52+
r = await client.get(url)
53+
return {
54+
"url": url,
55+
"status": r.status_code,
56+
"bytes": len(r.content),
57+
"elapsed_ms": round((loop.time() - start) * 1000, 1),
58+
}
59+
except Exception as exc:
60+
return {
61+
"url": url,
62+
"status": None,
63+
"error": type(exc).__name__,
64+
"elapsed_ms": round((loop.time() - start) * 1000, 1),
65+
}
66+
67+
tasks = [asyncio.create_task(probe(u)) for u in urls]
68+
for i, coro in enumerate(asyncio.as_completed(tasks), 1):
69+
results.append(await coro)
70+
progress(i, len(urls), message=f"{i}/{len(urls)} probed")
71+
72+
return results
73+
74+
75+
async def main() -> None:
76+
handle = await probe_many.start(URLS, concurrency=4)
77+
while not await handle.done():
78+
p = await handle.progress()
79+
if p is not None:
80+
print(p)
81+
await asyncio.sleep(0.3)
82+
83+
rows = await handle
84+
print(f"\nProbed {len(rows)} URLs:")
85+
for row in sorted(rows, key=lambda r: r["elapsed_ms"]):
86+
status = row.get("status") or row.get("error")
87+
print(f" {row['elapsed_ms']:>7.1f}ms {str(status):<5} {row['url']}")
88+
89+
90+
if __name__ == "__main__":
91+
asyncio.run(main())

examples/mongodb_client.py

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
"""Real-world MongoDB usage with pyfuse.
2+
3+
DB clients are stateful: connection pools, sockets, background heartbeat
4+
threads. Two rules to keep in mind when running them on a remote worker:
5+
6+
1. Open the client *inside* the traced function, not at module level.
7+
pyfuse caches the reconstructed namespace per subgraph, so a module-level
8+
client would survive across calls -- with stale sockets, wrong loop, etc.
9+
10+
2. Close the client before returning. Otherwise the worker leaks a pool
11+
and a heartbeat thread per task.
12+
13+
The example runs three independent tasks against a real MongoDB and uses
14+
``pymongo``'s aggregation framework so the round-trip is non-trivial.
15+
16+
Usage:
17+
# Start a Mongo + worker:
18+
docker run -d -p 27017:27017 --name pyfuse-mongo mongo:7
19+
pyfuse worker --backend redis://localhost:6379 --tmp
20+
21+
# Run the script (will pip-install pymongo on the worker):
22+
python -m pyfuse run --tmp examples/mongodb_client.py
23+
"""
24+
25+
import asyncio
26+
import random
27+
from contextlib import contextmanager
28+
from typing import Any, Iterator
29+
30+
import pymongo
31+
32+
import pyfuse
33+
from pyfuse import trace
34+
35+
pyfuse.connect("redis://localhost:6379")
36+
37+
MONGO_URL = "mongodb://localhost:27017"
38+
DB_NAME = "pyfuse_demo"
39+
40+
41+
@contextmanager
42+
def mongo_db(url: str, db_name: str) -> Iterator[Any]:
43+
"""Open a client, yield the database, always close on exit."""
44+
client = pymongo.MongoClient(url, serverSelectionTimeoutMS=3000)
45+
try:
46+
yield client[db_name]
47+
finally:
48+
client.close()
49+
50+
51+
@trace
52+
def seed_events(n: int, seed: int = 0) -> int:
53+
"""Insert *n* synthetic events, return the inserted count."""
54+
rng = random.Random(seed)
55+
levels = ["info", "warn", "error"]
56+
docs = [
57+
{
58+
"level": rng.choice(levels),
59+
"service": f"svc-{rng.randint(0, 4)}",
60+
"latency_ms": round(rng.gauss(120, 40), 2),
61+
}
62+
for _ in range(n)
63+
]
64+
with mongo_db(MONGO_URL, DB_NAME) as db:
65+
db.events.drop()
66+
result = db.events.insert_many(docs)
67+
return len(result.inserted_ids)
68+
69+
70+
@trace
71+
def aggregate_by_service() -> list[dict[str, Any]]:
72+
"""Run an aggregation pipeline and return the result."""
73+
pipeline = [
74+
{"$match": {"level": {"$in": ["warn", "error"]}}},
75+
{
76+
"$group": {
77+
"_id": "$service",
78+
"count": {"$sum": 1},
79+
"p95_latency": {"$max": "$latency_ms"},
80+
}
81+
},
82+
{"$sort": {"count": -1}},
83+
]
84+
with mongo_db(MONGO_URL, DB_NAME) as db:
85+
return [
86+
{"service": r["_id"], "count": r["count"], "p95": r["p95_latency"]}
87+
for r in db.events.aggregate(pipeline)
88+
]
89+
90+
91+
@trace
92+
def purge_events() -> int:
93+
"""Drop the collection. Returns the previous document count."""
94+
with mongo_db(MONGO_URL, DB_NAME) as db:
95+
before = db.events.count_documents({})
96+
db.events.drop()
97+
return before
98+
99+
100+
async def main() -> None:
101+
inserted = await seed_events.run(2000, seed=42)
102+
print(f"Inserted {inserted} events")
103+
104+
summary = await aggregate_by_service.run()
105+
print("Top noisy services:")
106+
for row in summary:
107+
print(f" {row['service']:<8} count={row['count']:<4} p95={row['p95']}ms")
108+
109+
removed = await purge_events.run()
110+
print(f"Cleaned up {removed} events")
111+
112+
113+
if __name__ == "__main__":
114+
asyncio.run(main())

0 commit comments

Comments
 (0)