This repository was archived by the owner on Jun 12, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathrefresh_access_token.py
More file actions
53 lines (41 loc) · 1.97 KB
/
refresh_access_token.py
File metadata and controls
53 lines (41 loc) · 1.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
"""The service that talks to the OAuth2 refresh access token endpoint."""
import logging
from oidcmsg import oauth2
from oidcmsg.oauth2 import ResponseMessage
from oidcmsg.time_util import time_sans_frac
from oidcservice.oauth2.utils import get_state_parameter
from oidcservice.service import Service
LOGGER = logging.getLogger(__name__)
class RefreshAccessToken(Service):
"""The service that talks to the OAuth2 refresh access token endpoint."""
msg_type = oauth2.RefreshAccessTokenRequest
response_cls = oauth2.AccessTokenResponse
error_msg = ResponseMessage
endpoint_name = 'token_endpoint'
synchronous = True
service_name = 'refresh_token'
default_authn_method = 'bearer_header'
http_method = 'POST'
def __init__(self, service_context, client_authn_factory=None, conf=None):
Service.__init__(self, service_context, client_authn_factory=client_authn_factory,
conf=conf)
self.pre_construct.append(self.oauth_pre_construct)
def update_service_context(self, resp, key='', **kwargs):
if 'expires_in' in resp:
resp['__expires_at'] = time_sans_frac() + int(resp['expires_in'])
self.store_item(resp, 'token_response', key)
def oauth_pre_construct(self, request_args=None, **kwargs):
"""Preconstructor of request arguments"""
_state = get_state_parameter(request_args, kwargs)
parameters = list(self.msg_type.c_param.keys())
_args = self.extend_request_args({}, oauth2.AccessTokenResponse,
'token_response', _state, parameters)
_args = self.extend_request_args(_args, oauth2.AccessTokenResponse,
'refresh_token_response', _state,
parameters)
if request_args is None:
request_args = _args
else:
_args.update(request_args)
request_args = _args
return request_args, {}