This repository was archived by the owner on Sep 21, 2025. It is now read-only.
forked from Colin-b/httpx_auth
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathclient_credentials.py
More file actions
138 lines (121 loc) · 6.38 KB
/
client_credentials.py
File metadata and controls
138 lines (121 loc) · 6.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
from hashlib import sha512
from typing import Union, Iterable
import httpx
from httpx_auth._authentication import SupportMultiAuth
from httpx_auth._oauth2.common import (
OAuth2BaseAuth,
request_new_grant_with_post,
_add_parameters,
)
class OAuth2ClientCredentials(OAuth2BaseAuth, SupportMultiAuth):
"""
Client Credentials Grant
Describes an OAuth 2 client credentials (also called application) flow requests authentication.
More details can be found in https://tools.ietf.org/html/rfc6749#section-4.4
"""
def __init__(self, token_url: str, client_id: str, client_secret: str, **kwargs):
"""
:param token_url: OAuth 2 token URL.
:param client_id: Resource owner user name.
:param client_secret: Resource owner password.
:param timeout: Maximum amount of seconds to wait for a token to be received once requested.
Wait for 1 minute by default.
:param header_name: Name of the header field used to send token.
Token will be sent in Authorization header field by default.
:param header_value: Format used to send the token value.
"{token}" must be present as it will be replaced by the actual token.
Token will be sent as "Bearer {token}" by default.
:param scope: Scope parameter sent to token URL as body. Can also be a list of scopes. Not sent by default.
:param token_field_name: Field name containing the token. access_token by default.
:param early_expiry: Number of seconds before actual token expiry where token will be considered as expired.
Default to 30 seconds to ensure token will not expire between the time of retrieval and the time the request
reaches the actual server. Set it to 0 to deactivate this feature and use the same token until actual expiry.
:param headers: Additional headers to set when requesting or refreshing token.
:param kwargs: all additional authorization parameters that should be put as query parameter in the token URL.
"""
self.token_url = token_url
if not self.token_url:
raise Exception("Token URL is mandatory.")
self.client_id = client_id
if not self.client_id:
raise Exception("client_id is mandatory.")
self.client_secret = client_secret
if not self.client_secret:
raise Exception("client_secret is mandatory.")
header_name = kwargs.pop("header_name", None) or "Authorization"
header_value = kwargs.pop("header_value", None) or "Bearer {token}"
self.token_field_name = kwargs.pop("token_field_name", None) or "access_token"
early_expiry = float(kwargs.pop("early_expiry", None) or 30.0)
# Time is expressed in seconds
self.timeout = int(kwargs.pop("timeout", None) or 60)
self.token_headers = kwargs.pop("headers", {})
# As described in https://tools.ietf.org/html/rfc6749#section-4.4.2
self.data = {"grant_type": "client_credentials"}
scope = kwargs.pop("scope", None)
if scope:
self.data["scope"] = " ".join(scope) if isinstance(scope, list) else scope
self.data.update(kwargs)
all_parameters_in_url = _add_parameters(self.token_url, self.data)
state = sha512(all_parameters_in_url.encode("unicode_escape")).hexdigest()
super().__init__(
state,
early_expiry,
header_name,
header_value,
)
def request_new_token(self) -> tuple:
# As described in https://tools.ietf.org/html/rfc6749#section-4.4.3
token, expires_in, _ = yield from request_new_grant_with_post(
self.token_url, self.data, self.token_field_name, self.token_headers
)
# Handle both Access and Bearer tokens
return (self.state, token, expires_in) if expires_in else (self.state, token)
class OktaClientCredentials(OAuth2ClientCredentials):
"""
Describes an Okta (OAuth 2) client credentials (also called application) flow requests authentication.
More details can be found in https://developer.okta.com/docs/guides/implement-grant-type/clientcreds/main/
"""
def __init__(
self,
instance: str,
client_id: str,
client_secret: str,
*,
scope: Union[str, Iterable[str]],
**kwargs,
):
"""
:param instance: Okta instance (like "testserver.okta-emea.com")
:param client_id: Okta Application Identifier (formatted as a Universal Unique Identifier)
:param client_secret: Resource owner password.
:param scope: Scope parameter sent to token URL as body. Can also be a list of scopes.
:param authorization_server: Okta authorization server
default by default.
:param timeout: Maximum amount of seconds to wait for a token to be received once requested.
Wait for 1 minute by default.
:param header_name: Name of the header field used to send token.
Token will be sent in Authorization header field by default.
:param header_value: Format used to send the token value.
"{token}" must be present as it will be replaced by the actual token.
Token will be sent as "Bearer {token}" by default.
:param token_field_name: Field name containing the token. access_token by default.
:param early_expiry: Number of seconds before actual token expiry where token will be considered as expired.
Default to 30 seconds to ensure token will not expire between the time of retrieval and the time the request
reaches the actual server. Set it to 0 to deactivate this feature and use the same token until actual expiry.
:param headers: Additional headers to set when requesting or refreshing token.
Use it to provide a custom proxying rule for instance.
:param kwargs: all additional authorization parameters that should be put as query parameter in the token URL.
"""
if not scope:
raise Exception("scope is mandatory.")
if not instance:
raise Exception("Okta instance is mandatory.")
authorization_server = kwargs.pop("authorization_server", None) or "default"
OAuth2ClientCredentials.__init__(
self,
f"https://{instance}/oauth2/{authorization_server}/v1/token",
client_id=client_id,
client_secret=client_secret,
scope=scope,
**kwargs,
)