1+ # -*- coding: utf-8 -*-
12import base64
23import hashlib
34import hmac
78from apimatic_core_interfaces .security .signature_verifier import SignatureVerifier
89from apimatic_core_interfaces .http .request import Request
910
10- from apimatic_core .exceptions .signature_verification_error import SignatureVerificationError
11-
1211
1312class DigestEncoder (Protocol ):
14- """Encodes raw HMAC digest bytes into the on-wire string."""
1513 def encode (self , digest : bytes ) -> str : ...
1614
1715
@@ -27,23 +25,18 @@ def encode(self, digest: bytes) -> str:
2725
2826class Base64UrlEncoder :
2927 def encode (self , digest : bytes ) -> str :
30- # Trim '=' padding to match many providers’ behavior
3128 return base64 .urlsafe_b64encode (digest ).decode ("utf-8" ).rstrip ("=" )
3229
30+
3331class HmacOrder (Enum ):
34- """Defines how additional headers are combined with the body in the HMAC message."""
3532 PREPEND = "prepend"
3633 APPEND = "append"
3734
35+
3836class HmacSignatureVerifier (SignatureVerifier ):
3937 """
4038 HMAC-SHA256 verifier with pluggable digest encoder and flexible message shape.
41-
42- Behavior:
43- - Returns True on valid signature.
44- - Raises SignatureVerificationError when the signature header is missing
45- or the signature does not match.
46- - Raises ValueError for invalid inputs (misuse).
39+ verify() never raises for verification outcomes; it returns VerificationResult.
4740 """
4841
4942 def __init__ (
@@ -67,51 +60,54 @@ def __init__(
6760 raise ValueError ("order must be HmacOrder.PREPEND or HmacOrder.APPEND." )
6861
6962 self ._key_bytes = key .encode ("utf-8" )
70- self ._signature_header = signature_header .lower ().strip ()
71- self ._additional_headers = [h .lower ().strip () for h in (additional_headers or ())]
63+ self ._sig_header = signature_header .lower ().strip ()
64+ self ._headers = [h .lower ().strip () for h in (additional_headers or ())]
7265 self ._order = order
7366 self ._delimiter = delimiter
7467 self ._encoder = encoder
7568 self ._hash_alg = hash_alg
7669
77- def verify (self , request : Request ) -> bool :
78- if request is None or not isinstance (request , Request ):
79- raise ValueError ("request must be an EventRequest." )
80-
81- if not isinstance (request .body , str ):
82- raise ValueError ("request.body must be a str (raw JSON)." )
83-
84- # normalize headers
85- normalized : Mapping [str , str ] = {
86- str (k ).lower (): str (v ) for k , v in request .headers .items ()
87- }
88- provided = normalized .get (self ._signature_header )
89- if provided is None or not str (provided ).strip ():
90- raise SignatureVerificationError (
91- f"Signature header '{ self ._signature_header } ' is missing from the request."
92- )
93- provided = str (provided ).strip ()
94-
95- # Build canonical message
96- parts : list [str ] = []
97- if self ._order is HmacOrder .PREPEND :
98- for h in self ._additional_headers :
99- val = normalized .get (h )
100- if val is not None :
101- parts .append (str (val ).strip ())
102- parts .append (request .body )
103- else :
104- parts .append (request .body )
105- for h in self ._additional_headers :
106- val = normalized .get (h )
107- if val is not None :
108- parts .append (str (val ).strip ())
109-
110- message = self ._delimiter .join (parts ).encode ("utf-8" )
111-
70+ def verify (self , request : Request ) -> VerificationResult :
11271 try :
72+ # Basic request shape checks
73+ if request is None or not hasattr (request , "headers" ) or not hasattr (request , "body" ):
74+ return VerificationResult .failed (ValueError ("Invalid request object." ))
75+
76+ normalized : Mapping [str , str ] = {str (k ).lower (): str (v ) for k , v in request .headers .items ()}
77+ provided = normalized .get (self ._sig_header )
78+
79+ if provided is None or not str (provided ).strip ():
80+ return VerificationResult .failed (ValueError (f"Signature header '{ self ._sig_header } ' is missing." ))
81+
82+ # Select body
83+ if not isinstance (request .body , str ):
84+ return VerificationResult .failed (ValueError ("request.body must be a str (raw JSON/text)." ))
85+
86+ # Build canonical message
87+ parts : list [str ] = []
88+ if self ._order is HmacOrder .PREPEND :
89+ for h in self ._headers :
90+ val = normalized .get (h )
91+ if val is not None :
92+ parts .append (str (val ).strip ())
93+ parts .append (request .body )
94+ else :
95+ parts .append (request .body )
96+ for h in self ._headers :
97+ val = normalized .get (h )
98+ if val is not None :
99+ parts .append (str (val ).strip ())
100+
101+ message = self ._delimiter .join (parts ).encode ("utf-8" )
102+
103+ # Compute digest
113104 digest = hmac .new (self ._key_bytes , message , self ._hash_alg ).digest ()
114105 expected = self ._encoder .encode (digest )
115- return hmac .compare_digest (provided , expected )
106+
107+ # Constant-time compare
108+ ok = hmac .compare_digest (str (provided ).strip (), expected )
109+ return VerificationResult .passed () if ok else VerificationResult .failed (ValueError ("Signature mismatch." ))
110+
116111 except Exception as e :
117- raise SignatureVerificationError ("HMAC digest computation failed." ) from e
112+ # Convert any unexpected error into a failure result
113+ return VerificationResult .failed (e )
0 commit comments