Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions acme_srv/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
certid_asn1_get,
certid_hex_get,
certid_check,
pkcs7_to_pem,
)

# CSR operations
Expand Down Expand Up @@ -125,6 +126,7 @@
config_allowed_domainlist_load,
config_async_mode_load,
config_proxy_load,
config_dryrun_load,
load_config,
header_info_jsonify,
header_info_lookup,
Expand Down
58 changes: 58 additions & 0 deletions acme_srv/helpers/certificates.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.serialization.pkcs7 import (
load_pem_pkcs7_certificates,
load_der_pkcs7_certificates,
)
from cryptography.x509 import load_pem_x509_certificate, ocsp
from OpenSSL import crypto
from .encoding import (
Expand Down Expand Up @@ -394,3 +398,57 @@ def certid_check(

logger.debug("Helper.certid_check() ended with: %s", result)
return result


def pkcs7_to_pem(logger, pkcs7_content: str, outform: str = "string") -> List[str]:
"""convert pkcs7 to pem"""
logger.debug("CAhandler._pkcs7_to_pem()")

# Define loading strategies in order of preference
loading_strategies = [
# Strategy 1: Load as PEM directly
lambda content: load_pem_pkcs7_certificates(convert_string_to_byte(content)),
# Strategy 2: Replace CERTIFICATE with PKCS7 tag and load as PEM
lambda content: load_pem_pkcs7_certificates(
convert_string_to_byte(content.replace("CERTIFICATE", "PKCS7"))
),
# Strategy 3: Load as DER
lambda content: load_der_pkcs7_certificates(content),
]

pkcs7_obj = None
last_error = None

for i, strategy in enumerate(loading_strategies):
try:
pkcs7_obj = strategy(pkcs7_content)
if i == 1: # Log only for the tag replacement strategy
logger.error("PKCS7-TAG not found, updated content successfully")
break
except Exception as err:
last_error = err
if i == 0:
logger.error("PKCS7-TAG not found updating content...")
elif i == 1:
logger.debug("CAhandler._pkcs7_to_pem(): load pem failed. Try der...")

if pkcs7_obj is None:
logger.error("All PKCS7 loading strategies failed. Last error: %s", last_error)
raise last_error

# Convert certificates to PEM format
cert_pem_list = [
convert_byte_to_string(cert.public_bytes(serialization.Encoding.PEM))
for cert in pkcs7_obj
]

# Define output format
output_formats = {
"string": lambda certs: "".join(certs),
"list": lambda certs: certs,
}

result = output_formats.get(outform, lambda _: None)(cert_pem_list)

logger.debug("Certificate._pkcs7_to_pem() ended")
return result
24 changes: 24 additions & 0 deletions acme_srv/helpers/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,3 +351,27 @@ def client_parameter_validate(
error,
)
return value_to_set, error


def config_dryrun_load(logger: logging.Logger, config_dic: Dict[str, str]):
"""load dryrun configuration"""
logger.debug("Helper.config_dryrun_load()")

dryrun = False
dryrun_profilename = None

if "DEFAULT" in config_dic and "dryrun" in config_dic["DEFAULT"]:
if config_dic["DEFAULT"]["dryrun"].lower() in ["true", "false"]:
dryrun = config_dic.getboolean("DEFAULT", "dryrun", fallback=False)
elif config_dic["DEFAULT"]["dryrun"].lower() == "profile":
if config_dic.get("DEFAULT", "dryrun_profile", fallback=None):
dryrun_profilename = config_dic["DEFAULT"]["dryrun_profile"]
else:
logger.warning(
"Dryrun profile name not set in configuration, please set dryrun_profile parameter"
)

logger.debug(
"Helper.config_dryrun_load() ended with: %s/%s", dryrun, dryrun_profilename
)
return dryrun, dryrun_profilename
25 changes: 21 additions & 4 deletions examples/acme2certifier_wsgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -552,17 +552,34 @@ def redirect(environ, start_response):
]


# Helper to extract path with prefix
def get_path_with_prefix(environ, config):
path = environ.get("PATH_INFO") or ""
# Collapse multiple leading slashes to one
while path.startswith("//"):
path = path[1:]
prefix = ""
if "Directory" in config and "url_prefix" in config["Directory"]:
prefix = str(config["Directory"]["url_prefix"]).strip("/")
if prefix:
path_ = path.lstrip("/")
if path_ == prefix:
return ""
if path_.startswith(prefix + "/"):
# Remove the prefix and any leading slashes after
return path_[len(prefix) :].lstrip("/")
return path_
return path.lstrip("/")


def application(environ, start_response):
"""The main WSGI application if nothing matches call the not_found function."""

# check if we need to activate the url pattern for challenge verification
if "CAhandler" in CONFIG and "acme_url" in CONFIG["CAhandler"]:
URLS.append((r"^.well-known/acme-challenge/", acmechallenge_serve))

prefix = "/"
if "Directory" in CONFIG and "url_prefix" in CONFIG["Directory"]:
prefix = CONFIG["Directory"]["url_prefix"] + "/"
path = environ.get("PATH_INFO", "").lstrip(prefix)
path = get_path_with_prefix(environ, CONFIG)

for regex, callback in URLS:
match = re.search(regex, path)
Expand Down
Loading
Loading