|
8 | 8 | import pytest |
9 | 9 | import responses |
10 | 10 | import respx |
| 11 | +from fastapi import HTTPException |
11 | 12 | from pydantic import SecretStr |
12 | 13 | from starlette.status import HTTP_200_OK, HTTP_403_FORBIDDEN |
13 | 14 |
|
14 | 15 | from blueapi.config import OIDCConfig, ServiceAccount |
15 | | -from blueapi.service import main |
| 16 | +from blueapi.service import authentication |
16 | 17 | from blueapi.service.authentication import ( |
17 | 18 | SessionCacheManager, |
18 | 19 | SessionManager, |
19 | 20 | TiledAuth, |
| 21 | + access_token, |
| 22 | + build_access_token_check, |
| 23 | + unchecked_bearer_token, |
20 | 24 | ) |
21 | 25 |
|
22 | 26 |
|
@@ -124,18 +128,18 @@ def test_poll_for_token_timeout( |
124 | 128 | def test_server_raises_exception_for_invalid_token( |
125 | 129 | oidc_config: OIDCConfig, mock_authn_server: responses.RequestsMock |
126 | 130 | ): |
127 | | - inner = main.decode_access_token(oidc_config) |
| 131 | + inner = authentication.build_access_token_check(oidc_config) |
128 | 132 | with pytest.raises(jwt.PyJWTError): |
129 | | - inner(Mock(), access_token="Invalid Token") |
| 133 | + inner(Mock(), token="Invalid Token") |
130 | 134 |
|
131 | 135 |
|
132 | 136 | def test_processes_valid_token( |
133 | 137 | oidc_config: OIDCConfig, |
134 | 138 | mock_authn_server: responses.RequestsMock, |
135 | 139 | valid_token_with_jwt, |
136 | 140 | ): |
137 | | - inner = main.decode_access_token(oidc_config) |
138 | | - inner(Mock(), access_token=valid_token_with_jwt["access_token"]) |
| 141 | + inner = authentication.build_access_token_check(oidc_config) |
| 142 | + inner(Mock(), token=valid_token_with_jwt["access_token"]) |
139 | 143 |
|
140 | 144 |
|
141 | 145 | def test_session_cache_manager_returns_writable_file_path(tmp_path): |
@@ -182,3 +186,49 @@ def test_tiled_auth_sync_auth_flow(): |
182 | 186 | result = next(flow) |
183 | 187 |
|
184 | 188 | assert result.headers["Authorization"] == f"Bearer {access_token}" |
| 189 | + |
| 190 | + |
| 191 | +@pytest.mark.parametrize( |
| 192 | + "header,token", |
| 193 | + [ |
| 194 | + (None, None), |
| 195 | + ("ApiKey foobar", None), |
| 196 | + ("Bearer foobar", "foobar"), |
| 197 | + ("Bearer with_whitespace ", "with_whitespace"), |
| 198 | + ("Bearerfoobar", None), |
| 199 | + ], |
| 200 | +) |
| 201 | +def test_unchecked_bearer_token(header: str | None, token: str | None): |
| 202 | + req = Mock() |
| 203 | + req.headers.get.side_effect = lambda key: header if key == "Authorization" else None |
| 204 | + |
| 205 | + assert unchecked_bearer_token(req) == token |
| 206 | + |
| 207 | + |
| 208 | +def test_access_token(): |
| 209 | + req = Mock() |
| 210 | + req.state.decoded_access_token = {"foo": "bar"} |
| 211 | + |
| 212 | + assert access_token(req) == {"foo": "bar"} |
| 213 | + |
| 214 | + |
| 215 | +def test_access_token_without_token(): |
| 216 | + req = Mock() |
| 217 | + del req.state.decoded_access_token |
| 218 | + |
| 219 | + assert access_token(req) is None |
| 220 | + |
| 221 | + |
| 222 | +@patch("blueapi.service.authentication.jwt") |
| 223 | +def test_build_access_token(mock_jwt: Mock): |
| 224 | + # Return None when building client to ensure no field/method access |
| 225 | + mock_jwt.PyJWKClient.return_value = None |
| 226 | + oidc_config = Mock() |
| 227 | + req = Mock() |
| 228 | + |
| 229 | + validate_fn = build_access_token_check(oidc_config) |
| 230 | + |
| 231 | + with pytest.raises(HTTPException, match="401"): |
| 232 | + validate_fn(req, token=None) |
| 233 | + |
| 234 | + mock_jwt.decode.assert_not_called() |
0 commit comments