Skip to content

Commit 5a52503

Browse files
authored
Merge pull request #1353 from major/rspeed-2652/rh-identity-field-validation
RSPEED-2652: validate type and format of rh-identity fields
2 parents f814439 + 3f58309 commit 5a52503

2 files changed

Lines changed: 238 additions & 35 deletions

File tree

src/authentication/rh_identity.py

Lines changed: 102 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
import base64
88
import json
9-
from typing import Optional
9+
from typing import Any, Optional
1010

1111
from fastapi import HTTPException, Request
1212

@@ -52,7 +52,7 @@ def _validate_structure(self) -> None:
5252
"""Validate the identity data structure.
5353
5454
Raises:
55-
HTTPException: 400 if required fields are missing
55+
HTTPException: 400 if required fields are missing or malformed
5656
"""
5757
if (
5858
"identity" not in self.identity_data
@@ -68,44 +68,111 @@ def _validate_structure(self) -> None:
6868

6969
identity_type = identity["type"]
7070
if identity_type == "User":
71-
if "user" not in identity:
72-
logger.warning(
73-
"Identity validation failed: missing 'user' field for User type"
74-
)
75-
raise HTTPException(status_code=400, detail="Invalid identity data")
76-
user = identity["user"]
77-
if "user_id" not in user:
78-
logger.warning(
79-
"Identity validation failed: missing 'user_id' in user data"
80-
)
81-
raise HTTPException(status_code=400, detail="Invalid identity data")
82-
if "username" not in user:
83-
logger.warning(
84-
"Identity validation failed: missing 'username' in user data"
85-
)
86-
raise HTTPException(status_code=400, detail="Invalid identity data")
71+
self._validate_user_fields(identity)
8772
elif identity_type == "System":
88-
if "system" not in identity:
89-
logger.warning(
90-
"Identity validation failed: missing 'system' field for System type"
91-
)
92-
raise HTTPException(status_code=400, detail="Invalid identity data")
93-
system = identity["system"]
94-
if "cn" not in system:
95-
logger.warning(
96-
"Identity validation failed: missing 'cn' in system data"
97-
)
98-
raise HTTPException(status_code=400, detail="Invalid identity data")
99-
if "account_number" not in identity:
100-
logger.warning(
101-
"Identity validation failed: "
102-
"missing 'account_number' for System type"
103-
)
104-
raise HTTPException(status_code=400, detail="Invalid identity data")
73+
self._validate_system_fields(identity)
10574
else:
10675
logger.warning("Identity validation failed: unsupported identity type")
10776
raise HTTPException(status_code=400, detail="Invalid identity data")
10877

78+
# Validate org_id if present and non-empty
79+
org_id = identity.get("org_id")
80+
if org_id is not None and org_id != "":
81+
self._validate_string_field("org_id", org_id)
82+
83+
def _validate_user_fields(self, identity: dict) -> None:
84+
"""Validate required fields for User identity type.
85+
86+
Args:
87+
identity: The identity dict containing user data
88+
89+
Raises:
90+
HTTPException: 400 if required User fields are missing or malformed
91+
"""
92+
if "user" not in identity:
93+
logger.warning(
94+
"Identity validation failed: missing 'user' field for User type"
95+
)
96+
raise HTTPException(status_code=400, detail="Invalid identity data")
97+
user = identity["user"]
98+
if "user_id" not in user:
99+
logger.warning("Identity validation failed: missing 'user_id' in user data")
100+
raise HTTPException(status_code=400, detail="Invalid identity data")
101+
if "username" not in user:
102+
logger.warning(
103+
"Identity validation failed: missing 'username' in user data"
104+
)
105+
raise HTTPException(status_code=400, detail="Invalid identity data")
106+
self._validate_string_field("user_id", user["user_id"])
107+
self._validate_string_field("username", user["username"])
108+
109+
def _validate_system_fields(self, identity: dict) -> None:
110+
"""Validate required fields for System identity type.
111+
112+
Args:
113+
identity: The identity dict containing system data
114+
115+
Raises:
116+
HTTPException: 400 if required System fields are missing or malformed
117+
"""
118+
if "system" not in identity:
119+
logger.warning(
120+
"Identity validation failed: missing 'system' field for System type"
121+
)
122+
raise HTTPException(status_code=400, detail="Invalid identity data")
123+
system = identity["system"]
124+
if "cn" not in system:
125+
logger.warning("Identity validation failed: missing 'cn' in system data")
126+
raise HTTPException(status_code=400, detail="Invalid identity data")
127+
if "account_number" not in identity:
128+
logger.warning(
129+
"Identity validation failed: "
130+
"missing 'account_number' for System type"
131+
)
132+
raise HTTPException(status_code=400, detail="Invalid identity data")
133+
self._validate_string_field("cn", system["cn"])
134+
self._validate_string_field("account_number", identity["account_number"])
135+
136+
def _validate_string_field(
137+
self, field_name: str, value: Any, max_length: int = 256
138+
) -> None:
139+
"""Validate that a field value is a well-formed string.
140+
141+
Args:
142+
field_name: Name of the field being validated (for error messages)
143+
value: The value to validate
144+
max_length: Maximum allowed string length
145+
146+
Raises:
147+
HTTPException: 400 if validation fails
148+
"""
149+
if not isinstance(value, str):
150+
logger.warning(
151+
"Identity validation failed: %s must be a string, got %s",
152+
field_name,
153+
type(value).__name__,
154+
)
155+
raise HTTPException(status_code=400, detail="Invalid identity data")
156+
if not value.strip():
157+
logger.warning(
158+
"Identity validation failed: %s must not be empty",
159+
field_name,
160+
)
161+
raise HTTPException(status_code=400, detail="Invalid identity data")
162+
if len(value) > max_length:
163+
logger.warning(
164+
"Identity validation failed: %s exceeds maximum length of %d",
165+
field_name,
166+
max_length,
167+
)
168+
raise HTTPException(status_code=400, detail="Invalid identity data")
169+
if any(ord(c) < 32 or ord(c) == 127 for c in value):
170+
logger.warning(
171+
"Identity validation failed: %s contains control characters",
172+
field_name,
173+
)
174+
raise HTTPException(status_code=400, detail="Invalid identity data")
175+
109176
def _get_identity_type(self) -> str:
110177
"""Get the identity type (User or System).
111178

tests/unit/authentication/test_rh_identity.py

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -594,3 +594,139 @@ async def test_header_exceeding_limit_rejected(
594594
mock_warning.assert_called_once()
595595
assert mock_warning.call_args.args[1] == header_size
596596
assert mock_warning.call_args.args[2] == max_size
597+
598+
599+
class TestRHIdentityFieldValidation:
600+
"""Test suite for RHIdentityData string field validation."""
601+
602+
@pytest.mark.parametrize(
603+
"field_path,bad_value",
604+
[
605+
(("user", "user_id"), None),
606+
(("user", "user_id"), 12345),
607+
(("user", "user_id"), True),
608+
(("user", "user_id"), []),
609+
(("user", "user_id"), {}),
610+
(("user", "user_id"), 3.14),
611+
(("user", "username"), None),
612+
(("user", "username"), 12345),
613+
],
614+
)
615+
def test_user_non_string_types_rejected(
616+
self, user_identity_data: dict, field_path: tuple[str, str], bad_value: object
617+
) -> None:
618+
"""Reject non-string values in User identity string fields."""
619+
user_identity_data["identity"][field_path[0]][field_path[1]] = bad_value
620+
with pytest.raises(HTTPException) as exc_info:
621+
RHIdentityData(user_identity_data)
622+
assert exc_info.value.status_code == 400
623+
624+
@pytest.mark.parametrize(
625+
"field_path,bad_value",
626+
[
627+
(("system", "cn"), None),
628+
(("system", "cn"), 12345),
629+
(("system", "cn"), True),
630+
(("system", "cn"), []),
631+
(("system", "cn"), {}),
632+
(("account_number",), None),
633+
(("account_number",), 12345),
634+
(("account_number",), False),
635+
(("account_number",), []),
636+
(("account_number",), {}),
637+
],
638+
)
639+
def test_system_non_string_types_rejected(
640+
self,
641+
system_identity_data: dict,
642+
field_path: tuple[str, ...],
643+
bad_value: object,
644+
) -> None:
645+
"""Reject non-string values in System identity string fields."""
646+
identity = system_identity_data["identity"]
647+
if len(field_path) == 1:
648+
identity[field_path[0]] = bad_value
649+
else:
650+
identity[field_path[0]][field_path[1]] = bad_value
651+
with pytest.raises(HTTPException) as exc_info:
652+
RHIdentityData(system_identity_data)
653+
assert exc_info.value.status_code == 400
654+
655+
@pytest.mark.parametrize("bad_value", ["", " ", "\t", "\n"])
656+
def test_empty_whitespace_rejected(
657+
self, user_identity_data: dict, bad_value: str
658+
) -> None:
659+
"""Reject empty and whitespace-only strings."""
660+
user_identity_data["identity"]["user"]["user_id"] = bad_value
661+
with pytest.raises(HTTPException) as exc_info:
662+
RHIdentityData(user_identity_data)
663+
assert exc_info.value.status_code == 400
664+
665+
@pytest.mark.parametrize(
666+
"bad_value",
667+
["user\x00id", "user\nid", "user\rid", "a\x1fb", "a\x7fb"],
668+
)
669+
def test_control_characters_rejected(
670+
self, user_identity_data: dict, bad_value: str
671+
) -> None:
672+
"""Reject strings containing control characters."""
673+
user_identity_data["identity"]["user"]["user_id"] = bad_value
674+
with pytest.raises(HTTPException) as exc_info:
675+
RHIdentityData(user_identity_data)
676+
assert exc_info.value.status_code == 400
677+
678+
def test_oversized_value_rejected(self, user_identity_data: dict) -> None:
679+
"""Reject values longer than 256 characters."""
680+
user_identity_data["identity"]["user"]["user_id"] = "a" * 257
681+
with pytest.raises(HTTPException) as exc_info:
682+
RHIdentityData(user_identity_data)
683+
assert exc_info.value.status_code == 400
684+
685+
def test_max_length_boundary_accepted(self, user_identity_data: dict) -> None:
686+
"""Accept values exactly 256 characters long."""
687+
user_identity_data["identity"]["user"]["user_id"] = "a" * 256
688+
RHIdentityData(user_identity_data)
689+
690+
def test_org_id_missing_accepted(self, user_identity_data: dict) -> None:
691+
"""Allow missing org_id."""
692+
user_identity_data["identity"].pop("org_id", None)
693+
RHIdentityData(user_identity_data)
694+
695+
def test_org_id_empty_accepted(self, user_identity_data: dict) -> None:
696+
"""Allow empty org_id."""
697+
user_identity_data["identity"]["org_id"] = ""
698+
RHIdentityData(user_identity_data)
699+
700+
def test_org_id_non_string_rejected(self, user_identity_data: dict) -> None:
701+
"""Reject non-string org_id when provided and non-empty."""
702+
user_identity_data["identity"]["org_id"] = 12345
703+
with pytest.raises(HTTPException) as exc_info:
704+
RHIdentityData(user_identity_data)
705+
assert exc_info.value.status_code == 400
706+
707+
def test_org_id_valid_accepted(self, user_identity_data: dict) -> None:
708+
"""Accept valid string org_id."""
709+
user_identity_data["identity"]["org_id"] = "valid-org-id"
710+
RHIdentityData(user_identity_data)
711+
712+
def test_org_id_oversized_rejected(self, user_identity_data: dict) -> None:
713+
"""Reject oversized org_id."""
714+
user_identity_data["identity"]["org_id"] = "a" * 257
715+
with pytest.raises(HTTPException) as exc_info:
716+
RHIdentityData(user_identity_data)
717+
assert exc_info.value.status_code == 400
718+
719+
def test_org_id_control_chars_rejected(self, user_identity_data: dict) -> None:
720+
"""Reject org_id containing control characters."""
721+
user_identity_data["identity"]["org_id"] = "org\x00id"
722+
with pytest.raises(HTTPException) as exc_info:
723+
RHIdentityData(user_identity_data)
724+
assert exc_info.value.status_code == 400
725+
726+
def test_valid_user_data_still_passes(self, user_identity_data: dict) -> None:
727+
"""Regression: valid User identity data passes validation."""
728+
RHIdentityData(user_identity_data)
729+
730+
def test_valid_system_data_still_passes(self, system_identity_data: dict) -> None:
731+
"""Regression: valid System identity data passes validation."""
732+
RHIdentityData(system_identity_data)

0 commit comments

Comments
 (0)