|
| 1 | +# Authentication |
| 2 | + |
| 3 | +This page describes GuardPost's authentication API in detail, covering: |
| 4 | + |
| 5 | +- [X] The `AuthenticationHandler` abstract class |
| 6 | +- [X] Synchronous vs asynchronous `authenticate` methods |
| 7 | +- [X] The `scheme` property |
| 8 | +- [X] The `Identity` class and its claims |
| 9 | +- [X] The `AuthenticationStrategy` class |
| 10 | +- [X] Using multiple handlers |
| 11 | +- [X] Grouping handlers by scheme |
| 12 | + |
| 13 | +## The `AuthenticationHandler` abstract class |
| 14 | + |
| 15 | +`AuthenticationHandler` is the base class for all authentication logic. Subclass |
| 16 | +it and implement the `authenticate` method to read credentials from a context |
| 17 | +and, when valid, set `context.identity`. |
| 18 | + |
| 19 | +```python {linenums="1"} |
| 20 | +from guardpost import AuthenticationHandler, Identity |
| 21 | + |
| 22 | + |
| 23 | +class MyHandler(AuthenticationHandler): |
| 24 | + async def authenticate(self, context) -> None: |
| 25 | + # Read credentials from context, validate them, then: |
| 26 | + context.identity = Identity({"sub": "user-1"}, "Bearer") |
| 27 | +``` |
| 28 | + |
| 29 | +The `context` parameter is whatever your application uses to represent a |
| 30 | +request — GuardPost imposes no specific type on it. In |
| 31 | +[BlackSheep](https://www.neoteroi.dev/blacksheep/) this is the `Request` |
| 32 | +object; in other frameworks it could be any object you choose. |
| 33 | + |
| 34 | +## Synchronous vs asynchronous handlers |
| 35 | + |
| 36 | +Both sync and async implementations are supported: |
| 37 | + |
| 38 | +=== "Async" |
| 39 | + |
| 40 | + ```python {linenums="1"} |
| 41 | + from guardpost import AuthenticationHandler, Identity |
| 42 | + |
| 43 | + |
| 44 | + class AsyncBearerHandler(AuthenticationHandler): |
| 45 | + scheme = "Bearer" |
| 46 | + |
| 47 | + async def authenticate(self, context) -> None: |
| 48 | + token = getattr(context, "token", None) |
| 49 | + if token: |
| 50 | + # e.g. validate token against a remote service |
| 51 | + user_info = await fetch_user_info(token) |
| 52 | + if user_info: |
| 53 | + context.identity = Identity( |
| 54 | + user_info, self.scheme |
| 55 | + ) |
| 56 | + ``` |
| 57 | + |
| 58 | +=== "Sync" |
| 59 | + |
| 60 | + ```python {linenums="1"} |
| 61 | + from guardpost import AuthenticationHandler, Identity |
| 62 | + |
| 63 | + |
| 64 | + class SyncApiKeyHandler(AuthenticationHandler): |
| 65 | + scheme = "ApiKey" |
| 66 | + |
| 67 | + _valid_keys = {"key-abc": "service-a", "key-xyz": "service-b"} |
| 68 | + |
| 69 | + def authenticate(self, context) -> None: |
| 70 | + api_key = getattr(context, "api_key", None) |
| 71 | + sub = self._valid_keys.get(api_key) |
| 72 | + if sub: |
| 73 | + context.identity = Identity( |
| 74 | + {"sub": sub}, self.scheme |
| 75 | + ) |
| 76 | + ``` |
| 77 | + |
| 78 | +## The `scheme` property |
| 79 | + |
| 80 | +The optional `scheme` class property names the authentication scheme this |
| 81 | +handler implements (e.g. `"Bearer"`, `"ApiKey"`, `"Cookie"`). Naming |
| 82 | +schemes is useful when multiple handlers are registered and you need to |
| 83 | +identify which one authenticated a request. |
| 84 | + |
| 85 | +```python {linenums="1"} |
| 86 | +from guardpost import AuthenticationHandler, Identity |
| 87 | + |
| 88 | + |
| 89 | +class CookieHandler(AuthenticationHandler): |
| 90 | + scheme = "Cookie" |
| 91 | + |
| 92 | + async def authenticate(self, context) -> None: |
| 93 | + session_id = getattr(context, "session_id", None) |
| 94 | + if session_id: |
| 95 | + context.identity = Identity( |
| 96 | + {"sub": "user-from-cookie"}, self.scheme |
| 97 | + ) |
| 98 | +``` |
| 99 | + |
| 100 | +## The `Identity` class and its claims |
| 101 | + |
| 102 | +`Identity` wraps a `dict` of claims and an `authentication_mode` string. |
| 103 | +`is_authenticated()` returns `True` only when `authentication_mode` is set. |
| 104 | + |
| 105 | +```python {linenums="1"} |
| 106 | +from guardpost import Identity |
| 107 | + |
| 108 | +identity = Identity( |
| 109 | + { |
| 110 | + "sub": "user-42", |
| 111 | + "name": "Bob", |
| 112 | + "email": "bob@example.com", |
| 113 | + "roles": ["editor"], |
| 114 | + "iss": "https://auth.example.com", |
| 115 | + }, |
| 116 | + "Bearer", |
| 117 | +) |
| 118 | + |
| 119 | +# Convenience properties |
| 120 | +print(identity.sub) # "user-42" |
| 121 | +print(identity.name) # "Bob" |
| 122 | +print(identity.access_token) # None — not set |
| 123 | + |
| 124 | +# Dict-style access |
| 125 | +print(identity["email"]) # "bob@example.com" |
| 126 | +print(identity.get("roles")) # ["editor"] |
| 127 | + |
| 128 | +# Authentication mode |
| 129 | +print(identity.authentication_mode) # "Bearer" |
| 130 | + |
| 131 | +# Authentication check |
| 132 | +print(identity.is_authenticated()) # True — authentication_mode is set |
| 133 | + |
| 134 | +# Anonymous identity: claims present, but no authentication_mode |
| 135 | +anon = Identity({"sub": "guest"}) |
| 136 | +print(anon.is_authenticated()) # False |
| 137 | +``` |
| 138 | + |
| 139 | +/// admonition | Anonymous vs no identity |
| 140 | + type: info |
| 141 | + |
| 142 | +An `Identity` created without `authentication_mode` (or `authentication_mode=None`) |
| 143 | +is **anonymous**: it has claims, but `is_authenticated()` returns `False`. This is |
| 144 | +different from `context.identity` being `None`, which means no identity was resolved |
| 145 | +at all. `AuthorizationStrategy` raises `UnauthorizedError` in both cases. |
| 146 | +/// |
| 147 | + |
| 148 | +## The `AuthenticationStrategy` class |
| 149 | + |
| 150 | +`AuthenticationStrategy` manages a list of handlers and calls them in sequence. |
| 151 | +Once a handler sets `context.identity`, the remaining handlers are skipped. |
| 152 | + |
| 153 | +```python {linenums="1"} |
| 154 | +import asyncio |
| 155 | +from guardpost import AuthenticationHandler, AuthenticationStrategy, Identity |
| 156 | + |
| 157 | + |
| 158 | +class MockContext: |
| 159 | + def __init__(self, token=None, api_key=None): |
| 160 | + self.token = token |
| 161 | + self.api_key = api_key |
| 162 | + self.identity = None |
| 163 | + |
| 164 | + |
| 165 | +class BearerHandler(AuthenticationHandler): |
| 166 | + scheme = "Bearer" |
| 167 | + |
| 168 | + async def authenticate(self, context) -> None: |
| 169 | + if context.token == "valid-jwt": |
| 170 | + context.identity = Identity( |
| 171 | + {"sub": "u1", "name": "Alice"}, self.scheme |
| 172 | + ) |
| 173 | + |
| 174 | + |
| 175 | +class ApiKeyHandler(AuthenticationHandler): |
| 176 | + scheme = "ApiKey" |
| 177 | + |
| 178 | + def authenticate(self, context) -> None: |
| 179 | + if context.api_key == "svc-key": |
| 180 | + context.identity = Identity( |
| 181 | + {"sub": "service-a"}, self.scheme |
| 182 | + ) |
| 183 | + |
| 184 | + |
| 185 | +async def main(): |
| 186 | + strategy = AuthenticationStrategy(BearerHandler(), ApiKeyHandler()) |
| 187 | + |
| 188 | + ctx = MockContext(api_key="svc-key") |
| 189 | + await strategy.authenticate(ctx) |
| 190 | + print(ctx.identity.sub) # "service-a" |
| 191 | + print(ctx.identity.authentication_mode) # "ApiKey" |
| 192 | + |
| 193 | + |
| 194 | +asyncio.run(main()) |
| 195 | +``` |
| 196 | + |
| 197 | +## Using multiple handlers |
| 198 | + |
| 199 | +When multiple handlers are registered, they are tried in the order they are |
| 200 | +passed to `AuthenticationStrategy`. The first handler to set `context.identity` |
| 201 | +wins; subsequent handlers are not called. |
| 202 | + |
| 203 | +```python {linenums="1", hl_lines="3-4"} |
| 204 | +strategy = AuthenticationStrategy( |
| 205 | + JWTHandler(), # tried first |
| 206 | + ApiKeyHandler(), # tried second, only if JWT handler didn't set identity |
| 207 | + CookieHandler(), # tried third, only if both above didn't set identity |
| 208 | +) |
| 209 | +``` |
| 210 | + |
| 211 | +This is useful for APIs that support multiple credential types simultaneously. |
| 212 | + |
| 213 | +## Grouping handlers by scheme |
| 214 | + |
| 215 | +You can inspect `context.identity.authentication_mode` after authentication to know which |
| 216 | +handler authenticated the request, and apply different logic accordingly. |
| 217 | + |
| 218 | +```python {linenums="1"} |
| 219 | +import asyncio |
| 220 | +from guardpost import AuthenticationHandler, AuthenticationStrategy, Identity |
| 221 | + |
| 222 | + |
| 223 | +class MockContext: |
| 224 | + def __init__(self, token=None, api_key=None): |
| 225 | + self.token = token |
| 226 | + self.api_key = api_key |
| 227 | + self.identity = None |
| 228 | + |
| 229 | + |
| 230 | +class BearerHandler(AuthenticationHandler): |
| 231 | + scheme = "Bearer" |
| 232 | + |
| 233 | + async def authenticate(self, context) -> None: |
| 234 | + if context.token: |
| 235 | + context.identity = Identity( |
| 236 | + {"sub": "user-1"}, self.scheme |
| 237 | + ) |
| 238 | + |
| 239 | + |
| 240 | +class ApiKeyHandler(AuthenticationHandler): |
| 241 | + scheme = "ApiKey" |
| 242 | + |
| 243 | + def authenticate(self, context) -> None: |
| 244 | + if context.api_key: |
| 245 | + context.identity = Identity( |
| 246 | + {"sub": "svc-1"}, self.scheme |
| 247 | + ) |
| 248 | + |
| 249 | + |
| 250 | +async def handle_request(context): |
| 251 | + strategy = AuthenticationStrategy(BearerHandler(), ApiKeyHandler()) |
| 252 | + await strategy.authenticate(context) |
| 253 | + |
| 254 | + if context.identity is None: |
| 255 | + print("Anonymous request") |
| 256 | + elif context.identity.authentication_mode == "Bearer": |
| 257 | + print(f"Human user: {context.identity.sub}") |
| 258 | + elif context.identity.authentication_mode == "ApiKey": |
| 259 | + print(f"Service call: {context.identity.sub}") |
| 260 | + |
| 261 | + |
| 262 | +asyncio.run(handle_request(MockContext(api_key="any-key"))) |
| 263 | +# Service call: svc-1 |
| 264 | +``` |
0 commit comments