-
Notifications
You must be signed in to change notification settings - Fork 686
Expand file tree
/
Copy pathworkspace.py
More file actions
236 lines (209 loc) · 9.05 KB
/
workspace.py
File metadata and controls
236 lines (209 loc) · 9.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
from typing import Optional
from fastapi import APIRouter, HTTPException, Query
from sqlmodel import exists, or_, select, delete as sqlmodel_delete, update as sqlmodel_update
from apps.system.crud.user import clean_user_cache
from apps.system.crud.workspace import reset_single_user_oid, reset_user_oid
from apps.system.models.system_model import UserWsModel, WorkspaceBase, WorkspaceEditor, WorkspaceModel
from apps.system.models.user import UserModel
from apps.system.schemas.system_schema import UserWsBase, UserWsDTO, UserWsEditor, UserWsOption, WorkspaceUser
from common.core.deps import CurrentUser, SessionDep, Trans
from common.core.pagination import Paginator
from common.core.schemas import PaginatedResponse, PaginationParams
from common.utils.time import get_timestamp
router = APIRouter(tags=["system/workspace"], prefix="/system/workspace")
@router.get("/uws/option/pager/{pageNum}/{pageSize}", response_model=PaginatedResponse[UserWsOption])
async def option_pager(
session: SessionDep,
current_user: CurrentUser,
trans: Trans,
pageNum: int,
pageSize: int,
oid: int = Query(description="空间ID"),
keyword: Optional[str] = Query(None, description="搜索关键字(可选)"),
):
if not current_user.isAdmin:
raise Exception(trans('i18n_permission.no_permission', url = ", ", msg = trans('i18n_permission.only_admin')))
if not oid:
raise Exception(trans('i18n_miss_args', key = '[oid]'))
pagination = PaginationParams(page=pageNum, size=pageSize)
paginator = Paginator(session)
stmt = select(UserModel.id, UserModel.account, UserModel.name).where(
~exists().where(UserWsModel.uid == UserModel.id, UserWsModel.oid == oid),
UserModel.id != 1
).order_by(UserModel.account, UserModel.create_time)
if keyword:
keyword_pattern = f"%{keyword}%"
stmt = stmt.where(
or_(
UserModel.account.ilike(keyword_pattern),
UserModel.name.ilike(keyword_pattern),
)
)
return await paginator.get_paginated_response(
stmt=stmt,
pagination=pagination,
)
@router.get("/uws/option", response_model=UserWsOption | None)
async def option_user(
session: SessionDep,
current_user: CurrentUser,
trans: Trans,
keyword: str = Query(description="搜索关键字")
):
if not keyword:
raise Exception(trans('i18n_miss_args', key = '[keyword]'))
if (not current_user.isAdmin) and current_user.weight == 0:
raise Exception(trans('i18n_permission.no_permission', url = '', msg = ''))
oid = current_user.oid
stmt = select(UserModel.id, UserModel.account, UserModel.name).where(
~exists().where(UserWsModel.uid == UserModel.id, UserWsModel.oid == oid),
UserModel.id != 1
)
if keyword:
stmt = stmt.where(
or_(
UserModel.account == keyword,
UserModel.name == keyword,
)
)
return session.exec(stmt).first()
@router.get("/uws/pager/{pageNum}/{pageSize}", response_model=PaginatedResponse[WorkspaceUser])
async def pager(
session: SessionDep,
current_user: CurrentUser,
trans: Trans,
pageNum: int,
pageSize: int,
keyword: Optional[str] = Query(None, description="搜索关键字(可选)"),
oid: Optional[int] = Query(None, description="空间ID(仅admin用户生效)"),
):
if not current_user.isAdmin and current_user.weight == 0:
raise Exception(trans('i18n_permission.no_permission', url = '', msg = ''))
if current_user.isAdmin:
workspace_id = oid if oid else current_user.oid
else:
workspace_id = current_user.oid
pagination = PaginationParams(page=pageNum, size=pageSize)
paginator = Paginator(session)
stmt = select(UserModel.id, UserModel.account, UserModel.name, UserModel.email, UserModel.status, UserModel.create_time, UserModel.oid, UserWsModel.weight).join(
UserWsModel, UserModel.id == UserWsModel.uid
).where(
UserWsModel.oid == workspace_id,
UserModel.id != 1
).order_by(UserModel.account, UserModel.create_time)
if keyword:
keyword_pattern = f"%{keyword}%"
stmt = stmt.where(
or_(
UserModel.account.ilike(keyword_pattern),
UserModel.name.ilike(keyword_pattern),
UserModel.email.ilike(keyword_pattern)
)
)
return await paginator.get_paginated_response(
stmt=stmt,
pagination=pagination,
)
@router.post("/uws")
async def create(session: SessionDep, current_user: CurrentUser, trans: Trans, creator: UserWsDTO):
if not current_user.isAdmin and current_user.weight == 0:
raise Exception(trans('i18n_permission.no_permission', url = '', msg = ''))
oid: int = creator.oid if (current_user.isAdmin and creator.oid) else current_user.oid
weight = creator.weight if (current_user.isAdmin and creator.weight) else 0
# 判断uid_list以及oid合法性
db_model_list = [
UserWsModel.model_validate({
"oid": oid,
"uid": uid,
"weight": weight
})
for uid in creator.uid_list
]
for uid in creator.uid_list:
await reset_single_user_oid(session, uid, oid)
await clean_user_cache(uid)
session.add_all(db_model_list)
session.commit()
@router.put("/uws")
async def edit(session: SessionDep, trans: Trans, editor: UserWsEditor):
if not editor.oid or not editor.uid:
raise Exception(trans('i18n_miss_args', key = '[oid, uid]'))
db_model = session.exec(select(UserWsModel).where(UserWsModel.uid == editor.uid, UserWsModel.oid == editor.oid)).first()
if not db_model:
raise HTTPException("uws not exist")
if editor.weight == db_model.weight:
return
db_model.weight = editor.weight
session.add(db_model)
await clean_user_cache(editor.uid)
session.commit()
@router.delete("/uws")
async def delete(session: SessionDep, current_user: CurrentUser, trans: Trans, dto: UserWsBase):
if not current_user.isAdmin and current_user.weight == 0:
raise Exception(trans('i18n_permission.no_permission', url = '', msg = ''))
oid: int = dto.oid if (current_user.isAdmin and dto.oid) else current_user.oid
db_model_list: list[UserWsModel] = session.exec(select(UserWsModel).where(UserWsModel.uid.in_(dto.uid_list), UserWsModel.oid == oid)).all()
if not db_model_list:
raise HTTPException(f"UserWsModel not found")
for db_model in db_model_list:
session.delete(db_model)
for uid in dto.uid_list:
await reset_single_user_oid(session, uid, oid, False)
await clean_user_cache(uid)
session.commit()
@router.get("", response_model=list[WorkspaceModel])
async def query(session: SessionDep, trans: Trans):
list_result = session.exec(select(WorkspaceModel)).all()
for ws in list_result:
if ws.name.startswith('i18n'):
ws.name = trans(ws.name)
list_result.sort(key=lambda x: x.name)
return list_result
@router.post("")
async def add(session: SessionDep, creator: WorkspaceBase):
db_model = WorkspaceModel.model_validate(creator)
db_model.create_time = get_timestamp()
session.add(db_model)
session.commit()
@router.put("")
async def update(session: SessionDep, editor: WorkspaceEditor):
id = editor.id
db_model = session.get(WorkspaceModel, id)
if not db_model:
raise HTTPException(f"WorkspaceModel with id {id} not found")
db_model.name = editor.name
session.add(db_model)
session.commit()
@router.get("/{id}", response_model=WorkspaceModel)
async def get_one(session: SessionDep, trans: Trans, id: int):
db_model = session.get(WorkspaceModel, id)
if not db_model:
raise HTTPException(f"WorkspaceModel with id {id} not found")
if db_model.name.startswith('i18n'):
db_model.name = trans(db_model.name)
return db_model
@router.delete("/{id}")
async def single_delete(session: SessionDep, current_user: CurrentUser, id: int):
if not current_user.isAdmin:
raise HTTPException("only admin can delete workspace")
if id == 1:
raise HTTPException(f"Can not delete default workspace")
db_model = session.get(WorkspaceModel, id)
if not db_model:
raise HTTPException(f"WorkspaceModel with id {id} not found")
if current_user.oid == id:
current_user.oid = 1 # reset to default workspace
update_stmt = sqlmodel_update(UserModel).where(UserModel.id == current_user.id).values(oid=1)
session.exec(update_stmt)
await clean_user_cache(current_user.id)
user_ws_list = session.exec(select(UserWsModel).where(UserWsModel.oid == id)).all()
if user_ws_list:
# clean user cache
for user_ws in user_ws_list:
await clean_user_cache(user_ws.uid)
# reset user default oid
await reset_user_oid(session, id)
# delete user_ws
session.exec(sqlmodel_delete(UserWsModel).where(UserWsModel.oid == id))
session.delete(db_model)
session.commit()