Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions daiv/notifications/channels/rocketchat.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from core.utils import build_absolute_url, build_uri
from notifications.channels.base import NotificationChannel
from notifications.channels.registry import register_channel
from notifications.channels.rocketchat_renderers.registry import get_renderer
from notifications.choices import ChannelType
from notifications.exceptions import UnrecoverableDeliveryError

Expand Down Expand Up @@ -159,6 +160,22 @@ def _compose_text(notification: Notification) -> str:
return "\n".join(parts)


def _build_payload(notification: Notification, delivery: NotificationDelivery) -> dict:
"""Build the ``chat.postMessage`` body for ``notification``.

Falls back to a plain-text message when no renderer is registered for the
event type, so newly-introduced events deliver before their renderer ships.
"""
channel = f"@{delivery.address}"
renderer = get_renderer(notification.event_type)
if renderer is None:
# Log so a missing renderer doesn't silently degrade to plain text forever.
logger.warning("Rocket Chat: no renderer for event_type=%r; sending plain text", notification.event_type)
return {"channel": channel, "text": _compose_text(notification)}
text, attachments = renderer.render(notification)
return {"channel": channel, "text": text, "attachments": attachments}


@register_channel
class RocketChatChannel(NotificationChannel):
channel_type = ChannelType.ROCKETCHAT
Expand All @@ -176,9 +193,8 @@ def send(self, notification: Notification, delivery: NotificationDelivery) -> No
if client is None:
raise UnrecoverableDeliveryError("Rocket Chat not configured")

text = _compose_text(notification)
try:
_rc_post(client, "chat.postMessage", {"channel": f"@{delivery.address}", "text": text})
_rc_post(client, "chat.postMessage", _build_payload(notification, delivery))
except RocketChatPermanentError as exc:
logger.error("Rocket Chat delivery %s permanently failed: %s", delivery.id, exc)
raise UnrecoverableDeliveryError(str(exc)) from exc
7 changes: 7 additions & 0 deletions daiv/notifications/channels/rocketchat_renderers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""Per-event Rocket Chat renderers. Each submodule self-registers via ``@register_renderer``."""

from notifications.channels.rocketchat_renderers import ( # noqa: F401
job_batch_finished,
job_finished,
schedule_finished,
)
90 changes: 90 additions & 0 deletions daiv/notifications/channels/rocketchat_renderers/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
from __future__ import annotations

import inspect
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, ClassVar

from core.utils import build_absolute_url

if TYPE_CHECKING:
from notifications.choices import EventType
from notifications.models import Notification


# Slack-compatible attachment colors (Rocket Chat accepts hex).
COLOR_SUCCESS = "#22c55e" # green
COLOR_FAILURE = "#ef4444" # red
COLOR_PARTIAL = "#eab308" # yellow

FOOTER = "DAIV"


class RocketChatRenderer(ABC):
"""Base for per-event Rocket Chat attachment renderers.

Subclasses set ``event_type`` and implement ``render``. They are registered
with the ``@register_renderer`` decorator from ``.registry``.
"""

event_type: ClassVar[EventType]

def __init_subclass__(cls, **kwargs) -> None:
super().__init_subclass__(**kwargs)
# Concrete renderers must declare ``event_type`` so the registry can key them and
# so a forgotten assignment surfaces at import time rather than inside chat.postMessage.
if not inspect.isabstract(cls) and "event_type" not in cls.__dict__:
raise TypeError(f"{cls.__name__} must define `event_type` (a notifications.choices.EventType value)")

@abstractmethod
def render(self, notification: Notification) -> tuple[str, list[dict]]:
"""Return ``(text, attachments)`` to send via Rocket Chat's ``chat.postMessage``."""

@staticmethod
def _fmt_tokens(n: int | None) -> str | None:
if n is None:
return None
if n < 1000:
return str(n)
return f"{n / 1000:.1f}k"

@staticmethod
def _fmt_cost(usd: float | None) -> str | None:
if usd is None:
return None
return f"${usd:.2f}"

@staticmethod
def _fmt_duration(seconds: float | None) -> str:
if seconds is None:
return "—"
total = int(seconds)
if total < 60:
return f"{total}s"
if total < 3600:
return f"{total // 60}m {total % 60:02d}s"
return f"{total // 3600}h {(total % 3600) // 60:02d}m"

@staticmethod
def _usage_field(ctx: dict) -> dict | None:
"""Combine input/output tokens into one short field; ``None`` if both are missing."""
in_tokens = RocketChatRenderer._fmt_tokens(ctx.get("input_tokens"))
out_tokens = RocketChatRenderer._fmt_tokens(ctx.get("output_tokens"))
if in_tokens is None and out_tokens is None:
return None
return {"title": "Usage", "value": f"{in_tokens or '—'} in · {out_tokens or '—'} out", "short": True}

@staticmethod
def _cost_field(ctx: dict) -> dict | None:
"""One-field cost, or ``None`` when no cost data is available."""
cost = RocketChatRenderer._fmt_cost(ctx.get("cost_usd"))
if cost is None:
return None
return {"title": "Cost", "value": cost, "short": True}

@staticmethod
def _link(notification: Notification) -> str:
return build_absolute_url(notification.link_url) if notification.link_url else ""

@staticmethod
def _color(ctx: dict) -> str:
return COLOR_SUCCESS if ctx.get("is_successful") else COLOR_FAILURE
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from notifications.channels.rocketchat_renderers.base import (
COLOR_FAILURE,
COLOR_PARTIAL,
COLOR_SUCCESS,
FOOTER,
RocketChatRenderer,
)
from notifications.channels.rocketchat_renderers.registry import register_renderer
from notifications.choices import EventType

if TYPE_CHECKING:
from notifications.models import Notification


_REPO_BREAKDOWN_LIMIT = 8


@register_renderer
class JobBatchFinishedRenderer(RocketChatRenderer):
event_type = EventType.JOB_BATCH_FINISHED

def render(self, notification: Notification) -> tuple[str, list[dict]]:
ctx = notification.context
successful = ctx.get("successful_count", 0)
failed = ctx.get("failed_count", 0)
total = ctx.get("total", successful + failed)

if failed == 0:
color, emoji = COLOR_SUCCESS, "✅"
elif successful == 0:
color, emoji = COLOR_FAILURE, "❌"
else:
color, emoji = COLOR_PARTIAL, "⚠️"

fields: list[dict] = [
{"title": "Results", "value": f"✓ {successful} · ✗ {failed} of {total}", "short": True},
{"title": "Duration", "value": self._fmt_duration(ctx.get("duration_seconds")), "short": True},
]
if owner := ctx.get("trigger_owner"):
fields.append({"title": "Owner", "value": owner, "short": True})
if (usage := self._usage_field(ctx)) is not None:
fields.append(usage)
if (cost := self._cost_field(ctx)) is not None:
fields.append({"title": "Total cost", "value": cost["value"], "short": True})
if breakdown := self._repo_breakdown(ctx.get("repo_results") or []):
fields.append({"title": "Repositories", "value": breakdown, "short": False})

attachment = {
"color": color,
"title": notification.subject,
"title_link": self._link(notification),
"fields": fields,
"footer": FOOTER,
"ts": int(notification.created.timestamp()),
}
return f"{emoji} {notification.subject}", [attachment]

@staticmethod
def _repo_breakdown(repo_results: list[dict]) -> str:
"""Format per-repo outcomes as ``✓ a/b · ✓ c/d · ✗ e/f`` with overflow truncation."""
if not repo_results:
return ""
head = repo_results[:_REPO_BREAKDOWN_LIMIT]
parts = [f"{'✓' if r.get('ok') else '✗'} {r.get('repo', '?')}" for r in head]
overflow = len(repo_results) - len(head)
if overflow > 0:
parts.append(f"… and {overflow} more")
return " · ".join(parts)
39 changes: 39 additions & 0 deletions daiv/notifications/channels/rocketchat_renderers/job_finished.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from notifications.channels.rocketchat_renderers.base import FOOTER, RocketChatRenderer
from notifications.channels.rocketchat_renderers.registry import register_renderer
from notifications.choices import EventType

if TYPE_CHECKING:
from notifications.models import Notification


@register_renderer
class JobFinishedRenderer(RocketChatRenderer):
event_type = EventType.JOB_FINISHED

def render(self, notification: Notification) -> tuple[str, list[dict]]:
ctx = notification.context
ok = ctx.get("is_successful", False)
emoji = "✅" if ok else "❌"

fields: list[dict] = [
{"title": "Trigger", "value": ctx.get("trigger_label") or "—", "short": True},
{"title": "Duration", "value": self._fmt_duration(ctx.get("duration_seconds")), "short": True},
]
if (usage := self._usage_field(ctx)) is not None:
fields.append(usage)
if (cost := self._cost_field(ctx)) is not None:
fields.append(cost)

attachment = {
"color": self._color(ctx),
"title": notification.subject,
"title_link": self._link(notification),
"fields": fields,
"footer": FOOTER,
"ts": int(notification.created.timestamp()),
}
return f"{emoji} {notification.subject}", [attachment]
24 changes: 24 additions & 0 deletions daiv/notifications/channels/rocketchat_renderers/registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from notifications.channels.rocketchat_renderers.base import RocketChatRenderer
from notifications.choices import EventType

_registry: dict[str, type[RocketChatRenderer]] = {}


def register_renderer(cls: type[RocketChatRenderer]) -> type[RocketChatRenderer]:
"""Decorator — registers a Rocket Chat renderer under its ``event_type``."""
event_type = cls.event_type
if event_type in _registry:
raise ValueError(f"Rocket Chat renderer for {event_type!r} already registered")
_registry[event_type] = cls
return cls


def get_renderer(event_type: str | EventType) -> RocketChatRenderer | None:
"""Return a renderer instance for ``event_type``, or ``None`` if no renderer is registered."""
cls = _registry.get(event_type)
return cls() if cls is not None else None
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from notifications.channels.rocketchat_renderers.base import FOOTER, RocketChatRenderer
from notifications.channels.rocketchat_renderers.registry import register_renderer
from notifications.choices import EventType

if TYPE_CHECKING:
from notifications.models import Notification


@register_renderer
class ScheduleFinishedRenderer(RocketChatRenderer):
event_type = EventType.SCHEDULE_FINISHED

def render(self, notification: Notification) -> tuple[str, list[dict]]:
ctx = notification.context
ok = ctx.get("is_successful", False)
emoji = "✅" if ok else "❌"

fields: list[dict] = [
{"title": "Repository", "value": ctx.get("repo_id") or "—", "short": True},
{"title": "Owner", "value": ctx.get("trigger_owner") or "—", "short": True},
{"title": "Duration", "value": self._fmt_duration(ctx.get("duration_seconds")), "short": True},
]
if (usage := self._usage_field(ctx)) is not None:
fields.append(usage)
if (cost := self._cost_field(ctx)) is not None:
fields.append(cost)

attachment = {
"color": self._color(ctx),
"title": notification.subject,
"title_link": self._link(notification),
"fields": fields,
"footer": FOOTER,
"ts": int(notification.created.timestamp()),
}
return f"{emoji} {notification.subject}", [attachment]
Loading