-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlarge_module.py
More file actions
56 lines (43 loc) · 2.02 KB
/
Copy pathlarge_module.py
File metadata and controls
56 lines (43 loc) · 2.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
"""Run a large multi-file module on a remote worker.
The stress_test_module has 47 functions across 7 files, 3 classes, and deep
dependency chains. Only the entry point below is decorated with @offwork.task --
offwork discovers everything else automatically.
Usage:
# Terminal 1 -- start a worker
offwork worker --backend local://localhost:9748
# Terminal 2 -- run this script
python examples/large_module.py
"""
import asyncio
import offwork
from tests.fixtures.stress_test_module.analyzers import build_report, detect_anomalies, analyze_measurements
from tests.fixtures.stress_test_module.formatters import format_text_report
from tests.fixtures.stress_test_module.generators import inject_anomalies, generate_measurements
from tests.fixtures.stress_test_module.validators import validate_measurements
from tests.fixtures.stress_test_module.transformers import compute_deltas, normalize_units, clean_measurements
@offwork.task
def full_sensor_report(sensor_count: int, readings_per_sensor: int, seed: int = 42) -> str:
"""
Calls functions from multiple modules, with complex dependency chains.
offwork captures their source and dependencies automatically.
"""
measurements = generate_measurements(sensor_count, readings_per_sensor, seed)
measurements = inject_anomalies(measurements, anomaly_rate=0.05, seed=seed + 1)
validate_measurements(measurements)
cleaned = clean_measurements(measurements)
normalized = normalize_units(cleaned, {"fahrenheit": "celsius"})
deltas = compute_deltas(normalized)
stats = analyze_measurements(normalized)
anomalies = detect_anomalies(normalized)
report = build_report(
f"Sensor Report ({sensor_count} sensors, seed={seed})",
stats, anomalies, normalized,
)
report["delta_count"] = len(deltas)
return format_text_report(report)
async def main() -> None:
offwork.connect("local://localhost:9748")
report = await full_sensor_report.run(3, 50, seed=42)
print(report)
if __name__ == "__main__":
asyncio.run(main())