This repository was archived by the owner on Jun 12, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathpkce.py
More file actions
108 lines (86 loc) · 3.35 KB
/
pkce.py
File metadata and controls
108 lines (86 loc) · 3.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import logging
from cryptojwt.utils import b64e
from oidcmsg.message import Message
from oidcservice import CC_METHOD, unreserved
from oidcservice.exception import Unsupported
from oidcservice.oauth2.utils import get_state_parameter
logger = logging.getLogger(__name__)
def add_code_challenge(request_args, service, **kwargs):
"""
PKCE RFC 7636 support
To be added as a post_construct method to an
:py:class:`oidcservice.oidc.service.Authorization` instance
:param service: The service that uses this function
:param request_args: Set of request arguments
:param kwargs: Extra set of keyword arguments
:return: Updated set of request arguments
"""
_kwargs = service.service_context.add_on["pkce"]
try:
cv_len = _kwargs['code_challenge_length']
except KeyError:
cv_len = 64 # Use default
# code_verifier: string of length cv_len
code_verifier = unreserved(cv_len)
_cv = code_verifier.encode()
try:
_method = _kwargs['code_challenge_method']
except KeyError:
_method = 'S256'
try:
# Pick hash method
_hash_method = CC_METHOD[_method]
# Use it on the code_verifier
_hv = _hash_method(_cv).digest()
# base64 encode the hash value
code_challenge = b64e(_hv).decode('ascii')
except KeyError:
raise Unsupported(
'PKCE Transformation method:{}'.format(_method))
_item = Message(code_verifier=code_verifier, code_challenge_method=_method)
service.store_item(_item, 'pkce', request_args['state'])
request_args.update(
{
"code_challenge": code_challenge,
"code_challenge_method": _method
})
return request_args, {}
def add_code_verifier(request_args, service, **kwargs):
"""
PKCE RFC 7636 support
To be added as a post_construct method to an
:py:class:`oidcservice.oidc.service.AccessToken` instance
:param service: The service that uses this function
:param request_args: Set of request arguments
:return: updated set of request arguments
"""
_state = request_args.get('state')
if _state is None:
_state = kwargs.get('state')
_item = service.get_item(Message, 'pkce', _state)
request_args.update({'code_verifier': _item['code_verifier']})
return request_args
def put_state_in_post_args(request_args, **kwargs):
state = get_state_parameter(request_args, kwargs)
return request_args, {'state': state}
def add_pkce_support(service, code_challenge_length, code_challenge_method):
"""
PKCE support can only be considered if this client can access authorization and
access token services.
:param service: Dictionary of services
:param code_challenge_length:
:param code_challenge_method:
:return:
"""
if "authorization" in service and "accesstoken" in service:
_service = service["authorization"]
_service.service_context.add_on['pkce'] = {
"code_challenge_length": code_challenge_length,
"code_challenge_method": code_challenge_method
}
_service.pre_construct.append(add_code_challenge)
token_service = service['accesstoken']
token_service.pre_construct.append(put_state_in_post_args)
token_service.post_construct.append(add_code_verifier)
else:
logger.warning("PKCE support could NOT be added")