This repository was archived by the owner on Jun 12, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathpushed_authorization.py
More file actions
73 lines (55 loc) · 2.39 KB
/
pushed_authorization.py
File metadata and controls
73 lines (55 loc) · 2.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import logging
from cryptojwt import JWT
from oidcmsg.message import Message
from oidcmsg.oauth2 import JWTSecuredAuthorizationRequest
import requests
logger = logging.getLogger(__name__)
def push_authorization(request_args, service, **kwargs):
"""
:param request_args: All the request arguments as a AuthorizationRequest instance
:param service: The service to which this post construct method is applied.
:param kwargs: Extra keyword arguments.
"""
method_args = service.service_context.add_on["pushed_authorization"]
# construct the message body
if method_args["body_format"] == "urlencoded":
_body = request_args.to_urlencoded()
else:
_jwt = JWT(key_jar=service.service_context.keyjar,
iss=service.service_context.base_url)
_jws = _jwt.pack(request_args.to_dict())
_msg = Message(request=_jws)
if method_args["merge_rule"] == "lax":
for param in request_args.required_parameters():
_msg[param] = request_args.get(param)
_body = _msg.to_urlencoded()
# Send it to the Pushed Authorization Request Endpoint
resp = method_args["http_client"].get(
service.service_context.get('provider_info')["pushed_authorization_request_endpoint"],
data=_body
)
if resp.status_code == 200:
_resp = Message().from_json(resp.text)
_req = JWTSecuredAuthorizationRequest(request_uri=_resp["request_uri"])
if method_args["merge_rule"] == "lax":
for param in request_args.required_parameters():
_req[param] = request_args.get(param)
request_args = _req
return request_args
def add_pushed_authorization_support(services, body_format="jws", signing_algorthm="RS256",
http_client=None, merge_rule="strict"):
"""
Add the necessary pieces to make pushed authorization happen.
:param services: A dictionary with all the services the client has access to.
:param body_format: jws or urlencoded
"""
if http_client is None:
http_client = requests
_service = services["authorization"]
_service.service_context.add_on['pushed_authorization'] = {
"body_format": body_format,
"signing_algorithm": signing_algorthm,
"http_client": http_client,
"merge_rule": merge_rule
}
_service.post_construct.append(push_authorization)