diff --git a/acme_srv/helper.py b/acme_srv/helper.py index 431d436c1..0246d3aed 100644 --- a/acme_srv/helper.py +++ b/acme_srv/helper.py @@ -42,6 +42,7 @@ certid_asn1_get, certid_hex_get, certid_check, + pkcs7_to_pem, ) # CSR operations @@ -125,6 +126,7 @@ config_allowed_domainlist_load, config_async_mode_load, config_proxy_load, + config_dryrun_load, load_config, header_info_jsonify, header_info_lookup, diff --git a/acme_srv/helpers/certificates.py b/acme_srv/helpers/certificates.py index 2e2656432..658e7ef26 100644 --- a/acme_srv/helpers/certificates.py +++ b/acme_srv/helpers/certificates.py @@ -6,6 +6,10 @@ from cryptography import x509 from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization, hashes +from cryptography.hazmat.primitives.serialization.pkcs7 import ( + load_pem_pkcs7_certificates, + load_der_pkcs7_certificates, +) from cryptography.x509 import load_pem_x509_certificate, ocsp from OpenSSL import crypto from .encoding import ( @@ -394,3 +398,57 @@ def certid_check( logger.debug("Helper.certid_check() ended with: %s", result) return result + + +def pkcs7_to_pem(logger, pkcs7_content: str, outform: str = "string") -> List[str]: + """convert pkcs7 to pem""" + logger.debug("CAhandler._pkcs7_to_pem()") + + # Define loading strategies in order of preference + loading_strategies = [ + # Strategy 1: Load as PEM directly + lambda content: load_pem_pkcs7_certificates(convert_string_to_byte(content)), + # Strategy 2: Replace CERTIFICATE with PKCS7 tag and load as PEM + lambda content: load_pem_pkcs7_certificates( + convert_string_to_byte(content.replace("CERTIFICATE", "PKCS7")) + ), + # Strategy 3: Load as DER + lambda content: load_der_pkcs7_certificates(content), + ] + + pkcs7_obj = None + last_error = None + + for i, strategy in enumerate(loading_strategies): + try: + pkcs7_obj = strategy(pkcs7_content) + if i == 1: # Log only for the tag replacement strategy + logger.error("PKCS7-TAG not found, updated content successfully") + break + except Exception as err: + last_error = err + if i == 0: + logger.error("PKCS7-TAG not found updating content...") + elif i == 1: + logger.debug("CAhandler._pkcs7_to_pem(): load pem failed. Try der...") + + if pkcs7_obj is None: + logger.error("All PKCS7 loading strategies failed. Last error: %s", last_error) + raise last_error + + # Convert certificates to PEM format + cert_pem_list = [ + convert_byte_to_string(cert.public_bytes(serialization.Encoding.PEM)) + for cert in pkcs7_obj + ] + + # Define output format + output_formats = { + "string": lambda certs: "".join(certs), + "list": lambda certs: certs, + } + + result = output_formats.get(outform, lambda _: None)(cert_pem_list) + + logger.debug("Certificate._pkcs7_to_pem() ended") + return result diff --git a/acme_srv/helpers/config.py b/acme_srv/helpers/config.py index 4deec4c67..649f9aeea 100644 --- a/acme_srv/helpers/config.py +++ b/acme_srv/helpers/config.py @@ -351,3 +351,27 @@ def client_parameter_validate( error, ) return value_to_set, error + + +def config_dryrun_load(logger: logging.Logger, config_dic: Dict[str, str]): + """load dryrun configuration""" + logger.debug("Helper.config_dryrun_load()") + + dryrun = False + dryrun_profilename = None + + if "DEFAULT" in config_dic and "dryrun" in config_dic["DEFAULT"]: + if config_dic["DEFAULT"]["dryrun"].lower() in ["true", "false"]: + dryrun = config_dic.getboolean("DEFAULT", "dryrun", fallback=False) + elif config_dic["DEFAULT"]["dryrun"].lower() == "profile": + if config_dic.get("DEFAULT", "dryrun_profile", fallback=None): + dryrun_profilename = config_dic["DEFAULT"]["dryrun_profile"] + else: + logger.warning( + "Dryrun profile name not set in configuration, please set dryrun_profile parameter" + ) + + logger.debug( + "Helper.config_dryrun_load() ended with: %s/%s", dryrun, dryrun_profilename + ) + return dryrun, dryrun_profilename diff --git a/examples/acme2certifier_wsgi.py b/examples/acme2certifier_wsgi.py index 93b62d32e..47a9e1528 100644 --- a/examples/acme2certifier_wsgi.py +++ b/examples/acme2certifier_wsgi.py @@ -552,6 +552,26 @@ def redirect(environ, start_response): ] +# Helper to extract path with prefix +def get_path_with_prefix(environ, config): + path = environ.get("PATH_INFO") or "" + # Collapse multiple leading slashes to one + while path.startswith("//"): + path = path[1:] + prefix = "" + if "Directory" in config and "url_prefix" in config["Directory"]: + prefix = str(config["Directory"]["url_prefix"]).strip("/") + if prefix: + path_ = path.lstrip("/") + if path_ == prefix: + return "" + if path_.startswith(prefix + "/"): + # Remove the prefix and any leading slashes after + return path_[len(prefix) :].lstrip("/") + return path_ + return path.lstrip("/") + + def application(environ, start_response): """The main WSGI application if nothing matches call the not_found function.""" @@ -559,10 +579,7 @@ def application(environ, start_response): if "CAhandler" in CONFIG and "acme_url" in CONFIG["CAhandler"]: URLS.append((r"^.well-known/acme-challenge/", acmechallenge_serve)) - prefix = "/" - if "Directory" in CONFIG and "url_prefix" in CONFIG["Directory"]: - prefix = CONFIG["Directory"]["url_prefix"] + "/" - path = environ.get("PATH_INFO", "").lstrip(prefix) + path = get_path_with_prefix(environ, CONFIG) for regex, callback in URLS: match = re.search(regex, path) diff --git a/test/test_helper.py b/test/test_helper.py index ec977d6fd..b09df4e0f 100644 --- a/test/test_helper.py +++ b/test/test_helper.py @@ -1,7 +1,7 @@ -#!/usr/bin/python # -*- coding: utf-8 -*- """unittests for account.py""" # pylint: disable=C0302, C0415, R0904, R0913, R0914, R0915, W0212 +import os import unittest import configparser import sys @@ -148,6 +148,8 @@ def setUp(self): profile_lookup, eab_profile_revocation_check, handler_config_check, + pkcs7_to_pem, + config_dryrun_load, ) self.logger = logging.getLogger("test_a2c") @@ -263,6 +265,9 @@ def setUp(self): self.b64_url_decode = b64_url_decode self.eab_profile_revocation_check = eab_profile_revocation_check self.handler_config_check = handler_config_check + self.pkcs7_to_pem = pkcs7_to_pem + self.dir_path = os.path.dirname(os.path.realpath(__file__)) + self.config_dryrun_load = config_dryrun_load def test_001_helper_b64decode_pad(self): """test b64decode_pad() method with a regular base64 encoded string""" @@ -6257,14 +6262,12 @@ def test_512_allowed_gai_family(self): def test_513_config_allowed_domainlist_load_deprecated_section(self): """Test config_allowed_domainlist_load loads from deprecated CAhandler section and logs warning.""" - import logging from acme_srv.helpers import config # Simulate config_dic as a dict, as expected by the function cfg = {"CAhandler": {"allowed_domainlist": "example.com,example.org"}} - logger = logging.getLogger("test_a2c") - with self.assertLogs(logger, level="WARNING") as log_context: - result = config.config_allowed_domainlist_load(logger, cfg) + with self.assertLogs(self.logger, level="WARNING") as log_context: + result = config.config_allowed_domainlist_load(self.logger, cfg) from acme_srv.helpers.global_variables import PARSING_ERR_MSG self.assertEqual(result, PARSING_ERR_MSG) @@ -6272,16 +6275,14 @@ def test_513_config_allowed_domainlist_load_deprecated_section(self): def test_514_config_allowed_domainlist_load_invalid_json(self): """Test config_allowed_domainlist_load handles invalid JSON and logs warning.""" - import logging from acme_srv.helpers import config - logger = logging.getLogger("test_a2c") # Simulate a config dict with invalid JSON in Order section cfg = {"Order": {"allowed_domainlist": "not-a-json-list"}} - with self.assertLogs(logger, level="WARNING") as log_context: + with self.assertLogs(self.logger, level="WARNING") as log_context: from acme_srv.helpers.global_variables import PARSING_ERR_MSG - result = config.config_allowed_domainlist_load(logger, cfg) + result = config.config_allowed_domainlist_load(self.logger, cfg) self.assertEqual(result, PARSING_ERR_MSG) self.assertTrue( any( @@ -6290,6 +6291,151 @@ def test_514_config_allowed_domainlist_load_invalid_json(self): ) ) + def test_515_pkcs7_to_pem(self): + """test pkcs7 to pem default output""" + with open(self.dir_path + "/ca/certs.p7b", "r") as fso: + file_content = fso.read() + with open(self.dir_path + "/ca/certs.pem", "r") as fso: + result = fso.read() + self.assertEqual(result, self.pkcs7_to_pem(self.logger, file_content)) + + def test_516_pkcs7_to_pem(self): + """test pkcs7 to pem output string""" + with open(self.dir_path + "/ca/certs.p7b", "r") as fso: + file_content = fso.read() + with open(self.dir_path + "/ca/certs.pem", "r") as fso: + result = fso.read() + self.assertEqual(result, self.pkcs7_to_pem(self.logger, file_content, "string")) + + def test_517_pkcs7_to_pem(self): + """test pkcs7 to pem output list""" + with open(self.dir_path + "/ca/certs.p7b", "r") as fso: + file_content = fso.read() + result = [ + "-----BEGIN CERTIFICATE-----\nMIIFTzCCAzegAwIBAgIIAzHyhSyrXfMwDQYJKoZIhvcNAQELBQAwKzEXMBUGA1UE\nCxMOYWNtZTJjZXJ0aWZpZXIxEDAOBgNVBAMTB3Jvb3QtY2EwHhcNMjAwNTI3MTM1\nNDAwWhcNMzAwNTI2MjM1OTAwWjAqMRcwFQYDVQQLEw5hY21lMmNlcnRpZmllcjEP\nMA0GA1UEAxMGc3ViLWNhMIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEA\nxXHaGZsolXe+PBdUryngHP9VbBC1mehqeTtYI+hqsqGNH7q9a7bSrxMwFuF1kYL8\njqqxkJdtl0L94xcxJg/ZdMx7Nt0vGI+BaAuTpEpUEHeN4tqS6NhB/m/0LGkAELc/\nqkzmoO4B1FDwEEj/3IXtZcupqG80oDt7jWSGXdtF7NTjzcumznMeRXidCdhxRxT/\n/WrsChaytXo0xWZ56oeNwd6x6Dr8/39PBOWtj4fldyDcg+Q+alci2tx9pxmu2bCV\nXcB9ftCLKhDk2WEHE88bgKSp7fV2RCmq9po+Tx8JJ7qecLunUsK/F0XN4kpoQLm9\nhcymqchnMSncSiyin1dQHGHWgXDtBDdq6A2Z6rx26Qk5H9HTYvcNSe1YwFEDoGLB\nZQjbCPWiaqoaH4agBQTclPvrrSCRaVmhUSO+pBtSXDkmN4t3MDZxfgRkp8ixwkB1\n5Y5f0LTpCyAJsdQDw8+Ea0aDqO30eskh4CErnm9+Fejd9Ew2cwpdwfBXzVSbYilM\nGueQihZHvJmVRxAwU69aO2Qs8B0tQ60CfWKVlmWPiakrvYYlPp0FBsM61G6LZEN8\nhH2CKnS8hHv5IWEXZvp0Pk8V3P5h6bWN0Tl+x/V1Prt7Wp8NoiPETE8XyDDxe6dm\nKxztWBH/mTsJyMGb6ZiUoXdPU9TFUKqHxTRLHaxfsPsCAwEAAaN4MHYwEgYDVR0T\nAQH/BAgwBgEB/wIBATAdBgNVHQ4EFgQUv96OjgYiIqutQ8jd1E+oq0hBPtUwDgYD\nVR0PAQH/BAQDAgGGMBEGCWCGSAGG+EIBAQQEAwIABzAeBglghkgBhvhCAQ0EERYP\neGNhIGNlcnRpZmljYXRlMA0GCSqGSIb3DQEBCwUAA4ICAQBbHLEVyg4f9uEujroc\n31UVyDRLMdPgEPLjOenSBCBmH0N81whDmxNI/7JAAB6J14WMX8OLF0HkZnb7G77W\nvDhy1aFvQFbXHBz3/zUO9Mw9J4L2XEW6ond3Nsh1m2oXeBde3R3ANxuIzHqZDlP9\n6YrRcHjnf4+1/5AKDJAvJD+gFb5YnYUKH2iSvHUvG17xcZx98Rf2eo8LealG4JqH\nJh4sKRy0VjDQD7jXSCbweTHEb8wz+6OfNGrIo+BhTFP5vPcwE4nlJwYBoaOJ5cVa\n7gdQJ7WkLSxvwHxuxzvSVK73u3jl3I9SqTrbMLG/jeJyV0P8EvdljOaGnCtQVRwC\nzM4ptXUvKhKOHy7/nyTF/Bc35ZwwL/2xWvNK1+NibgE/6CFxupwWpdmxQbVVuoQ3\n2tUil9ty0yC6m5GKE8+t1lrZuxyA+b/TBnYNO5xo8UEMbkpxaNYSwmw+f/loxXP/\nM7sIBcLvy2ugHEBxwd9o/kLXeXT2DaRvxPjp4yk8MpJRpNmz3aB5HJwaUnaRLVo5\nZ3XWWXmjMGZ6/m0AAoDbDz/pXtOoJZT8BJdD1DuDdszVsQnLVn4B/LtIXL6FbXsF\nzfv6ERP9a5gpKUZ+4NjgrnlGtdccNZpwyWF0IXcvaq3b8hXIRO4hMjzHeHfzJN4t\njX1vlY35Ofonc4+6dRVamBiF9A==\n-----END CERTIFICATE-----\n", + "-----BEGIN CERTIFICATE-----\nMIIFcDCCA1igAwIBAgIIevLTTxOMoZgwDQYJKoZIhvcNAQELBQAwKzEXMBUGA1UE\nCxMOYWNtZTJjZXJ0aWZpZXIxEDAOBgNVBAMTB3Jvb3QtY2EwHhcNMjAwNTI3MDAw\nMDAwWhcNMzAwNTI2MjM1OTU5WjArMRcwFQYDVQQLEw5hY21lMmNlcnRpZmllcjEQ\nMA4GA1UEAxMHcm9vdC1jYTCCAiIwDQYJKoZIhvcNAQEBBQADggIPADCCAgoCggIB\nAJy4UZHdZgYt64k/rFamoC676tYvtabeuiqVw1c6oVZI897cFLG6BYwyr2Eaj7tF\nrqTJDeMN4vZSudLsmLDq6m8KwX/riPzUTIlcjM5aIMANZr9rLEs3NWtcivolB5aQ\n1slhdVitUPLuxsFnYeQTyxFyP7lng9M/Z403KLG8phdmKjM0vJkaj4OuKOXf3UsW\nqWQYyRl/ms07xVj02uq08LkoeO+jtQisvyVXURdaCceZtyK/ZBQ7NFCsbK112cVR\n1e2aJol7NJAA6Wm6iBzAdkAA2l3kh40SLoEbaiaVMixLN2vilIZOOAoDXX4+T6ir\n+KnDVSJ2yu5c/OJMwuXwHrh7Lgg1vsFR5TNehknhjUuWOUO+0TkKPg2A7KTg72OZ\n2mOcLZIbxzr1P5RRvdmLQLPrTF2EJvpQPNmbXqN3ZVWEvfHTjkkTFY/dsOTvFTgS\nri15zYKch8votcU7z+BQhgmMtwO2JhPMmZ6ABd9skI7ijWpwOltAhxtdoBO6T6CB\nCrE2yXc6V/PyyAKcFglNmIght5oXsnE+ub/dtx8f9Iea/xNPdo5aGy8fdaitolDK\n16kd3Kb7OE4HMHIwOxxF1BEAqerxxhbLMRBr8hRSZI5cvLzWLvpAQ5zuhjD6V3b9\nBYFd4ujAu3zl3mbzdbYjFoGOX6aBZaGDxlc4O2W7HxntAgMBAAGjgZcwgZQwDwYD\nVR0TAQH/BAUwAwEB/zAdBgNVHQ4EFgQUDGVvuTFYZtEAkz3af9wRKDDvAswwHwYD\nVR0jBBgwFoAUDGVvuTFYZtEAkz3af9wRKDDvAswwDgYDVR0PAQH/BAQDAgGGMBEG\nCWCGSAGG+EIBAQQEAwIABzAeBglghkgBhvhCAQ0EERYPeGNhIGNlcnRpZmljYXRl\nMA0GCSqGSIb3DQEBCwUAA4ICAQAjko7dX+iCgT+m3Iy1Vg6j7MRevPAzq1lqHRRN\nNdt2ct530pIut7Fv5V2xYk35ka+i/G+XyOvTXa9vAUKiBtiRnUPsXu4UcS7CcrCX\nEzHx4eOtHnp5wDhO0Fx5/OUZTaP+L7Pd1GD/j953ibx5bMa/M9Rj+S486nst57tu\nDRmEAavFDiMd6L3jH4YSckjmIH2uSeDIaRa9k6ag077XmWhvVYQ9tuR7RGbSuuV3\nFc6pqcFbbWpoLhNRcFc+hbUKOsKl2cP+QEKP/H2s3WMllqgAKKZeO+1KOsGo1CDs\n475bIXyCBpFbH2HOPatmu3yZRQ9fj9ta9EW46n33DFRNLinFWa4WJs4yLVP1juge\n2TCOyA1t61iy++RRXSG3e7NFYrEZuCht1EdDAdzIUY89m9NCPwoDYS4CahgnfkkO\n7YQe6f6yqK6isyf8ZFcp1uF58eERDiF/FDqS8nLmCdURuI56DDoNvDpig5J/9RNW\nG8vEvt2p7QrjeZ3EAatx5JuYty/NKTHZwJWk51CgzEgzDwzE2JIiqeldtL5d0Sl6\neVuv0G04BEyuXxEWpgVVzBS4qEFIBSnTJzgu1PXmId3yLvg2Nr8NKvwyZmN5xKFp\n0A9BWo15zW1PXDaD+l39oTYD7agjXkzTAjYIcfNJ7ATIYFD0xAvNAOf70s7aNupF\nfvkG2Q==\n-----END CERTIFICATE-----\n", + ] + self.assertEqual(result, self.pkcs7_to_pem(self.logger, file_content, "list")) + + def test_518_pkcs7_to_pem(self): + """test pkcs7 to pem output list""" + with open(self.dir_path + "/ca/certs.p7b", "r") as fso: + file_content = fso.read() + result = None + self.assertEqual( + result, self.pkcs7_to_pem(self.logger, file_content, "unknown") + ) + + def test_519_pkcs7_to_pem(self): + """test pkcs7 to pem output list""" + + file_content = base64.b64decode( + "MIIK9AYJKoZIhvcNAQcCoIIK5TCCCuECAQExADALBgkqhkiG9w0BBwGgggrHMIIFTzCCAzegAwIBAgIIAzHyhSyrXfMwDQYJKoZIhvcNAQELBQAwKzEXMBUGA1UECxMOYWNtZTJjZXJ0aWZpZXIxEDAOBgNVBAMTB3Jvb3QtY2EwHhcNMjAwNTI3MTM1NDAwWhcNMzAwNTI2MjM1OTAwWjAqMRcwFQYDVQQLEw5hY21lMmNlcnRpZmllcjEPMA0GA1UEAxMGc3ViLWNhMIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEAxXHaGZsolXe+PBdUryngHP9VbBC1mehqeTtYI+hqsqGNH7q9a7bSrxMwFuF1kYL8jqqxkJdtl0L94xcxJg/ZdMx7Nt0vGI+BaAuTpEpUEHeN4tqS6NhB/m/0LGkAELc/qkzmoO4B1FDwEEj/3IXtZcupqG80oDt7jWSGXdtF7NTjzcumznMeRXidCdhxRxT//WrsChaytXo0xWZ56oeNwd6x6Dr8/39PBOWtj4fldyDcg+Q+alci2tx9pxmu2bCVXcB9ftCLKhDk2WEHE88bgKSp7fV2RCmq9po+Tx8JJ7qecLunUsK/F0XN4kpoQLm9hcymqchnMSncSiyin1dQHGHWgXDtBDdq6A2Z6rx26Qk5H9HTYvcNSe1YwFEDoGLBZQjbCPWiaqoaH4agBQTclPvrrSCRaVmhUSO+pBtSXDkmN4t3MDZxfgRkp8ixwkB15Y5f0LTpCyAJsdQDw8+Ea0aDqO30eskh4CErnm9+Fejd9Ew2cwpdwfBXzVSbYilMGueQihZHvJmVRxAwU69aO2Qs8B0tQ60CfWKVlmWPiakrvYYlPp0FBsM61G6LZEN8hH2CKnS8hHv5IWEXZvp0Pk8V3P5h6bWN0Tl+x/V1Prt7Wp8NoiPETE8XyDDxe6dmKxztWBH/mTsJyMGb6ZiUoXdPU9TFUKqHxTRLHaxfsPsCAwEAAaN4MHYwEgYDVR0TAQH/BAgwBgEB/wIBATAdBgNVHQ4EFgQUv96OjgYiIqutQ8jd1E+oq0hBPtUwDgYDVR0PAQH/BAQDAgGGMBEGCWCGSAGG+EIBAQQEAwIABzAeBglghkgBhvhCAQ0EERYPeGNhIGNlcnRpZmljYXRlMA0GCSqGSIb3DQEBCwUAA4ICAQBbHLEVyg4f9uEujroc31UVyDRLMdPgEPLjOenSBCBmH0N81whDmxNI/7JAAB6J14WMX8OLF0HkZnb7G77WvDhy1aFvQFbXHBz3/zUO9Mw9J4L2XEW6ond3Nsh1m2oXeBde3R3ANxuIzHqZDlP96YrRcHjnf4+1/5AKDJAvJD+gFb5YnYUKH2iSvHUvG17xcZx98Rf2eo8LealG4JqHJh4sKRy0VjDQD7jXSCbweTHEb8wz+6OfNGrIo+BhTFP5vPcwE4nlJwYBoaOJ5cVa7gdQJ7WkLSxvwHxuxzvSVK73u3jl3I9SqTrbMLG/jeJyV0P8EvdljOaGnCtQVRwCzM4ptXUvKhKOHy7/nyTF/Bc35ZwwL/2xWvNK1+NibgE/6CFxupwWpdmxQbVVuoQ32tUil9ty0yC6m5GKE8+t1lrZuxyA+b/TBnYNO5xo8UEMbkpxaNYSwmw+f/loxXP/M7sIBcLvy2ugHEBxwd9o/kLXeXT2DaRvxPjp4yk8MpJRpNmz3aB5HJwaUnaRLVo5Z3XWWXmjMGZ6/m0AAoDbDz/pXtOoJZT8BJdD1DuDdszVsQnLVn4B/LtIXL6FbXsFzfv6ERP9a5gpKUZ+4NjgrnlGtdccNZpwyWF0IXcvaq3b8hXIRO4hMjzHeHfzJN4tjX1vlY35Ofonc4+6dRVamBiF9DCCBXAwggNYoAMCAQICCHry008TjKGYMA0GCSqGSIb3DQEBCwUAMCsxFzAVBgNVBAsTDmFjbWUyY2VydGlmaWVyMRAwDgYDVQQDEwdyb290LWNhMB4XDTIwMDUyNzAwMDAwMFoXDTMwMDUyNjIzNTk1OVowKzEXMBUGA1UECxMOYWNtZTJjZXJ0aWZpZXIxEDAOBgNVBAMTB3Jvb3QtY2EwggIiMA0GCSqGSIb3DQEBAQUAA4ICDwAwggIKAoICAQCcuFGR3WYGLeuJP6xWpqAuu+rWL7Wm3roqlcNXOqFWSPPe3BSxugWMMq9hGo+7Ra6kyQ3jDeL2UrnS7Jiw6upvCsF/64j81EyJXIzOWiDADWa/ayxLNzVrXIr6JQeWkNbJYXVYrVDy7sbBZ2HkE8sRcj+5Z4PTP2eNNyixvKYXZiozNLyZGo+Drijl391LFqlkGMkZf5rNO8VY9NrqtPC5KHjvo7UIrL8lV1EXWgnHmbciv2QUOzRQrGytddnFUdXtmiaJezSQAOlpuogcwHZAANpd5IeNEi6BG2omlTIsSzdr4pSGTjgKA11+Pk+oq/ipw1UidsruXPziTMLl8B64ey4INb7BUeUzXoZJ4Y1LljlDvtE5Cj4NgOyk4O9jmdpjnC2SG8c69T+UUb3Zi0Cz60xdhCb6UDzZm16jd2VVhL3x045JExWP3bDk7xU4Eq4tec2CnIfL6LXFO8/gUIYJjLcDtiYTzJmegAXfbJCO4o1qcDpbQIcbXaATuk+ggQqxNsl3Olfz8sgCnBYJTZiIIbeaF7JxPrm/3bcfH/SHmv8TT3aOWhsvH3WoraJQytepHdym+zhOBzByMDscRdQRAKnq8cYWyzEQa/IUUmSOXLy81i76QEOc7oYw+ld2/QWBXeLowLt85d5m83W2IxaBjl+mgWWhg8ZXODtlux8Z7QIDAQABo4GXMIGUMA8GA1UdEwEB/wQFMAMBAf8wHQYDVR0OBBYEFAxlb7kxWGbRAJM92n/cESgw7wLMMB8GA1UdIwQYMBaAFAxlb7kxWGbRAJM92n/cESgw7wLMMA4GA1UdDwEB/wQEAwIBhjARBglghkgBhvhCAQEEBAMCAAcwHgYJYIZIAYb4QgENBBEWD3hjYSBjZXJ0aWZpY2F0ZTANBgkqhkiG9w0BAQsFAAOCAgEAI5KO3V/ogoE/ptyMtVYOo+zEXrzwM6tZah0UTTXbdnLed9KSLrexb+VdsWJN+ZGvovxvl8jr012vbwFCogbYkZ1D7F7uFHEuwnKwlxMx8eHjrR56ecA4TtBcefzlGU2j/i+z3dRg/4/ed4m8eWzGvzPUY/kuPOp7Lee7bg0ZhAGrxQ4jHei94x+GEnJI5iB9rkngyGkWvZOmoNO+15lob1WEPbbke0Rm0rrldxXOqanBW21qaC4TUXBXPoW1CjrCpdnD/kBCj/x9rN1jJZaoACimXjvtSjrBqNQg7OO+WyF8ggaRWx9hzj2rZrt8mUUPX4/bWvRFuOp99wxUTS4pxVmuFibOMi1T9Y7oHtkwjsgNbetYsvvkUV0ht3uzRWKxGbgobdRHQwHcyFGPPZvTQj8KA2EuAmoYJ35JDu2EHun+sqiuorMn/GRXKdbhefHhEQ4hfxQ6kvJy5gnVEbiOegw6Dbw6YoOSf/UTVhvLxL7dqe0K43mdxAGrceSbmLcvzSkx2cCVpOdQoMxIMw8MxNiSIqnpXbS+XdEpenlbr9BtOARMrl8RFqYFVcwUuKhBSAUp0yc4LtT15iHd8i74Nja/DSr8MmZjecShadAPQVqNec1tT1w2g/pd/aE2A+2oI15M0wI2CHHzSewEyGBQ9MQLzQDn+9LO2jbqRX75BtmhADEA" + ) + result = [ + "-----BEGIN CERTIFICATE-----\nMIIFTzCCAzegAwIBAgIIAzHyhSyrXfMwDQYJKoZIhvcNAQELBQAwKzEXMBUGA1UE\nCxMOYWNtZTJjZXJ0aWZpZXIxEDAOBgNVBAMTB3Jvb3QtY2EwHhcNMjAwNTI3MTM1\nNDAwWhcNMzAwNTI2MjM1OTAwWjAqMRcwFQYDVQQLEw5hY21lMmNlcnRpZmllcjEP\nMA0GA1UEAxMGc3ViLWNhMIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEA\nxXHaGZsolXe+PBdUryngHP9VbBC1mehqeTtYI+hqsqGNH7q9a7bSrxMwFuF1kYL8\njqqxkJdtl0L94xcxJg/ZdMx7Nt0vGI+BaAuTpEpUEHeN4tqS6NhB/m/0LGkAELc/\nqkzmoO4B1FDwEEj/3IXtZcupqG80oDt7jWSGXdtF7NTjzcumznMeRXidCdhxRxT/\n/WrsChaytXo0xWZ56oeNwd6x6Dr8/39PBOWtj4fldyDcg+Q+alci2tx9pxmu2bCV\nXcB9ftCLKhDk2WEHE88bgKSp7fV2RCmq9po+Tx8JJ7qecLunUsK/F0XN4kpoQLm9\nhcymqchnMSncSiyin1dQHGHWgXDtBDdq6A2Z6rx26Qk5H9HTYvcNSe1YwFEDoGLB\nZQjbCPWiaqoaH4agBQTclPvrrSCRaVmhUSO+pBtSXDkmN4t3MDZxfgRkp8ixwkB1\n5Y5f0LTpCyAJsdQDw8+Ea0aDqO30eskh4CErnm9+Fejd9Ew2cwpdwfBXzVSbYilM\nGueQihZHvJmVRxAwU69aO2Qs8B0tQ60CfWKVlmWPiakrvYYlPp0FBsM61G6LZEN8\nhH2CKnS8hHv5IWEXZvp0Pk8V3P5h6bWN0Tl+x/V1Prt7Wp8NoiPETE8XyDDxe6dm\nKxztWBH/mTsJyMGb6ZiUoXdPU9TFUKqHxTRLHaxfsPsCAwEAAaN4MHYwEgYDVR0T\nAQH/BAgwBgEB/wIBATAdBgNVHQ4EFgQUv96OjgYiIqutQ8jd1E+oq0hBPtUwDgYD\nVR0PAQH/BAQDAgGGMBEGCWCGSAGG+EIBAQQEAwIABzAeBglghkgBhvhCAQ0EERYP\neGNhIGNlcnRpZmljYXRlMA0GCSqGSIb3DQEBCwUAA4ICAQBbHLEVyg4f9uEujroc\n31UVyDRLMdPgEPLjOenSBCBmH0N81whDmxNI/7JAAB6J14WMX8OLF0HkZnb7G77W\nvDhy1aFvQFbXHBz3/zUO9Mw9J4L2XEW6ond3Nsh1m2oXeBde3R3ANxuIzHqZDlP9\n6YrRcHjnf4+1/5AKDJAvJD+gFb5YnYUKH2iSvHUvG17xcZx98Rf2eo8LealG4JqH\nJh4sKRy0VjDQD7jXSCbweTHEb8wz+6OfNGrIo+BhTFP5vPcwE4nlJwYBoaOJ5cVa\n7gdQJ7WkLSxvwHxuxzvSVK73u3jl3I9SqTrbMLG/jeJyV0P8EvdljOaGnCtQVRwC\nzM4ptXUvKhKOHy7/nyTF/Bc35ZwwL/2xWvNK1+NibgE/6CFxupwWpdmxQbVVuoQ3\n2tUil9ty0yC6m5GKE8+t1lrZuxyA+b/TBnYNO5xo8UEMbkpxaNYSwmw+f/loxXP/\nM7sIBcLvy2ugHEBxwd9o/kLXeXT2DaRvxPjp4yk8MpJRpNmz3aB5HJwaUnaRLVo5\nZ3XWWXmjMGZ6/m0AAoDbDz/pXtOoJZT8BJdD1DuDdszVsQnLVn4B/LtIXL6FbXsF\nzfv6ERP9a5gpKUZ+4NjgrnlGtdccNZpwyWF0IXcvaq3b8hXIRO4hMjzHeHfzJN4t\njX1vlY35Ofonc4+6dRVamBiF9A==\n-----END CERTIFICATE-----\n", + "-----BEGIN CERTIFICATE-----\nMIIFcDCCA1igAwIBAgIIevLTTxOMoZgwDQYJKoZIhvcNAQELBQAwKzEXMBUGA1UE\nCxMOYWNtZTJjZXJ0aWZpZXIxEDAOBgNVBAMTB3Jvb3QtY2EwHhcNMjAwNTI3MDAw\nMDAwWhcNMzAwNTI2MjM1OTU5WjArMRcwFQYDVQQLEw5hY21lMmNlcnRpZmllcjEQ\nMA4GA1UEAxMHcm9vdC1jYTCCAiIwDQYJKoZIhvcNAQEBBQADggIPADCCAgoCggIB\nAJy4UZHdZgYt64k/rFamoC676tYvtabeuiqVw1c6oVZI897cFLG6BYwyr2Eaj7tF\nrqTJDeMN4vZSudLsmLDq6m8KwX/riPzUTIlcjM5aIMANZr9rLEs3NWtcivolB5aQ\n1slhdVitUPLuxsFnYeQTyxFyP7lng9M/Z403KLG8phdmKjM0vJkaj4OuKOXf3UsW\nqWQYyRl/ms07xVj02uq08LkoeO+jtQisvyVXURdaCceZtyK/ZBQ7NFCsbK112cVR\n1e2aJol7NJAA6Wm6iBzAdkAA2l3kh40SLoEbaiaVMixLN2vilIZOOAoDXX4+T6ir\n+KnDVSJ2yu5c/OJMwuXwHrh7Lgg1vsFR5TNehknhjUuWOUO+0TkKPg2A7KTg72OZ\n2mOcLZIbxzr1P5RRvdmLQLPrTF2EJvpQPNmbXqN3ZVWEvfHTjkkTFY/dsOTvFTgS\nri15zYKch8votcU7z+BQhgmMtwO2JhPMmZ6ABd9skI7ijWpwOltAhxtdoBO6T6CB\nCrE2yXc6V/PyyAKcFglNmIght5oXsnE+ub/dtx8f9Iea/xNPdo5aGy8fdaitolDK\n16kd3Kb7OE4HMHIwOxxF1BEAqerxxhbLMRBr8hRSZI5cvLzWLvpAQ5zuhjD6V3b9\nBYFd4ujAu3zl3mbzdbYjFoGOX6aBZaGDxlc4O2W7HxntAgMBAAGjgZcwgZQwDwYD\nVR0TAQH/BAUwAwEB/zAdBgNVHQ4EFgQUDGVvuTFYZtEAkz3af9wRKDDvAswwHwYD\nVR0jBBgwFoAUDGVvuTFYZtEAkz3af9wRKDDvAswwDgYDVR0PAQH/BAQDAgGGMBEG\nCWCGSAGG+EIBAQQEAwIABzAeBglghkgBhvhCAQ0EERYPeGNhIGNlcnRpZmljYXRl\nMA0GCSqGSIb3DQEBCwUAA4ICAQAjko7dX+iCgT+m3Iy1Vg6j7MRevPAzq1lqHRRN\nNdt2ct530pIut7Fv5V2xYk35ka+i/G+XyOvTXa9vAUKiBtiRnUPsXu4UcS7CcrCX\nEzHx4eOtHnp5wDhO0Fx5/OUZTaP+L7Pd1GD/j953ibx5bMa/M9Rj+S486nst57tu\nDRmEAavFDiMd6L3jH4YSckjmIH2uSeDIaRa9k6ag077XmWhvVYQ9tuR7RGbSuuV3\nFc6pqcFbbWpoLhNRcFc+hbUKOsKl2cP+QEKP/H2s3WMllqgAKKZeO+1KOsGo1CDs\n475bIXyCBpFbH2HOPatmu3yZRQ9fj9ta9EW46n33DFRNLinFWa4WJs4yLVP1juge\n2TCOyA1t61iy++RRXSG3e7NFYrEZuCht1EdDAdzIUY89m9NCPwoDYS4CahgnfkkO\n7YQe6f6yqK6isyf8ZFcp1uF58eERDiF/FDqS8nLmCdURuI56DDoNvDpig5J/9RNW\nG8vEvt2p7QrjeZ3EAatx5JuYty/NKTHZwJWk51CgzEgzDwzE2JIiqeldtL5d0Sl6\neVuv0G04BEyuXxEWpgVVzBS4qEFIBSnTJzgu1PXmId3yLvg2Nr8NKvwyZmN5xKFp\n0A9BWo15zW1PXDaD+l39oTYD7agjXkzTAjYIcfNJ7ATIYFD0xAvNAOf70s7aNupF\nfvkG2Q==\n-----END CERTIFICATE-----\n", + ] + self.assertEqual(result, self.pkcs7_to_pem(self.logger, file_content, "list")) + + def test_520_pkcs7_to_pem_tag_replacement_logs_error(self): + """Test pkcs7_to_pem logs error on tag replacement strategy (line 426)""" + from acme_srv.helpers.certificates import pkcs7_to_pem + + logger = Mock() + # First strategy fails, second succeeds (tag replacement) + with patch( + "acme_srv.helpers.certificates.load_pem_pkcs7_certificates", + side_effect=[Exception("fail1"), [Mock()]], + ): + with patch( + "acme_srv.helpers.certificates.convert_string_to_byte", + side_effect=lambda x: x, + ): + result = pkcs7_to_pem(logger, "dummy", outform="list") + # Should log error for tag replacement + logger.error.assert_any_call( + "PKCS7-TAG not found, updated content successfully" + ) + self.assertIsInstance(result, list) + + def test_521_pkcs7_to_pem_all_strategies_fail(self): + """Test pkcs7_to_pem logs error and raises if all strategies fail (lines 436-437)""" + from acme_srv.helpers.certificates import pkcs7_to_pem + + logger = Mock() + # All strategies fail + with patch( + "acme_srv.helpers.certificates.load_pem_pkcs7_certificates", + side_effect=Exception("fail1"), + ), patch( + "acme_srv.helpers.certificates.convert_string_to_byte", + side_effect=lambda x: x, + ), patch( + "acme_srv.helpers.certificates.load_der_pkcs7_certificates", + side_effect=Exception("fail2"), + ): + with self.assertRaises(Exception) as cm: + pkcs7_to_pem(logger, "dummy", outform="list") + logger.error.assert_any_call( + "All PKCS7 loading strategies failed. Last error: %s", cm.exception + ) + + def test_522_config_dryrun_load_not_set(self): + """Test config_dryrun_load with valid 'true' value.""" + config_dic = configparser.ConfigParser() + config_dic["DEFAULT"] = {"foo": "bar"} + self.assertEqual( + (False, None), self.config_dryrun_load(self.logger, config_dic) + ) + + def test_523_config_dryrun_load_true(self): + """Test config_dryrun_load with valid 'true' value.""" + config_dic = configparser.ConfigParser() + config_dic["DEFAULT"] = {"dryrun": "True"} + true_list = ["true", "True", "TRUE"] + for val in true_list: + config_dic["DEFAULT"]["dryrun"] = val + self.assertEqual( + (True, None), self.config_dryrun_load(self.logger, config_dic) + ) + + def test_524_config_dryrun_load_false(self): + """Test config_dryrun_load with valid 'false' value.""" + config_dic = configparser.ConfigParser() + config_dic["DEFAULT"] = {"dryrun": "False"} + false_list = ["false", "False", "FALSE"] + for val in false_list: + config_dic["DEFAULT"]["dryrun"] = val + self.assertEqual( + (False, None), self.config_dryrun_load(self.logger, config_dic) + ) + + def test_525_config_dryrun_load_profile(self): + """Test config_dryrun_load with invalid value.""" + config_dic = configparser.ConfigParser() + profile_list = ["profile", "Profile", "PROFILE"] + for val in profile_list: + config_dic["DEFAULT"]["dryrun"] = val + config_dic["DEFAULT"]["dryrun_profile"] = "custom_dryrun_profile" + self.assertEqual( + (False, "custom_dryrun_profile"), + self.config_dryrun_load(self.logger, config_dic), + ) + + def test_526_config_dryrun_load_profile_no_profilename(self): + """Test config_dryrun_load with invalid value and no dryrun_profile set.""" + config_dic = configparser.ConfigParser() + config_dic["DEFAULT"]["dryrun"] = "profile" + with self.assertLogs(self.logger, level="WARNING") as lcm: + self.assertEqual( + (False, None), self.config_dryrun_load(self.logger, config_dic) + ) + self.assertIn( + "WARNING:test_a2c:Dryrun profile name not set in configuration, please set dryrun_profile parameter", + lcm.output[0], + ) + if __name__ == "__main__": unittest.main() diff --git a/test/test_wsgi_acme2certifier.py b/test/test_wsgi_acme2certifier.py index 38f07fb7b..b19d409df 100644 --- a/test/test_wsgi_acme2certifier.py +++ b/test/test_wsgi_acme2certifier.py @@ -1144,6 +1144,204 @@ def test_067_renewalinfot( self.assertTrue(mock_header.called) self.assertTrue(mock_body.called) + def test_068_get_path_with_prefix(self): + from examples.acme2certifier_wsgi import get_path_with_prefix + + # No Directory/url_prefix in config + environ = {"PATH_INFO": "/foo/bar"} + config = {} + self.assertEqual(get_path_with_prefix(environ, config), "foo/bar") + + # Directory/url_prefix present, prefix matches + environ = {"PATH_INFO": "/api/v1/resource"} + config = {"Directory": {"url_prefix": "api/v1"}} + self.assertEqual(get_path_with_prefix(environ, config), "resource") + + # Directory/url_prefix present, prefix does not match + environ = {"PATH_INFO": "/other/resource"} + config = {"Directory": {"url_prefix": "api/v1"}} + self.assertEqual(get_path_with_prefix(environ, config), "other/resource") + + # Directory/url_prefix is empty string + environ = {"PATH_INFO": "/foo/bar"} + config = {"Directory": {"url_prefix": ""}} + self.assertEqual(get_path_with_prefix(environ, config), "foo/bar") + + # PATH_INFO is empty + environ = {"PATH_INFO": ""} + config = {"Directory": {"url_prefix": "api/v1"}} + self.assertEqual(get_path_with_prefix(environ, config), "") + + # PATH_INFO does not start with slash + environ = {"PATH_INFO": "foo/bar"} + config = {"Directory": {"url_prefix": "foo"}} + self.assertEqual(get_path_with_prefix(environ, config), "bar") + + # PATH_INFO is just the prefix + environ = {"PATH_INFO": "/api/v1/"} + config = {"Directory": {"url_prefix": "api/v1"}} + self.assertEqual(get_path_with_prefix(environ, config), "") + + # PATH_INFO is None + environ = {} + config = {"Directory": {"url_prefix": "api/v1"}} + self.assertEqual(get_path_with_prefix(environ, config), "") + + # Directory key missing + environ = {"PATH_INFO": "/foo/bar"} + config = {} + self.assertEqual(get_path_with_prefix(environ, config), "foo/bar") + + # url_prefix key missing + environ = {"PATH_INFO": "/foo/bar"} + config = {"Directory": {}} + self.assertEqual(get_path_with_prefix(environ, config), "foo/bar") + + # url_prefix with trailing slash + environ = {"PATH_INFO": "/api/v1/resource"} + config = {"Directory": {"url_prefix": "api/v1/"}} + self.assertEqual(get_path_with_prefix(environ, config), "resource") + + # PATH_INFO with multiple leading slashes + environ = {"PATH_INFO": "///api/v1/resource"} + config = {"Directory": {"url_prefix": "api/v1"}} + self.assertEqual(get_path_with_prefix(environ, config), "resource") + + # PATH_INFO with only slash + environ = {"PATH_INFO": "/"} + config = {"Directory": {"url_prefix": "api/v1"}} + self.assertEqual(get_path_with_prefix(environ, config), "") + + # PATH_INFO to check fix for #313 + environ = {"PATH_INFO": "/directory"} + config = {"Directory": {"url_prefix": "/dfoo"}} + self.assertEqual(get_path_with_prefix(environ, config), "directory") + + # test for cornercase where PATH_INFO starts with url_prefix but is not actually a match + environ = {"PATH_INFO": "api/v10/resource"} + config = {"Directory": {"url_prefix": "api/v1"}} + self.assertEqual(get_path_with_prefix(environ, config), "api/v10/resource") + + # test for cornercase where PATH_INFO is None and url_prefix is present + environ = {"PATH_INFO": None} + config = {"Directory": {"url_prefix": "api/v1"}} + self.assertEqual(get_path_with_prefix(environ, config), "") + + def test_069_application_url_match(self): + """Test application() returns correct callback for matching URL pattern.""" + # Patch URLS and callback + import examples.acme2certifier_wsgi as wsgi_mod + + called = {} + + def fake_callback(environ, start_response): + called["called"] = True + return ["matched"] + + # Save and patch URLS + orig_URLS = wsgi_mod.URLS[:] + wsgi_mod.URLS.clear() + wsgi_mod.URLS.append((r"^foo", fake_callback)) + try: + environ = {"PATH_INFO": "foo"} + start_response = lambda *a, **kw: None + result = wsgi_mod.application(environ, start_response) + self.assertEqual(result, ["matched"]) + self.assertTrue(called.get("called")) + finally: + wsgi_mod.URLS[:] = orig_URLS + + def test_070_application_url_no_match(self): + """Test application() calls not_found if no URL matches.""" + import examples.acme2certifier_wsgi as wsgi_mod + + # Patch URLS to empty and patch not_found + orig_URLS = wsgi_mod.URLS[:] + orig_not_found = wsgi_mod.not_found + wsgi_mod.URLS.clear() + called = {} + + def fake_not_found(environ, start_response): + called["not_found"] = True + return ["notfound"] + + wsgi_mod.not_found = fake_not_found + try: + environ = {"PATH_INFO": "doesnotmatch"} + start_response = lambda *a, **kw: None + result = wsgi_mod.application(environ, start_response) + self.assertEqual(result, ["notfound"]) + self.assertTrue(called.get("not_found")) + finally: + wsgi_mod.not_found = orig_not_found + wsgi_mod.URLS[:] = orig_URLS + + def test_071_application_dynamic_challenge_url(self): + """Test application() dynamically adds challenge URL pattern if config present.""" + import examples.acme2certifier_wsgi as wsgi_mod + + # Patch CONFIG and URLS + orig_CONFIG = dict(wsgi_mod.CONFIG) + orig_URLS = wsgi_mod.URLS[:] + orig_acmechallenge_serve = wsgi_mod.acmechallenge_serve + wsgi_mod.CONFIG["CAhandler"] = {"acme_url": "something"} + wsgi_mod.URLS.clear() + called = {} + + def fake_callback(environ, start_response): + called["challenge"] = True + return ["challenge"] + + wsgi_mod.acmechallenge_serve = fake_callback + try: + environ = {"PATH_INFO": ".well-known/acme-challenge/abc"} + start_response = lambda *a, **kw: None + result = wsgi_mod.application(environ, start_response) + self.assertEqual(result, ["challenge"]) + self.assertTrue(called.get("challenge")) + finally: + wsgi_mod.acmechallenge_serve = orig_acmechallenge_serve + wsgi_mod.URLS[:] = orig_URLS + wsgi_mod.CONFIG.clear() + wsgi_mod.CONFIG.update(orig_CONFIG) + + def test_072_redirect_with_url_prefix(self): + """Test redirect() covers the 'if URL_PREFIX:' branch (line 527).""" + import examples.acme2certifier_wsgi as wsgi_mod + + # Save and patch URL_PREFIX + orig_url_prefix = wsgi_mod.URL_PREFIX + wsgi_mod.URL_PREFIX = "/my-prefix" + called = {} + + def fake_start_response(status, headers): + called["status"] = status + called["headers"] = headers + + try: + result = wsgi_mod.redirect({}, fake_start_response) + self.assertEqual(result, []) + self.assertEqual(called["status"], "302 Found") + self.assertIn(("Location", "/my-prefix/directory"), called["headers"]) + finally: + wsgi_mod.URL_PREFIX = orig_url_prefix + + def test_073_get_handler_cls_address_string(self): + """Test get_handler_cls() returns handler with correct address_string().""" + handler_cls = self.get_handler_cls() + # Create a dummy instance with client_address attribute + class DummyRequest: + pass + + dummy = DummyRequest() + dummy.client_address = ("1.2.3.4", 12345) + # The handler expects self.client_address, so we set it + # The handler may expect more, but for address_string() only client_address is needed + handler = handler_cls.__new__(handler_cls) + handler.client_address = dummy.client_address + result = handler.address_string() + self.assertEqual(result, "1.2.3.4") + if __name__ == "__main__":