Skip to content
Merged
34 changes: 23 additions & 11 deletions elementary/clients/slack/client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import ssl
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Tuple, Union

Expand All @@ -13,6 +14,7 @@
from elementary.config.config import Config
from elementary.tracking.tracking_interface import Tracking
from elementary.utils.log import get_logger
from elementary.utils.ssl import create_ssl_context

logger = get_logger(__name__)

Expand All @@ -25,8 +27,9 @@ class SlackClient(ABC):
def __init__(
self,
tracking: Optional[Tracking] = None,
ssl_context: Optional[ssl.SSLContext] = None,
):
self.client = self._initial_client()
self.client = self._initial_client(ssl_context)
self.tracking = tracking
self._initial_retry_handlers()
self.email_to_user_id_cache: Dict[str, str] = {}
Expand All @@ -37,20 +40,22 @@ def create_client(
) -> Optional["SlackClient"]:
if not config.has_slack:
return None
ssl_context = create_ssl_context(config.ssl_ca_bundle)
if config.slack_token:
logger.debug("Creating Slack client with token.")
return SlackWebClient(token=config.slack_token, tracking=tracking)
return SlackWebClient(
token=config.slack_token, tracking=tracking, ssl_context=ssl_context
)
elif config.slack_webhook:
logger.debug("Creating Slack client with webhook.")
return SlackWebhookClient(
webhook=config.slack_webhook,
is_workflow=config.is_slack_workflow,
tracking=tracking,
ssl_context=ssl_context,
)
return None

@abstractmethod
def _initial_client(self):
def _initial_client(self, ssl_context: Optional[ssl.SSLContext]):
raise NotImplementedError

def _initial_retry_handlers(self):
Expand Down Expand Up @@ -85,12 +90,13 @@ def __init__(
self,
token: str,
tracking: Optional[Tracking] = None,
ssl_context: Optional[ssl.SSLContext] = None,
):
self.token = token
super().__init__(tracking)
super().__init__(tracking, ssl_context)

def _initial_client(self):
return WebClient(token=self.token)
def _initial_client(self, ssl_context: Optional[ssl.SSLContext]):
return WebClient(token=self.token, ssl=ssl_context)

@sleep_and_retry
@limits(calls=1, period=ONE_SECOND)
Expand Down Expand Up @@ -231,16 +237,22 @@ def __init__(
webhook: str,
is_workflow: bool,
tracking: Optional[Tracking] = None,
ssl_context: Optional[ssl.SSLContext] = None,
):
self.webhook = webhook
self.is_workflow = is_workflow
super().__init__(tracking)
super().__init__(tracking, ssl_context)

def _initial_client(self):
def _initial_client(self, ssl_context: Optional[ssl.SSLContext]):
if self.is_workflow:
# Workflow webhooks do not support the ssl_context parameter.
# requests.Session() uses the requests default CA bundle (certifi).
return requests.Session()

return WebhookClient(
url=self.webhook, default_headers={"Content-type": "application/json"}
url=self.webhook,
default_headers={"Content-type": "application/json"},
ssl=ssl_context,
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.

@sleep_and_retry
Expand Down
6 changes: 6 additions & 0 deletions elementary/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def __init__(
run_dbt_deps_if_needed: Optional[bool] = None,
project_name: Optional[str] = None,
quiet_logs: Optional[bool] = None,
ssl_ca_bundle: Optional[str] = None,
):
self.config_dir = config_dir
self.profiles_dir = profiles_dir
Expand Down Expand Up @@ -223,6 +224,11 @@ def __init__(
quiet_logs, config.get("quiet_logs"), False
)

self.ssl_ca_bundle = self._first_not_none(
ssl_ca_bundle,
config.get("ssl_ca_bundle"),
)

def _load_configuration(self) -> dict:
config_file_path = os.path.join(self.config_dir, self._CONFIG_FILE_NAME)
if not os.path.exists(config_file_path):
Expand Down
9 changes: 7 additions & 2 deletions elementary/messages/messaging_integrations/slack_web.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import ssl
import time
from typing import Any, Dict, Iterator, Optional

Expand Down Expand Up @@ -54,9 +55,13 @@ def __init__(

@classmethod
def from_token(
cls, token: str, tracking: Optional[Tracking] = None, **kwargs: Any
cls,
token: str,
tracking: Optional[Tracking] = None,
ssl_context: Optional[ssl.SSLContext] = None,
**kwargs: Any,
) -> "SlackWebMessagingIntegration":
client = WebClient(token=token)
client = WebClient(token=token, ssl=ssl_context)
client.retry_handlers.append(RateLimitErrorRetryHandler(max_retry_count=5))
return cls(client, tracking, **kwargs)

Expand Down
8 changes: 6 additions & 2 deletions elementary/messages/messaging_integrations/slack_webhook.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import ssl
from datetime import datetime, timezone
from http import HTTPStatus
from typing import Any, Optional
Expand Down Expand Up @@ -37,9 +38,12 @@ def __init__(

@classmethod
def from_url(
cls, url: str, tracking: Optional[Tracking] = None
cls,
url: str,
tracking: Optional[Tracking] = None,
ssl_context: Optional[ssl.SSLContext] = None,
) -> "SlackWebhookMessagingIntegration":
client = WebhookClient(url)
client = WebhookClient(url, ssl=ssl_context)
client.retry_handlers.append(RateLimitErrorRetryHandler(max_retry_count=5))
return cls(client, tracking)

Expand Down
13 changes: 13 additions & 0 deletions elementary/monitor/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,15 @@ def decorator(func):
default=None,
help="The Slack token for your workspace.",
)(func)
func = click.option(
"--ssl-ca-bundle",
type=str,
default=None,
help="Override the CA bundle used for SSL connections. "
"Accepted values: 'certifi' (use the certifi package bundle), "
"'system' (use the OS CA store), or a file path to a custom CA bundle. "
"When omitted each underlying library uses its own default.",
)(func)
Comment thread
haritamar marked this conversation as resolved.
if cmd in (Command.REPORT, Command.SEND_REPORT):
func = click.option(
"--exclude-elementary-models",
Expand Down Expand Up @@ -331,6 +340,7 @@ def monitor(
teams_webhook,
maximum_columns_in_alert_samples,
quiet_logs,
ssl_ca_bundle,
):
"""
Get alerts on failures in dbt jobs.
Expand Down Expand Up @@ -365,6 +375,7 @@ def monitor(
teams_webhook=teams_webhook,
maximum_columns_in_alert_samples=maximum_columns_in_alert_samples,
quiet_logs=quiet_logs,
ssl_ca_bundle=ssl_ca_bundle,
)
anonymous_tracking = AnonymousCommandLineTracking(config)
anonymous_tracking.set_env("use_select", bool(select))
Expand Down Expand Up @@ -692,6 +703,7 @@ def send_report(
include,
target_path,
quiet_logs,
ssl_ca_bundle,
):
"""
Generate and send the report to an external platform.
Expand Down Expand Up @@ -735,6 +747,7 @@ def send_report(
env=env,
project_name=project_name,
quiet_logs=quiet_logs,
ssl_ca_bundle=ssl_ca_bundle,
)
anonymous_tracking = AnonymousCommandLineTracking(config)
anonymous_tracking.set_env("use_select", bool(select))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
)
from elementary.tracking.tracking_interface import Tracking
from elementary.utils.log import get_logger
from elementary.utils.ssl import create_ssl_context

logger = get_logger(__name__)

Expand All @@ -43,18 +44,19 @@ def get_integration(
tracking: Optional[Tracking] = None,
) -> Union[BaseMessagingIntegration, BaseIntegration]:
if config.has_slack:
ssl_context = create_ssl_context(config.ssl_ca_bundle)
if config.is_slack_workflow:
return SlackIntegration(
config=config,
tracking=tracking,
)
if config.slack_token:
return SlackWebMessagingIntegration.from_token(
config.slack_token, tracking
config.slack_token, tracking, ssl_context=ssl_context
)
elif config.slack_webhook:
return SlackWebhookMessagingIntegration.from_url(
config.slack_webhook, tracking
config.slack_webhook, tracking, ssl_context=ssl_context
)
else:
raise UnsupportedAlertIntegrationError
Expand Down
41 changes: 41 additions & 0 deletions elementary/utils/ssl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import os
import ssl
from typing import Optional

import certifi

from elementary.utils.log import get_logger

logger = get_logger(__name__)

CERTIFI = "certifi"
SYSTEM = "system"


def create_ssl_context(ssl_ca_bundle: Optional[str] = None) -> Optional[ssl.SSLContext]:
"""Resolve an ssl_ca_bundle setting into an SSLContext.

Returns ``None`` when *ssl_ca_bundle* is ``None`` so that each
library keeps its own default CA behaviour.
"""
if ssl_ca_bundle is None:
return None

value = ssl_ca_bundle.strip()
if not value:
raise ValueError(
"ssl_ca_bundle cannot be empty. Use 'certifi', 'system', or a CA bundle file path."
)

Comment thread
haritamar marked this conversation as resolved.
if value.lower() == CERTIFI:
logger.debug("Using certifi CA bundle for SSL context.")
return ssl.create_default_context(cafile=certifi.where())

if value.lower() == SYSTEM:
logger.debug("Using system CA store for SSL context.")
return ssl.create_default_context()

if not os.path.isfile(value):
raise ValueError(f"ssl_ca_bundle path does not exist or is not a file: {value}")
logger.debug("Using custom CA bundle for SSL context: %s", value)
return ssl.create_default_context(cafile=value)
Loading