Skip to content
Open
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added
- `requests_auth.OAuth2DeviceCode` handling [OAuth 2.0 Device Authorization Grant](https://datatracker.ietf.org/doc/html/rfc8628) (device code flow).
The following parameters allow customisation of this flow:
- `prompt_callback`: A function called with `verification_uri` and `user_code` once the device authorization request has been made. By default a `print` statement prompts the user to navigate to the URI and enter the code.
- `prefer_complete_verification_url`: When `True` and the server provides a `verification_uri_complete`, that URL is used instead so the user does not need to enter the code separately.
- `authorization_pending_status_code` / `slow_down_status_code`: Status codes used to detect the `authorization_pending` and `slow_down` polling responses respectively.
- `timeout`: Maximum number of seconds to wait for the user to complete authentication. Defaults to 3 minutes.
- `requests_auth.Auth0DeviceCode` provides specific support for the [Auth0](https://auth0.com/docs/get-started/authentication-and-authorization-flow/device-authorization-flow/call-your-api-using-the-device-authorization-flow) device code flow.
- `requests_auth.EntraIDDeviceCode` provides specific support for the [Microsoft Entra ID](https://learn.microsoft.com/en-us/entra/identity-platform/v2-oauth2-device-code) device code flow.

## [8.0.0] - 2024-06-18
### Added
- Adding explicit support for Python `3.12`.
Expand Down
4 changes: 4 additions & 0 deletions requests_auth/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
OAuth2AuthorizationCodePKCE,
OktaAuthorizationCodePKCE,
)
from requests_auth._oauth2.device_code import OAuth2DeviceCode, Auth0DeviceCode, EntraIDDeviceCode
from requests_auth._oauth2.client_credentials import (
OAuth2ClientCredentials,
OktaClientCredentials,
Expand Down Expand Up @@ -52,6 +53,9 @@
"DisplaySettings",
"OAuth2AuthorizationCodePKCE",
"OktaAuthorizationCodePKCE",
"OAuth2DeviceCode",
"Auth0DeviceCode",
"EntraIDDeviceCode",
"OAuth2Implicit",
"OktaImplicit",
"OktaImplicitIdToken",
Expand Down
281 changes: 281 additions & 0 deletions requests_auth/_oauth2/device_code.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
import time
import warnings

from hashlib import sha512
from typing import cast, Tuple

import requests

from requests_auth._errors import InvalidGrantRequest, TimeoutOccurred, GrantNotProvided
from requests_auth._authentication import SupportMultiAuth
from requests_auth._oauth2.common import (
OAuth2,
request_new_grant_with_post,
_content_from_response,
)

def prompt_user_to_authenticate(verification_uri: str, user_code: str) -> None:
print("Device Code login request:")
print(
f"Navigate to {verification_uri} on any device and enter the device code: {user_code}"
)

class OAuth2DeviceCode(requests.auth.AuthBase, SupportMultiAuth):
"""
Device Code Grant

Describes an OAuth 2 device code flow authentication.
More details can be found in https://datatracker.ietf.org/doc/html/rfc8628
"""

def __init__(
self, authorization_url: str, token_url: str, client_id: str, **kwargs
) -> None:
"""
:param authorization_url: OAuth 2 authorization URL.
:param token_url: OAuth 2 token URL.
:param client_id: Resource owner username.
:param timeout: Maximum amount of seconds to wait for a token to be received once requested.
Wait for 3 minutes by default.
:param prefer_complete_verification_url: If supported, return the complete verification URL to avoid the need
to enter the code. If false or not supported, the device code will be returned.
:param header_name: Name of the header field used to send token.
Token will be sent in Authorization header field by default.
:param header_value: Format used to send the token value.
"{token}" must be present as it will be replaced by the actual token.
Token will be sent as "Bearer {token}" by default.
:param scope: Scope parameter sent to token URL as body. Can also be a list of scopes. Not sent by default.
:param token_field_name: Field name containing the token. access_token by default.
:param authorization_pending_status_code: Status code returned if server is waiting for authorization. Default
value is 400 "Bad Request".
:param slow_down_status_code: Status code returned if the server requests the client wait longer between poll
requests. Default value is 400 "Bad Request".
:param early_expiry: Number of seconds before actual token expiry where token will be considered as expired.
Default to 30 seconds to ensure token will not expire between the time of retrieval and the time the request
reaches the actual server. Set it to 0 to deactivate this feature and use the same token until actual expiry.
:param session: requests.Session instance that will be used to request the token.
Use it to provide a custom proxying rule for instance.
:param prompt_callback: A function that will be called with the verification_uri and user_code as parameters
once the authorization request has been made. By default, a print statement will be used to prompt the
user to authenticate with their browser.
:param kwargs: all additional authorization parameters that should be put as query parameter in the token URL.
"""
self.authorization_url = authorization_url
if not self.authorization_url:
raise Exception("Authorization URL is mandatory.")
self.token_url = token_url
if not self.token_url:
raise Exception("Token URL is mandatory.")
self.client_id = client_id
if not self.client_id:
raise Exception("Client ID is mandatory.")

self.header_name = kwargs.pop("header_name", None) or "Authorization"
self.header_value = kwargs.pop("header_value", None) or "Bearer {token}"
if "{token}" not in self.header_value:
raise Exception("header_value parameter must contains {token}.")

self.token_field_name = kwargs.pop("token_field_name", None) or "access_token"
self.authorization_pending_status_code = kwargs.pop(
"authorization_pending_status_code", 400
)
self.slow_down_status_code = kwargs.pop("slow_down_status_code", 400)
self.early_expiry = float(kwargs.pop("early_expiry", None) or 30.0)
self.prefer_complete_verification_url = kwargs.pop(
"prefer_complete_verification_url", False
)

self.prompt_callback = kwargs.pop("prompt_callback", None) or prompt_user_to_authenticate

# Time is expressed in seconds
self.timeout = int(kwargs.pop("timeout", None) or 180)

self.session = kwargs.pop("session", None) or requests.Session()
self.session.timeout = self.timeout

# As described in https://datatracker.ietf.org/doc/html/rfc8628#section-3.1
self.authorization_data = {"client_id": self.client_id}
scope = kwargs.pop("scope", None)
if scope:
self.authorization_data["scope"] = (
" ".join(scope) if isinstance(scope, list) else scope
)
self.authorization_data.update(kwargs)
self.state = sha512(
(self.authorization_url + self.token_url + self.client_id).encode(
"unicode_escape"
)
).hexdigest()

# As described in https://tools.ietf.org/html/rfc6749#section-6
self.refresh_data = {"grant_type": "refresh_token"}
self.refresh_data.update(kwargs)

self.additional_data = kwargs

def __call__(self, r: requests.Request) -> requests.Request:
token = OAuth2.token_cache.get_token(
key=self.state,
early_expiry=self.early_expiry,
on_missing_token=self.request_new_token,
on_expired_token=self.refresh_token,
)
r.headers[self.header_name] = self.header_value.format(token=token)
return r

def request_new_token(self) -> Tuple[str, str] | Tuple[str, str, int, str]:
# As described in https://datatracker.ietf.org/doc/html/rfc8628#section-3.1

authorization_response: requests.Response = self.session.post(
self.authorization_url,
data=self.authorization_data,
headers={"Content-Type": "application/x-www-form-urlencoded"},
)

if not authorization_response:
raise InvalidGrantRequest(authorization_response)

response_data = authorization_response.json()
device_code = response_data["device_code"]
user_code = response_data["user_code"]
verification_uri = response_data["verification_uri"]
request_expires_in = response_data["expires_in"]

verification_uri_complete = response_data.get("verification_uri_complete", None)
if (
self.prefer_complete_verification_url
and verification_uri_complete is not None
):
verification_uri = verification_uri_complete

interval = response_data.get("interval", 5)
start_time = time.time()

self.prompt_callback(verification_uri, user_code)

token_request_data = {
"grant_type": "urn:ietf:params:oauth:grant-type:device_code",
"device_code": device_code,
"client_id": self.client_id,
}
token_request_data.update(self.additional_data)
time.sleep(interval)
while time.time() - start_time < request_expires_in:
token_response = self.session.post(
self.token_url,
data=token_request_data,
headers={"Content-Type": "application/x-www-form-urlencoded"},
)

if token_response.status_code == 200:
# User has authenticated, a token is returned
content = _content_from_response(token_response)
token = content.get("access_token", None)
if not token:
raise GrantNotProvided("access_token", content)
token_expires_in = content.get("expires_in", None)
return (
(
self.state,
cast(str, content.get("access_token")),
cast(int, token_expires_in),
cast(str, content.get("refresh_token")),
)
if token_expires_in is not None
else (self.state, cast(str, content.get("access_token")))
)
if token_response.status_code == self.authorization_pending_status_code:
error_content = _content_from_response(token_response)
error_type = error_content.get("error", None)
if error_type == "authorization_pending":
time.sleep(interval)
continue
if token_response.status_code == self.slow_down_status_code:
error_content = _content_from_response(token_response)
error_type = error_content.get("error", None)
if error_type == "slow_down":
interval += 5
time.sleep(interval)
continue
raise InvalidGrantRequest(token_response)
raise TimeoutOccurred(request_expires_in)

def refresh_token(self, refresh_token: str) -> Tuple[str, str, int, str]:
# As described in https://tools.ietf.org/html/rfc6749#section-6
self.refresh_data["refresh_token"] = refresh_token
token, expires_in, refresh_token = request_new_grant_with_post(
self.token_url,
self.refresh_data,
self.token_field_name,
self.timeout,
self.session,
)
return self.state, token, expires_in, refresh_token


class Auth0DeviceCode(OAuth2DeviceCode):
"""
Describes an Auth0 (OAuth 2) Device code flow authentication request.
"""

def __init__(self, domain: str, client_id: str, audience: str, **kwargs) -> None:
"""
:param domain: Auth0 domain, (like "https://org.eu.auth0.com")
:param client_id: Client ID
:param audience: API Audience, (like https://org-api-audience")
:param scope: Scope parameter sent in query. Can also be a list of scopes. Request 'openid' by default.
:param timeout: Maximum amount of seconds to wait for a token to be received once requested.
Wait for 3 minutes by default.
:param prefer_complete_verification_url: If supported, return the complete verification URL to avoid the need
to enter the code. If false or not supported, the device code will be returned.
:param early_expiry: Number of seconds before actual token expiry where token will be considered as expired.
Default to 30 seconds to ensure token will not expire between the time of retrieval and the time the request
reaches the actual server. Set it to 0 to deactivate this feature and use the same token until actual expiry.
:param session: requests.Session instance that will be used to request the token.
Use it to provide a custom proxying rule for instance.
:param kwargs: all additional authorization parameters that should be put as query parameter in the token URL.
"""
stripped_domain = domain.rstrip("/")
scopes = kwargs.pop("scope", "openid")
kwargs["scope"] = " ".join(scopes) if isinstance(scopes, list) else scopes
super().__init__(
authorization_url=f"{stripped_domain}/oauth/device/code",
token_url=f"{stripped_domain}/oauth/token",
client_id=client_id,
audience=audience,
authorization_pending_status_code=403,
slow_down_status_code=403,
**kwargs,
)

class EntraIDDeviceCode(OAuth2DeviceCode):
"""
Describes an Entra ID (OAuth 2) Device code flow authentication request.
"""

def __init__(self, tenant_id: str, client_id: str, **kwargs) -> None:
"""
:param tenant_id: Entra ID tenant ID, (like "00000000-0000-0000-0000-000000000000")
:param client_id: Client ID
:param scope: Scope parameter sent in query. Can also be a list of scopes. Request 'openid' by default.
:param timeout: Maximum amount of seconds to wait for a token to be received once requested.
Wait for 3 minutes by default.
:param early_expiry: Number of seconds before actual token expiry where token will be considered as expired.
Default to 30 seconds to ensure token will not expire between the time of retrieval and the time the request
reaches the actual server. Set it to 0 to deactivate this feature and use the same token until actual expiry.
:param session: requests.Session instance that will be used to request the token.
Use it to provide a custom proxying rule for instance.
:param kwargs: all additional authorization parameters that should be put as query parameter in the token URL.
"""
if "prefer_complete_verification_url" in kwargs:
warnings.warn("prefer_complete_verification_url parameter is not supported by Microsoft Entra ID and will be ignored.")
scopes = kwargs.pop("scope", "openid")
kwargs["scope"] = " ".join(scopes) if isinstance(scopes, list) else scopes
super().__init__(
authorization_url=f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/devicecode",
token_url=f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token",
client_id=client_id,
authorization_pending_status_code=400,
slow_down_status_code=400,
**kwargs,
)
Empty file.
Loading