|
5 | 5 | It optionally opens a browser window to guide a human user to manually login. |
6 | 6 | After obtaining an auth code, the web server will automatically shut down. |
7 | 7 | """ |
| 8 | +from collections import defaultdict |
8 | 9 | import logging |
9 | 10 | import os |
10 | 11 | import socket |
@@ -109,70 +110,47 @@ def _printify(text): |
109 | 110 |
|
110 | 111 | class _AuthCodeHandler(BaseHTTPRequestHandler): |
111 | 112 | def do_GET(self): |
112 | | - # For flexibility, we choose to not check self.path matching redirect_uri |
113 | | - #assert self.path.startswith('/THE_PATH_REGISTERED_BY_THE_APP') |
114 | | - |
115 | 113 | qs = parse_qs(urlparse(self.path).query) |
116 | | - if qs.get('code') or qs.get('error'): |
| 114 | + if qs: |
117 | 115 | # GET request with auth code or error - reject for security (form_post only) |
118 | 116 | self._send_full_response( |
119 | | - "GET method is not supported for authentication responses. " |
120 | | - "This application requires form_post response mode.", |
| 117 | + "response_mode=query is not supported for authentication responses. " |
| 118 | + "This application operates in response_mode=form_post mode only.", |
121 | 119 | is_ok=False) |
122 | | - elif not qs: |
123 | | - # Blank redirect from eSTS error - show generic error and mark done |
124 | | - self._send_full_response( |
125 | | - "Authentication could not be completed. " |
126 | | - "You can close this window and return to the application.") |
127 | | - self.server.done = True |
128 | 120 | else: |
129 | 121 | # Other GET requests - show welcome page |
130 | 122 | self._send_full_response(self.server.welcome_page) |
131 | 123 | # NOTE: Don't do self.server.shutdown() here. It'll halt the server. |
132 | 124 |
|
133 | | - def do_POST(self): |
134 | | - # Handle form_post response mode where auth code is sent via POST body |
| 125 | + def do_POST(self): # Handle form_post response where auth code is in body |
| 126 | + # For flexibility, we choose to not check self.path matching redirect_uri |
| 127 | + #assert self.path.startswith('/THE_PATH_REGISTERED_BY_THE_APP') |
135 | 128 | content_length = int(self.headers.get('Content-Length', 0)) |
136 | 129 | post_data = self.rfile.read(content_length).decode('utf-8') |
137 | | - |
138 | 130 | qs = parse_qs(post_data) |
139 | 131 | if qs.get('code') or qs.get('error'): # So, it is an auth response |
140 | | - auth_response = _qs2kv(qs) |
141 | | - logger.debug("Got auth response via POST: %s", auth_response) |
142 | | - self._process_auth_response(auth_response) |
| 132 | + self._process_auth_response(_qs2kv(qs)) |
143 | 133 | else: |
144 | 134 | self._send_full_response("Invalid POST request", is_ok=False) |
145 | 135 | # NOTE: Don't do self.server.shutdown() here. It'll halt the server. |
146 | 136 |
|
147 | 137 | def _process_auth_response(self, auth_response): |
148 | 138 | """Process the auth response from either GET or POST request.""" |
| 139 | + logger.debug("Got auth response: %s", auth_response) |
149 | 140 | if self.server.auth_state and self.server.auth_state != auth_response.get("state"): |
150 | 141 | # OAuth2 successful and error responses contain state when it was used |
151 | 142 | # https://www.rfc-editor.org/rfc/rfc6749#section-4.2.2.1 |
152 | | - self._send_full_response("State mismatch") # Possibly an attack |
153 | | - # Don't set auth_response for security, but mark as done to avoid hanging |
154 | | - self.server.done = True |
| 143 | + self._send_full_response( # Possibly an attack |
| 144 | + "State mismatch. Waiting for next response... or you may abort.", is_ok=False) |
155 | 145 | else: |
156 | 146 | template = (self.server.success_template |
157 | 147 | if "code" in auth_response else self.server.error_template) |
158 | 148 | if _is_html(template.template): |
159 | 149 | safe_data = _escape(auth_response) # Foiling an XSS attack |
160 | 150 | else: |
161 | | - safe_data = dict(auth_response) # Make a copy to avoid mutating original |
162 | | - # Provide default values for common OAuth2 response fields |
163 | | - # to avoid showing literal placeholder text like "$error_description" |
164 | | - safe_data.setdefault("error", "") |
165 | | - safe_data.setdefault("error_description", "") |
166 | | - # Format error message nicely: include ": description." only if description exists |
167 | | - if "code" not in auth_response: # This is an error response |
168 | | - error_desc = auth_response.get("error_description", "").strip() |
169 | | - if error_desc: |
170 | | - safe_data["error_message"] = f"{safe_data['error']}: {error_desc}." |
171 | | - else: |
172 | | - safe_data["error_message"] = safe_data["error"] |
173 | | - else: |
174 | | - safe_data["error_message"] = "" |
175 | | - self._send_full_response(template.safe_substitute(**safe_data)) |
| 151 | + safe_data = auth_response |
| 152 | + filled_data = defaultdict(str, safe_data) # So that missing keys will be empty string |
| 153 | + self._send_full_response(template.safe_substitute(**filled_data)) |
176 | 154 | self.server.auth_response = auth_response # Set it now, after the response is likely sent |
177 | 155 |
|
178 | 156 | def _send_full_response(self, body, is_ok=True): |
@@ -258,6 +236,7 @@ def get_auth_response(self, timeout=None, **kwargs): |
258 | 236 |
|
259 | 237 | :param str auth_uri: |
260 | 238 | If provided, this function will try to open a local browser. |
| 239 | + Starting from 2026, the built-in http server will require response_mode=form_post. |
261 | 240 | :param int timeout: In seconds. None means wait indefinitely. |
262 | 241 | :param str state: |
263 | 242 | You may provide the state you used in auth_uri, |
@@ -330,17 +309,20 @@ def _get_auth_response(self, result, auth_uri=None, timeout=None, state=None, |
330 | 309 | welcome_uri = "http://localhost:{p}".format(p=self.get_port()) |
331 | 310 | abort_uri = "{loc}?error=abort".format(loc=welcome_uri) |
332 | 311 | logger.debug("Abort by visit %s", abort_uri) |
333 | | - |
334 | | - # Enforce response_mode=form_post for security |
| 312 | + |
335 | 313 | if auth_uri: |
336 | | - parsed = urlparse(auth_uri) |
337 | | - params = parse_qs(parsed.query) |
338 | | - params['response_mode'] = ['form_post'] # Enforce form_post |
339 | | - new_query = urlencode(params, doseq=True) |
340 | | - auth_uri = parsed._replace(query=new_query).geturl() |
341 | | - |
342 | | - self._server.welcome_page = Template(welcome_template or "").safe_substitute( |
343 | | - auth_uri=auth_uri, abort_uri=abort_uri) |
| 314 | + # Note to maintainers: |
| 315 | + # Do not enforce response_mode=form_post by secretly hardcoding it here. |
| 316 | + # Just validate it here, so we won't surprise caller by changing their auth_uri behind the scene. |
| 317 | + params = parse_qs(urlparse(auth_uri).query) |
| 318 | + assert params.get('response_mode', [None])[0] == 'form_post', ( |
| 319 | + "The built-in http server supports HTTP POST only. " |
| 320 | + "The auth_uri must be built with response_mode=form_post") |
| 321 | + |
| 322 | + self._server.welcome_page = Template( |
| 323 | + welcome_template or |
| 324 | + "<a href='$auth_uri'>Sign In</a>, or <a href='$abort_uri'>Abort</a>" |
| 325 | + ).safe_substitute(auth_uri=auth_uri, abort_uri=abort_uri) |
344 | 326 | if auth_uri: # Now attempt to open a local browser to visit it |
345 | 327 | _uri = welcome_uri if welcome_template else auth_uri |
346 | 328 | logger.info("Open a browser on this device to visit: %s" % _uri) |
@@ -369,22 +351,22 @@ def _get_auth_response(self, result, auth_uri=None, timeout=None, state=None, |
369 | 351 | auth_uri_callback(_uri) |
370 | 352 |
|
371 | 353 | self._server.success_template = Template(success_template or |
372 | | - "Authentication complete. You can return to the application. Please close this browser tab.\n\n" |
373 | | - "For your security: Do not share the contents of this page, the address bar, or take screenshots.") |
| 354 | + "Authentication complete. You can return to the application. Please close this browser tab.") |
374 | 355 | self._server.error_template = Template(error_template or |
375 | | - "Authentication failed. $error_message\n\n" |
376 | | - "For your security: Do not share the contents of this page, the address bar, or take screenshots.") |
| 356 | + # Do NOT invent new placeholders in this template. Just use standard keys defined in OAuth2 RFC. |
| 357 | + # Otherwise there is no obvious canonical way for caller to know what placeholders are supported. |
| 358 | + # Besides, we have been using these standard keys for years. Changing now would break backward compatibility. |
| 359 | + "Authentication failed. $error: $error_description. ($error_uri)") |
377 | 360 |
|
378 | 361 | self._server.timeout = timeout # Otherwise its handle_timeout() won't work |
379 | 362 | self._server.auth_response = {} # Shared with _AuthCodeHandler |
380 | 363 | self._server.auth_state = state # So handler will check it before sending response |
381 | | - self._server.done = False # Flag to indicate completion without setting auth_response |
382 | 364 | while not self._closing: # Otherwise, the handle_request() attempt |
383 | 365 | # would yield noisy ValueError trace |
384 | 366 | # Derived from |
385 | 367 | # https://docs.python.org/2/library/basehttpserver.html#more-examples |
386 | 368 | self._server.handle_request() |
387 | | - if self._server.auth_response or self._server.done: |
| 369 | + if self._server.auth_response: |
388 | 370 | break |
389 | 371 | result.update(self._server.auth_response) # Return via writable result param |
390 | 372 |
|
@@ -425,8 +407,6 @@ def __exit__(self, exc_type, exc_val, exc_tb): |
425 | 407 | ) |
426 | 408 | print(json.dumps(receiver.get_auth_response( |
427 | 409 | auth_uri=flow["auth_uri"], |
428 | | - welcome_template= |
429 | | - "<a href='$auth_uri'>Sign In</a>, or <a href='$abort_uri'>Abort</a", |
430 | 410 | error_template="<html>Oh no. $error</html>", |
431 | 411 | success_template="Oh yeah. Got $code", |
432 | 412 | timeout=args.timeout, |
|
0 commit comments