|
| 1 | +# hmac_signature_verifier.py |
| 2 | +import hmac |
| 3 | +import hashlib |
| 4 | +from typing import Callable, Optional, Union |
| 5 | + |
| 6 | +from apimatic_core_interfaces.http.request import Request |
| 7 | +from apimatic_core_interfaces.security.signature_verifier import SignatureVerifier |
| 8 | +from apimatic_core_interfaces.types.signature_verification_result import SignatureVerificationResult |
| 9 | +from apimatic_core.exceptions.signature_verification_error import SignatureVerificationError |
| 10 | + |
| 11 | + |
| 12 | +class DigestEncoder: |
| 13 | + """Minimal encoder interface for HMAC digests.""" |
| 14 | + def encode(self, digest: bytes) -> str: # pragma: no cover - interface |
| 15 | + raise NotImplementedError |
| 16 | + |
| 17 | + |
| 18 | +class HexEncoder(DigestEncoder): |
| 19 | + """Lowercase hexadecimal encoding.""" |
| 20 | + def encode(self, digest: bytes) -> str: |
| 21 | + return digest.hex() |
| 22 | + |
| 23 | + |
| 24 | +class Base64Encoder(DigestEncoder): |
| 25 | + """Standard Base64 encoding.""" |
| 26 | + def encode(self, digest: bytes) -> str: |
| 27 | + import base64 |
| 28 | + return base64.b64encode(digest).decode("utf-8") |
| 29 | + |
| 30 | + |
| 31 | +class Base64UrlEncoder(DigestEncoder): |
| 32 | + """URL-safe Base64 without '=' padding.""" |
| 33 | + def encode(self, digest: bytes) -> str: |
| 34 | + import base64 |
| 35 | + return base64.urlsafe_b64encode(digest).decode("utf-8").rstrip("=") |
| 36 | + |
| 37 | + |
| 38 | +class HmacSignatureVerifier(SignatureVerifier): |
| 39 | + """ |
| 40 | + HMAC signature verifier that delegates message construction to a user-supplied callable. |
| 41 | +
|
| 42 | + Parameters |
| 43 | + ---------- |
| 44 | + key : str |
| 45 | + Shared secret used for HMAC. |
| 46 | + signature_header : str |
| 47 | + Header name containing the provided signature (case-insensitive lookup). |
| 48 | + message_resolver : Optional[Callable[[Request], Union[bytes, str, None]]] |
| 49 | + Function that produces the exact message to sign. If omitted (None) or returns None, |
| 50 | + the verifier will use request.raw_body (bytes) if present, otherwise request.body (text, UTF-8). |
| 51 | + hash_alg : Callable (defaults to hashlib.sha256) |
| 52 | + Hash algorithm for HMAC. |
| 53 | + encoder : DigestEncoder (defaults to HexEncoder()) |
| 54 | + Encoder to transform HMAC digest bytes into a string for comparison. |
| 55 | + signature_value_template : Optional[str] |
| 56 | + If provided, wraps/defines the expected signature. If it contains '{digest}', the |
| 57 | + placeholder is replaced with the encoded digest; otherwise it is treated as a constant. |
| 58 | + """ |
| 59 | + |
| 60 | + def __init__( |
| 61 | + self, |
| 62 | + *, |
| 63 | + key: str, |
| 64 | + signature_header: str, |
| 65 | + message_resolver: Optional[Callable[[Request], Union[bytes, str, None]]] = None, |
| 66 | + hash_alg=hashlib.sha256, |
| 67 | + encoder: Optional[DigestEncoder] = None, |
| 68 | + signature_value_template: Optional[str] = None, |
| 69 | + ) -> None: |
| 70 | + if not isinstance(key, str) or not key: |
| 71 | + raise ValueError("key must be a non-empty string.") |
| 72 | + if not isinstance(signature_header, str) or not signature_header.strip(): |
| 73 | + raise ValueError("signature_header must be a non-empty string.") |
| 74 | + |
| 75 | + self._key_bytes = key.encode("utf-8") |
| 76 | + self._signature_header_lc = signature_header.lower().strip() |
| 77 | + self._message_resolver = message_resolver |
| 78 | + self._hash_alg = hash_alg |
| 79 | + self._encoder = encoder or HexEncoder() |
| 80 | + self._signature_value_template = signature_value_template |
| 81 | + |
| 82 | + def verify(self, request: Request) -> SignatureVerificationResult: |
| 83 | + try: |
| 84 | + provided = self._read_signature_header(request) |
| 85 | + if provided is None: |
| 86 | + return SignatureVerificationResult.failed( |
| 87 | + ValueError(f"Signature header '{self._signature_header_lc}' is missing.") |
| 88 | + ) |
| 89 | + |
| 90 | + message_bytes = self._resolve_message_bytes(request) |
| 91 | + digest = hmac.new(self._key_bytes, message_bytes, self._hash_alg).digest() |
| 92 | + encoded_digest = self._encoder.encode(digest) |
| 93 | + expected = self._wrap_expected_signature(encoded_digest) |
| 94 | + |
| 95 | + is_match = hmac.compare_digest(provided, expected) |
| 96 | + return SignatureVerificationResult.passed() if is_match else SignatureVerificationResult.failed( |
| 97 | + SignatureVerificationError("Signature mismatch.") |
| 98 | + ) |
| 99 | + except Exception as exc: |
| 100 | + return SignatureVerificationResult.failed( |
| 101 | + SignatureVerificationError(f"Signature Verification Failed: {exc}") |
| 102 | + ) |
| 103 | + |
| 104 | + # ------------- internal helpers ------------- |
| 105 | + |
| 106 | + def _read_signature_header(self, request: Request) -> Optional[str]: |
| 107 | + headers = {str(k).lower(): str(v) for k, v in (getattr(request, "headers", {}) or {}).items()} |
| 108 | + value = headers.get(self._signature_header_lc) |
| 109 | + return None if value is None or value.strip() == "" else value |
| 110 | + |
| 111 | + def _resolve_message_bytes(self, request: Request) -> bytes: |
| 112 | + """ |
| 113 | + Resolve the message to be signed: |
| 114 | + - If message_resolver is provided and returns bytes/str -> use it. |
| 115 | + If it returns None -> fall back to raw/text body. |
| 116 | + - If no resolver provided -> fall back to raw/text body. |
| 117 | + """ |
| 118 | + if callable(self._message_resolver): |
| 119 | + resolved = self._message_resolver(request) |
| 120 | + if isinstance(resolved, bytes): |
| 121 | + return resolved |
| 122 | + if isinstance(resolved, str): |
| 123 | + return resolved.encode("utf-8") |
| 124 | + # resolved is None -> fall through to default |
| 125 | + # default fallback: prefer raw_body, else textual body |
| 126 | + raw = getattr(request, "raw_body", None) |
| 127 | + if isinstance(raw, (bytes, bytearray)): |
| 128 | + return bytes(raw) |
| 129 | + body = getattr(request, "body", None) |
| 130 | + return (body or "").encode("utf-8") |
| 131 | + |
| 132 | + def _wrap_expected_signature(self, encoded_digest: str) -> str: |
| 133 | + template = self._signature_value_template |
| 134 | + if not template: |
| 135 | + return encoded_digest |
| 136 | + return template.replace("{digest}", encoded_digest) if "{digest}" in template else template |
0 commit comments