|
1 | 1 | """Implementation of common test steps.""" |
2 | 2 |
|
| 3 | +import base64 |
| 4 | +import json |
| 5 | + |
3 | 6 | import requests |
4 | 7 | from behave import given, when # pyright: ignore[reportAttributeAccessIssue] |
5 | 8 | from behave.runner import Context |
6 | 9 | from tests.e2e.utils.utils import normalize_endpoint |
7 | 10 |
|
8 | 11 |
|
| 12 | +def _encode_rh_identity(identity_data: dict) -> str: |
| 13 | + """Encode identity dict to base64 for x-rh-identity header. |
| 14 | +
|
| 15 | + Args: |
| 16 | + identity_data: JSON-serializable identity payload to encode. |
| 17 | +
|
| 18 | + Returns: |
| 19 | + Base64-encoded UTF-8 string representation of the JSON payload. |
| 20 | + """ |
| 21 | + json_str = json.dumps(identity_data) |
| 22 | + return base64.b64encode(json_str.encode("utf-8")).decode("utf-8") |
| 23 | + |
| 24 | + |
9 | 25 | @given("I set the Authorization header to {header_value}") |
10 | 26 | def set_authorization_header_custom(context: Context, header_value: str) -> None: |
11 | 27 | """Set a custom Authorization header value. |
@@ -73,3 +89,103 @@ def access_rest_api_endpoint_post_without_param( |
73 | 89 | context.response = requests.post( |
74 | 90 | url, json="", headers=context.auth_headers, timeout=10 |
75 | 91 | ) |
| 92 | + |
| 93 | + |
| 94 | +@given('I set the x-rh-identity header to raw value "{header_value}"') |
| 95 | +def set_rh_identity_header_raw(context: Context, header_value: str) -> None: |
| 96 | + """Set x-rh-identity header with a raw string value for testing invalid base64.""" |
| 97 | + if not hasattr(context, "auth_headers"): |
| 98 | + context.auth_headers = {} |
| 99 | + context.auth_headers["x-rh-identity"] = header_value |
| 100 | + print(f"Set x-rh-identity header to raw value: {header_value[:50]}...") |
| 101 | + |
| 102 | + |
| 103 | +@given('I set the x-rh-identity header with base64 encoded value "{raw_value}"') |
| 104 | +def set_rh_identity_header_base64_raw(context: Context, raw_value: str) -> None: |
| 105 | + """Set x-rh-identity header with base64-encoded raw string for testing invalid JSON.""" |
| 106 | + if not hasattr(context, "auth_headers"): |
| 107 | + context.auth_headers = {} |
| 108 | + encoded = base64.b64encode(raw_value.encode("utf-8")).decode("utf-8") |
| 109 | + context.auth_headers["x-rh-identity"] = encoded |
| 110 | + print(f"Set x-rh-identity header with base64-encoded: {raw_value}") |
| 111 | + |
| 112 | + |
| 113 | +@given("I set the x-rh-identity header with JSON") |
| 114 | +def set_rh_identity_header_json(context: Context) -> None: |
| 115 | + """Set x-rh-identity header with base64-encoded JSON from context.text.""" |
| 116 | + if not hasattr(context, "auth_headers"): |
| 117 | + context.auth_headers = {} |
| 118 | + assert context.text is not None, "JSON payload required" |
| 119 | + identity_data = json.loads(context.text) |
| 120 | + context.auth_headers["x-rh-identity"] = _encode_rh_identity(identity_data) |
| 121 | + print(f"Set x-rh-identity header with JSON: {identity_data}") |
| 122 | + |
| 123 | + |
| 124 | +@given("I set the x-rh-identity header with valid User identity") |
| 125 | +def set_rh_identity_user(context: Context) -> None: |
| 126 | + """Set x-rh-identity header with User identity from table.""" |
| 127 | + if not hasattr(context, "auth_headers"): |
| 128 | + context.auth_headers = {} |
| 129 | + |
| 130 | + assert context.table is not None, "Table with identity fields required" |
| 131 | + |
| 132 | + fields = {row["field"]: row["value"] for row in context.table} |
| 133 | + |
| 134 | + entitlements = {} |
| 135 | + if "entitlements" in fields: |
| 136 | + for ent in fields["entitlements"].split(","): |
| 137 | + ent = ent.strip() |
| 138 | + if not ent: |
| 139 | + continue |
| 140 | + entitlements[ent] = {"is_entitled": True, "is_trial": False} |
| 141 | + |
| 142 | + identity_data = { |
| 143 | + "identity": { |
| 144 | + "account_number": fields.get("account_number", "123"), |
| 145 | + "org_id": fields.get("org_id", "321"), |
| 146 | + "type": "User", |
| 147 | + "user": { |
| 148 | + "user_id": fields.get("user_id", "test-user"), |
| 149 | + "username": fields.get("username", "test@redhat.com"), |
| 150 | + "is_org_admin": fields.get("is_org_admin", "false").lower() == "true", |
| 151 | + }, |
| 152 | + }, |
| 153 | + "entitlements": entitlements, |
| 154 | + } |
| 155 | + |
| 156 | + context.auth_headers["x-rh-identity"] = _encode_rh_identity(identity_data) |
| 157 | + print(f"Set x-rh-identity header with User identity: {fields.get('user_id')}") |
| 158 | + |
| 159 | + |
| 160 | +@given("I set the x-rh-identity header with valid System identity") |
| 161 | +def set_rh_identity_system(context: Context) -> None: |
| 162 | + """Set x-rh-identity header with System identity from table.""" |
| 163 | + if not hasattr(context, "auth_headers"): |
| 164 | + context.auth_headers = {} |
| 165 | + |
| 166 | + assert context.table is not None, "Table with identity fields required" |
| 167 | + |
| 168 | + fields = {row["field"]: row["value"] for row in context.table} |
| 169 | + |
| 170 | + entitlements = {} |
| 171 | + if "entitlements" in fields: |
| 172 | + for ent in fields["entitlements"].split(","): |
| 173 | + ent = ent.strip() |
| 174 | + if not ent: |
| 175 | + continue |
| 176 | + entitlements[ent] = {"is_entitled": True, "is_trial": False} |
| 177 | + |
| 178 | + identity_data = { |
| 179 | + "identity": { |
| 180 | + "account_number": fields.get("account_number", "123"), |
| 181 | + "org_id": fields.get("org_id", "321"), |
| 182 | + "type": "System", |
| 183 | + "system": { |
| 184 | + "cn": fields.get("cn", "default-cn-uuid"), |
| 185 | + }, |
| 186 | + }, |
| 187 | + "entitlements": entitlements, |
| 188 | + } |
| 189 | + |
| 190 | + context.auth_headers["x-rh-identity"] = _encode_rh_identity(identity_data) |
| 191 | + print(f"Set x-rh-identity header with System identity: {fields.get('cn')}") |
0 commit comments