This repository was archived by the owner on Jun 12, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathaccess_token.py
More file actions
86 lines (67 loc) · 2.64 KB
/
access_token.py
File metadata and controls
86 lines (67 loc) · 2.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import logging
from oidcmsg import oidc
from oidcmsg.oidc import verified_claim_name
from oidcmsg.time_util import time_sans_frac
from oidcservice.exception import ParameterError
from oidcservice.oauth2 import access_token
from oidcservice.oidc import IDT2REG
__author__ = 'Roland Hedberg'
LOGGER = logging.getLogger(__name__)
class AccessToken(access_token.AccessToken):
msg_type = oidc.AccessTokenRequest
response_cls = oidc.AccessTokenResponse
error_msg = oidc.ResponseMessage
def __init__(self, service_context, client_authn_factory=None,
conf=None):
access_token.AccessToken.__init__(self, service_context,
client_authn_factory=client_authn_factory, conf=conf)
def gather_verify_arguments(self):
"""
Need to add some information before running verify()
:return: dictionary with arguments to the verify call
"""
_ctx = self.service_context
# Default is RS256
kwargs = {
'client_id': _ctx.get('client_id'), 'iss': _ctx.get('issuer'),
'keyjar': _ctx.keyjar, 'verify': True,
'skew': _ctx.clock_skew,
}
if 'registration_response' in _ctx:
_reg_resp = _ctx.get('registration_response')
for attr, param in IDT2REG.items():
try:
kwargs[attr] = _reg_resp[param]
except KeyError:
pass
try:
kwargs['allow_missing_kid'] = self.service_context.allow['missing_kid']
except KeyError:
pass
if 'behaviour' in _ctx:
_verify_args = _ctx.get('behaviour').get("verify_args")
if _verify_args:
kwargs.update(_verify_args)
return kwargs
def update_service_context(self, resp, key='', **kwargs):
try:
_idt = resp[verified_claim_name('id_token')]
except KeyError:
pass
else:
try:
if self.get_state_by_nonce(_idt['nonce']) != key:
raise ParameterError('Someone has messed with "nonce"')
except KeyError:
raise ValueError('Invalid nonce value')
self.store_sub2state(_idt['sub'], key)
if 'expires_in' in resp:
resp['__expires_at'] = time_sans_frac() + int(
resp['expires_in'])
self.store_item(resp, 'token_response', key)
def get_authn_method(self):
try:
return self.service_context.get('behaviour')[
'token_endpoint_auth_method']
except KeyError:
return self.default_authn_method