-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathemail_attachments.py
More file actions
161 lines (130 loc) · 5.06 KB
/
Copy pathemail_attachments.py
File metadata and controls
161 lines (130 loc) · 5.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
"""Inspect email attachments on workers, fanned out from a local poller.
Two concerns kept apart:
* The **poller** (this script's main loop) is stateful: it owns the IMAP
connection and tracks which UIDs were already seen. It stays local.
* The **per-attachment work** is pure: bytes in, structured result out.
That's the offwork task.
The traced entry point ``process_attachment`` is small. It calls three
plain helpers -- ``_classify``, ``_extract_text``, ``_score_risk`` --
that are not decorated. offwork discovers them by walking the AST and
ships their source as part of the same task envelope.
Usage:
offwork worker --backend local://localhost:9748 --tmp
python examples/email_attachments.py
The script synthesizes a handful of emails with attachments in-memory
(no IMAP server needed), dispatches each attachment to the worker, and
prints the verdict.
"""
import asyncio
import hashlib
import re
from email import message_from_bytes
from email.message import EmailMessage, Message
from typing import Any
import offwork
offwork.connect("local://localhost:9748")
# --- helpers (auto-discovered) --------------------------------------------
DANGEROUS_EXTS = {".exe", ".js", ".vbs", ".scr", ".bat"}
SUSPICIOUS_KEYWORDS = ("invoice", "wire", "urgent", "password")
def _classify(filename: str) -> str:
name = filename.lower()
for ext in DANGEROUS_EXTS:
if name.endswith(ext):
return "executable"
if name.endswith((".pdf", ".doc", ".docx")):
return "document"
if name.endswith((".png", ".jpg", ".jpeg", ".gif")):
return "image"
return "other"
def _extract_text(payload: bytes, kind: str) -> str:
if kind != "document":
return ""
# Stub: real version would shell out to pdftotext / antiword / etc.
text = payload.decode("utf-8", errors="ignore")
return re.sub(r"\s+", " ", text)[:2000]
def _score_risk(filename: str, kind: str, text: str) -> int:
score = 0
if kind == "executable":
score += 80
if any(kw in text.lower() for kw in SUSPICIOUS_KEYWORDS):
score += 25
if any(kw in filename.lower() for kw in SUSPICIOUS_KEYWORDS):
score += 15
return min(score, 100)
# --- entry point ----------------------------------------------------------
@offwork.task
def process_attachment(filename: str, payload: bytes) -> dict[str, Any]:
"""Inspect a single attachment. Calls the helpers above on the worker."""
kind = _classify(filename)
text = _extract_text(payload, kind)
return {
"filename": filename,
"size": len(payload),
"sha256": hashlib.sha256(payload).hexdigest(),
"kind": kind,
"risk": _score_risk(filename, kind, text),
}
# --- local-only mailbox plumbing -----------------------------------------
def iter_attachments(msg: Message) -> list[tuple[str, bytes]]:
out: list[tuple[str, bytes]] = []
for part in msg.walk():
if part.get_content_maintype() == "multipart":
continue
if part.get("Content-Disposition") is None:
continue
name = part.get_filename()
payload = part.get_payload(decode=True)
if name and isinstance(payload, bytes):
out.append((name, payload))
return out
def synthesize_inbox() -> list[Message]:
"""Build a small in-memory inbox with attachments for the demo."""
samples: list[tuple[str, str, list[tuple[str, bytes]]]] = [
(
"alice@example.com",
"Lunch tomorrow?",
[("photo.jpg", b"\xff\xd8\xff" + b"\x00" * 4096)],
),
(
"billing@vendor.com",
"Invoice INV-9921",
[("invoice.pdf", b"%PDF-1.4 invoice total: $1,250.00 due upon receipt")],
),
(
"noreply@scary.example",
"URGENT: please review attached",
[("urgent_invoice.scr", b"MZ" + b"\x00" * 2048)],
),
(
"ops@partner.com",
"Q1 deck and notes",
[
("deck.pdf", b"%PDF-1.4 strategy review Q1"),
("notes.txt", b"meeting notes go here"),
],
),
]
inbox: list[Message] = []
for sender, subject, atts in samples:
msg = EmailMessage()
msg["From"] = sender
msg["To"] = "you@example.com"
msg["Subject"] = subject
msg.set_content("See attached.")
for name, payload in atts:
msg.add_attachment(
payload, maintype="application", subtype="octet-stream", filename=name,
)
inbox.append(message_from_bytes(bytes(msg)))
return inbox
async def main() -> None:
messages = synthesize_inbox()
attachments = [a for m in messages for a in iter_attachments(m)]
print(f"Inbox: {len(messages)} messages, {len(attachments)} attachments")
results = await process_attachment.map(attachments)
for r in results:
flag = "!!" if r["risk"] >= 50 else "ok"
print(f" [{flag}] {r['filename']:<24} {r['kind']:<10} "
f"size={r['size']:<6} risk={r['risk']}")
if __name__ == "__main__":
asyncio.run(main())