|
1 | 1 | import uuid |
2 | 2 | from datetime import datetime, timedelta |
3 | 3 |
|
| 4 | +import jsonschema |
4 | 5 | import swapper |
5 | 6 | from cryptography import x509 |
6 | 7 | from cryptography.exceptions import InvalidSignature, UnsupportedAlgorithm |
|
16 | 17 | from OpenSSL import crypto |
17 | 18 |
|
18 | 19 | from .. import settings as app_settings |
| 20 | +from ..schemas import get_schema_item_options |
19 | 21 |
|
20 | 22 | KEY_LENGTH_CHOICES = ( |
21 | 23 | ("256", "256 (ECDSA)"), |
|
37 | 39 | ("sha512", "SHA512"), |
38 | 40 | ) |
39 | 41 |
|
| 42 | +SUPPORTED_EXTENDED_KEY_USAGE_VALUES = { |
| 43 | + "clientauth", |
| 44 | + "serverauth", |
| 45 | + "codesigning", |
| 46 | + "emailprotection", |
| 47 | +} |
| 48 | + |
| 49 | +SUPPORTED_NS_CERT_TYPE_VALUES = { |
| 50 | + "client", |
| 51 | + "server", |
| 52 | + "email", |
| 53 | + "objsign", |
| 54 | + "sslca", |
| 55 | + "emailca", |
| 56 | + "objca", |
| 57 | +} |
| 58 | + |
40 | 59 |
|
41 | 60 | def default_validity_start(): |
42 | 61 | """ |
@@ -157,19 +176,20 @@ class Meta: |
157 | 176 | def __str__(self): |
158 | 177 | return self.name |
159 | 178 |
|
160 | | - def clean_fields(self, *args, **kwargs): |
| 179 | + def clean_fields(self, exclude=None): |
161 | 180 | # importing existing certificate |
162 | 181 | # must be done here in order to validate imported fields |
163 | 182 | # and fill private and public key before validation fails |
164 | 183 | if self._state.adding and self.certificate and self.private_key: |
165 | 184 | self._validate_pem() |
166 | 185 | self._import() |
167 | | - super().clean_fields(*args, **kwargs) |
| 186 | + super().clean_fields(exclude=exclude) |
| 187 | + if not exclude or "extensions" not in exclude: |
| 188 | + self._validate_extensions() |
168 | 189 |
|
169 | 190 | def clean(self): |
170 | 191 | if self.serial_number: |
171 | 192 | self._validate_serial_number() |
172 | | - self._verify_extension_format() |
173 | 193 | # when importing, both public and private must be present |
174 | 194 | if (self.certificate and not self.private_key) or ( |
175 | 195 | self.private_key and not self.certificate |
@@ -505,19 +525,165 @@ def _verify_ca(self): |
505 | 525 | _("Cryptographic signature verification failed: CA does not match.") |
506 | 526 | ) |
507 | 527 |
|
508 | | - def _verify_extension_format(self): |
509 | | - """ |
510 | | - (internal use only) |
511 | | - verifies the format of ``self.extension`` is correct |
512 | | - """ |
513 | | - msg = "Extension format invalid" |
| 528 | + def _get_extensions_schema(self): |
| 529 | + if hasattr(self, "ca"): |
| 530 | + return app_settings.get_cert_extensions_schema() |
| 531 | + return app_settings.get_ca_extensions_schema() |
| 532 | + |
| 533 | + def _normalize_extensions(self, schema): |
| 534 | + if self.extensions is None: |
| 535 | + self.extensions = [] |
| 536 | + return |
514 | 537 | if not isinstance(self.extensions, list): |
515 | | - raise ValidationError(msg) |
| 538 | + return |
| 539 | + options = get_schema_item_options(schema) |
| 540 | + normalized = [] |
516 | 541 | for ext in self.extensions: |
517 | 542 | if not isinstance(ext, dict): |
518 | | - raise ValidationError(msg) |
519 | | - if not ("name" in ext and "critical" in ext and "value" in ext): |
520 | | - raise ValidationError(msg) |
| 543 | + normalized.append(ext) |
| 544 | + continue |
| 545 | + normalized_ext = ext.copy() |
| 546 | + branch = options.get(normalized_ext.get("name"), {}) |
| 547 | + value_schema = branch.get("properties", {}).get("value", {}) |
| 548 | + if value_schema.get("type") == "array" and isinstance( |
| 549 | + normalized_ext.get("value"), str |
| 550 | + ): |
| 551 | + normalized_ext["value"] = [ |
| 552 | + value.strip() |
| 553 | + for value in normalized_ext["value"].split(",") |
| 554 | + if value.strip() |
| 555 | + ] |
| 556 | + normalized.append(normalized_ext) |
| 557 | + self.extensions = normalized |
| 558 | + |
| 559 | + def _get_best_extensions_error(self, errors): |
| 560 | + error = jsonschema.exceptions.best_match(errors) |
| 561 | + while error.context: |
| 562 | + nested_error = jsonschema.exceptions.best_match(error.context) |
| 563 | + if nested_error is error: |
| 564 | + break |
| 565 | + error = nested_error |
| 566 | + return error |
| 567 | + |
| 568 | + def _format_extensions_error(self, error): |
| 569 | + path = [] |
| 570 | + for segment in error.absolute_path: |
| 571 | + if isinstance(segment, int): |
| 572 | + path.append(f"[{segment}]") |
| 573 | + elif path: |
| 574 | + path.append(f".{segment}") |
| 575 | + else: |
| 576 | + path.append(str(segment)) |
| 577 | + message = error.message |
| 578 | + if path: |
| 579 | + return _("Extensions data at %(path)s: %(message)s") % { |
| 580 | + "path": "".join(path), |
| 581 | + "message": message, |
| 582 | + } |
| 583 | + return _("Extensions data: %(message)s") % {"message": message} |
| 584 | + |
| 585 | + def _raise_extensions_validation_error(self, path, message): |
| 586 | + if path: |
| 587 | + raise ValidationError( |
| 588 | + { |
| 589 | + "extensions": _("Extensions data at %(path)s: %(message)s") |
| 590 | + % {"path": path, "message": message} |
| 591 | + } |
| 592 | + ) |
| 593 | + raise ValidationError( |
| 594 | + {"extensions": _("Extensions data: %(message)s") % {"message": message}} |
| 595 | + ) |
| 596 | + |
| 597 | + def _get_extension_values(self, value): |
| 598 | + if isinstance(value, list): |
| 599 | + return value |
| 600 | + if isinstance(value, str): |
| 601 | + return value.split(",") |
| 602 | + return [] |
| 603 | + |
| 604 | + def _validate_supported_extensions(self): |
| 605 | + for index, ext in enumerate(self.extensions or []): |
| 606 | + if not isinstance(ext, dict): |
| 607 | + continue |
| 608 | + path = f"[{index}]" |
| 609 | + name = ext.get("name") |
| 610 | + critical = ext.get("critical", False) |
| 611 | + value = ext.get("value", "") |
| 612 | + if not isinstance(critical, bool): |
| 613 | + self._raise_extensions_validation_error( |
| 614 | + f"{path}.critical", _("Critical flag must be a boolean value.") |
| 615 | + ) |
| 616 | + if name == "nsComment": |
| 617 | + if not isinstance(value, str): |
| 618 | + self._raise_extensions_validation_error( |
| 619 | + f"{path}.value", |
| 620 | + _("nsComment extension requires a string value."), |
| 621 | + ) |
| 622 | + if not value: |
| 623 | + self._raise_extensions_validation_error( |
| 624 | + f"{path}.value", _("nsComment extension requires a value.") |
| 625 | + ) |
| 626 | + if len(value) > 255: |
| 627 | + self._raise_extensions_validation_error( |
| 628 | + f"{path}.value", |
| 629 | + _("nsComment value exceeds maximum length of 255 bytes"), |
| 630 | + ) |
| 631 | + continue |
| 632 | + if name == "extendedKeyUsage": |
| 633 | + values = self._get_extension_values(value) |
| 634 | + if not values: |
| 635 | + self._raise_extensions_validation_error( |
| 636 | + f"{path}.value", |
| 637 | + _( |
| 638 | + "extendedKeyUsage extension requires at least " |
| 639 | + "one valid value." |
| 640 | + ), |
| 641 | + ) |
| 642 | + for raw_value in values: |
| 643 | + cleaned_value = ( |
| 644 | + raw_value.strip().lower() |
| 645 | + if isinstance(raw_value, str) |
| 646 | + else str(raw_value).strip().lower() |
| 647 | + ) |
| 648 | + if cleaned_value not in SUPPORTED_EXTENDED_KEY_USAGE_VALUES: |
| 649 | + self._raise_extensions_validation_error( |
| 650 | + f"{path}.value", |
| 651 | + _("Unsupported extendedKeyUsage value: %s") % cleaned_value, |
| 652 | + ) |
| 653 | + continue |
| 654 | + if name == "nsCertType": |
| 655 | + values = self._get_extension_values(value) |
| 656 | + if not values: |
| 657 | + self._raise_extensions_validation_error( |
| 658 | + f"{path}.value", |
| 659 | + _("nsCertType extension requires at least one valid type."), |
| 660 | + ) |
| 661 | + for raw_value in values: |
| 662 | + cleaned_value = ( |
| 663 | + raw_value.strip().lower() |
| 664 | + if isinstance(raw_value, str) |
| 665 | + else str(raw_value).strip().lower() |
| 666 | + ) |
| 667 | + if cleaned_value not in SUPPORTED_NS_CERT_TYPE_VALUES: |
| 668 | + self._raise_extensions_validation_error( |
| 669 | + f"{path}.value", |
| 670 | + _("Unsupported nsCertType value: %s") % cleaned_value, |
| 671 | + ) |
| 672 | + continue |
| 673 | + self._raise_extensions_validation_error( |
| 674 | + f"{path}.name", _("Unsupported extension: %s") % name |
| 675 | + ) |
| 676 | + |
| 677 | + def _validate_extensions(self): |
| 678 | + schema = self._get_extensions_schema() |
| 679 | + self._normalize_extensions(schema) |
| 680 | + validator_cls = jsonschema.validators.validator_for(schema) |
| 681 | + validator = validator_cls(schema) |
| 682 | + errors = list(validator.iter_errors(self.extensions)) |
| 683 | + if errors: |
| 684 | + error = self._get_best_extensions_error(errors) |
| 685 | + raise ValidationError({"extensions": self._format_extensions_error(error)}) |
| 686 | + self._validate_supported_extensions() |
521 | 687 |
|
522 | 688 | def _add_extensions(self, builder, public_key): |
523 | 689 | """ |
@@ -594,7 +760,8 @@ def _add_extensions(self, builder, public_key): |
594 | 760 | "emailprotection": ExtendedKeyUsageOID.EMAIL_PROTECTION, |
595 | 761 | } |
596 | 762 | oids = [] |
597 | | - for v in val.split(","): |
| 763 | + values = val if isinstance(val, list) else val.split(",") |
| 764 | + for v in values: |
598 | 765 | v_clean = v.strip().lower() |
599 | 766 | if v_clean not in eku_map: |
600 | 767 | raise ValidationError( |
@@ -634,7 +801,8 @@ def _add_extensions(self, builder, public_key): |
634 | 801 | "objca": 0x01, |
635 | 802 | } |
636 | 803 | bits = 0 |
637 | | - for v in val.split(","): |
| 804 | + values = val if isinstance(val, list) else val.split(",") |
| 805 | + for v in values: |
638 | 806 | v_clean = v.strip().lower() |
639 | 807 | if v_clean not in ns_cert_type_map: |
640 | 808 | raise ValidationError( |
|
0 commit comments