Skip to content

Commit be56d50

Browse files
authored
Merge pull request lightspeed-core#483 from omertuc/regex
Regex operator for JWT role extraction
2 parents 0af7072 + 17d0419 commit be56d50

3 files changed

Lines changed: 160 additions & 12 deletions

File tree

src/authorization/resolvers.py

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,8 @@ def evaluate_role_rules(rule: JwtRoleRule, jwt_claims: dict[str, Any]) -> UserRo
7272
return (
7373
set(rule.roles)
7474
if JwtRolesResolver._evaluate_operator(
75-
rule.negate,
75+
rule,
7676
[match.value for match in parse(rule.jsonpath).find(jwt_claims)],
77-
rule.operator,
78-
rule.value,
7977
)
8078
else set()
8179
)
@@ -99,19 +97,26 @@ def _get_claims(auth: AuthTuple) -> dict[str, Any]:
9997

10098
@staticmethod
10199
def _evaluate_operator(
102-
negate: bool, match: Any, operator: JsonPathOperator, value: Any
100+
rule: JwtRoleRule, match: Any
103101
) -> bool: # pylint: disable=too-many-branches
104-
"""Evaluate an operator against a match and value."""
102+
"""Evaluate an operator against a match and rule."""
105103
result = False
106-
match operator:
104+
match rule.operator:
107105
case JsonPathOperator.EQUALS:
108-
result = match == value
106+
result = match == rule.value
109107
case JsonPathOperator.CONTAINS:
110-
result = value in match
108+
result = rule.value in match
111109
case JsonPathOperator.IN:
112-
result = match in value
113-
114-
if negate:
110+
result = match in rule.value
111+
case JsonPathOperator.MATCH:
112+
# Use the pre-compiled regex pattern for better performance
113+
if rule.compiled_regex is not None:
114+
result = any(
115+
isinstance(item, str) and bool(rule.compiled_regex.search(item))
116+
for item in match
117+
)
118+
119+
if rule.negate:
115120
result = not result
116121

117122
return result

src/models/config.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
"""Model with service configuration."""
22

33
from pathlib import Path
4-
from typing import Optional, Any
4+
from typing import Optional, Any, Pattern
55
from enum import Enum
6+
from functools import cached_property
7+
import re
68

79
import jsonpath_ng
810
from jsonpath_ng.exceptions import JSONPathError
@@ -233,6 +235,7 @@ class JsonPathOperator(str, Enum):
233235
EQUALS = "equals"
234236
CONTAINS = "contains"
235237
IN = "in"
238+
MATCH = "match"
236239

237240

238241
class JwtRoleRule(ConfigurationBase):
@@ -272,6 +275,29 @@ def check_roles(self) -> Self:
272275

273276
return self
274277

278+
@model_validator(mode="after")
279+
def check_regex_pattern(self) -> Self:
280+
"""Verify that regex patterns are valid for MATCH operator."""
281+
if self.operator == JsonPathOperator.MATCH:
282+
if not isinstance(self.value, str):
283+
raise ValueError(
284+
f"MATCH operator requires a string pattern, {type(self.value).__name__}"
285+
)
286+
try:
287+
re.compile(self.value)
288+
except re.error as e:
289+
raise ValueError(
290+
f"Invalid regex pattern for MATCH operator: {self.value}: {e}"
291+
) from e
292+
return self
293+
294+
@cached_property
295+
def compiled_regex(self) -> Optional[Pattern[str]]:
296+
"""Return compiled regex pattern for MATCH operator, None otherwise."""
297+
if self.operator == JsonPathOperator.MATCH and isinstance(self.value, str):
298+
return re.compile(self.value)
299+
return None
300+
275301

276302
class Action(str, Enum):
277303
"""Available actions in the system."""

tests/unit/authorization/test_resolvers.py

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
import json
44
import base64
5+
import re
6+
7+
import pytest
58

69
from authorization.resolvers import JwtRolesResolver, GenericAccessResolver
710
from models.config import JwtRoleRule, AccessRule, JsonPathOperator, Action
@@ -75,6 +78,120 @@ async def test_resolve_roles_no_match(self):
7578
roles = await jwt_resolver.resolve_roles(auth)
7679
assert len(roles) == 0
7780

81+
async def test_resolve_roles_match_operator_email_domain(self):
82+
"""Test role extraction using MATCH operator with email domain regex."""
83+
role_rules = [
84+
JwtRoleRule(
85+
jsonpath="$.email",
86+
operator=JsonPathOperator.MATCH,
87+
value=r"@redhat\.com$",
88+
roles=["redhat_employee"],
89+
)
90+
]
91+
jwt_resolver = JwtRolesResolver(role_rules)
92+
93+
jwt_claims = {
94+
"exp": 1754489339,
95+
"iat": 1754488439,
96+
"sub": "f:123:employee@redhat.com",
97+
"email": "employee@redhat.com",
98+
}
99+
100+
auth = ("user", "token", claims_to_token(jwt_claims))
101+
roles = await jwt_resolver.resolve_roles(auth)
102+
assert "redhat_employee" in roles
103+
104+
async def test_resolve_roles_match_operator_no_match(self):
105+
"""Test role extraction using MATCH operator with no match."""
106+
role_rules = [
107+
JwtRoleRule(
108+
jsonpath="$.email",
109+
operator=JsonPathOperator.MATCH,
110+
value=r"@redhat\.com$",
111+
roles=["redhat_employee"],
112+
)
113+
]
114+
jwt_resolver = JwtRolesResolver(role_rules)
115+
116+
jwt_claims = {
117+
"exp": 1754489339,
118+
"iat": 1754488439,
119+
"sub": "f:123:user@example.com",
120+
"email": "user@example.com",
121+
}
122+
123+
auth = ("user", "token", claims_to_token(jwt_claims))
124+
roles = await jwt_resolver.resolve_roles(auth)
125+
assert len(roles) == 0
126+
127+
async def test_resolve_roles_match_operator_invalid_regex(self):
128+
"""Test that invalid regex patterns are rejected at rule creation time."""
129+
with pytest.raises(
130+
ValueError, match="Invalid regex pattern for MATCH operator"
131+
):
132+
JwtRoleRule(
133+
jsonpath="$.email",
134+
operator=JsonPathOperator.MATCH,
135+
value="[invalid regex(", # Invalid regex pattern
136+
roles=["test_role"],
137+
)
138+
139+
async def test_resolve_roles_match_operator_non_string_pattern(self):
140+
"""Test that non-string regex patterns are rejected at rule creation time."""
141+
with pytest.raises(
142+
ValueError, match="MATCH operator requires a string pattern"
143+
):
144+
JwtRoleRule(
145+
jsonpath="$.user_id",
146+
operator=JsonPathOperator.MATCH,
147+
value=123, # Non-string pattern
148+
roles=["test_role"],
149+
)
150+
151+
async def test_resolve_roles_match_operator_non_string_value(self):
152+
"""Test role extraction using MATCH operator with non-string match value."""
153+
role_rules = [
154+
JwtRoleRule(
155+
jsonpath="$.user_id",
156+
operator=JsonPathOperator.MATCH,
157+
value=r"\d+", # Number pattern
158+
roles=["numeric_user"],
159+
)
160+
]
161+
jwt_resolver = JwtRolesResolver(role_rules)
162+
163+
jwt_claims = {
164+
"exp": 1754489339,
165+
"iat": 1754488439,
166+
"user_id": 12345, # Non-string value
167+
}
168+
169+
auth = ("user", "token", claims_to_token(jwt_claims))
170+
roles = await jwt_resolver.resolve_roles(auth)
171+
assert len(roles) == 0 # Non-string values don't match regex
172+
173+
async def test_compiled_regex_property(self):
174+
"""Test that compiled regex pattern is properly created for MATCH operator."""
175+
# Test MATCH operator creates compiled regex
176+
match_rule = JwtRoleRule(
177+
jsonpath="$.email",
178+
operator=JsonPathOperator.MATCH,
179+
value=r"@example\.com$",
180+
roles=["example_user"],
181+
)
182+
assert match_rule.compiled_regex is not None
183+
assert isinstance(match_rule.compiled_regex, re.Pattern)
184+
assert match_rule.compiled_regex.pattern == r"@example\.com$"
185+
186+
# Test non-MATCH operator returns None
187+
equals_rule = JwtRoleRule(
188+
jsonpath="$.email",
189+
operator=JsonPathOperator.EQUALS,
190+
value="test@example.com",
191+
roles=["example_user"],
192+
)
193+
assert equals_rule.compiled_regex is None
194+
78195

79196
class TestGenericAccessResolver:
80197
"""Test cases for GenericAccessResolver."""

0 commit comments

Comments
 (0)