Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 49 additions & 2 deletions airbyte_cdk/sources/declarative/interpolation/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@

import base64
import hashlib
import hmac as hmac_lib
import json
import re
from typing import Any, Optional
from typing import Any, Dict, Optional


def hash(value: Any, hash_type: str = "md5", salt: Optional[str] = None) -> str:
Expand Down Expand Up @@ -135,5 +136,51 @@ def regex_search(value: str, regex: str) -> str:
return ""


_filters_list = [hash, base64encode, base64decode, base64binascii_decode, string, regex_search]
def hmac(value: Any, key: str, hash_type: str = "sha256") -> str:
"""
Implementation of a custom Jinja2 hmac filter with SHA-256 support.

This filter creates a Hash-based Message Authentication Code (HMAC) using a cryptographic
hash function and a secret key. Currently only supports SHA-256, and returns hexdigest of the signature.

Example usage in a low code connector:

auth_headers:
$ref: "#/definitions/base_auth"
$parameters:
signature: "{{ 'message_to_sign' | hmac('my_secret_key') }}"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah gotcha, so to use this in many cases its up to the caller to create the list of query params by hand to sign.

Think thats a fine work around.

But it seems like the next win is

  1. a jinja function to get all query params
  2. a jinja function to transform it into a delimited string (after you optionally sort / filter / map the list)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that would indeed make things easier.


:param value: The message to be authenticated
:param key: The secret key for the HMAC
:param hash_type: Hash algorithm to use (default: sha256)
:return: HMAC digest as a hexadecimal string
"""
# Define allowed hash functions
ALLOWED_HASH_TYPES: Dict[str, Any] = {
"sha256": hashlib.sha256,
}

if hash_type not in ALLOWED_HASH_TYPES:
raise ValueError(
f"Hash type '{hash_type}' is not allowed. Allowed types: {', '.join(ALLOWED_HASH_TYPES.keys())}"
)

hmac_obj = hmac_lib.new(
key=str(key).encode("utf-8"),
msg=str(value).encode("utf-8"),
digestmod=ALLOWED_HASH_TYPES[hash_type],
)

return hmac_obj.hexdigest()


_filters_list = [
hash,
base64encode,
base64decode,
base64binascii_decode,
string,
regex_search,
hmac,
]
filters = {f.__name__: f for f in _filters_list}
59 changes: 59 additions & 0 deletions unit_tests/sources/declarative/interpolation/test_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#
import base64
import hashlib
import hmac as hmac_lib

import pytest

Expand Down Expand Up @@ -105,3 +106,61 @@ def test_regex_search_no_match() -> None:
val = interpolation.eval(expression_with_regex, {})

assert val is None


def test_hmac_sha256_default() -> None:
message = "test_message"
secret_key = "test_secret_key"

s = "{{ '%s' | hmac('%s') }}" % (message, secret_key)
filter_hmac = interpolation.eval(s, config={})

# compute expected hmac using the hmac library directly
hmac_obj = hmac_lib.new(
key=secret_key.encode("utf-8"), msg=message.encode("utf-8"), digestmod=hashlib.sha256
)
expected_hmac = hmac_obj.hexdigest()

assert filter_hmac == expected_hmac


def test_hmac_sha256_explicit() -> None:
message = "test_message"
secret_key = "test_secret_key"

s = "{{ '%s' | hmac('%s', 'sha256') }}" % (message, secret_key)
filter_hmac = interpolation.eval(s, config={})

# compute expected hmac using the hmac library directly
hmac_obj = hmac_lib.new(
key=secret_key.encode("utf-8"), msg=message.encode("utf-8"), digestmod=hashlib.sha256
)
expected_hmac = hmac_obj.hexdigest()

assert filter_hmac == expected_hmac


def test_hmac_with_numeric_value() -> None:
message = 12345
secret_key = "test_secret_key"

s = "{{ %d | hmac('%s') }}" % (message, secret_key)
filter_hmac = interpolation.eval(s, config={})

# compute expected hmac using the hmac library directly
hmac_obj = hmac_lib.new(
key=secret_key.encode("utf-8"), msg=str(message).encode("utf-8"), digestmod=hashlib.sha256
)
expected_hmac = hmac_obj.hexdigest()

assert filter_hmac == expected_hmac


def test_hmac_with_invalid_hash_type() -> None:
message = "test_message"
secret_key = "test_secret_key"

s = "{{ '%s' | hmac('%s', 'md5') }}" % (message, secret_key)

with pytest.raises(ValueError):
interpolation.eval(s, config={})
Loading