This repository was archived by the owner on Jun 12, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy path__init__.py
More file actions
executable file
·127 lines (104 loc) · 4.03 KB
/
__init__.py
File metadata and controls
executable file
·127 lines (104 loc) · 4.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import copy
import inspect
import logging
import sys
from typing import Optional
from typing import Union
from oidcmsg.message import Message
from oidcendpoint.session.grant import Grant
logger = logging.getLogger(__name__)
class AuthzHandling(object):
""" Class that allow an entity to manage authorization """
def __init__(self, endpoint_context, grant_config=None, **kwargs):
self.endpoint_context = endpoint_context
self.cookie_dealer = endpoint_context.cookie_dealer
self.grant_config = grant_config or {}
self.kwargs = kwargs
def usage_rules(self, client_id):
if "usage_rules" in self.grant_config:
_usage_rules = copy.deepcopy(self.grant_config["usage_rules"])
else:
_usage_rules = {}
if not client_id:
return _usage_rules
try:
_per_client = self.endpoint_context.cdb[client_id]["token_usage_rules"]
except KeyError:
pass
else:
if _usage_rules:
for _token_type, _rule in _usage_rules.items():
_pc = _per_client.get(_token_type)
if _pc:
_rule.update(_pc)
for _token_type, _rule in _per_client.items():
if _token_type not in _usage_rules:
_usage_rules[_token_type] = _rule
else:
_usage_rules = _per_client
return _usage_rules
def usage_rules_for(self, client_id, token_type):
_token_usage = self.usage_rules(client_id=client_id)
try:
return _token_usage[token_type]
except KeyError:
return {}
def __call__(
self,
session_id: str,
request: Union[dict, Message],
resources: Optional[list] = None,
) -> Grant:
args = self.grant_config.copy()
scope = request.get("scope")
if scope:
args["scope"] = scope
claims = request.get("claims")
if claims:
if isinstance(request, Message):
claims = claims.to_dict()
args["claims"] = claims
session_info = self.endpoint_context.session_manager.get_session_info(
session_id=session_id, grant=True
)
grant = session_info["grant"]
for key, val in args.items():
if key == "expires_in":
grant.set_expires_at(val)
if key == "usage_rules":
setattr(grant, key, self.usage_rules(request.get("client_id")))
else:
setattr(grant, key, val)
if resources is None:
grant.resources = [session_info["client_id"]]
else:
grant.resources = resources
# This is where user consent should be handled
for interface in ["userinfo", "introspection", "id_token", "access_token"]:
grant.claims[interface] = self.endpoint_context.claims_interface.get_claims(
session_id=session_id, scopes=request["scope"], usage=interface
)
return grant
class Implicit(AuthzHandling):
def __call__(self, session_id: str, request: Union[dict, Message],
resources: Optional[list] = None) -> Grant:
args = self.grant_config.copy()
grant = self.endpoint_context.session_manager.get_grant(session_id=session_id)
for arg, val in args:
setattr(grant, arg, val)
return grant
def factory(msgtype, endpoint_context, **kwargs):
"""
Factory method that can be used to easily instantiate a class instance
:param msgtype: The name of the class
:param kwargs: Keyword arguments
:return: An instance of the class or None if the name doesn't match any
known class.
"""
for name, obj in inspect.getmembers(sys.modules[__name__]):
if inspect.isclass(obj) and issubclass(obj, AuthzHandling):
try:
if obj.__name__ == msgtype:
return obj(endpoint_context, **kwargs)
except AttributeError:
pass