-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpackage_installation.py
More file actions
157 lines (113 loc) · 4.97 KB
/
Copy pathpackage_installation.py
File metadata and controls
157 lines (113 loc) · 4.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
"""Run functions that depend on third-party packages on a bare worker.
The worker has no packages pre-installed (besides the standard library).
offwork detects the imports, installs the packages via pip, and executes
the function -- all automatically.
This example also demonstrates ``worker_only_import``: the client never
installs ``markdown`` or ``python-dateutil`` -- only the worker does.
The local imports resolve to lightweight stubs that raise
``WorkerOnlyError`` if used outside a traced function.
Requires Redis on localhost:6379. Install: pip install redis
Usage:
# Terminal 1 -- start a worker
offwork worker --backend local://localhost:9748
# Terminal 2 -- run this script
python examples/package_installation.py
"""
import asyncio
from html.parser import HTMLParser
import offwork
from offwork import install_package_as, worker_only_import
# Some packages have different import and pip names:
# import yaml -> pip install PyYAML
# import cv2 -> pip install opencv-python
# import PIL -> pip install Pillow
#
# Common mappings are built-in. For others, use install_package_as:
with worker_only_import("PyYAML"):
import yaml
# `worker_only_import` skips the local install entirely. The package only
# needs to be available on the worker. The client gets a stub object that
# raises WorkerOnlyError if used directly, but is fine to reference inside
# a @offwork.task function (it's serialized and re-imported on the worker).
# No pip name needed when import name == package name:
with worker_only_import():
import requests
import markdown
# Pass an explicit pip name when the import name differs from the package:
with worker_only_import("python-dateutil"):
from dateutil import parser as date_parser
# -- Simple case: import name matches package name --------------------------
# When the import name matches the pip package (e.g. `import requests`),
# no extra configuration is needed. The worker runs `pip install requests`.
@offwork.task
def fetch_title(url: str) -> str:
"""Fetch a web page and extract the <title> tag."""
class TitleParser(HTMLParser):
def __init__(self) -> None:
super().__init__()
self.in_title = False
self.title = ""
def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
self.in_title = tag == "title"
def handle_data(self, data: str) -> None:
if self.in_title:
self.title += data
def handle_endtag(self, tag: str) -> None:
if tag == "title":
self.in_title = False
resp = requests.get(url, timeout=5.0)
parser = TitleParser()
parser.feed(resp.text)
return parser.title.strip()
# -- Mismatched names: install_package_as -----------------------------------
@offwork.task
def to_yaml(data: object) -> str:
"""Convert a Python object to YAML. Worker installs PyYAML automatically."""
return yaml.dump(data, default_flow_style=False)
@offwork.task
def parse_date(text: str) -> str:
"""Parse a human-readable date string. Worker installs python-dateutil."""
dt = date_parser.parse(text)
return dt.isoformat()
# -- Multiple packages in one function -------------------------------------
@offwork.task
def markdown_word_freq(md_text: str) -> str:
"""Strip markdown, count word frequencies, return as YAML.
Requires both 'markdown' and 'PyYAML' -- the worker installs both.
"""
class TextExtractor(HTMLParser):
def __init__(self) -> None:
super().__init__()
self.parts: list[str] = []
def handle_data(self, data: str) -> None:
self.parts.append(data)
html = markdown.markdown(md_text)
extractor = TextExtractor()
extractor.feed(html)
plain = " ".join(extractor.parts).lower()
words = plain.split()
freq: dict[str, int] = {}
for w in words:
freq[w] = freq.get(w, 0) + 1
return yaml.dump(dict(sorted(freq.items(), key=lambda x: x[1], reverse=True)))
# ---------------------------------------------------------------------------
async def main() -> None:
offwork.connect("local://localhost:9748")
# Simple package (import name == pip name)
print("--- requests (auto-detected) ---")
title = await fetch_title.run("https://example.com")
print(f" Page title: {title}")
# Mismatched package names
print("\n--- PyYAML (install_package_as) ---")
result = await to_yaml.run({"framework": "offwork", "version": "0.1.2"})
print(f" YAML output:\n{result}")
print("--- python-dateutil (install_package_as) ---")
iso = await parse_date.run("January 5th, 2024 at 3pm")
print(f" Parsed date: {iso}")
# Multiple packages in one function
print("\n--- Multiple packages (markdown + PyYAML) ---")
md = "# Hello\n\nThis is a **test** of the word frequency counter."
freq = await markdown_word_freq.run(md)
print(f" Word frequencies:\n{freq}")
if __name__ == "__main__":
asyncio.run(main())