-
Notifications
You must be signed in to change notification settings - Fork 2.8k
Expand file tree
/
Copy pathuser_resource_permission.py
More file actions
435 lines (384 loc) · 22 KB
/
user_resource_permission.py
File metadata and controls
435 lines (384 loc) · 22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
# coding=utf-8
"""
@project: MaxKB
@Author:虎虎
@file: workspace_user_resource_permission.py
@date:2025/4/28 17:17
@desc:
"""
import json
import os
from django.core.cache import cache
from django.db import models
from django.db.models import QuerySet, Q
from django.utils.translation import gettext_lazy as _
from rest_framework import serializers
from application.models import Application
from common.constants.cache_version import Cache_Version
from common.constants.permission_constants import get_default_workspace_user_role_mapping_list, RoleConstants, \
ResourcePermission, ResourcePermissionRole, ResourceAuthType
from common.database_model_manage.database_model_manage import DatabaseModelManage
from common.db.search import native_search, native_page_search, get_dynamics_model
from common.db.sql_execute import select_list
from common.exception.app_exception import AppApiException
from common.utils.common import get_file_content
from knowledge.models import Knowledge
from maxkb.conf import PROJECT_DIR
from maxkb.settings import edition
from models_provider.models import Model
from system_manage.models import WorkspaceUserResourcePermission
from tools.models import Tool
from users.serializers.user import is_workspace_manage
class PermissionSerializer(serializers.Serializer):
VIEW = serializers.BooleanField(required=True, label="可读")
MANAGE = serializers.BooleanField(required=True, label="管理")
ROLE = serializers.BooleanField(required=True, label="跟随角色")
class UserResourcePermissionItemResponse(serializers.Serializer):
id = serializers.UUIDField(required=True, label="主键id")
name = serializers.CharField(required=True, label="资源名称")
auth_target_type = serializers.CharField(required=True, label="授权资源")
user_id = serializers.UUIDField(required=True, label="用户id")
icon = serializers.CharField(required=True, label="资源图标")
auth_type = serializers.CharField(required=True, label="授权类型")
permission = serializers.ChoiceField(required=False, allow_null=True, allow_blank=True,
choices=['NOT_AUTH', 'MANAGE', 'VIEW', 'ROLE'],
label=_('permission'))
class UserResourcePermissionResponse(serializers.Serializer):
KNOWLEDGE = UserResourcePermissionItemResponse(many=True)
class UpdateTeamMemberItemPermissionSerializer(serializers.Serializer):
target_id = serializers.CharField(required=True, label=_('target id'))
permission = serializers.ChoiceField(required=False, allow_null=True, allow_blank=True,
choices=['NOT_AUTH', 'MANAGE', 'VIEW', 'ROLE'],
label=_('permission'))
class UpdateUserResourcePermissionRequest(serializers.Serializer):
user_resource_permission_list = UpdateTeamMemberItemPermissionSerializer(required=True, many=True)
def is_valid(self, *, auth_target_type=None, workspace_id=None, raise_exception=False):
super().is_valid(raise_exception=True)
user_resource_permission_list = [{'target_id': urp.get('target_id'), 'auth_target_type': auth_target_type} for
urp in
self.data.get("user_resource_permission_list")]
illegal_target_id_list = select_list(
get_file_content(
os.path.join(PROJECT_DIR, "apps", "system_manage", 'sql', 'check_member_permission_target_exists.sql')),
[json.dumps(user_resource_permission_list), workspace_id, workspace_id, workspace_id, workspace_id,
workspace_id, workspace_id, workspace_id])
if illegal_target_id_list is not None and len(illegal_target_id_list) > 0:
raise AppApiException(500,
_('Non-existent id[') + str(illegal_target_id_list) + ']')
m_map = {
"KNOWLEDGE": Knowledge,
'TOOL': Tool,
'MODEL': Model,
'APPLICATION': Application,
}
sql_map = {
"KNOWLEDGE": 'get_knowledge_user_resource_permission.sql',
'TOOL': 'get_tool_user_resource_permission.sql',
'MODEL': 'get_model_user_resource_permission.sql',
'APPLICATION': 'get_application_user_resource_permission.sql'
}
class UserResourcePermissionUserListRequest(serializers.Serializer):
name = serializers.CharField(required=False, allow_null=True, allow_blank=True, label=_('resource name'))
permission = serializers.MultipleChoiceField(required=False, allow_null=True, allow_blank=True,
choices=['NOT_AUTH', 'MANAGE', 'VIEW', 'ROLE'],
label=_('permission'))
class UserResourcePermissionSerializer(serializers.Serializer):
workspace_id = serializers.CharField(required=True, label=_('workspace id'))
user_id = serializers.CharField(required=True, label=_('user id'))
auth_target_type = serializers.CharField(required=True, label=_('resource'))
def get_queryset(self, instance):
resource_query_set = QuerySet(
model=get_dynamics_model({
'name': models.CharField(),
"permission": models.CharField(),
}))
name = instance.get('name')
permission = instance.get('permission')
query_p_list = [None if p == "NOT_AUTH" else p for p in permission]
if name:
resource_query_set = resource_query_set.filter(name__contains=name)
if permission:
if all([p is None for p in query_p_list]):
resource_query_set = resource_query_set.filter(permission=None)
else:
if any([p is None for p in query_p_list]):
resource_query_set = resource_query_set.filter(
Q(permission__in=query_p_list) | Q(permission=None))
else:
resource_query_set = resource_query_set.filter(
permission__in=query_p_list)
return {
'query_set': QuerySet(m_map.get(self.data.get('auth_target_type'))).filter(
workspace_id=self.data.get('workspace_id')),
'folder_query_set': QuerySet(m_map.get(self.data.get('auth_target_type'))).filter(
workspace_id=self.data.get('workspace_id')),
'workspace_user_resource_permission_query_set': QuerySet(WorkspaceUserResourcePermission).filter(
workspace_id=self.data.get('workspace_id'), user=self.data.get('user_id'),
auth_target_type=self.data.get('auth_target_type')),
'resource_query_set': resource_query_set
}
def is_auth(self, resource_id: str):
self.is_valid(raise_exception=True)
auth_target_type = self.data.get('auth_target_type')
workspace_id = self.data.get('workspace_id')
user_id = self.data.get('user_id')
workspace_manage = is_workspace_manage(user_id, workspace_id)
if workspace_manage:
return True
wurp = QuerySet(WorkspaceUserResourcePermission).filter(auth_target_type=auth_target_type,
workspace_id=workspace_id, user=user_id,
target=resource_id).first()
if wurp is None:
return False
workspace_user_role_mapping_model = DatabaseModelManage.get_model("workspace_user_role_mapping")
role_permission_mapping_model = DatabaseModelManage.get_model("role_permission_mapping_model")
if wurp.auth_type == ResourceAuthType.ROLE.value:
if workspace_user_role_mapping_model and role_permission_mapping_model:
inner = QuerySet(workspace_user_role_mapping_model).filter(workspace_id=workspace_id, user_id=user_id)
return QuerySet(role_permission_mapping_model).filter(role_id__in=inner,
permission_id=(
auth_target_type + ':READ')).exists()
else:
return False
else:
return wurp.permission_list.__contains__(ResourcePermission.VIEW.value)
def auth_resource_batch(self, resource_id_list: list):
self.is_valid(raise_exception=True)
auth_target_type = self.data.get('auth_target_type')
workspace_id = self.data.get('workspace_id')
user_id = self.data.get('user_id')
wurp = QuerySet(WorkspaceUserResourcePermission).filter(auth_target_type=auth_target_type,
workspace_id=workspace_id, user_id=user_id).first()
auth_type = wurp.auth_type if wurp else (
ResourceAuthType.RESOURCE_PERMISSION_GROUP if edition == 'CE' else ResourceAuthType.ROLE)
workspace_user_resource_permission = [WorkspaceUserResourcePermission(
target=resource_id,
auth_target_type=auth_target_type,
permission_list=[ResourcePermission.VIEW,
ResourcePermission.MANAGE] if auth_type == ResourceAuthType.RESOURCE_PERMISSION_GROUP else [
ResourcePermissionRole.ROLE],
workspace_id=workspace_id,
user_id=user_id,
auth_type=auth_type
) for resource_id in resource_id_list]
QuerySet(WorkspaceUserResourcePermission).bulk_create(workspace_user_resource_permission)
# 刷新缓存
version = Cache_Version.PERMISSION_LIST.get_version()
key = Cache_Version.PERMISSION_LIST.get_key(user_id=user_id)
cache.delete(key, version=version)
return True
def auth_resource(self, resource_id: str, is_folder=False):
self.is_valid(raise_exception=True)
auth_target_type = self.data.get('auth_target_type')
workspace_id = self.data.get('workspace_id')
user_id = self.data.get('user_id')
wurp = QuerySet(WorkspaceUserResourcePermission).filter(auth_target_type=auth_target_type,
workspace_id=workspace_id, user_id=user_id).first()
auth_type = wurp.auth_type if wurp else (
ResourceAuthType.RESOURCE_PERMISSION_GROUP if edition == 'CE' else ResourceAuthType.ROLE)
# 自动授权给创建者
WorkspaceUserResourcePermission(
target=resource_id,
auth_target_type=auth_target_type,
permission_list=[ResourcePermission.VIEW,
ResourcePermission.MANAGE] if (
auth_type == ResourceAuthType.RESOURCE_PERMISSION_GROUP or is_folder) else [
ResourcePermissionRole.ROLE],
workspace_id=workspace_id,
user_id=user_id,
auth_type=ResourceAuthType.RESOURCE_PERMISSION_GROUP if is_folder else auth_type
).save()
# 刷新缓存
version = Cache_Version.PERMISSION_LIST.get_version()
key = Cache_Version.PERMISSION_LIST.get_key(user_id=user_id)
cache.delete(key, version=version)
return True
def list(self, instance, user, with_valid=True):
if with_valid:
self.is_valid(raise_exception=True)
UserResourcePermissionUserListRequest(data=instance).is_valid(raise_exception=True)
workspace_id = self.data.get("workspace_id")
user_id = self.data.get("user_id")
# 用户权限列表
user_resource_permission_list = native_search(self.get_queryset(instance), get_file_content(
os.path.join(PROJECT_DIR, "apps", "system_manage", 'sql', sql_map.get(self.data.get('auth_target_type')))))
return [{**user_resource_permission}
for user_resource_permission in user_resource_permission_list]
def page(self, instance, current_page: int, page_size: int, user, with_valid=True):
if with_valid:
self.is_valid(raise_exception=True)
UserResourcePermissionUserListRequest(data=instance).is_valid(raise_exception=True)
workspace_id = self.data.get("workspace_id")
user_id = self.data.get("user_id")
# 用户对应的资源权限分页列表
user_resource_permission_page_list = native_page_search(current_page, page_size, self.get_queryset(instance),
get_file_content(
os.path.join(PROJECT_DIR, "apps", "system_manage",
'sql', sql_map.get(
self.data.get('auth_target_type')))
))
return user_resource_permission_page_list
def edit(self, instance, user, with_valid=True):
if with_valid:
self.is_valid(raise_exception=True)
UpdateUserResourcePermissionRequest(data={'user_resource_permission_list': instance}).is_valid(
raise_exception=True,
auth_target_type=self.data.get(
'auth_target_type'),
workspace_id=self.data.get('workspace_id'))
workspace_id = self.data.get("workspace_id")
user_id = self.data.get("user_id")
update_list = []
save_list = []
targets = [item['target_id'] for item in instance]
QuerySet(WorkspaceUserResourcePermission).filter(
workspace_id=workspace_id,
user_id=user_id,
auth_target_type=self.data.get('auth_target_type'),
target__in=targets
).delete()
workspace_user_resource_permission_exist_list = []
for user_resource_permission in instance:
permission = user_resource_permission['permission']
auth_type, permission_list = permission_map[permission]
exist_list = [user_resource_permission_exist for user_resource_permission_exist in
workspace_user_resource_permission_exist_list if
user_resource_permission.get('target_id') == str(user_resource_permission_exist.target)]
if len(exist_list) > 0:
exist_list[0].permission_list = [key for key in user_resource_permission.get('permission').keys() if
user_resource_permission.get('permission').get(key)]
exist_list[0].auth_type = user_resource_permission.get('auth_type')
update_list.append(exist_list[0])
else:
save_list.append(WorkspaceUserResourcePermission(target=user_resource_permission.get('target_id'),
auth_target_type=self.data.get('auth_target_type'),
permission_list=permission_list,
workspace_id=workspace_id,
user_id=user_id,
auth_type=auth_type))
# 批量更新
QuerySet(WorkspaceUserResourcePermission).bulk_update(update_list, ['permission_list', 'auth_type']) if len(
update_list) > 0 else None
# 批量插入
QuerySet(WorkspaceUserResourcePermission).bulk_create(save_list) if len(save_list) > 0 else None
version = Cache_Version.PERMISSION_LIST.get_version()
key = Cache_Version.PERMISSION_LIST.get_key(user_id=user_id)
cache.delete(key, version=version)
return instance
class ResourceUserPermissionUserListRequest(serializers.Serializer):
nick_name = serializers.CharField(required=False, allow_null=True, allow_blank=True, label=_('workspace id'))
username = serializers.CharField(required=False, allow_null=True, allow_blank=True, label=_('workspace id'))
permission = serializers.MultipleChoiceField(required=False, allow_null=True, allow_blank=True,
choices=['NOT_AUTH', 'MANAGE', 'VIEW', 'ROLE'],
label=_('permission'))
class ResourceUserPermissionEditRequest(serializers.Serializer):
user_id = serializers.CharField(required=True, label=_('workspace id'))
permission = serializers.ChoiceField(required=True, choices=['NOT_AUTH', 'MANAGE', 'VIEW', 'ROLE'],
label=_('permission'))
permission_map = {
"ROLE": ("ROLE", ["ROLE"]),
"MANAGE": ("RESOURCE_PERMISSION_GROUP", ["MANAGE", "VIEW"]),
"VIEW": ("RESOURCE_PERMISSION_GROUP", ["VIEW"]),
"NOT_AUTH": ("RESOURCE_PERMISSION_GROUP", []),
}
class ResourceUserPermissionSerializer(serializers.Serializer):
workspace_id = serializers.CharField(required=True, label=_('workspace id'))
target = serializers.CharField(required=True, label=_('resource id'))
auth_target_type = serializers.CharField(required=True, label=_('resource'))
users_permission = ResourceUserPermissionEditRequest(required=False, many=True, label=_('users_permission'))
def get_queryset(self, instance):
user_query_set = QuerySet(model=get_dynamics_model({
'nick_name': models.CharField(),
'username': models.CharField(),
"permission": models.CharField(),
"id": models.UUIDField(),
}))
nick_name = instance.get('nick_name')
username = instance.get('username')
permission = instance.get('permission')
query_p_list = [None if p == "NOT_AUTH" else p for p in permission]
workspace_user_resource_permission_query_set = QuerySet(WorkspaceUserResourcePermission).filter(
workspace_id=self.data.get('workspace_id'),
auth_target_type=self.data.get('auth_target_type'),
target=self.data.get('target'))
if nick_name:
user_query_set = user_query_set.filter(nick_name__contains=nick_name)
if username:
user_query_set = user_query_set.filter(username__contains=username)
if permission:
if all([p is None for p in query_p_list]):
user_query_set = user_query_set.filter(
permission=None)
else:
if any([p is None for p in query_p_list]):
user_query_set = user_query_set.filter(
Q(permission__in=query_p_list) | Q(permission=None))
else:
user_query_set = user_query_set.filter(
permission__in=query_p_list)
workspace_user_role_mapping_model = DatabaseModelManage.get_model("workspace_user_role_mapping")
if workspace_user_role_mapping_model:
user_query_set = user_query_set.filter(
id__in=QuerySet(workspace_user_role_mapping_model).filter(
workspace_id=self.data.get('workspace_id')).values("user_id"))
return {
'workspace_user_resource_permission_query_set': workspace_user_resource_permission_query_set,
'user_query_set': user_query_set
}
def list(self, instance, with_valid=True):
if with_valid:
self.is_valid(raise_exception=True)
ResourceUserPermissionUserListRequest(data=instance).is_valid(raise_exception=True)
# 资源的用户授权列表
resource_user_permission_list = native_search(self.get_queryset(instance), get_file_content(
os.path.join(PROJECT_DIR, "apps", "system_manage", 'sql', 'get_resource_user_permission_detail.sql')
))
return resource_user_permission_list
def page(self, instance, current_page: int, page_size: int, with_valid=True):
if with_valid:
self.is_valid(raise_exception=True)
ResourceUserPermissionUserListRequest(data=instance).is_valid(raise_exception=True)
# 分页列表
resource_user_permission_page_list = native_page_search(current_page, page_size, self.get_queryset(instance),
get_file_content(
os.path.join(PROJECT_DIR, "apps", "system_manage",
'sql',
'get_resource_user_permission_detail.sql')
))
return resource_user_permission_page_list
def edit(self, instance, with_valid=True):
if with_valid:
self.is_valid(raise_exception=True)
ResourceUserPermissionEditRequest(data=instance, many=True).is_valid(
raise_exception=True)
workspace_id = self.data.get("workspace_id")
target = self.data.get("target")
auth_target_type = self.data.get("auth_target_type")
users_permission = instance
users_id = [item["user_id"] for item in users_permission]
# 删除已存在的对应的用户在该资源下的权限
QuerySet(WorkspaceUserResourcePermission).filter(
workspace_id=workspace_id,
target=target,
auth_target_type=auth_target_type,
user_id__in=users_id
).delete()
save_list = []
for item in users_permission:
permission = item['permission']
auth_type, permission_list = permission_map[permission]
save_list.append(WorkspaceUserResourcePermission(
target=target,
auth_target_type=auth_target_type,
workspace_id=workspace_id,
auth_type=auth_type,
user_id=item["user_id"],
permission_list=permission_list
))
if save_list:
QuerySet(WorkspaceUserResourcePermission).bulk_create(save_list)
version = Cache_Version.PERMISSION_LIST.get_version()
for user_id in users_id:
key = Cache_Version.PERMISSION_LIST.get_key(user_id=user_id)
cache.delete(key, version=version)
return instance