Skip to content
Merged
1 change: 1 addition & 0 deletions elementary/messages/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class Icon(Enum):
BELL = "bell"
GEM = "gem"
SPARKLES = "sparkles"
LINK = "link"


class TextStyle(Enum):
Expand Down
1 change: 1 addition & 0 deletions elementary/messages/formats/unicode.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
Icon.BELL: "🔔",
Icon.GEM: "💎",
Icon.SPARKLES: "✨",
Icon.LINK: "🔗",
}

for icon in Icon:
Expand Down
47 changes: 47 additions & 0 deletions elementary/monitor/alerts/alert.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,23 @@
ReportLinkData,
)
from elementary.utils.log import get_logger
from elementary.utils.pydantic_shim import BaseModel
from elementary.utils.time import DATETIME_WITH_TIMEZONE_FORMAT

logger = get_logger(__name__)


class OrchestratorInfo(BaseModel):
"""Structured orchestrator metadata for alerts."""

job_id: Optional[str] = None
job_name: Optional[str] = None
run_id: Optional[str] = None
orchestrator: Optional[str] = None
job_url: Optional[str] = None
run_url: Optional[str] = None


class AlertModel:
def __init__(
self,
Expand All @@ -32,6 +44,12 @@ def __init__(
alert_fields: Optional[List[str]] = None,
elementary_database_and_schema: Optional[str] = None,
env: Optional[str] = None,
job_id: Optional[str] = None,
job_name: Optional[str] = None,
job_run_id: Optional[str] = None,
job_url: Optional[str] = None,
job_run_url: Optional[str] = None,
orchestrator: Optional[str] = None,
**kwargs,
):
self.id = id
Expand Down Expand Up @@ -65,6 +83,12 @@ def __init__(
self.alert_fields = alert_fields
self.elementary_database_and_schema = elementary_database_and_schema
self.env = env
self.job_id = job_id
self.job_name = job_name
self.job_run_id = job_run_id
self.job_url = job_url
self.job_run_url = job_run_url
self.orchestrator = orchestrator

@property
def unified_meta(self) -> Dict:
Expand All @@ -84,3 +108,26 @@ def summary(self) -> str:

def get_report_link(self) -> Optional[ReportLinkData]:
raise NotImplementedError

@property
def orchestrator_info(self) -> Optional[OrchestratorInfo]:
"""Returns structured orchestrator metadata if available."""
if not any(
[
self.job_name,
self.job_run_id,
self.orchestrator,
self.job_url,
self.job_run_url,
]
):
return None

return OrchestratorInfo(
job_id=self.job_id or None,
job_name=self.job_name or None,
run_id=self.job_run_id or None,
orchestrator=self.orchestrator or None,
job_url=self.job_url or None,
run_url=self.job_run_url or None,
)
59 changes: 53 additions & 6 deletions elementary/monitor/alerts/alert_messages/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
from pydantic import BaseModel

from elementary.messages.block_builders import (
BoldTextBlock,
BoldTextLineBlock,
BulletListBlock,
FactsBlock,
ItalicTextLineBlock,
JsonCodeBlock,
LinkInlineBlocks,
LinksLineBlock,
MentionLineBlock,
NonPrimaryFactBlock,
Expand All @@ -33,6 +35,7 @@
TextStyle,
)
from elementary.messages.message_body import Color, MessageBlock, MessageBody
from elementary.monitor.alerts.alert import OrchestratorInfo
from elementary.monitor.alerts.alert_messages.alert_fields import AlertField
from elementary.monitor.alerts.alerts_groups.alerts_group import AlertsGroup
from elementary.monitor.alerts.alerts_groups.base_alerts_group import BaseAlertsGroup
Expand All @@ -42,6 +45,9 @@
from elementary.monitor.alerts.model_alert import ModelAlertModel
from elementary.monitor.alerts.source_freshness_alert import SourceFreshnessAlertModel
from elementary.monitor.alerts.test_alert import TestAlertModel
from elementary.monitor.data_monitoring.alerts.integrations.utils.orchestrator_link import (
create_orchestrator_link,
)
from elementary.monitor.data_monitoring.alerts.integrations.utils.report_link import (
ReportLinkData,
)
Expand Down Expand Up @@ -106,6 +112,7 @@ def _get_run_alert_subtitle_block(
suppression_interval: Optional[int] = None,
env: Optional[str] = None,
links: list[ReportLinkData] = [],
orchestrator_info: Optional[OrchestratorInfo] = None,
) -> LinesBlock:
summary = []
summary.append((type.capitalize() + ":", name))
Expand All @@ -114,16 +121,54 @@ def _get_run_alert_subtitle_block(
summary.append(("Status:", status or "Unknown"))
if detected_at_str:
summary.append(("Time:", detected_at_str))

subtitle_lines = []

if orchestrator_info and orchestrator_info.job_name:
orchestrator_name = orchestrator_info.orchestrator or "orchestrator"
job_info_text = f"{orchestrator_info.job_name} (via {orchestrator_name})"

orchestrator_link = create_orchestrator_link(orchestrator_info)
if orchestrator_link:
job_inlines: List[InlineBlock] = [
Comment thread
ofek1weiss marked this conversation as resolved.
BoldTextBlock(text="Job:"),
TextBlock(text=job_info_text + " | "),
]
job_inlines.extend(
LinkInlineBlocks(
text=orchestrator_link.text,
url=orchestrator_link.url,
icon=orchestrator_link.icon,
)
)

subtitle_lines.append(LineBlock(inlines=job_inlines))
else:
summary.append(("Job:", job_info_text))
if suppression_interval:
summary.append(("Suppression interval:", str(suppression_interval)))
subtitle_lines = [SummaryLineBlock(summary=summary)]

if links:
subtitle_lines.append(
LinksLineBlock(
links=[(link.text, link.url, link.icon) for link in links]
subtitle_lines.append(SummaryLineBlock(summary=summary))

all_links = []

for link in links:
all_links.append((link.text, link.url, link.icon))

if orchestrator_info and not orchestrator_info.job_name:
orchestrator_link = create_orchestrator_link(orchestrator_info)
if orchestrator_link:
all_links.append(
(
orchestrator_link.text,
orchestrator_link.url,
orchestrator_link.icon,
)
)
)

if all_links:
subtitle_lines.append(LinksLineBlock(links=all_links))

return LinesBlock(lines=subtitle_lines)

def _get_run_alert_subtitle_links(
Expand Down Expand Up @@ -151,6 +196,7 @@ def _get_run_alert_subtitle_blocks(
asset_type = "snapshot" if alert.materialization == "snapshot" else "model"
asset_name = alert.alias
links = self._get_run_alert_subtitle_links(alert)
orchestrator_info = alert.orchestrator_info
return [
self._get_run_alert_subtitle_block(
type=asset_type,
Expand All @@ -160,6 +206,7 @@ def _get_run_alert_subtitle_blocks(
suppression_interval=alert.suppression_interval,
env=alert.env,
links=links,
orchestrator_info=orchestrator_info,
)
]

Expand Down
13 changes: 13 additions & 0 deletions elementary/monitor/alerts/model_alert.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ def __init__(
alert_fields: Optional[List[str]] = None,
elementary_database_and_schema: Optional[str] = None,
env: Optional[str] = None,
job_id: Optional[str] = None,
job_name: Optional[str] = None,
job_run_id: Optional[str] = None,
job_url: Optional[str] = None,
job_run_url: Optional[str] = None,
orchestrator: Optional[str] = None,
**kwargs,
):
super().__init__(
Expand All @@ -57,6 +63,13 @@ def __init__(
alert_fields,
elementary_database_and_schema,
env=env,
job_id=job_id,
job_name=job_name,
job_run_id=job_run_id,
job_url=job_url,
job_run_url=job_run_url,
orchestrator=orchestrator,
**kwargs,
)
self.alias = alias
self.path = path
Expand Down
13 changes: 13 additions & 0 deletions elementary/monitor/alerts/source_freshness_alert.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ def __init__(
alert_fields: Optional[List[str]] = None,
elementary_database_and_schema: Optional[str] = None,
env: Optional[str] = None,
job_id: Optional[str] = None,
job_name: Optional[str] = None,
job_run_id: Optional[str] = None,
job_url: Optional[str] = None,
job_run_url: Optional[str] = None,
orchestrator: Optional[str] = None,
**kwargs,
):
super().__init__(
Expand All @@ -66,6 +72,13 @@ def __init__(
alert_fields,
elementary_database_and_schema,
env=env,
job_id=job_id,
job_name=job_name,
job_run_id=job_run_id,
job_url=job_url,
job_run_url=job_run_url,
orchestrator=orchestrator,
**kwargs,
)
self.snapshotted_at_str = (
convert_datetime_utc_str_to_timezone_str(
Expand Down
13 changes: 13 additions & 0 deletions elementary/monitor/alerts/test_alert.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ def __init__(
alert_fields: Optional[List[str]] = None,
elementary_database_and_schema: Optional[str] = None,
env: Optional[str] = None,
job_id: Optional[str] = None,
job_name: Optional[str] = None,
job_run_id: Optional[str] = None,
job_url: Optional[str] = None,
job_run_url: Optional[str] = None,
orchestrator: Optional[str] = None,
**kwargs,
):
super().__init__(
Expand All @@ -68,6 +74,13 @@ def __init__(
alert_fields,
elementary_database_and_schema,
env=env,
job_id=job_id,
job_name=job_name,
job_run_id=job_run_id,
job_url=job_url,
job_run_url=job_run_url,
orchestrator=orchestrator,
**kwargs,
)
self.table_name = table_name
self.test_type = test_type
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from typing import Optional

from elementary.messages.blocks import Icon
from elementary.monitor.alerts.alert import OrchestratorInfo
from elementary.utils.pydantic_shim import BaseModel


class OrchestratorLinkData(BaseModel):
url: str
text: str
orchestrator: str
icon: Optional[Icon] = None


def create_orchestrator_link(
orchestrator_info: OrchestratorInfo,
) -> Optional[OrchestratorLinkData]:
"""Create an orchestrator link from orchestrator info if URL is available."""
if not orchestrator_info or not orchestrator_info.run_url:
return None

orchestrator = orchestrator_info.orchestrator or "orchestrator"

return OrchestratorLinkData(
url=orchestrator_info.run_url,
text=f"View in {orchestrator}",
orchestrator=orchestrator,
icon=Icon.LINK,
)


def create_job_link(
orchestrator_info: OrchestratorInfo,
) -> Optional[OrchestratorLinkData]:
"""Create a job-level orchestrator link if job URL is available."""
if not orchestrator_info or not orchestrator_info.job_url:
return None

orchestrator = orchestrator_info.orchestrator or "orchestrator"
job_name = orchestrator_info.job_name or "Job"

display_name = orchestrator.replace("_", " ").title()

return OrchestratorLinkData(
url=orchestrator_info.job_url,
text=f"{job_name} in {display_name}",
orchestrator=orchestrator,
icon=Icon.GEAR,
)
Loading
Loading