Skip to content

Commit 1907f60

Browse files
committed
Add Telegram / Discord / Teams / PagerDuty notification sinks
1 parent f503173 commit 1907f60

4 files changed

Lines changed: 366 additions & 0 deletions

File tree

automation_file/__init__.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,11 +88,15 @@
8888
zip_info,
8989
)
9090
from automation_file.notify import (
91+
DiscordSink,
9192
EmailSink,
9293
NotificationException,
9394
NotificationManager,
9495
NotificationSink,
96+
PagerDutySink,
9597
SlackSink,
98+
TeamsSink,
99+
TelegramSink,
96100
WebhookSink,
97101
notification_manager,
98102
notify_send,
@@ -348,11 +352,15 @@ def __getattr__(name: str) -> Any:
348352
"progress_registry",
349353
"register_progress_ops",
350354
# Notifications
355+
"DiscordSink",
351356
"EmailSink",
352357
"NotificationException",
353358
"NotificationManager",
354359
"NotificationSink",
360+
"PagerDutySink",
355361
"SlackSink",
362+
"TeamsSink",
363+
"TelegramSink",
356364
"WebhookSink",
357365
"notification_manager",
358366
"notify_send",

automation_file/notify/__init__.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,26 @@
1717
register_notify_ops,
1818
)
1919
from automation_file.notify.sinks import (
20+
DiscordSink,
2021
EmailSink,
2122
NotificationSink,
23+
PagerDutySink,
2224
SlackSink,
25+
TeamsSink,
26+
TelegramSink,
2327
WebhookSink,
2428
)
2529

2630
__all__ = [
31+
"DiscordSink",
2732
"EmailSink",
2833
"NotificationException",
2934
"NotificationManager",
3035
"NotificationSink",
36+
"PagerDutySink",
3137
"SlackSink",
38+
"TeamsSink",
39+
"TelegramSink",
3240
"WebhookSink",
3341
"notification_manager",
3442
"notify_send",

automation_file/notify/sinks.py

Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,218 @@ def __repr__(self) -> str:
194194
)
195195

196196

197+
class TelegramSink(NotificationSink):
198+
"""Telegram Bot API sink.
199+
200+
``bot_token`` is a secret — it's combined into the request URL and never
201+
logged. ``chat_id`` identifies the target channel or user.
202+
"""
203+
204+
_LEVEL_PREFIX: ClassVar[dict[str, str]] = {
205+
"info": "",
206+
"warning": "⚠️ ",
207+
"error": "🚨 ",
208+
}
209+
_API_HOST: ClassVar[str] = "https://api.telegram.org"
210+
211+
def __init__(
212+
self,
213+
bot_token: str,
214+
chat_id: str | int,
215+
*,
216+
name: str = "telegram",
217+
timeout: float = _DEFAULT_TIMEOUT,
218+
) -> None:
219+
if not bot_token:
220+
raise NotificationException("telegram sink requires a bot_token")
221+
if chat_id in (None, ""):
222+
raise NotificationException("telegram sink requires a chat_id")
223+
self._bot_token = bot_token
224+
self.chat_id = chat_id
225+
self.name = name
226+
self.timeout = timeout
227+
self._url = f"{self._API_HOST}/bot{bot_token}/sendMessage"
228+
validate_http_url(self._url)
229+
230+
def send(self, subject: str, body: str, level: str = "info") -> None:
231+
self._check_level(level)
232+
prefix = self._LEVEL_PREFIX[level]
233+
text = f"{prefix}{subject}\n{self._truncate(body)}"
234+
payload = {"chat_id": self.chat_id, "text": text, "disable_web_page_preview": True}
235+
try:
236+
response = requests.post(
237+
self._url,
238+
data=json.dumps(payload).encode("utf-8"),
239+
headers={"Content-Type": "application/json"},
240+
timeout=self.timeout,
241+
allow_redirects=False,
242+
)
243+
except requests.RequestException as err:
244+
raise NotificationException(f"telegram sink {self.name!r} post failed: {err}") from err
245+
if response.status_code >= 400:
246+
raise NotificationException(
247+
f"telegram sink {self.name!r} returned HTTP {response.status_code}"
248+
)
249+
250+
def __repr__(self) -> str:
251+
return f"TelegramSink(name={self.name!r}, chat_id={self.chat_id!r})"
252+
253+
254+
class DiscordSink(NotificationSink):
255+
"""Discord incoming-webhook sink — POSTs ``{content}`` to the webhook URL."""
256+
257+
_LEVEL_PREFIX: ClassVar[dict[str, str]] = {
258+
"info": "",
259+
"warning": ":warning: ",
260+
"error": ":rotating_light: ",
261+
}
262+
# Discord caps message content at 2000 characters.
263+
_MAX_CONTENT: ClassVar[int] = 1900
264+
265+
def __init__(
266+
self,
267+
webhook_url: str,
268+
*,
269+
name: str = "discord",
270+
timeout: float = _DEFAULT_TIMEOUT,
271+
) -> None:
272+
validate_http_url(webhook_url)
273+
self._url = webhook_url
274+
self.name = name
275+
self.timeout = timeout
276+
277+
def send(self, subject: str, body: str, level: str = "info") -> None:
278+
self._check_level(level)
279+
prefix = self._LEVEL_PREFIX[level]
280+
content = f"{prefix}**{subject}**\n{self._truncate(body)}"
281+
if len(content) > self._MAX_CONTENT:
282+
content = content[: self._MAX_CONTENT] + "…"
283+
try:
284+
response = requests.post(
285+
self._url,
286+
data=json.dumps({"content": content}).encode("utf-8"),
287+
headers={"Content-Type": "application/json"},
288+
timeout=self.timeout,
289+
allow_redirects=False,
290+
)
291+
except requests.RequestException as err:
292+
raise NotificationException(f"discord sink {self.name!r} post failed: {err}") from err
293+
if response.status_code >= 400:
294+
raise NotificationException(
295+
f"discord sink {self.name!r} returned HTTP {response.status_code}"
296+
)
297+
298+
299+
class TeamsSink(NotificationSink):
300+
"""Microsoft Teams incoming-webhook sink using the legacy MessageCard schema."""
301+
302+
_LEVEL_COLOR: ClassVar[dict[str, str]] = {
303+
"info": "2E86DE",
304+
"warning": "E67E22",
305+
"error": "C0392B",
306+
}
307+
308+
def __init__(
309+
self,
310+
webhook_url: str,
311+
*,
312+
name: str = "teams",
313+
timeout: float = _DEFAULT_TIMEOUT,
314+
) -> None:
315+
validate_http_url(webhook_url)
316+
self._url = webhook_url
317+
self.name = name
318+
self.timeout = timeout
319+
320+
def send(self, subject: str, body: str, level: str = "info") -> None:
321+
self._check_level(level)
322+
payload = {
323+
"@type": "MessageCard",
324+
"@context": "https://schema.org/extensions",
325+
"summary": subject,
326+
"themeColor": self._LEVEL_COLOR[level],
327+
"title": subject,
328+
"text": self._truncate(body),
329+
}
330+
try:
331+
response = requests.post(
332+
self._url,
333+
data=json.dumps(payload).encode("utf-8"),
334+
headers={"Content-Type": "application/json"},
335+
timeout=self.timeout,
336+
allow_redirects=False,
337+
)
338+
except requests.RequestException as err:
339+
raise NotificationException(f"teams sink {self.name!r} post failed: {err}") from err
340+
if response.status_code >= 400:
341+
raise NotificationException(
342+
f"teams sink {self.name!r} returned HTTP {response.status_code}"
343+
)
344+
345+
346+
class PagerDutySink(NotificationSink):
347+
"""PagerDuty Events API v2 sink.
348+
349+
``routing_key`` is the integration key for a PagerDuty service and is
350+
treated as a secret. Each ``send`` enqueues a ``trigger`` event unless
351+
the level is explicitly ``info``, in which case it sends ``acknowledge``
352+
semantics via ``event_action='trigger'`` + ``severity='info'``.
353+
"""
354+
355+
_ENQUEUE_URL: ClassVar[str] = "https://events.pagerduty.com/v2/enqueue"
356+
_LEVEL_SEVERITY: ClassVar[dict[str, str]] = {
357+
"info": "info",
358+
"warning": "warning",
359+
"error": "error",
360+
}
361+
362+
def __init__(
363+
self,
364+
routing_key: str,
365+
*,
366+
source: str = "automation_file",
367+
name: str = "pagerduty",
368+
timeout: float = _DEFAULT_TIMEOUT,
369+
) -> None:
370+
if not routing_key:
371+
raise NotificationException("pagerduty sink requires a routing_key")
372+
self._routing_key = routing_key
373+
self.source = source
374+
self.name = name
375+
self.timeout = timeout
376+
validate_http_url(self._ENQUEUE_URL)
377+
378+
def send(self, subject: str, body: str, level: str = "info") -> None:
379+
self._check_level(level)
380+
payload = {
381+
"routing_key": self._routing_key,
382+
"event_action": "trigger",
383+
"payload": {
384+
"summary": subject,
385+
"source": self.source,
386+
"severity": self._LEVEL_SEVERITY[level],
387+
"custom_details": {"body": self._truncate(body)},
388+
},
389+
}
390+
try:
391+
response = requests.post(
392+
self._ENQUEUE_URL,
393+
data=json.dumps(payload).encode("utf-8"),
394+
headers={"Content-Type": "application/json"},
395+
timeout=self.timeout,
396+
allow_redirects=False,
397+
)
398+
except requests.RequestException as err:
399+
raise NotificationException(f"pagerduty sink {self.name!r} post failed: {err}") from err
400+
if response.status_code >= 400:
401+
raise NotificationException(
402+
f"pagerduty sink {self.name!r} returned HTTP {response.status_code}"
403+
)
404+
405+
def __repr__(self) -> str:
406+
return f"PagerDutySink(name={self.name!r}, source={self.source!r})"
407+
408+
197409
def _describe(sink: NotificationSink) -> dict[str, Any]:
198410
info: dict[str, Any] = {"name": sink.name, "type": type(sink).__name__}
199411
if isinstance(sink, WebhookSink):
@@ -202,6 +414,10 @@ def _describe(sink: NotificationSink) -> dict[str, Any]:
202414
info["host"] = sink.host
203415
info["port"] = sink.port
204416
info["recipients"] = sink.recipients
417+
elif isinstance(sink, TelegramSink):
418+
info["chat_id"] = sink.chat_id
419+
elif isinstance(sink, PagerDutySink):
420+
info["source"] = sink.source
205421
return info
206422

207423

0 commit comments

Comments
 (0)