-
Notifications
You must be signed in to change notification settings - Fork 1.7k
fix(auth): prefer fido2 for reauth fallback #17391
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
c688dd5
49a4163
fe4cf2c
9ce8d95
46eb634
b477066
f9e0377
672a4ab
5ed5683
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,8 @@ | |
| import abc | ||
| import base64 | ||
| import getpass | ||
| import hashlib | ||
| import json | ||
| import sys | ||
|
|
||
| from google.auth import _helpers | ||
|
|
@@ -29,12 +31,12 @@ | |
| PublicKeyCredentialDescriptor, | ||
| ) | ||
|
|
||
|
|
||
| REAUTH_ORIGIN = "https://accounts.google.com" | ||
| SAML_CHALLENGE_MESSAGE = ( | ||
| "Please run `gcloud auth login` to complete reauthentication with SAML." | ||
| ) | ||
| WEBAUTHN_TIMEOUT_MS = 120000 # Two minute timeout | ||
| U2F_AUTHENTICATION_TYPE = "navigator.id.getAssertion" | ||
|
|
||
|
|
||
| def get_user_password(text): | ||
|
|
@@ -117,26 +119,43 @@ def is_locally_eligible(self): | |
|
|
||
| @_helpers.copy_docstring(ReauthChallenge) | ||
| def obtain_challenge_input(self, metadata): | ||
| # Check if there is an available Webauthn Handler, if not use pyu2f | ||
| # Check if there is an available Webauthn Handler, if not use fido2. | ||
| try: | ||
| factory = webauthn_handler_factory.WebauthnHandlerFactory() | ||
| webauthn_handler = factory.get_handler() | ||
| if webauthn_handler is not None: | ||
| sys.stderr.write("Please insert and touch your security key\n") | ||
| return self._obtain_challenge_input_webauthn(metadata, webauthn_handler) | ||
| except Exception: | ||
| # Attempt pyu2f if exception in webauthn flow | ||
| # Attempt fido2 if exception in webauthn flow. | ||
| pass | ||
|
|
||
| return self._obtain_challenge_input_fido2(metadata) | ||
|
|
||
| def _get_fido2_classes(self): | ||
|
parthea marked this conversation as resolved.
Outdated
|
||
| try: | ||
| import pyu2f.convenience.authenticator # type: ignore | ||
| import pyu2f.errors # type: ignore | ||
| import pyu2f.model # type: ignore | ||
| except ImportError: | ||
| raise exceptions.ReauthFailError( | ||
| "pyu2f dependency is required to use Security key reauth feature. " | ||
| "It can be installed via `pip install pyu2f` or `pip install google-auth[reauth]`." | ||
| from fido2.ctap import CtapError # type: ignore | ||
| from fido2.ctap1 import ( | ||
| APDU, # type: ignore | ||
| ApduError, # type: ignore | ||
| Ctap1, # type: ignore | ||
| ) | ||
| from fido2.hid import CtapHidDevice # type: ignore | ||
| except ImportError as caught_exc: | ||
| raise exceptions.ReauthFailError( | ||
| "fido2 dependency is required to use Security key reauth feature. " | ||
| "It can be installed via `pip install fido2` or `pip install google-auth[reauth]`." | ||
| ) from caught_exc | ||
| return CtapHidDevice, Ctap1, APDU, ApduError, CtapError | ||
|
|
||
| def _obtain_challenge_input_fido2(self, metadata): | ||
| CtapHidDevice, Ctap1, APDU, ApduError, CtapError = self._get_fido2_classes() | ||
|
|
||
| devices = list(CtapHidDevice.list_devices()) | ||
| if not devices: | ||
| sys.stderr.write("No security key found.\n") | ||
| return None | ||
|
parthea marked this conversation as resolved.
Outdated
parthea marked this conversation as resolved.
Outdated
|
||
|
|
||
| sk = metadata["securityKey"] | ||
| challenges = sk["challenges"] | ||
| # Read both 'applicationId' and 'relyingPartyId', if they are the same, use | ||
|
|
@@ -150,46 +169,57 @@ def obtain_challenge_input(self, metadata): | |
| else: | ||
| application_parameters = [application_id] | ||
|
|
||
| challenge_data = [] | ||
| for c in challenges: | ||
| kh = c["keyHandle"].encode("ascii") | ||
| key = pyu2f.model.RegisteredKey(bytearray(base64.urlsafe_b64decode(kh))) | ||
| challenge = c["challenge"].encode("ascii") | ||
| challenge = base64.urlsafe_b64decode(challenge) | ||
| challenge_data.append({"key": key, "challenge": challenge}) | ||
|
|
||
| # Track number of tries to suppress error message until all application_parameters | ||
| # are tried. | ||
| tries = 0 | ||
| sys.stderr.write("Please touch your security key.\n") | ||
| for app_id in application_parameters: | ||
|
parthea marked this conversation as resolved.
Outdated
|
||
| try: | ||
| tries += 1 | ||
| api = pyu2f.convenience.authenticator.CreateCompositeAuthenticator( | ||
| REAUTH_ORIGIN | ||
| ) | ||
| response = api.Authenticate( | ||
| app_id, challenge_data, print_callback=sys.stderr.write | ||
| ) | ||
| return {"securityKey": response} | ||
| except pyu2f.errors.U2FError as e: | ||
| if e.code == pyu2f.errors.U2FError.DEVICE_INELIGIBLE: | ||
| # Only show error if all app_ids have been tried | ||
| if tries == len(application_parameters): | ||
| sys.stderr.write("Ineligible security key.\n") | ||
| return None | ||
| continue | ||
| if e.code == pyu2f.errors.U2FError.TIMEOUT: | ||
| sys.stderr.write( | ||
| "Timed out while waiting for security key touch.\n" | ||
| ) | ||
| else: | ||
| raise e | ||
| except pyu2f.errors.PluginError as e: | ||
| sys.stderr.write("Plugin error: {}.\n".format(e)) | ||
| continue | ||
| except pyu2f.errors.NoDeviceFoundError: | ||
| sys.stderr.write("No security key found.\n") | ||
| return None | ||
| app_param = hashlib.sha256(app_id.encode("utf-8")).digest() | ||
| for challenge in challenges: | ||
| key_handle = self._urlsafe_b64decode(challenge["keyHandle"]) | ||
| challenge_bytes = self._urlsafe_b64decode(challenge["challenge"]) | ||
| client_data = self._create_u2f_client_data(challenge_bytes) | ||
| client_param = hashlib.sha256(client_data).digest() | ||
| for device in devices: | ||
| try: | ||
| signature = Ctap1(device).authenticate( | ||
| client_param, app_param, key_handle | ||
| ) | ||
| except ApduError as caught_exc: | ||
| if caught_exc.code == APDU.WRONG_DATA: | ||
| continue | ||
| if caught_exc.code == APDU.USE_NOT_SATISFIED: | ||
| sys.stderr.write( | ||
| "Timed out while waiting for security key touch.\n" | ||
| ) | ||
| return None | ||
| raise | ||
| except CtapError as caught_exc: | ||
| if caught_exc.code in ( | ||
| CtapError.ERR.TIMEOUT, | ||
| CtapError.ERR.ACTION_TIMEOUT, | ||
| CtapError.ERR.KEEPALIVE_CANCEL, | ||
| CtapError.ERR.OPERATION_DENIED, | ||
| ): | ||
| sys.stderr.write( | ||
| "Timed out while waiting for security key touch.\n" | ||
| ) | ||
| return None | ||
| raise | ||
|
parthea marked this conversation as resolved.
Outdated
|
||
| else: | ||
| return { | ||
| "securityKey": { | ||
| "clientData": self._unpadded_urlsafe_b64encode( | ||
| client_data | ||
| ), | ||
| "signatureData": self._unpadded_urlsafe_b64encode( | ||
| bytes(signature) | ||
| ), | ||
| "applicationId": app_id, | ||
| "keyHandle": self._unpadded_urlsafe_b64encode( | ||
| key_handle | ||
| ), | ||
| } | ||
| } | ||
| sys.stderr.write("Ineligible security key.\n") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a reason to use stderr.write here? IIRC, other parts of the library uses
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks @daniel-sanche.. I kept stderr here because this challenge flow already uses it for interactive prompts, and the legacy pyu2f path also writes user facing challenge messages there. |
||
| return None | ||
|
parthea marked this conversation as resolved.
Outdated
|
||
|
|
||
| def _obtain_challenge_input_webauthn(self, metadata, webauthn_handler): | ||
| sk = metadata.get("securityKey") | ||
|
|
@@ -248,8 +278,23 @@ def _obtain_challenge_input_webauthn(self, metadata, webauthn_handler): | |
| def _unpadded_urlsafe_b64recode(self, s): | ||
| """Converts standard b64 encoded string to url safe b64 encoded string | ||
| with no padding.""" | ||
| b = base64.urlsafe_b64decode(s) | ||
| return base64.urlsafe_b64encode(b).decode().rstrip("=") | ||
| return self._unpadded_urlsafe_b64encode(self._urlsafe_b64decode(s)) | ||
|
|
||
| def _create_u2f_client_data(self, challenge): | ||
| return json.dumps( | ||
| { | ||
| "challenge": self._unpadded_urlsafe_b64encode(challenge), | ||
| "origin": REAUTH_ORIGIN, | ||
| "typ": U2F_AUTHENTICATION_TYPE, | ||
| }, | ||
| sort_keys=True, | ||
| ).encode() | ||
|
|
||
| def _unpadded_urlsafe_b64encode(self, data): | ||
| return base64.urlsafe_b64encode(data).decode().rstrip("=") | ||
|
|
||
| def _urlsafe_b64decode(self, data): | ||
| return base64.urlsafe_b64decode(data + "=" * (-len(data) % 4)) | ||
|
|
||
|
|
||
| class SamlChallenge(ReauthChallenge): | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.