Skip to content

Commit ef2c484

Browse files
committed
Update test_certauthority_root
1 parent 30f3478 commit ef2c484

File tree

1 file changed

+176
-1
lines changed

1 file changed

+176
-1
lines changed

test/integration/smoke/test_certauthority_root.py

Lines changed: 176 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,12 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18+
import re
19+
from datetime import datetime, timedelta
20+
1821
from nose.plugins.attrib import attr
1922
from marvin.cloudstackTestCase import cloudstackTestCase
20-
from marvin.lib.utils import cleanup_resources
23+
from marvin.lib.utils import cleanup_resources, wait_until
2124
from marvin.lib.base import *
2225
from marvin.lib.common import list_hosts
2326

@@ -60,6 +63,29 @@ def verifySignature(self, caCert, cert):
6063
except Exception as e:
6164
print(f"Certificate verification failed: {e}")
6265

66+
67+
def parseCertificateChain(self, pem):
68+
"""Split a PEM blob containing one or more certificates into a list of x509 objects."""
69+
certs = []
70+
matches = re.findall(
71+
r'-----BEGIN CERTIFICATE-----.*?-----END CERTIFICATE-----',
72+
pem,
73+
re.DOTALL
74+
)
75+
for match in matches:
76+
certs.append(x509.load_pem_x509_certificate(match.encode(), default_backend()))
77+
return certs
78+
79+
80+
def assertSignatureValid(self, issuerCert, cert):
81+
"""Verify cert is signed by issuerCert; raise on failure."""
82+
issuerCert.public_key().verify(
83+
cert.signature,
84+
cert.tbs_certificate_bytes,
85+
padding.PKCS1v15(),
86+
cert.signature_hash_algorithm,
87+
)
88+
6389
def setUp(self):
6490
self.apiclient = self.testClient.getApiClient()
6591
self.dbclient = self.testClient.getDbConnection()
@@ -224,3 +250,152 @@ def checkHostIsUp(hostId):
224250
self.assertTrue(len(hosts) == 1)
225251
else:
226252
self.fail("Failed to have systemvm host in Up state after cert provisioning")
253+
254+
255+
@attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False)
256+
def test_ca_certificate_chain_validity(self):
257+
"""
258+
Tests that listCaCertificate returns a valid certificate chain.
259+
When an intermediate CA is configured, the response is a PEM blob
260+
containing multiple certificates. Each non-root cert must be signed
261+
by the next cert in the chain, and the final cert must be self-signed.
262+
"""
263+
pem = self.getCaCertificate()
264+
self.assertTrue(len(pem) > 0)
265+
266+
chain = self.parseCertificateChain(pem)
267+
self.assertTrue(len(chain) >= 1, "Expected at least one certificate in CA chain")
268+
269+
# Each non-root cert must be signed by the next cert in the chain
270+
for i in range(len(chain) - 1):
271+
child = chain[i]
272+
parent = chain[i + 1]
273+
self.assertEqual(
274+
child.issuer, parent.subject,
275+
f"Chain break: cert[{i}] issuer does not match cert[{i + 1}] subject"
276+
)
277+
try:
278+
self.assertSignatureValid(parent, child)
279+
except Exception as e:
280+
self.fail(f"Signature verification failed for chain link {i} -> {i + 1}: {e}")
281+
282+
# The last cert in the chain must be self-signed (root CA)
283+
root = chain[-1]
284+
self.assertEqual(
285+
root.issuer, root.subject,
286+
"Final cert in CA chain is not self-signed"
287+
)
288+
try:
289+
self.assertSignatureValid(root, root)
290+
except Exception as e:
291+
self.fail(f"Root CA self-signature verification failed: {e}")
292+
293+
294+
@attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False)
295+
def test_issue_certificate_issuer_matches_ca(self):
296+
"""
297+
Tests that an issued certificate's issuer DN matches the subject DN
298+
of the first cert in the returned CA chain, and that the signature
299+
verifies against that cert's public key.
300+
"""
301+
cmd = issueCertificate.issueCertificateCmd()
302+
cmd.domain = 'apache.org'
303+
cmd.ipaddress = '10.1.1.1'
304+
cmd.provider = 'root'
305+
306+
response = self.apiclient.issueCertificate(cmd)
307+
self.assertTrue(len(response.certificate) > 0)
308+
self.assertTrue(len(response.cacertificates) > 0)
309+
310+
leaf = x509.load_pem_x509_certificate(response.certificate.encode(), default_backend())
311+
caChain = self.parseCertificateChain(response.cacertificates)
312+
self.assertTrue(len(caChain) >= 1, "Expected at least one CA certificate in response")
313+
314+
# The issuing CA is the first cert in the returned chain (intermediate
315+
# if an intermediate CA is configured, otherwise the root).
316+
issuingCa = caChain[0]
317+
self.assertEqual(
318+
leaf.issuer, issuingCa.subject,
319+
"Leaf certificate issuer does not match issuing CA subject"
320+
)
321+
try:
322+
self.assertSignatureValid(issuingCa, leaf)
323+
except Exception as e:
324+
self.fail(f"Leaf certificate signature does not verify against issuing CA: {e}")
325+
326+
327+
@attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False)
328+
def test_certificate_validity_period(self):
329+
"""
330+
Tests that an issued certificate has sensible validity bounds:
331+
not_valid_before <= now <= not_valid_after, and validity duration
332+
is at least 300 days (CloudStack default is 1 year).
333+
"""
334+
cmd = issueCertificate.issueCertificateCmd()
335+
cmd.domain = 'apache.org'
336+
cmd.provider = 'root'
337+
338+
response = self.apiclient.issueCertificate(cmd)
339+
self.assertTrue(len(response.certificate) > 0)
340+
341+
cert = x509.load_pem_x509_certificate(response.certificate.encode(), default_backend())
342+
343+
# cryptography >= 42 prefers the *_utc variants; fall back for older versions.
344+
notBefore = getattr(cert, 'not_valid_before_utc', None) or cert.not_valid_before
345+
notAfter = getattr(cert, 'not_valid_after_utc', None) or cert.not_valid_after
346+
347+
now = datetime.now(notBefore.tzinfo) if notBefore.tzinfo else datetime.utcnow()
348+
self.assertTrue(notBefore <= now, f"Certificate not_valid_before {notBefore} is in the future")
349+
self.assertTrue(now <= notAfter, f"Certificate not_valid_after {notAfter} is in the past")
350+
351+
duration = notAfter - notBefore
352+
self.assertTrue(
353+
duration >= timedelta(days=300),
354+
f"Certificate validity duration {duration} is less than expected minimum of 300 days"
355+
)
356+
357+
358+
def getUpKVMHosts(self, hostId=None):
359+
hosts = list_hosts(
360+
self.apiclient,
361+
type='Routing',
362+
hypervisor='KVM',
363+
state='Up',
364+
resourcestate='Enabled',
365+
id=hostId
366+
)
367+
return hosts
368+
369+
370+
@attr(tags=['advanced'], required_hardware=True)
371+
def test_provision_certificate_kvm(self):
372+
"""
373+
Tests certificate provisioning on a KVM host.
374+
Exercises the keystore-cert-import + cloud.jks provisioning flow
375+
against a real agent. Skipped when no KVM hosts are available.
376+
"""
377+
if self.hypervisor.lower() != 'kvm':
378+
raise self.skipTest("Hypervisor is not KVM, skipping test")
379+
380+
hosts = self.getUpKVMHosts()
381+
if not hosts or len(hosts) < 1:
382+
raise self.skipTest("No Up KVM hosts found, skipping test")
383+
384+
host = hosts[0]
385+
386+
cmd = provisionCertificate.provisionCertificateCmd()
387+
cmd.hostid = host.id
388+
cmd.reconnect = True
389+
cmd.provider = 'root'
390+
391+
response = self.apiclient.provisionCertificate(cmd)
392+
self.assertTrue(response.success)
393+
394+
def checkHostIsUp(hostId):
395+
hosts = self.getUpKVMHosts(hostId)
396+
return (hosts is not None and len(hosts) > 0), hosts
397+
398+
result, hosts = wait_until(2, 30, checkHostIsUp, host.id)
399+
if not result:
400+
self.fail("KVM host did not return to Up state after certificate provisioning")
401+
self.assertEqual(len(hosts), 1)

0 commit comments

Comments
 (0)