Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
f25ec12
adds signature verification implementations
sufyankhanrao Aug 21, 2025
77b4e2f
adds gaurd clause for the hmac verification implementation
sufyankhanrao Aug 21, 2025
7ad9240
updates the verify contract in SignatureVerifier
sufyankhanrao Aug 25, 2025
87ee3d8
makes the hmac signature verification more generic so that could be r…
sufyankhanrao Aug 26, 2025
426171d
reverts the Hmac implementation from generic configuration perspective
sufyankhanrao Aug 26, 2025
8fd9245
Revert "reverts the Hmac implementation from generic configuration pe…
sufyankhanrao Aug 27, 2025
f2f464c
fixes the request package
sufyankhanrao Aug 27, 2025
e4f2e9d
adds verification_result type for verify method
sufyankhanrao Aug 27, 2025
4179e49
adds the verification result integration
sufyankhanrao Aug 27, 2025
fc99e6b
override str method for custom exception
sufyankhanrao Aug 27, 2025
4344518
contains hmac generic implementation using json pointers
sufyankhanrao Sep 1, 2025
7345493
adds test for the hmac implementation
sufyankhanrao Sep 1, 2025
5ce2013
updates readme file
sufyankhanrao Sep 1, 2025
444fe65
fixed typing for python 3.7+ compatibility
sufyankhanrao Sep 1, 2025
c34851a
Merge branch 'main' into webhooks-support
sufyankhanrao Sep 1, 2025
149ff0c
adds typing extensions module dependency
sufyankhanrao Sep 1, 2025
edc2248
make the version to 4.0 for typing extensions
sufyankhanrao Sep 1, 2025
3908d61
fixes the hmac signature verifier implementation
sufyankhanrao Sep 2, 2025
ac188bb
makes the resolver injectable to the hmac implementation
sufyankhanrao Sep 4, 2025
e50b650
renamed the resolver parameter
sufyankhanrao Sep 4, 2025
44d63e3
removed the templating from the init file
sufyankhanrao Sep 4, 2025
e803faf
incorporated the feedback from the 1st review
sufyankhanrao Sep 5, 2025
98621d9
revision: adds structural typing and adapter for requests
sufyankhanrao Sep 8, 2025
65ed8a7
fixed failing test case for python 3.8
sufyankhanrao Sep 10, 2025
cbe3786
updates readme for the adapters
sufyankhanrao Sep 10, 2025
f38df42
fix sonarcloud complain in the adapter script
sufyankhanrao Sep 10, 2025
06efbf1
incorporate the errors property from verification result and renamed …
sufyankhanrao Sep 12, 2025
51434f2
renamed the adapter methods
sufyankhanrao Oct 3, 2025
d990d8e
renamed method in ReadMe
sufyankhanrao Oct 3, 2025
5ae14e4
updates core interfaces dependency version
sufyankhanrao Oct 3, 2025
6778b0c
Update setup.py
asadali214 Oct 3, 2025
a6b8da9
Update requirements.txt
asadali214 Oct 3, 2025
0925b8a
Update setup.py
asadali214 Oct 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -162,3 +162,4 @@ cython_debug/
# Visual Studio Code
.vscode/
.qodo
*~
220 changes: 142 additions & 78 deletions README.md

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion apimatic_core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,7 @@
'logger',
'exceptions',
'constants',
'pagination'
'pagination',
'security',
'templating'
]
14 changes: 14 additions & 0 deletions apimatic_core/exceptions/signature_verification_error.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
"""
This is an exception class which will be raised when verifying the Webhooks & Callbacks signature.
"""


class SignatureVerificationError(Exception):
"""Raised when a request cannot be verified (missing or invalid signature)."""

def __init__(self, message):
self.message = message
super().__init__(self.message)

def __str__(self):
return self.message
4 changes: 4 additions & 0 deletions apimatic_core/security/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
__all__=[
'hmac_signature_verifier',
'encoders'
]
28 changes: 28 additions & 0 deletions apimatic_core/security/encoders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# ======================================================================
# Digest encoders
# ======================================================================
import base64

from typing_extensions import Protocol

class DigestEncoder(Protocol):
"""Protocol for digest encoders. Implementations must return a text form of the raw HMAC bytes."""
def encode(self, digest: bytes) -> str: ...


class HexEncoder:
"""Lowercase hexadecimal encoding (e.g., 'a1b2c3...')."""
def encode(self, digest: bytes) -> str:
return digest.hex()


class Base64Encoder:
"""Standard Base64 encoding."""
def encode(self, digest: bytes) -> str:
return base64.b64encode(digest).decode("utf-8")


class Base64UrlEncoder:
"""URL-safe Base64 encoding without padding (= stripped)."""
def encode(self, digest: bytes) -> str:
return base64.urlsafe_b64encode(digest).decode("utf-8").rstrip("=")
104 changes: 104 additions & 0 deletions apimatic_core/security/hmac_signature_verifier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import hashlib
import hmac
from typing import Optional, List, Callable

from apimatic_core_interfaces.http.request import Request
from apimatic_core_interfaces.security.verification_result import VerificationResult

from apimatic_core.exceptions.signature_verification_error import SignatureVerificationError
from apimatic_core.security.encoders import DigestEncoder, HexEncoder
from apimatic_core.templating.template_engine import TemplateEngine


class HmacSignatureVerifier:
"""
Template-driven HMAC signature verifier.

- Builds the message to sign using a template with placeholders:
{raw_body}
{$method} | {$url} | {$request.path}
{$request.header.<HeaderName>}
{$request.query.<ParamName>}
{$request.body#/json/pointer}
- Computes HMAC(message, key, hash_alg) and encodes with the chosen encoder.
- Optionally wraps the encoded digest via `signature_value_template`, e.g., "sha256={digest}".

Parameters
----------
key : str
Shared secret (non-empty).
signature_header : str
Name of the header carrying the provided signature (case-insensitive lookup).
message_template : str
Template to construct the signed message (see placeholders above).
hash_alg : callable, optional
Hash function from hashlib (default: hashlib.sha256).
encoder : DigestEncoder, optional
Encoder for the raw HMAC bytes (default: HexEncoder()).
signature_value_template : str, optional
Optional template for the expected signature value, where `{digest}` is replaced
by the encoded digest. If omitted, the expected signature is the encoded digest itself.
"""

def __init__(
self,
*,
key: str,
signature_header: str,
message_template: str,
hash_alg=hashlib.sha256,
encoder: DigestEncoder = HexEncoder(),
signature_value_template: Optional[str] = None,
) -> None:
# Basic config validation
if not isinstance(key, str) or not key:
raise ValueError("key must be a non-empty string.")
if not isinstance(signature_header, str) or not signature_header.strip():
raise ValueError("signature_header must be a non-empty string.")
if not isinstance(message_template, str) or not message_template:
raise ValueError("message_template must be a non-empty string.")

self._key_bytes: bytes = key.encode("utf-8")
self._signature_header_lc = signature_header.lower().strip()
self._hash_alg = hash_alg
self._encoder = encoder
self._sig_value_template = signature_value_template

# Compile template once for performance
self._engine = TemplateEngine()
self._plan: List[Callable[[Request], bytes]] = self._engine.compile(message_template)

def verify(self, request: Request) -> VerificationResult:
"""Verify the signature in the request headers."""
try:
provided = self._read_signature_header(request)
if provided is None:
return VerificationResult.failed(
ValueError(f"Signature header '{self._signature_header_lc}' is missing.")
)

message = self._engine.render(self._plan, request)
digest = hmac.new(self._key_bytes, message, self._hash_alg).digest()
encoded = self._encoder.encode(digest)
expected = (
self._sig_value_template.replace("{digest}", encoded)
if self._sig_value_template else
encoded
)

ok = hmac.compare_digest(provided, expected)
return VerificationResult.passed() if ok else VerificationResult.failed(
SignatureVerificationError("Signature mismatch.")
)
except Exception as exc:
return VerificationResult.failed(
SignatureVerificationError(f"Signature Verification Failed: {exc}")
)

# ------------------------------------------------------------------
# Internals
# ------------------------------------------------------------------
def _read_signature_header(self, request: Request) -> Optional[str]:
headers = {str(k).lower(): str(v) for k, v in (getattr(request, "headers", {}) or {}).items()}
value = headers.get(self._signature_header_lc)
return None if value is None or str(value).strip() == "" else str(value)
5 changes: 5 additions & 0 deletions apimatic_core/templating/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
__all__=[
'template_engine',
'template_resolver',
'json_pointer_resolver'
]
79 changes: 79 additions & 0 deletions apimatic_core/templating/json_pointer_resolver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import json
from typing import Any, Optional
from jsonpointer import resolve_pointer, JsonPointerException


class JsonPointerResolver:
"""
Resolve RFC 6901 JSON Pointer values from a request body string.

Features:
- Lazy JSON parsing (parses only once on init)
- Canonical encoding for dict/list
- Consistent bytes conversion for scalar values

Usage:
resolver = JsonPointerResolver(body_text='{"event":{"id":"123"}}')
value_bytes = resolver.resolve_as_bytes("/event/id")
print(value_bytes) # b'123'
"""

__slots__ = ("_json",)

def __init__(self, *, body_text: Optional[str]) -> None:
"""
Initialize a new JSON Pointer resolver.

Args:
body_text: JSON text body (string). None or invalid JSON results in a no-op resolver.
"""
self._json = self._parse_json_maybe(body_text)

def resolve(self, pointer: str) -> Any:
"""
Resolve the value at the provided pointer.

Args:
pointer: JSON pointer path (e.g., "/event/id").

Returns:
The Python value (str, int, dict, list, etc.), or None if not found.
"""
if self._json is None:
return None
try:
return resolve_pointer(self._json, pointer)
except JsonPointerException:
return None

def resolve_as_bytes(self, pointer: str) -> bytes:
"""
Resolve the value and return as a canonical byte sequence.

Conversion rules:
- None: b""
- dict/list: canonical JSON with sorted keys
- str/int/float/bool: string representation encoded in UTF-8
- bytes/bytearray: returned as-is
"""
val = self.resolve(pointer)
return self._to_bytes(val)

@staticmethod
def _parse_json_maybe(body: Optional[str]) -> Any:
if not isinstance(body, str) or not body.strip():
return None
try:
return json.loads(body)
except Exception:
return None

@staticmethod
def _to_bytes(val: Any) -> bytes:
if val is None:
return b""
if isinstance(val, (bytes, bytearray)):
return bytes(val)
if isinstance(val, (dict, list)):
return json.dumps(val, separators=(",", ":"), sort_keys=True).encode("utf-8")
return str(val).encode("utf-8")
Loading
Loading