-
Notifications
You must be signed in to change notification settings - Fork 689
Expand file tree
/
Copy pathuser.py
More file actions
321 lines (283 loc) · 15.4 KB
/
user.py
File metadata and controls
321 lines (283 loc) · 15.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
from collections import defaultdict
from typing import Optional
from fastapi import APIRouter, File, Path, Query, UploadFile
from sqlmodel import SQLModel, or_, select, delete as sqlmodel_delete
from apps.system.crud.user import check_account_exists, check_email_exists, check_email_format, check_pwd_format, get_db_user, single_delete, user_ws_options
from apps.system.crud.user_excel import batchUpload, downTemplate, download_error_file
from apps.system.models.system_model import UserWsModel, WorkspaceModel
from apps.system.models.user import UserModel
from apps.system.schemas.auth import CacheName, CacheNamespace
from apps.system.schemas.permission import SqlbotPermission, require_permissions
from apps.system.schemas.system_schema import PwdEditor, UserCreator, UserEditor, UserGrid, UserInfoDTO, UserLanguage, UserStatus, UserWs
from common.audit.models.log_model import OperationType, OperationModules
from common.audit.schemas.logger_decorator import LogConfig, system_log
from common.core.deps import CurrentUser, SessionDep, Trans
from common.core.pagination import Paginator
from common.core.schemas import PaginatedResponse, PaginationParams
from common.core.security import default_md5_pwd, md5pwd, verify_md5pwd
from common.core.sqlbot_cache import clear_cache
from common.core.config import settings
from apps.swagger.i18n import PLACEHOLDER_PREFIX
router = APIRouter(tags=["system_user"], prefix="/user")
@router.get("/template", include_in_schema=False)
@require_permissions(permission=SqlbotPermission(role=['admin']))
async def templateExcel(trans: Trans):
return await downTemplate(trans)
@router.post("/batchImport", include_in_schema=False)
@require_permissions(permission=SqlbotPermission(role=['admin']))
async def upload_excel(session: SessionDep, trans: Trans, current_user: CurrentUser, file: UploadFile = File(...)):
return await batchUpload(session, trans, file)
@router.get("/errorRecord/{file_id}", include_in_schema=False)
@require_permissions(permission=SqlbotPermission(role=['admin']))
async def download_error(file_id: str):
return download_error_file(file_id)
@router.get("/info", summary=f"{PLACEHOLDER_PREFIX}system_user_current_user", description=f"{PLACEHOLDER_PREFIX}system_user_current_user_desc")
async def user_info(current_user: CurrentUser) -> UserInfoDTO:
return current_user
@router.get("/defaultPwd", include_in_schema=False)
@require_permissions(permission=SqlbotPermission(role=['admin']))
async def default_pwd() -> str:
return settings.DEFAULT_PWD
@router.get("/pager/{pageNum}/{pageSize}", response_model=PaginatedResponse[UserGrid], summary=f"{PLACEHOLDER_PREFIX}system_user_grid", description=f"{PLACEHOLDER_PREFIX}system_user_grid")
@require_permissions(permission=SqlbotPermission(role=['admin']))
async def pager(
session: SessionDep,
pageNum: int = Path(..., title=f"{PLACEHOLDER_PREFIX}page_num", description=f"{PLACEHOLDER_PREFIX}page_num"),
pageSize: int = Path(..., title=f"{PLACEHOLDER_PREFIX}page_size", description=f"{PLACEHOLDER_PREFIX}page_size"),
keyword: Optional[str] = Query(None, description=f"{PLACEHOLDER_PREFIX}keyword"),
status: Optional[int] = Query(None, description=f"{PLACEHOLDER_PREFIX}status"),
origins: Optional[list[int]] = Query(None, description=f"{PLACEHOLDER_PREFIX}origin"),
oidlist: Optional[list[int]] = Query(None, description=f"{PLACEHOLDER_PREFIX}oid"),
):
pagination = PaginationParams(page=pageNum, size=pageSize)
paginator = Paginator(session)
filters = {}
origin_stmt = (
select(UserModel.id, UserModel.account)
.join(UserWsModel, UserModel.id == UserWsModel.uid, isouter=True)
.where(UserModel.id != 1)
.distinct()
.order_by(UserModel.account)
)
if oidlist:
origin_stmt = origin_stmt.where(UserWsModel.oid.in_(oidlist))
if origins:
origin_stmt = origin_stmt.where(UserModel.origin.in_(origins))
if status is not None:
origin_stmt = origin_stmt.where(UserModel.status == status)
if keyword:
keyword_pattern = f"%{keyword}%"
origin_stmt = origin_stmt.where(
or_(
UserModel.account.ilike(keyword_pattern),
UserModel.name.ilike(keyword_pattern),
UserModel.email.ilike(keyword_pattern)
)
)
user_page = await paginator.get_paginated_response(
stmt=origin_stmt,
pagination=pagination,
**filters)
uid_list = [item.get('id') for item in user_page.items]
if not uid_list:
return user_page
stmt = (
select(UserModel, UserWsModel.oid.label('ws_oid'))
.join(UserWsModel, UserModel.id == UserWsModel.uid, isouter=True)
.where(UserModel.id.in_(uid_list))
.order_by(UserModel.account, UserModel.create_time)
)
user_workspaces = session.exec(stmt).all()
merged = defaultdict(list)
extra_attrs = {}
for (user, ws_oid) in user_workspaces:
item = {}
item.update(user.model_dump())
user_id = item['id']
merged[user_id].append(ws_oid)
if user_id not in extra_attrs:
extra_attrs[user_id] = {k: v for k, v in item.items() if k != "ws_oid"}
# 组合结果
result = [
{**extra_attrs[user_id], "oid_list": list(filter(None, oid_list))}
for user_id, oid_list in merged.items()
]
user_page.items = result
return user_page
def format_user_dict(row) -> dict:
result_dict = {}
for item, key in zip(row, row._fields):
if isinstance(item, SQLModel):
result_dict.update(item.model_dump())
else:
result_dict[key] = item
return result_dict
@router.get("/ws", include_in_schema=False)
async def ws_options(session: SessionDep, current_user: CurrentUser, trans: Trans) -> list[UserWs]:
return await user_ws_options(session, current_user.id, trans)
@router.put("/ws/{oid}", summary=f"{PLACEHOLDER_PREFIX}switch_oid_api", description=f"{PLACEHOLDER_PREFIX}switch_oid_api")
@clear_cache(namespace=CacheNamespace.AUTH_INFO, cacheName=CacheName.USER_INFO, keyExpression="current_user.id")
@system_log(LogConfig(
operation_type=OperationType.UPDATE,
module=OperationModules.USER,
resource_id_expr="editor.id"
))
async def ws_change(session: SessionDep, current_user: CurrentUser, trans:Trans, oid: int = Path(description=f"{PLACEHOLDER_PREFIX}oid")):
ws_list: list[UserWs] = await user_ws_options(session, current_user.id)
if not any(x.id == oid for x in ws_list):
db_ws = session.get(WorkspaceModel, oid)
if db_ws:
raise Exception(trans('i18n_user.ws_miss', ws = db_ws.name))
raise Exception(trans('i18n_not_exist', msg = f"{trans('i18n_ws.title')}[{oid}]"))
user_model: UserModel = get_db_user(session = session, user_id = current_user.id)
user_model.oid = oid
session.add(user_model)
@router.get("/{id}", response_model=UserEditor, summary=f"{PLACEHOLDER_PREFIX}user_detail_api", description=f"{PLACEHOLDER_PREFIX}user_detail_api")
@require_permissions(permission=SqlbotPermission(role=['admin']))
async def query(session: SessionDep, trans: Trans, id: int = Path(description=f"{PLACEHOLDER_PREFIX}uid")) -> UserEditor:
db_user: UserModel = get_db_user(session = session, user_id = id)
u_ws_options = await user_ws_options(session, id, trans)
result = UserEditor.model_validate(db_user.model_dump())
if u_ws_options:
result.oid_list = [item.id for item in u_ws_options]
return result
@router.post("", summary=f"{PLACEHOLDER_PREFIX}user_create_api", description=f"{PLACEHOLDER_PREFIX}user_create_api")
@require_permissions(permission=SqlbotPermission(role=['admin']))
@system_log(LogConfig(
operation_type=OperationType.CREATE,
module=OperationModules.USER,
result_id_expr="id"
))
async def user_create(session: SessionDep, creator: UserCreator, trans: Trans):
return await create(session=session, creator=creator, trans=trans)
async def create(session: SessionDep, creator: UserCreator, trans: Trans):
if check_account_exists(session=session, account=creator.account):
raise Exception(trans('i18n_exist', msg = f"{trans('i18n_user.account')} [{creator.account}]"))
if check_email_exists(session=session, email=creator.email):
raise Exception(trans('i18n_exist', msg = f"{trans('i18n_user.email')} [{creator.email}]"))
if not check_email_format(creator.email):
raise Exception(trans('i18n_format_invalid', key = f"{trans('i18n_user.email')} [{creator.email}]"))
#data = creator.model_dump(exclude_unset=True)
data = creator.model_dump()
user_model = UserModel.model_validate(data)
#user_model.create_time = get_timestamp()
user_model.language = "zh-CN"
user_model.oid = 0
if creator.oid_list:
# need to validate oid_list
db_model_list = [
UserWsModel.model_validate({
"oid": oid,
"uid": user_model.id,
"weight": 0
})
for oid in creator.oid_list
]
session.add_all(db_model_list)
user_model.oid = creator.oid_list[0]
session.add(user_model)
return user_model
@router.put("", summary=f"{PLACEHOLDER_PREFIX}user_update_api", description=f"{PLACEHOLDER_PREFIX}user_update_api")
@require_permissions(permission=SqlbotPermission(role=['admin']))
@clear_cache(namespace=CacheNamespace.AUTH_INFO, cacheName=CacheName.USER_INFO, keyExpression="editor.id")
@system_log(LogConfig(
operation_type=OperationType.UPDATE,
module=OperationModules.USER,
resource_id_expr="editor.id"
))
async def update(session: SessionDep, editor: UserEditor, trans: Trans):
user_model: UserModel = get_db_user(session = session, user_id = editor.id)
if not user_model:
raise Exception(f"User with id [{editor.id}] not found!")
if editor.account != user_model.account:
raise Exception(f"account cannot be changed!")
if editor.email != user_model.email and check_email_exists(session=session, email=editor.email):
raise Exception(trans('i18n_exist', msg = f"{trans('i18n_user.email')} [{editor.email}]"))
if not check_email_format(editor.email):
raise Exception(trans('i18n_format_invalid', key = f"{trans('i18n_user.email')} [{editor.email}]"))
origin_oid: int = user_model.oid
uws_list_stmt = select(UserWsModel).where(UserWsModel.uid == editor.id)
uws_list = session.exec(uws_list_stmt).all()
existing_oids = {uws.oid for uws in uws_list}
new_oid_set = set(editor.oid_list) if editor.oid_list else set()
oids_to_remove = existing_oids - new_oid_set
oids_to_add = new_oid_set - existing_oids
if oids_to_remove:
del_stmt = sqlmodel_delete(UserWsModel).where(UserWsModel.uid == editor.id, UserWsModel.oid.in_(oids_to_remove))
session.exec(del_stmt)
data = editor.model_dump(exclude_unset=True)
user_model.sqlmodel_update(data)
user_model.oid = 0
if editor.oid_list:
user_model.oid = origin_oid if origin_oid in editor.oid_list else editor.oid_list[0]
if oids_to_add:
db_uws_model_list = [
UserWsModel.model_validate({
"oid": oid,
"uid": user_model.id,
"weight": 0
})
for oid in oids_to_add
]
session.add_all(db_uws_model_list)
session.add(user_model)
@router.delete("/{id}", summary=f"{PLACEHOLDER_PREFIX}user_del_api", description=f"{PLACEHOLDER_PREFIX}user_del_api")
@require_permissions(permission=SqlbotPermission(role=['admin']))
@system_log(LogConfig(
operation_type=OperationType.DELETE,
module=OperationModules.USER,
resource_id_expr="id"
))
async def delete(session: SessionDep, id: int = Path(description=f"{PLACEHOLDER_PREFIX}uid")):
await single_delete(session, id)
@router.delete("", summary=f"{PLACEHOLDER_PREFIX}user_batchdel_api", description=f"{PLACEHOLDER_PREFIX}user_batchdel_api")
@require_permissions(permission=SqlbotPermission(role=['admin']))
@system_log(LogConfig(operation_type=OperationType.DELETE,module=OperationModules.USER,resource_id_expr="id_list"))
async def batch_del(session: SessionDep, id_list: list[int]):
for id in id_list:
await single_delete(session, id)
@router.put("/language", summary=f"{PLACEHOLDER_PREFIX}language_change", description=f"{PLACEHOLDER_PREFIX}language_change")
@clear_cache(namespace=CacheNamespace.AUTH_INFO, cacheName=CacheName.USER_INFO, keyExpression="current_user.id")
async def langChange(session: SessionDep, current_user: CurrentUser, trans: Trans, language: UserLanguage):
lang = language.language
if lang not in ["zh-CN", "en", "ko-KR"]:
raise Exception(trans('i18n_user.language_not_support', key = lang))
db_user: UserModel = get_db_user(session=session, user_id=current_user.id)
db_user.language = lang
session.add(db_user)
@router.patch("/pwd/{id}", summary=f"{PLACEHOLDER_PREFIX}reset_pwd", description=f"{PLACEHOLDER_PREFIX}reset_pwd")
@require_permissions(permission=SqlbotPermission(role=['admin']))
@clear_cache(namespace=CacheNamespace.AUTH_INFO, cacheName=CacheName.USER_INFO, keyExpression="id")
@system_log(LogConfig(operation_type=OperationType.RESET_PWD,module=OperationModules.USER,resource_id_expr="id"))
async def pwdReset(session: SessionDep, current_user: CurrentUser, trans: Trans, id: int = Path(description=f"{PLACEHOLDER_PREFIX}uid")):
if not current_user.isAdmin:
raise Exception(trans('i18n_permission.no_permission', url = " patch[/user/pwd/id],", msg = trans('i18n_permission.only_admin')))
db_user: UserModel = get_db_user(session=session, user_id=id)
db_user.password = default_md5_pwd()
session.add(db_user)
@router.put("/pwd", summary=f"{PLACEHOLDER_PREFIX}update_pwd", description=f"{PLACEHOLDER_PREFIX}update_pwd")
@clear_cache(namespace=CacheNamespace.AUTH_INFO, cacheName=CacheName.USER_INFO, keyExpression="current_user.id")
@system_log(LogConfig(operation_type=OperationType.UPDATE_PWD,module=OperationModules.USER,result_id_expr="id"))
async def pwdUpdate(session: SessionDep, current_user: CurrentUser, trans: Trans, editor: PwdEditor):
new_pwd = editor.new_pwd
if not check_pwd_format(new_pwd):
raise Exception(trans('i18n_format_invalid', key = trans('i18n_user.password')))
db_user: UserModel = get_db_user(session=session, user_id=current_user.id)
if not verify_md5pwd(editor.pwd, db_user.password):
raise Exception(trans('i18n_error', key = trans('i18n_user.password')))
db_user.password = md5pwd(new_pwd)
session.add(db_user)
return db_user
@router.patch("/status", summary=f"{PLACEHOLDER_PREFIX}update_status", description=f"{PLACEHOLDER_PREFIX}update_status")
@require_permissions(permission=SqlbotPermission(role=['admin']))
@clear_cache(namespace=CacheNamespace.AUTH_INFO, cacheName=CacheName.USER_INFO, keyExpression="statusDto.id")
@system_log(LogConfig(operation_type=OperationType.UPDATE_STATUS,module=OperationModules.USER, resource_id_expr="statusDto.id"))
async def statusChange(session: SessionDep, current_user: CurrentUser, trans: Trans, statusDto: UserStatus):
if not current_user.isAdmin:
raise Exception(trans('i18n_permission.no_permission', url = ", ", msg = trans('i18n_permission.only_admin')))
status = statusDto.status
if status not in [0, 1]:
return {"message": "status not supported"}
db_user: UserModel = get_db_user(session=session, user_id=statusDto.id)
db_user.status = status
session.add(db_user)