1- import base64
1+ # ======================================================================
2+ # HMAC verifier
3+ # ======================================================================
24import hashlib
35import hmac
4- from enum import Enum
5- from typing import Mapping , Optional , Sequence , Protocol
6+ from typing import Optional , List , Callable
67
7- from apimatic_core_interfaces .security .signature_verifier import SignatureVerifier
88from apimatic_core_interfaces .http .request import Request
99from apimatic_core_interfaces .security .verification_result import VerificationResult
1010
1111from apimatic_core .exceptions .signature_verification_error import SignatureVerificationError
12+ from apimatic_core .security .encoders import DigestEncoder , HexEncoder
13+ from apimatic_core .templating .template_engine import TemplateEngine
1214
1315
14- class DigestEncoder (Protocol ):
15- def encode (self , digest : bytes ) -> str : ...
16-
17-
18- class HexEncoder :
19- def encode (self , digest : bytes ) -> str :
20- return digest .hex ()
21-
22-
23- class Base64Encoder :
24- def encode (self , digest : bytes ) -> str :
25- return base64 .b64encode (digest ).decode ("utf-8" )
26-
27-
28- class Base64UrlEncoder :
29- def encode (self , digest : bytes ) -> str :
30- return base64 .urlsafe_b64encode (digest ).decode ("utf-8" ).rstrip ("=" )
31-
32-
33- class HmacOrder (Enum ):
34- PREPEND = "prepend"
35- APPEND = "append"
36-
37-
38- class HmacSignatureVerifier (SignatureVerifier ):
16+ class HmacSignatureVerifier :
3917 """
40- HMAC-SHA256 verifier with pluggable digest encoder and flexible message shape.
41- verify() never raises for verification outcomes; it returns VerificationResult.
18+ Template-driven HMAC signature verifier.
19+
20+ - Builds the message to sign using a template with placeholders:
21+ {raw_body}
22+ {$method} | {$url} | {$request.path}
23+ {$request.header.<HeaderName>}
24+ {$request.query.<ParamName>}
25+ {$request.body#/json/pointer}
26+ - Computes HMAC(message, key, hash_alg) and encodes with the chosen encoder.
27+ - Optionally wraps the encoded digest via `signature_value_template`, e.g., "sha256={digest}".
28+
29+ Parameters
30+ ----------
31+ key : str
32+ Shared secret (non-empty).
33+ signature_header : str
34+ Name of the header carrying the provided signature (case-insensitive lookup).
35+ message_template : str
36+ Template to construct the signed message (see placeholders above).
37+ hash_alg : callable, optional
38+ Hash function from hashlib (default: hashlib.sha256).
39+ encoder : DigestEncoder, optional
40+ Encoder for the raw HMAC bytes (default: HexEncoder()).
41+ signature_value_template : str, optional
42+ Optional template for the expected signature value, where `{digest}` is replaced
43+ by the encoded digest. If omitted, the expected signature is the encoded digest itself.
4244 """
4345
4446 def __init__ (
4547 self ,
4648 * ,
4749 key : str ,
48- signature_header : str = "X-Signature" ,
49- additional_headers : Optional [Sequence [str ]] = None ,
50- order : HmacOrder = HmacOrder .PREPEND ,
51- delimiter : str = "|" ,
52- encoder : DigestEncoder = HexEncoder (),
50+ signature_header : str ,
51+ message_template : str ,
5352 hash_alg = hashlib .sha256 ,
54- ):
53+ encoder : DigestEncoder = HexEncoder (),
54+ signature_value_template : Optional [str ] = None ,
55+ ) -> None :
56+ # Basic config validation
5557 if not isinstance (key , str ) or not key :
56- raise ValueError ("HMAC key must be a non-empty string." )
58+ raise ValueError ("key must be a non-empty string." )
5759 if not isinstance (signature_header , str ) or not signature_header .strip ():
5860 raise ValueError ("signature_header must be a non-empty string." )
59- if not isinstance (delimiter , str ) or not delimiter :
60- raise ValueError ("delimiter must be a non-empty string." )
61- if order not in (HmacOrder .PREPEND , HmacOrder .APPEND ):
62- raise ValueError ("order must be HmacOrder.PREPEND or HmacOrder.APPEND." )
63-
64- self ._key = key
65- self ._sig_header = signature_header .lower ().strip ()
66- self ._headers = [h .lower ().strip () for h in (additional_headers or ())]
67- self ._order = order
68- self ._delimiter = delimiter
69- self ._encoder = encoder
61+ if not isinstance (message_template , str ) or not message_template :
62+ raise ValueError ("message_template must be a non-empty string." )
63+
64+ self ._key_bytes : bytes = key .encode ("utf-8" )
65+ self ._signature_header_lc = signature_header .lower ().strip ()
7066 self ._hash_alg = hash_alg
67+ self ._encoder = encoder
68+ self ._sig_value_template = signature_value_template
69+
70+ # Compile template once for performance
71+ self ._engine = TemplateEngine ()
72+ self ._plan : List [Callable [[Request ], bytes ]] = self ._engine .compile (message_template )
7173
74+ # ------------------------------------------------------------------
75+ # Public API
76+ # ------------------------------------------------------------------
7277 def verify (self , request : Request ) -> VerificationResult :
78+ """
79+ Verify the signature on the given request.
80+
81+ Returns
82+ -------
83+ VerificationResult
84+ - passed() if signature matches
85+ - failed(error) otherwise
86+ """
7387 try :
74- # Basic request shape checks
75- if request is None or not hasattr (request , "headers" ) or not hasattr (request , "body" ):
76- return VerificationResult .failed (ValueError ("Invalid request object." ))
77-
78- normalized : Mapping [str , str ] = {str (k ).lower (): str (v ) for k , v in request .headers .items ()}
79- provided = normalized .get (self ._sig_header )
80-
81- if provided is None or not str (provided ).strip ():
82- return VerificationResult .failed (ValueError (f"Signature header '{ self ._sig_header } ' is missing." ))
83-
84- # Select body
85- if not isinstance (request .body , str ):
86- return VerificationResult .failed (ValueError ("request.body must be a str (raw JSON/text)." ))
87-
88- # Build canonical message
89- parts : list [str ] = []
90- if self ._order is HmacOrder .PREPEND :
91- for h in self ._headers :
92- val = normalized .get (h )
93- if val is not None :
94- parts .append (str (val ).strip ())
95- parts .append (request .body )
96- else :
97- parts .append (request .body )
98- for h in self ._headers :
99- val = normalized .get (h )
100- if val is not None :
101- parts .append (str (val ).strip ())
102-
103- message = self ._delimiter .join (parts ).encode ("utf-8" )
104-
105- # Compute digest
106- digest = hmac .new (self ._key .encode ("utf-8" ), message , self ._hash_alg ).digest ()
107- expected = self ._encoder .encode (digest )
108-
109- # Constant-time compare
110- ok = hmac .compare_digest (str (provided ).strip (), expected )
111- return VerificationResult .passed () if ok else VerificationResult .failed (SignatureVerificationError ("Signature mismatch." ))
112-
113- except Exception as e :
114- # Convert any unexpected error into a failure result
115- return VerificationResult .failed (SignatureVerificationError (f"Signature Verification Failed: { e } " ))
88+ provided = self ._read_signature_header (request )
89+ if provided is None :
90+ return VerificationResult .failed (
91+ ValueError (f"Signature header '{ self ._signature_header_lc } ' is missing." )
92+ )
93+
94+ expected = self ._compute_expected_signature_text (request )
95+ ok = hmac .compare_digest (provided , expected )
96+ return VerificationResult .passed () if ok else VerificationResult .failed (
97+ SignatureVerificationError ("Signature mismatch." )
98+ )
99+ except Exception as exc :
100+ return VerificationResult .failed (
101+ SignatureVerificationError (f"Signature Verification Failed: { exc } " )
102+ )
103+
104+ def compute_expected_signature (self , request : Request ) -> str :
105+ """
106+ Compute the *exact* expected signature text for this request (useful for tests or debugging).
107+
108+ Returns
109+ -------
110+ str
111+ The fully formatted signature string (after digest encoding and optional value templating).
112+ """
113+ return self ._compute_expected_signature_text (request )
114+
115+ # ------------------------------------------------------------------
116+ # Internals
117+ # ------------------------------------------------------------------
118+ def _read_signature_header (self , request : Request ) -> Optional [str ]:
119+ headers = {str (k ).lower (): str (v ) for k , v in (getattr (request , "headers" , {}) or {}).items ()}
120+ value = headers .get (self ._signature_header_lc )
121+ return None if value is None or str (value ).strip () == "" else str (value )
122+
123+ def _compute_expected_signature_text (self , request : Request ) -> str :
124+ message = self ._engine .render (self ._plan , request )
125+ digest = hmac .new (self ._key_bytes , message , self ._hash_alg ).digest ()
126+ encoded = self ._encoder .encode (digest )
127+ return self ._sig_value_template .replace ("{digest}" , encoded ) if self ._sig_value_template else encoded
0 commit comments