This repository was archived by the owner on Jun 12, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathauthorization.py
More file actions
78 lines (64 loc) · 2.9 KB
/
authorization.py
File metadata and controls
78 lines (64 loc) · 2.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
"""The service that talks to the OAuth2 Authorization endpoint."""
import logging
from oidcmsg import oauth2
from oidcmsg.exception import MissingParameter
from oidcmsg.oauth2 import ResponseMessage
from oidcmsg.time_util import time_sans_frac
from oidcservice.oauth2.utils import get_state_parameter
from oidcservice.oauth2.utils import pick_redirect_uris
from oidcservice.oauth2.utils import set_state_parameter
from oidcservice.service import Service
LOGGER = logging.getLogger(__name__)
class Authorization(Service):
"""The service that talks to the OAuth2 Authorization endpoint."""
msg_type = oauth2.AuthorizationRequest
response_cls = oauth2.AuthorizationResponse
error_msg = ResponseMessage
endpoint_name = 'authorization_endpoint'
synchronous = False
service_name = 'authorization'
response_body_type = 'urlencoded'
def __init__(self, service_context, client_authn_factory=None, conf=None):
Service.__init__(self, service_context, client_authn_factory=client_authn_factory,
conf=conf)
self.pre_construct.extend([pick_redirect_uris, set_state_parameter])
self.post_construct.append(self.store_auth_request)
def update_service_context(self, resp, key='', **kwargs):
if 'expires_in' in resp:
resp['__expires_at'] = time_sans_frac() + int(resp['expires_in'])
self.service_context.state.store_item(resp, 'auth_response', key)
def store_auth_request(self, request_args=None, **kwargs):
"""Store the authorization request in the state DB."""
_key = get_state_parameter(request_args, kwargs)
self.service_context.state.store_item(request_args, 'auth_request', _key)
return request_args
def gather_request_args(self, **kwargs):
ar_args = Service.gather_request_args(self, **kwargs)
if 'redirect_uri' not in ar_args:
try:
ar_args['redirect_uri'] = self.service_context.redirect_uris[0]
except (KeyError, AttributeError):
raise MissingParameter('redirect_uri')
return ar_args
def post_parse_response(self, response, **kwargs):
"""
Add scope claim to response, from the request, if not present in the
response
:param response: The response
:param kwargs: Extra Keyword arguments
:return: A possibly augmented response
"""
if "scope" not in response:
try:
_key = kwargs['state']
except KeyError:
pass
else:
if _key:
item = self.service_context.state.get_item(oauth2.AuthorizationRequest,
'auth_request', _key)
try:
response["scope"] = item["scope"]
except KeyError:
pass
return response