|
3 | 3 |
|
4 | 4 | from pycose.keys import CoseKey |
5 | 5 |
|
| 6 | +from typing import Any, Union |
6 | 7 | from cryptography import x509 |
7 | 8 | from cryptography.x509.oid import NameOID |
8 | 9 | from cryptography.x509 import Certificate |
9 | 10 | from cryptography.hazmat.primitives import hashes, serialization |
10 | 11 |
|
11 | | -from pymdoccbor import settings |
12 | | - |
13 | | -class MsoX509FabricInteface: |
| 12 | +def selfsigned_x509cert(cert_info: dict[str, Any], private_key: CoseKey, encoding: str = "DER") -> Union[Certificate, bytes]: |
14 | 13 | """ |
15 | | - MsoX509Fabric helper class to create a new mso |
| 14 | + Returns an X.509 certificate derived from the private key of the MSO Issuer |
| 15 | +
|
| 16 | + :param cert_info: dict[str, Any]: the certificate information, should contain at least one of the following: |
| 17 | + - country_name |
| 18 | + - state_or_province_name |
| 19 | + - locality_name |
| 20 | + - organization_name |
| 21 | + - common_name |
| 22 | + - not_valid_before |
| 23 | + - not_valid_after |
| 24 | + - san_url |
| 25 | + |
| 26 | + :param private_key: CoseKey: the private key to use for signing the certificate |
| 27 | + :param encoding: str: the encoding to use, default is DER |
| 28 | +
|
| 29 | + :return: Union[Certificate, bytes]: the X.509 certificate |
16 | 30 | """ |
17 | 31 |
|
18 | | - def __init__(self, private_key: CoseKey | None) -> None: |
19 | | - """ |
20 | | - Initialize the MsoX509Fabric object |
| 32 | + if not private_key: |
| 33 | + raise ValueError("private_key must be set") |
21 | 34 |
|
22 | | - :param private_key: str: the private key in COSE format |
23 | | - """ |
24 | | - self.private_key = private_key |
25 | | - |
26 | | - def selfsigned_x509cert(self, encoding: str = "DER") -> Union[Certificate, bytes]: |
27 | | - """ |
28 | | - Returns an X.509 certificate derived from the private key of the MSO Issuer |
| 35 | + ckey = COSEKey.from_bytes(private_key.encode()) |
| 36 | + |
| 37 | + name_attributes = [] |
| 38 | + if "country_name" in cert_info: |
| 39 | + name_attributes.append(x509.NameAttribute(NameOID.COUNTRY_NAME, cert_info["country_name"])) |
| 40 | + if "state_or_province_name" in cert_info: |
| 41 | + name_attributes.append(x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, cert_info["state_or_province_name"])) |
| 42 | + if "locality_name" in cert_info: |
| 43 | + name_attributes.append(x509.NameAttribute(NameOID.LOCALITY_NAME, cert_info["locality_name"])) |
| 44 | + if "organization_name" in cert_info: |
| 45 | + name_attributes.append(x509.NameAttribute(NameOID.ORGANIZATION_NAME, cert_info["organization_name"])) |
| 46 | + if "common_name" in cert_info: |
| 47 | + name_attributes.append(x509.NameAttribute(NameOID.COMMON_NAME, cert_info["common_name"])) |
29 | 48 |
|
30 | | - :param encoding: str: the encoding to use, default is DER |
| 49 | + subject = issuer = x509.Name(name_attributes) |
31 | 50 |
|
32 | | - :return: Union[Certificate, bytes]: the X.509 certificate |
33 | | - """ |
| 51 | + cert_builder = x509.CertificateBuilder().subject_name( |
| 52 | + subject |
| 53 | + ).issuer_name( |
| 54 | + issuer |
| 55 | + ).public_key( |
| 56 | + ckey.key.public_key() |
| 57 | + ).serial_number( |
| 58 | + x509.random_serial_number() |
| 59 | + ) |
| 60 | + |
| 61 | + if "not_valid_before" in cert_info: |
| 62 | + cert_builder = cert_builder.not_valid_before( |
| 63 | + cert_info["not_valid_before"] |
| 64 | + ) |
34 | 65 |
|
35 | | - if not self.private_key: |
36 | | - raise ValueError("private_key must be set") |
| 66 | + if "not_valid_after" in cert_info: |
| 67 | + cert_builder = cert_builder.not_valid_after( |
| 68 | + cert_info["not_valid_after"] |
| 69 | + ) |
37 | 70 |
|
38 | | - ckey = COSEKey.from_bytes(self.private_key.encode()) |
39 | 71 |
|
40 | | - subject = issuer = x509.Name([ |
41 | | - x509.NameAttribute(NameOID.COUNTRY_NAME, settings.X509_COUNTRY_NAME), |
42 | | - x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, settings.X509_STATE_OR_PROVINCE_NAME), |
43 | | - x509.NameAttribute(NameOID.LOCALITY_NAME, settings.X509_LOCALITY_NAME), |
44 | | - x509.NameAttribute(NameOID.ORGANIZATION_NAME, settings.X509_ORGANIZATION_NAME), |
45 | | - x509.NameAttribute(NameOID.COMMON_NAME, settings.X509_COMMON_NAME), |
46 | | - ]) |
47 | | - cert = x509.CertificateBuilder().subject_name( |
48 | | - subject |
49 | | - ).issuer_name( |
50 | | - issuer |
51 | | - ).public_key( |
52 | | - ckey.key.public_key() |
53 | | - ).serial_number( |
54 | | - x509.random_serial_number() |
55 | | - ).not_valid_before( |
56 | | - settings.X509_NOT_VALID_BEFORE |
57 | | - ).not_valid_after( |
58 | | - settings.X509_NOT_VALID_AFTER |
59 | | - ).add_extension( |
| 72 | + if "san_url" in cert_info: |
| 73 | + cert_builder = cert_builder.add_extension( |
60 | 74 | x509.SubjectAlternativeName( |
61 | 75 | [ |
62 | 76 | x509.UniformResourceIdentifier( |
63 | | - settings.X509_SAN_URL |
| 77 | + cert_info["san_url"] |
64 | 78 | ) |
65 | 79 | ] |
66 | 80 | ), |
67 | 81 | critical=False, |
68 | 82 | # Sign our certificate with our private key |
69 | | - ).sign(ckey.key, hashes.SHA256()) |
| 83 | + ) |
| 84 | + |
| 85 | + cert = cert_builder.sign(ckey.key, hashes.SHA256()) |
70 | 86 |
|
71 | | - if not encoding: |
72 | | - return cert |
73 | | - else: |
74 | | - return cert.public_bytes( |
75 | | - getattr(serialization.Encoding, encoding) |
76 | | - ) |
| 87 | + if not encoding: |
| 88 | + return cert |
| 89 | + else: |
| 90 | + return cert.public_bytes( |
| 91 | + getattr(serialization.Encoding, encoding) |
| 92 | + ) |
0 commit comments