-
Notifications
You must be signed in to change notification settings - Fork 2.8k
Expand file tree
/
Copy pathauthentication.py
More file actions
143 lines (120 loc) · 5.68 KB
/
authentication.py
File metadata and controls
143 lines (120 loc) · 5.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# coding=utf-8
"""
@project: MaxKB
@Author:虎虎
@file: authentication.py
@date:2025/4/15 20:12
@desc:
"""
from typing import List
from django.utils.translation import gettext_lazy as _
from rest_framework.request import Request
from common.constants.permission_constants import PermissionConstants, RoleConstants, ViewPermission, CompareConstants, \
Permission, Role
from common.exception.app_exception import AppUnauthorizedFailed
def exist_permissions_by_permission_constants(user_permission: List[PermissionConstants],
permission_list: List[PermissionConstants]):
"""
用户是否拥有 permission_list的权限
:param user_permission: 用户权限
:param permission_list: 需要的权限
:return: 是否拥有
"""
return any(list(map(lambda up: permission_list.__contains__(up), user_permission)))
def exist_role_by_role_constants(user_role: List[RoleConstants],
role_list: List[RoleConstants]):
"""
用户是否拥有这个角色
:param user_role: 用户角色
:param role_list: 需要拥有的角色
:return: 是否拥有
"""
return any([True for role in role_list if user_role.__contains__(role.value.__str__())])
def exist_permissions_by_view_permission(user_role: List[RoleConstants],
user_permission: List[PermissionConstants | object],
permission: ViewPermission, request, **kwargs):
"""
用户是否存在这些权限
:param request:
:param user_role: 用户角色
:param user_permission: 用户权限
:param permission: 所属权限
:return: 是否存在 True False
"""
role_list = [user_r(request, kwargs) if callable(user_r) else user_r for user_r in
permission.roleList]
role_ok = any(list(map(lambda up: role_list.__contains__(up),
user_role)))
permission_list = [user_p(request, kwargs) if callable(user_p) else user_p for user_p in
permission.permissionList
]
permission_ok = any(list(map(lambda up: permission_list.__contains__(up),
user_permission)))
return role_ok | permission_ok if permission.compare == CompareConstants.OR else role_ok & permission_ok
def exist_permissions(user_role: List[RoleConstants], user_permission: List[PermissionConstants], permission, request,
**kwargs):
if isinstance(permission, ViewPermission):
return exist_permissions_by_view_permission(user_role, user_permission, permission, request, **kwargs)
if isinstance(permission, RoleConstants):
return exist_role_by_role_constants(user_role, [permission])
if isinstance(permission, PermissionConstants):
return exist_permissions_by_permission_constants(user_permission, [permission])
if isinstance(permission, Permission):
return user_permission.__contains__(permission)
if isinstance(permission, Role):
return user_role.__contains__(permission.__str__())
return False
def exist(user_role: List[RoleConstants], user_permission: List[PermissionConstants], permission, request, **kwargs):
if callable(permission):
p = permission(request, kwargs)
return exist_permissions(user_role, user_permission, p, request, **kwargs)
return exist_permissions(user_role, user_permission, permission, request, **kwargs)
def get_is_permissions(request, **kwargs):
def is_permissions(*permission, compare=CompareConstants.OR):
exit_list = list(
map(lambda p: exist(request.auth.role_list, request.auth.permission_list, p, request, **kwargs),
permission))
return any(exit_list) if compare == CompareConstants.OR else all(exit_list)
return is_permissions
def check_batch_permissions(request: Request, id_list: List[str], id_key: str, permissions: tuple,
compare=CompareConstants.OR, **kwargs) -> List[str]:
if not id_list:
return []
# workspace manager 直接放行
# 预检
kwargs[id_key] = '__workspace_level_pre_check__'
pre_check = list(
map(lambda p: exist(request.auth.role_list, request.auth.permission_list, p, request, **kwargs),
permissions)
)
if any(pre_check) if compare == CompareConstants.OR else all(pre_check):
return list(id_list)
# 逐个资源校验
result_list = []
for resource_id in id_list:
kwargs[id_key] = resource_id
exit_list = list(
map(lambda p: exist(request.auth.role_list, request.auth.permission_list, p, request, **kwargs),
permissions)
)
if any(exit_list) if compare == CompareConstants.OR else all(exit_list):
result_list.append(resource_id)
return result_list
def has_permissions(*permission, compare=CompareConstants.OR):
"""
权限 role or permission
:param compare: 比较符号
:param permission: 如果是角色 role:roleId
:return: 权限装饰器函数,用于判断用户是否有权限访问当前接口
"""
def inner(func):
def run(view, request, **kwargs):
exit_list = list(
map(lambda p: exist(request.auth.role_list, request.auth.permission_list, p, request, **kwargs),
permission))
# 判断是否有权限
if any(exit_list) if compare == CompareConstants.OR else all(exit_list):
return func(view, request, **kwargs)
raise AppUnauthorizedFailed(403, _('No permission to access'))
return run
return inner