Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,12 @@ Installation
"--smtp-host", "$mail_loop_smtp_host$",
"--smtp-port", "$mail_loop_smtp_port$",
"--smtp-user", "$mail_loop_smtp_user$",
"--smtp-skip-cert-validation",
"--imap-host", "$mail_loop_imap_host$",
"--imap-port", "$mail_loop_imap_port$",
"--imap-user", "$mail_loop_imap_user$",
"--imap-spam", "$mail_loop_imap_spam$",
"--imap-skip-cert-validation",
"--imap-cleanup" ]
}

Expand Down
35 changes: 25 additions & 10 deletions check_mail_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,18 +102,18 @@ def email_create_message(mail_from: str, mail_to: str, _uuid: str) -> MIMEText:
return msg


def smtp_connect(smtp_host: str, smtp_port: int, smtp_user: str, smtp_pass: str) -> smtplib.SMTP:
def smtp_connect(smtp_host: str, smtp_port: int, smtp_user: str, smtp_pass: str, smtp_skip_cert_validation: bool) -> smtplib.SMTP:
"""
Connect to an SMTPS server.

@param smtp_host: The mail server host.
@param smtp_port: The SMTP server port. STARTSSL or plaintext communication is not supported.
@param smtp_user: The username for SMTP authentication.
@param smtp_pass: The password for SMTP authentication.
@param smtp_skip_cert_validation: Skip the validation of the SMTP Server provided certificate.
@return Function returns a smtplib.SMTP object that represents a server connection.
"""

server = smtplib.SMTP_SSL(smtp_host, smtp_port, context=ssl.create_default_context())
server = smtplib.SMTP_SSL(smtp_host, smtp_port, context=create_ssl_context(smtp_skip_cert_validation))
debug(f"SMTP: Try to log in to {smtp_host} as: {smtp_user}")
server.login(smtp_user, smtp_pass)
debug(f"SMTP: Log in was successful.")
Expand All @@ -122,7 +122,7 @@ def smtp_connect(smtp_host: str, smtp_port: int, smtp_user: str, smtp_pass: str)


def imap_retrieve_mail(imap_host: str, imap_port: int, imap_user: str, imap_pass: str, imap_spambox: Optional[str],
expected_token: str, cleanup_flag: bool) -> MailFound:
expected_token: str, cleanup_flag: bool, imap_skip_cert_validation) -> MailFound:
"""
Retrieve an e-mail from an IMAP account. Search the INBOX and the Spambox for a specific token value.
Retry up to three times.
Expand All @@ -134,12 +134,13 @@ def imap_retrieve_mail(imap_host: str, imap_port: int, imap_user: str, imap_pass
@param imap_spambox: The name of the spam mailbox, where the mail is also searched. Pass None to skip.
@param expected_token: Lookup this token in a X-Icinga-Test-Id E-mail header.
@param cleanup_flag: Remove mails from the IMAP account.
@param imap_skip_cert_validation: Skip the validation of the IMAP Server provided certificate.

@return Function returns a MailFound status.
"""

# Establish IMAP connection
server = imaplib.IMAP4_SSL(host=imap_host, port=imap_port, ssl_context=ssl.create_default_context())
server = imaplib.IMAP4_SSL(host=imap_host, port=imap_port, ssl_context=create_ssl_context(imap_skip_cert_validation))
debug(f"IMAP: Try to log in to {imap_host} as: {imap_user}")
server.login(imap_user, imap_pass)
debug(f"IMAP: Log in was successful.")
Expand Down Expand Up @@ -235,6 +236,18 @@ def __init__(self, _data):

return token_found

def create_ssl_context(skip_validation: bool) -> ssl.SSLContext:
"""
Create the SSL Context used for the connection to the SMTP or IMAP server.

@param skip_validation: Whether or not to validate server certificate.
@return ssl.SSLContext
"""
context = ssl.create_default_context()
if skip_validation:
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
return context

def main():
global debug_flag, delay
Expand All @@ -248,22 +261,24 @@ def main():

parser.add_argument('--smtp-host', metavar='SMTP_HOST', help='SMTP: Hostname of the SMTP server.', required=True)
parser.add_argument('--smtp-port', metavar='SMTP_PORT',
help='SMTP: Deliver mail via this port. STARTSSL or plaintext communication is not supported.',
help='SMTP: Deliver mail via this port. STARTTLS or plaintext communication is not supported.',
type=int, default=465)
parser.add_argument('--smtp-user', metavar='SMTP_USER', help='SMTP: User name for login.', required=True)
parser.add_argument('--smtp-pass', metavar='SMTP_PASS',
help='SMTP: Passwort for login. Alternatively, set environment variable SMTP_PASS.',
default=os.getenv("SMTP_PASS"))
parser.add_argument('--smtp-skip-cert-validation', action='store_true', help="Do not validate SMTP Server Certificate")

parser.add_argument('--imap-host', metavar='IMAP_HOST', help='IMAP: Hostname of the SMTP server.', required=True)
parser.add_argument('--imap-port', metavar='IMAP_PORT', help='IMAP: Deliver mail via this port.', type=int,
parser.add_argument('--imap-host', metavar='IMAP_HOST', help='IMAP: Hostname of the IMAP server.', required=True)
parser.add_argument('--imap-port', metavar='IMAP_PORT', help='IMAP: Fetch mail via this port.', type=int,
default=993)
parser.add_argument('--imap-user', metavar='IMAP_USER', help='IMAP: User name for login.', required=True)
parser.add_argument('--imap-pass', metavar='IMAP_PASS',
help='IMAP: Passwort for login. Alternatively, set environment variable IMAP_PASS.',
default=os.getenv("IMAP_PASS"))
parser.add_argument('--imap-spam', metavar='IMAP_SPAM', help='IMAP: Name of the spam box.')
parser.add_argument('--imap-cleanup', action='store_true', help="Delete processed mails on the IMAP account.")
parser.add_argument('--imap-skip-cert-validation', action='store_true', help="Do not validate IMAP Server Certificate")

parser.add_argument('--delay', metavar='SECONDS', help=f"Delay between sending and retrieving (default {delay} s).",
type=int, default=delay)
Expand All @@ -274,12 +289,12 @@ def main():

_uuid = str(uuid.uuid4())
email = email_create_message(args.mail_from, args.mail_to, _uuid)
smtp_server = smtp_connect(args.smtp_host, args.smtp_port, args.smtp_user, args.smtp_pass)
smtp_server = smtp_connect(args.smtp_host, args.smtp_port, args.smtp_user, args.smtp_pass, args.smtp_skip_cert_validation)
smtp_server.sendmail(args.mail_from, args.mail_to, email.as_string())
debug(f"SMTP: Sent e-mail with ID {_uuid} to {args.mail_to}.")

status = imap_retrieve_mail(args.imap_host, args.imap_port, args.imap_user, args.imap_pass, args.imap_spam, _uuid,
args.imap_cleanup)
args.imap_cleanup, args.imap_skip_cert_validation)

if status == MailFound.FOUND:
print("OK")
Expand Down