-
Notifications
You must be signed in to change notification settings - Fork 685
Expand file tree
/
Copy pathuser.py
More file actions
249 lines (226 loc) · 10.5 KB
/
user.py
File metadata and controls
249 lines (226 loc) · 10.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
from collections import defaultdict
from typing import Optional
from fastapi import APIRouter, Query
from sqlmodel import SQLModel, or_, select, delete as sqlmodel_delete
from apps.system.crud.user import check_account_exists, check_email_exists, check_email_format, check_pwd_format, get_db_user, single_delete, user_ws_options
from apps.system.models.system_model import UserWsModel, WorkspaceModel
from apps.system.models.user import UserModel
from apps.system.schemas.auth import CacheName, CacheNamespace
from apps.system.schemas.system_schema import PwdEditor, UserCreator, UserEditor, UserGrid, UserLanguage, UserStatus, UserWs
from common.core.deps import CurrentUser, SessionDep, Trans
from common.core.pagination import Paginator
from common.core.schemas import PaginatedResponse, PaginationParams
from common.core.security import default_md5_pwd, md5pwd, verify_md5pwd
from common.core.sqlbot_cache import clear_cache
from common.core.config import settings
router = APIRouter(tags=["user"], prefix="/user")
@router.get("/info")
async def user_info(current_user: CurrentUser):
return current_user
@router.get("/defaultPwd")
async def default_pwd() -> str:
return settings.DEFAULT_PWD
@router.get("/pager/{pageNum}/{pageSize}", response_model=PaginatedResponse[UserGrid])
async def pager(
session: SessionDep,
pageNum: int,
pageSize: int,
keyword: Optional[str] = Query(None, description="搜索关键字(可选)"),
status: Optional[int] = Query(None, description="状态"),
origins: Optional[list[int]] = Query(None, description="来源"),
oidlist: Optional[list[int]] = Query(None, description="空间ID集合(可选)"),
):
pagination = PaginationParams(page=pageNum, size=pageSize)
paginator = Paginator(session)
filters = {}
origin_stmt = (
select(UserModel.id, UserModel.account)
.join(UserWsModel, UserModel.id == UserWsModel.uid, isouter=True)
.where(UserModel.id != 1)
.distinct()
.order_by(UserModel.account)
)
if oidlist:
origin_stmt = origin_stmt.where(UserWsModel.oid.in_(oidlist))
if status is not None:
origin_stmt = origin_stmt.where(UserModel.status == status)
if keyword:
keyword_pattern = f"%{keyword}%"
origin_stmt = origin_stmt.where(
or_(
UserModel.account.ilike(keyword_pattern),
UserModel.name.ilike(keyword_pattern),
UserModel.email.ilike(keyword_pattern)
)
)
user_page = await paginator.get_paginated_response(
stmt=origin_stmt,
pagination=pagination,
**filters)
uid_list = [item.get('id') for item in user_page.items]
if not uid_list:
return user_page
stmt = (
select(UserModel, UserWsModel.oid.label('ws_oid'))
.join(UserWsModel, UserModel.id == UserWsModel.uid, isouter=True)
.where(UserModel.id.in_(uid_list))
.order_by(UserModel.account, UserModel.create_time)
)
user_workspaces = session.exec(stmt).all()
merged = defaultdict(list)
extra_attrs = {}
for (user, ws_oid) in user_workspaces:
item = {}
item.update(user.model_dump())
user_id = item['id']
merged[user_id].append(ws_oid)
if user_id not in extra_attrs:
extra_attrs[user_id] = {k: v for k, v in item.items() if k != "ws_oid"}
# 组合结果
result = [
{**extra_attrs[user_id], "oid_list": list(filter(None, oid_list))}
for user_id, oid_list in merged.items()
]
user_page.items = result
return user_page
def format_user_dict(row) -> dict:
result_dict = {}
for item, key in zip(row, row._fields):
if isinstance(item, SQLModel):
result_dict.update(item.model_dump())
else:
result_dict[key] = item
return result_dict
@router.get("/ws")
async def ws_options(session: SessionDep, current_user: CurrentUser, trans: Trans) -> list[UserWs]:
return await user_ws_options(session, current_user.id, trans)
@router.put("/ws/{oid}")
@clear_cache(namespace=CacheNamespace.AUTH_INFO, cacheName=CacheName.USER_INFO, keyExpression="current_user.id")
async def ws_change(session: SessionDep, current_user: CurrentUser, trans:Trans, oid: int):
ws_list: list[UserWs] = await user_ws_options(session, current_user.id)
if not any(x.id == oid for x in ws_list):
db_ws = session.get(WorkspaceModel, oid)
if db_ws:
raise Exception(trans('i18n_user.ws_miss', ws = db_ws.name))
raise Exception(trans('i18n_not_exist', msg = f"{trans('i18n_ws.title')}[{oid}]"))
user_model: UserModel = get_db_user(session = session, user_id = current_user.id)
user_model.oid = oid
session.add(user_model)
session.commit()
@router.get("/{id}", response_model=UserEditor)
async def query(session: SessionDep, trans: Trans, id: int) -> UserEditor:
db_user: UserModel = get_db_user(session = session, user_id = id)
u_ws_options = await user_ws_options(session, id, trans)
result = UserEditor.model_validate(db_user.model_dump())
if u_ws_options:
result.oid_list = [item.id for item in u_ws_options]
return result
@router.post("")
async def create(session: SessionDep, creator: UserCreator, trans: Trans):
if check_account_exists(session=session, account=creator.account):
raise Exception(trans('i18n_exist', msg = f"{trans('i18n_user.account')} [{creator.account}]"))
if check_email_exists(session=session, email=creator.email):
raise Exception(trans('i18n_exist', msg = f"{trans('i18n_user.email')} [{creator.email}]"))
if not check_email_format(creator.email):
raise Exception(trans('i18n_format_invalid', key = f"{trans('i18n_user.email')} [{creator.email}]"))
data = creator.model_dump(exclude_unset=True)
user_model = UserModel.model_validate(data)
#user_model.create_time = get_timestamp()
user_model.language = "zh-CN"
user_model.oid = 0
if creator.oid_list:
# need to validate oid_list
db_model_list = [
UserWsModel.model_validate({
"oid": oid,
"uid": user_model.id,
"weight": 0
})
for oid in creator.oid_list
]
session.add_all(db_model_list)
user_model.oid = creator.oid_list[0]
session.add(user_model)
session.commit()
@router.put("")
@clear_cache(namespace=CacheNamespace.AUTH_INFO, cacheName=CacheName.USER_INFO, keyExpression="editor.id")
async def update(session: SessionDep, editor: UserEditor, trans: Trans):
user_model: UserModel = get_db_user(session = session, user_id = editor.id)
if not user_model:
raise Exception(f"User with id [{editor.id}] not found!")
if editor.account != user_model.account:
raise Exception(f"account cannot be changed!")
if editor.email != user_model.email and check_email_exists(session=session, email=editor.email):
raise Exception(trans('i18n_exist', msg = f"{trans('i18n_user.email')} [{editor.email}]"))
if not check_email_format(editor.email):
raise Exception(trans('i18n_format_invalid', key = f"{trans('i18n_user.email')} [{editor.email}]"))
origin_oid: int = user_model.oid
del_stmt = sqlmodel_delete(UserWsModel).where(UserWsModel.uid == editor.id)
session.exec(del_stmt)
data = editor.model_dump(exclude_unset=True)
user_model.sqlmodel_update(data)
user_model.oid = 0
if editor.oid_list:
# need to validate oid_list
db_model_list = [
UserWsModel.model_validate({
"oid": oid,
"uid": user_model.id,
"weight": 0
})
for oid in editor.oid_list
]
session.add_all(db_model_list)
user_model.oid = origin_oid if origin_oid in editor.oid_list else editor.oid_list[0]
session.add(user_model)
session.commit()
@router.delete("/{id}")
async def delete(session: SessionDep, id: int):
await single_delete(session, id)
@router.delete("")
async def batch_del(session: SessionDep, id_list: list[int]):
for id in id_list:
await single_delete(session, id)
@router.put("/language")
@clear_cache(namespace=CacheNamespace.AUTH_INFO, cacheName=CacheName.USER_INFO, keyExpression="current_user.id")
async def langChange(session: SessionDep, current_user: CurrentUser, trans: Trans, language: UserLanguage):
lang = language.language
if lang not in ["zh-CN", "en", "ko-KR"]:
raise Exception(trans('i18n_user.language_not_support', key = lang))
db_user: UserModel = get_db_user(session=session, user_id=current_user.id)
db_user.language = lang
session.add(db_user)
session.commit()
@router.patch("/pwd/{id}")
@clear_cache(namespace=CacheNamespace.AUTH_INFO, cacheName=CacheName.USER_INFO, keyExpression="id")
async def pwdReset(session: SessionDep, current_user: CurrentUser, trans: Trans, id: int):
if not current_user.isAdmin:
raise Exception(trans('i18n_permission.no_permission', url = " patch[/user/pwd/id],", msg = trans('i18n_permission.only_admin')))
db_user: UserModel = get_db_user(session=session, user_id=id)
db_user.password = default_md5_pwd()
session.add(db_user)
session.commit()
@router.put("/pwd")
@clear_cache(namespace=CacheNamespace.AUTH_INFO, cacheName=CacheName.USER_INFO, keyExpression="current_user.id")
async def pwdUpdate(session: SessionDep, current_user: CurrentUser, trans: Trans, editor: PwdEditor):
new_pwd = editor.new_pwd
if not check_pwd_format(new_pwd):
raise Exception(trans('i18n_format_invalid', key = trans('i18n_user.password')))
db_user: UserModel = get_db_user(session=session, user_id=current_user.id)
if not verify_md5pwd(editor.pwd, db_user.password):
raise Exception(trans('i18n_error', key = trans('i18n_user.password')))
db_user.password = md5pwd(new_pwd)
session.add(db_user)
session.commit()
@router.patch("/status")
@clear_cache(namespace=CacheNamespace.AUTH_INFO, cacheName=CacheName.USER_INFO, keyExpression="statusDto.id")
async def langChange(session: SessionDep, current_user: CurrentUser, trans: Trans, statusDto: UserStatus):
if not current_user.isAdmin:
raise Exception(trans('i18n_permission.no_permission', url = ", ", msg = trans('i18n_permission.only_admin')))
status = statusDto.status
if status not in [0, 1]:
return {"message": "status not supported"}
db_user: UserModel = get_db_user(session=session, user_id=statusDto.id)
db_user.status = status
session.add(db_user)
session.commit()