Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 218 additions & 1 deletion tests/core/lib/test_ssl.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import ssl
from unittest.mock import patch
from datetime import datetime, timezone, timedelta
from unittest.mock import MagicMock, patch

import pytest

from nettacker.core.lib.ssl import (
SslEngine,
SslLibrary,
create_socket_connection,
create_tcp_socket,
get_cert_info,
is_weak_cipher_suite,
is_weak_hash_algo,
is_weak_ssl_version,
Expand Down Expand Up @@ -463,6 +466,24 @@ def test_is_weak_cipher_suite_ssl_error(self, mock_context, mock_socket, connect
("sha1", True),
("test_algo", False),
("sha256", False),
# Case insensitivity
("MD5", True),
("SHA1", True),
("Md5", True),
# Real-world signature algorithm strings
("sha1WithRSAEncryption", True),
("md5WithRSAEncryption", True),
("sha256WithRSAEncryption", False),
("sha512WithRSAEncryption", False),
("ecdsa-with-SHA256", False),
# Strong algorithms
("sha384", False),
("sha512", False),
("sha3-256", False),
("ed25519", False),
# Edge cases
("rsa", False),
("", False),
],
)
def test_is_weak_hash_algo(self, algo, expected):
Expand Down Expand Up @@ -519,6 +540,19 @@ def test_is_weak_ssl_version_exceptions(
socket_instance, server_hostname=connection_params["HOST"]
)

@patch("socket.socket")
@patch("ssl.SSLContext")
def test_is_weak_ssl_version_timeout(self, mock_context, mock_socket, connection_params):
"""Test that socket timeout is handled gracefully."""
socket_instance = mock_socket.return_value
socket_instance.connect.side_effect = socket.timeout

result = is_weak_ssl_version(
connection_params["HOST"], connection_params["PORT"], connection_params["TIMEOUT"]
)

assert result == ([], True)

def test_response_conditions_matched_expired_cert(self, ssl_engine, substeps, responses):
result = ssl_engine.response_conditions_matched(
substeps.ssl_certificate_expired_vuln, responses.ssl_certificate_expired
Expand Down Expand Up @@ -572,3 +606,186 @@ def test_response_conditions_matched_none_response(self, ssl_engine, substeps):
result = ssl_engine.response_conditions_matched(substeps.ssl_weak_version_vuln, None)

assert result == []

@patch("nettacker.core.lib.ssl.create_tcp_socket")
def test_ssl_certificate_scan_connection_refused(self, mock_connection, ssl_library, connection_params):
mock_connection.return_value = None

result = ssl_library.ssl_certificate_scan(
connection_params["HOST"], connection_params["PORT"], connection_params["TIMEOUT"]
)

assert result is None

@patch("socket.socket")
@patch("ssl.wrap_socket")
def test_create_tcp_socket_connection_refused(self, mock_wrap, mock_socket):
socket_instance = mock_socket.return_value
socket_instance.connect.side_effect = ConnectionRefusedError

result = create_tcp_socket("example.com", 80, 60)

assert result is None


class TestGetCertInfo:
"""Tests for get_cert_info function."""

@patch("nettacker.core.lib.ssl.crypto.load_certificate")
def test_get_cert_info_valid_cert(self, mock_load_cert):
mock_x509 = Mockx509Object(
is_expired=False,
issuer="TestCA",
subject="TestSubject",
signing_algo="sha256WithRSAEncryption",
expire_date=b"21001207153045Z",
activation_date=b"20231207153045Z",
)
mock_load_cert.return_value = mock_x509

result = get_cert_info("fake_cert_pem_data")

assert result["expired"] is False
assert result["self_signed"] is False
assert result["weak_signing_algo"] is False
assert result["not_activated"] is False
assert result["expiring_soon"] is False
assert "TestCA" in result["issuer"]
assert "TestSubject" in result["subject"]

@patch("nettacker.core.lib.ssl.crypto.load_certificate")
def test_get_cert_info_weak_signature(self, mock_load_cert):
mock_x509 = Mockx509Object(
is_expired=False,
issuer="TestCA",
subject="TestSubject",
signing_algo="sha1WithRSAEncryption",
expire_date=b"21001207153045Z",
activation_date=b"20231207153045Z",
)
mock_load_cert.return_value = mock_x509

result = get_cert_info("fake_cert_pem_data")

assert result["weak_signing_algo"] is True
assert result["signing_algo"] == "sha1WithRSAEncryption"

@patch("nettacker.core.lib.ssl.crypto.load_certificate")
def test_get_cert_info_self_signed(self, mock_load_cert):
mock_x509 = Mockx509Object(
is_expired=False,
issuer="SameEntity",
subject="SameEntity",
signing_algo="sha256WithRSAEncryption",
expire_date=b"21001207153045Z",
activation_date=b"20231207153045Z",
)
mock_load_cert.return_value = mock_x509

result = get_cert_info("fake_cert_pem_data")

assert result["self_signed"] is True
assert result["issuer"] == result["subject"]

@patch("nettacker.core.lib.ssl.crypto.load_certificate")
def test_get_cert_info_expired(self, mock_load_cert):
mock_x509 = Mockx509Object(
is_expired=True,
issuer="TestCA",
subject="TestSubject",
signing_algo="sha256WithRSAEncryption",
expire_date=b"20201207153045Z",
activation_date=b"20151207153045Z",
)
mock_load_cert.return_value = mock_x509

result = get_cert_info("fake_cert_pem_data")

assert result["expired"] is True

@patch("nettacker.core.lib.ssl.crypto.load_certificate")
def test_get_cert_info_expiring_soon(self, mock_load_cert):
soon = datetime.now(timezone.utc) + timedelta(days=15)
expire_str = soon.strftime("%Y%m%d%H%M%SZ").encode()
past = datetime.now(timezone.utc) - timedelta(days=365)
activation_str = past.strftime("%Y%m%d%H%M%SZ").encode()

mock_x509 = Mockx509Object(
is_expired=False,
issuer="TestCA",
subject="TestSubject",
signing_algo="sha256WithRSAEncryption",
expire_date=expire_str,
activation_date=activation_str,
)
mock_load_cert.return_value = mock_x509

result = get_cert_info("fake_cert_pem_data")

assert result["expiring_soon"] is True
assert result["expired"] is False

@patch("nettacker.core.lib.ssl.crypto.load_certificate")
def test_get_cert_info_not_yet_activated(self, mock_load_cert):
future = datetime.now(timezone.utc) + timedelta(days=180)
activation_str = future.strftime("%Y%m%d%H%M%SZ").encode()
far_future = datetime.now(timezone.utc) + timedelta(days=365)
expire_str = far_future.strftime("%Y%m%d%H%M%SZ").encode()

mock_x509 = Mockx509Object(
is_expired=False,
issuer="TestCA",
subject="TestSubject",
signing_algo="sha256WithRSAEncryption",
expire_date=expire_str,
activation_date=activation_str,
)
mock_load_cert.return_value = mock_x509

result = get_cert_info("fake_cert_pem_data")

assert result["not_activated"] is True

@patch("nettacker.core.lib.ssl.crypto.load_certificate")
def test_get_cert_info_md5_signature(self, mock_load_cert):
mock_x509 = Mockx509Object(
is_expired=False,
issuer="TestCA",
subject="TestSubject",
signing_algo="md5WithRSAEncryption",
expire_date=b"21001207153045Z",
activation_date=b"20231207153045Z",
)
mock_load_cert.return_value = mock_x509

result = get_cert_info("fake_cert_pem_data")

assert result["weak_signing_algo"] is True
assert result["signing_algo"] == "md5WithRSAEncryption"


class TestCreateSocketConnection:
"""Tests for create_socket_connection function."""

@patch("nettacker.core.lib.ssl.ssl.SSLContext")
def test_create_socket_connection_success(self, mock_context_class):
mock_context = mock_context_class.return_value
mock_wrapped_socket = MagicMock()
mock_context.wrap_socket.return_value = mock_wrapped_socket

result = create_socket_connection(mock_context, "example.com", 443, 10)

assert result == mock_wrapped_socket
mock_context.wrap_socket.assert_called_once()

@patch("socket.socket")
@patch("nettacker.core.lib.ssl.ssl.SSLContext")
def test_create_socket_connection_sets_timeout(self, mock_context_class, mock_socket):
mock_socket_instance = mock_socket.return_value
mock_context = mock_context_class.return_value
mock_context.wrap_socket.return_value = MagicMock()

create_socket_connection(mock_context, "example.com", 443, 15)

mock_socket_instance.settimeout.assert_called_with(15)
mock_socket_instance.connect.assert_called_with(("example.com", 443))