Skip to content

Commit 6320d90

Browse files
authored
feat: Allow dcvMode to be passed to the Certificates::enroll method (#280)
chore: Update unit tests to account for the new enroll field
1 parent 04ecf8a commit 6320d90

4 files changed

Lines changed: 61 additions & 21 deletions

File tree

.bumpversion.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[bumpversion]
22
commit = True
3-
current_version = 3.0.0
3+
current_version = 4.0.0
44
tag = True
55
tag_name = {new_version}
66

cert_manager/__version__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,4 @@
33
__title__ = "cert_manager"
44
__description__ = "Python interface to the Sectigo Certificate Manager REST API"
55
__url__ = "https://github.com/broadinstitute/python-cert_manager"
6-
__version__ = "3.0.0"
6+
__version__ = "4.0.0"

cert_manager/_certificates.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,9 @@ def enroll(self, **kwargs):
156156
external_requester: One or more e-mail addresses
157157
custom_fields: zero or more objects representing custom fields and their values
158158
Note: each object must have a 'name' key and a 'value' key
159+
dcv_mode: The DCV method to use for the certificate. Allowed values are "EMAIL",
160+
"CNAME", and "HTTP" and "HTTPS". If not provided, "CNAME is the default.
161+
159162
Returns:
160163
The certificate_id and the normal status messages for errors
161164
"""
@@ -167,6 +170,7 @@ def enroll(self, **kwargs):
167170
subject_alt_names = kwargs.get("subject_alt_names", None)
168171
external_requester = kwargs.get("external_requester", None)
169172
custom_fields = kwargs.get("custom_fields", [])
173+
dcv_mode = kwargs.get("dcv_mode", "CNAME")
170174

171175
# Make sure a valid certificate type name was provided
172176
if cert_type_name not in self.types:
@@ -193,7 +197,7 @@ def enroll(self, **kwargs):
193197
data = {
194198
"orgId": org_id, "csr": csr.rstrip(), "subjAltNames": final_san, "certType": type_id,
195199
"numberServers": 1, "serverType": -1, "term": term, "comments": f"Enrolled by {self._client.user_agent}",
196-
"externalRequester": external_requester
200+
"externalRequester": external_requester, "dcvMode": dcv_mode,
197201
}
198202
if custom_fields:
199203
data['customFields'] = custom_fields

tests/test_certificates.py

Lines changed: 54 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -359,17 +359,18 @@ def test_success(self):
359359
post_data = {
360360
"orgId": self.test_org, "csr": self.test_csr.rstrip(), "subjAltNames": None, "certType": 224,
361361
"numberServers": 1, "serverType": -1, "term": self.test_term,
362-
"comments": f"Enrolled by {self.client.user_agent}", "externalRequester": self.test_external_requester
362+
"comments": f"Enrolled by {self.client.user_agent}", "externalRequester": self.test_external_requester,
363+
"dcvMode": "CNAME",
363364
}
364-
post_json = json.dumps(post_data)
365365

366366
# Verify all the query information
367367
self.assertEqual(resp, self.test_result)
368368
self.assertEqual(len(responses.calls), 3)
369369
self.assertEqual(responses.calls[0].request.url, self.test_types_url)
370370
self.assertEqual(responses.calls[1].request.url, self.test_customfields_url)
371371
self.assertEqual(responses.calls[2].request.url, self.test_url)
372-
self.assertEqual(responses.calls[2].request.body, post_json.encode("utf-8"))
372+
test_json = json.loads(responses.calls[2].request.body.decode("utf-8"))
373+
self.assertEqual(test_json, post_data)
373374

374375
@responses.activate
375376
def test_san_list(self):
@@ -393,17 +394,50 @@ def test_san_list(self):
393394
post_data = {
394395
"orgId": self.test_org, "csr": self.test_csr.rstrip(), "subjAltNames": self.test_san, "certType": 224,
395396
"numberServers": 1, "serverType": -1, "term": self.test_term,
396-
"comments": f"Enrolled by {self.client.user_agent}", "externalRequester": self.test_external_requester
397+
"comments": f"Enrolled by {self.client.user_agent}",
398+
"externalRequester": self.test_external_requester, "dcvMode": "CNAME",
397399
}
398-
post_json = json.dumps(post_data)
399400

400401
# Verify all the query information
401402
self.assertEqual(resp, self.test_result)
402403
self.assertEqual(len(responses.calls), 3)
403404
self.assertEqual(responses.calls[0].request.url, self.test_types_url)
404405
self.assertEqual(responses.calls[1].request.url, self.test_customfields_url)
405406
self.assertEqual(responses.calls[2].request.url, self.test_url)
406-
self.assertEqual(responses.calls[2].request.body, post_json.encode("utf-8"))
407+
test_json = json.loads(responses.calls[2].request.body.decode("utf-8"))
408+
self.assertEqual(test_json, post_data)
409+
410+
@responses.activate
411+
def test_dcv_mode(self):
412+
"""Handle a custom dcvMode correctly."""
413+
# Setup the mocked responses
414+
# We need to mock the /types and /customFields URLs as well
415+
# since Certificates.types and Certificate.custom_fields are called from enroll
416+
responses.add(responses.GET, self.test_types_url, json=self.types_data, status=200)
417+
responses.add(responses.GET, self.test_customfields_url, json=self.cf_data, status=200)
418+
responses.add(responses.POST, self.test_url, json=self.test_result, status=200)
419+
420+
# Call the function
421+
resp = self.certobj.enroll(cert_type_name=self.test_ct_name, csr=self.test_csr, term=self.test_term,
422+
org_id=self.test_org, external_requester=self.test_external_requester,
423+
dcv_mode="HTTP")
424+
425+
# Mock up the data that should be sent with the post
426+
post_data = {
427+
"orgId": self.test_org, "csr": self.test_csr.rstrip(), "subjAltNames": None, "certType": 224,
428+
"numberServers": 1, "serverType": -1, "term": self.test_term,
429+
"comments": f"Enrolled by {self.client.user_agent}",
430+
"externalRequester": self.test_external_requester, "dcvMode": "HTTP",
431+
}
432+
433+
# Verify all the query information
434+
self.assertEqual(resp, self.test_result)
435+
self.assertEqual(len(responses.calls), 3)
436+
self.assertEqual(responses.calls[0].request.url, self.test_types_url)
437+
self.assertEqual(responses.calls[1].request.url, self.test_customfields_url)
438+
self.assertEqual(responses.calls[2].request.url, self.test_url)
439+
test_json = json.loads(responses.calls[2].request.body.decode("utf-8"))
440+
self.assertEqual(test_json, post_data)
407441

408442
@responses.activate
409443
def test_bad_cert_name(self):
@@ -465,17 +499,17 @@ def test_mandatory_custom_fields_success(self):
465499
"orgId": self.test_org, "csr": self.test_csr.rstrip(), "subjAltNames": None, "certType": 224,
466500
"numberServers": 1, "serverType": -1, "term": self.test_term,
467501
"comments": f"Enrolled by {self.client.user_agent}", "externalRequester": self.test_external_requester,
468-
"customFields": self.test_cf
502+
"dcvMode": "CNAME", "customFields": self.test_cf
469503
}
470-
post_json = json.dumps(post_data)
471504

472505
# Verify all the query information
473506
self.assertEqual(resp, self.test_result)
474507
self.assertEqual(len(responses.calls), 3)
475508
self.assertEqual(responses.calls[0].request.url, self.test_types_url)
476509
self.assertEqual(responses.calls[1].request.url, self.test_customfields_url)
477510
self.assertEqual(responses.calls[2].request.url, self.test_url)
478-
self.assertEqual(responses.calls[2].request.body, post_json.encode("utf-8"))
511+
test_json = json.loads(responses.calls[2].request.body.decode("utf-8"))
512+
self.assertEqual(test_json, post_data)
479513

480514
@responses.activate
481515
def test_mandatory_custom_fields_missing(self):
@@ -618,13 +652,14 @@ def test_success(self):
618652
# Call the function
619653
resp = self.certobj.revoke(cert_id=self.test_id, reason="Because")
620654

621-
post_json = json.dumps({"reason": "Because"})
655+
post_json = {"reason": "Because"}
622656

623657
# Verify all the query information
624658
self.assertEqual(resp, {})
625659
self.assertEqual(len(responses.calls), 1)
626660
self.assertEqual(responses.calls[0].request.url, self.test_url)
627-
self.assertEqual(responses.calls[0].request.body, post_json.encode("utf-8"))
661+
test_json = json.loads(responses.calls[0].request.body.decode("utf-8"))
662+
self.assertEqual(test_json, post_json)
628663

629664
@responses.activate
630665
def test_no_reason(self):
@@ -646,7 +681,8 @@ def test_failure(self):
646681
# Verify all the query information
647682
self.assertEqual(len(responses.calls), 1)
648683
self.assertEqual(responses.calls[0].request.url, self.test_url)
649-
self.assertEqual(responses.calls[0].request.body, post_json.encode("utf-8"))
684+
test_json = responses.calls[0].request.body.decode("utf-8")
685+
self.assertEqual(test_json, post_json)
650686

651687

652688
class TestReplace(TestCertificates):
@@ -677,13 +713,13 @@ def test_success(self):
677713
# Mock up the data that should be sent with the post
678714
post_data = {"csr": self.test_csr, "commonName": self.test_cn, "subjectAlternativeNames": None,
679715
"reason": self.test_reason}
680-
post_json = json.dumps(post_data)
681716

682717
# Verify all the query information
683718
self.assertEqual(resp, {})
684719
self.assertEqual(len(responses.calls), 1)
685720
self.assertEqual(responses.calls[0].request.url, self.test_url)
686-
self.assertEqual(responses.calls[0].request.body, post_json.encode("utf-8"))
721+
test_json = json.loads(responses.calls[0].request.body.decode("utf-8"))
722+
self.assertEqual(test_json, post_data)
687723

688724
@responses.activate
689725
def test_san_string(self):
@@ -700,13 +736,13 @@ def test_san_string(self):
700736
# Mock up the data that should be sent with the post
701737
post_data = {"csr": self.test_csr, "commonName": self.test_cn, "subjectAlternativeNames": san_list,
702738
"reason": self.test_reason}
703-
post_json = json.dumps(post_data)
704739

705740
# Verify all the query information
706741
self.assertEqual(resp, {})
707742
self.assertEqual(len(responses.calls), 1)
708743
self.assertEqual(responses.calls[0].request.url, self.test_url)
709-
self.assertEqual(responses.calls[0].request.body, post_json.encode("utf-8"))
744+
test_json = json.loads(responses.calls[0].request.body.decode("utf-8"))
745+
self.assertEqual(test_json, post_data)
710746

711747
@responses.activate
712748
def test_failure(self):
@@ -721,9 +757,9 @@ def test_failure(self):
721757
# Mock up the data that should be sent with the post
722758
post_data = {"csr": self.test_csr, "commonName": self.test_cn, "subjectAlternativeNames": None,
723759
"reason": self.test_reason}
724-
post_json = json.dumps(post_data)
725760

726761
# Verify all the query information
727762
self.assertEqual(len(responses.calls), 1)
728763
self.assertEqual(responses.calls[0].request.url, self.test_url)
729-
self.assertEqual(responses.calls[0].request.body, post_json.encode("utf-8"))
764+
test_json = json.loads(responses.calls[0].request.body.decode("utf-8"))
765+
self.assertEqual(test_json, post_data)

0 commit comments

Comments
 (0)