-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcsv_etl.py
More file actions
101 lines (73 loc) · 3.02 KB
/
Copy pathcsv_etl.py
File metadata and controls
101 lines (73 loc) · 3.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
"""Chunked CSV transform with pandas, fanned out across workers.
A daily ETL pattern: split a large CSV into row-ranges, send each chunk
to a worker, merge the partial results locally. Each task is pure --
bytes in, dict out.
The traced entry point ``summarize_chunk`` calls three plain helpers
(``_load``, ``_clean``, ``_aggregate``). None of them are decorated;
offwork picks them up by walking the call graph of the entry point and
sends them along with the task.
Usage:
offwork worker --backend local://localhost:9748 --tmp
python -m offwork run --tmp examples/csv_etl.py
"""
import asyncio
import csv
import io
import random
import pandas as pd
import offwork
offwork.connect("local://localhost:9748")
# --- helpers (auto-discovered) --------------------------------------------
def _load(csv_bytes: bytes) -> pd.DataFrame:
return pd.read_csv(io.BytesIO(csv_bytes))
def _clean(df: pd.DataFrame) -> pd.DataFrame:
df = df.dropna(subset=["amount"])
df = df.assign(amount=df["amount"].astype(float))
return df[df["amount"] >= 0]
def _aggregate(df: pd.DataFrame) -> dict[str, float | int]:
if df.empty:
return {"rows": 0, "sum": 0.0, "min": 0.0, "max": 0.0}
return {
"rows": int(len(df)),
"sum": float(df["amount"].sum()),
"min": float(df["amount"].min()),
"max": float(df["amount"].max()),
}
# --- entry point ----------------------------------------------------------
@offwork.task
def summarize_chunk(csv_bytes: bytes) -> dict[str, float | int]:
"""Clean a CSV chunk and return per-chunk aggregates."""
return _aggregate(_clean(_load(csv_bytes)))
# --- local-only test data + dispatch -------------------------------------
def make_synthetic_csv(rows: int, seed: int) -> bytes:
rng = random.Random(seed)
buf = io.StringIO()
w = csv.writer(buf)
w.writerow(["id", "amount", "category"])
for i in range(rows):
amount = "" if rng.random() < 0.02 else f"{rng.gauss(50, 20):.2f}"
w.writerow([i, amount, rng.choice(["a", "b", "c"])])
return buf.getvalue().encode()
def split_csv(blob: bytes, chunks: int) -> list[bytes]:
text = blob.decode()
header, _, body = text.partition("\n")
lines = body.splitlines()
step = max(1, len(lines) // chunks)
return [
"\n".join([header, *lines[i : i + step]]).encode()
for i in range(0, len(lines), step)
]
async def main() -> None:
blob = make_synthetic_csv(rows=50_000, seed=7)
chunks = split_csv(blob, chunks=8)
print(f"Dispatching {len(chunks)} chunks ({len(blob) // 1024} KiB total)")
partials = await summarize_chunk.map([(c,) for c in chunks])
total_rows = sum(p["rows"] for p in partials)
total_sum = sum(p["sum"] for p in partials)
print(f"rows kept: {total_rows}")
print(f"sum: {total_sum:.2f}")
print(f"avg: {total_sum / total_rows:.2f}")
print(f"min/max: {min(p['min'] for p in partials):.2f} / "
f"{max(p['max'] for p in partials):.2f}")
if __name__ == "__main__":
asyncio.run(main())