Skip to content

Commit f3873e9

Browse files
authored
feat: add role management helper methods (#114)
1 parent c8e8b2d commit f3873e9

2 files changed

Lines changed: 164 additions & 0 deletions

File tree

src/casdoor/async_main.py

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -462,3 +462,107 @@ async def update_user(self, user: User) -> Dict:
462462
async def delete_user(self, user: User) -> Dict:
463463
response = await self.modify_user("delete-user", user)
464464
return response
465+
466+
async def get_roles(self) -> List[Dict]:
467+
"""
468+
Get all roles from Casdoor for the configured organization.
469+
470+
:return: list of role dicts
471+
"""
472+
path = "/api/get-roles"
473+
params = {"owner": self.org_name}
474+
async with self._session as session:
475+
response = await session.get(path, headers=self.headers, params=params)
476+
if response.get("status") != "ok":
477+
raise Exception(response.get("msg", "Failed to get roles"))
478+
return response.get("data", [])
479+
480+
async def get_role(self, role_name: str) -> Dict:
481+
"""
482+
Get a specific role from Casdoor.
483+
484+
:param role_name: the name of the role
485+
:return: role dict
486+
"""
487+
path = "/api/get-role"
488+
params = {"id": f"{self.org_name}/{role_name}"}
489+
async with self._session as session:
490+
response = await session.get(path, headers=self.headers, params=params)
491+
if response.get("status") != "ok":
492+
raise Exception(response.get("msg", f"Role {role_name} not found"))
493+
return response.get("data")
494+
495+
async def update_role(self, role: Dict) -> Dict:
496+
"""
497+
Update a role in Casdoor.
498+
499+
:param role: role dict with updated data
500+
:return: response dict with status
501+
"""
502+
path = "/api/update-role"
503+
params = {"id": f"{role['owner']}/{role['name']}"}
504+
async with self._session as session:
505+
response = await session.post(path, params=params, headers=self.headers, json=role)
506+
if response.get("status") != "ok":
507+
raise Exception(response.get("msg", "Failed to update role"))
508+
return response
509+
510+
async def assign_role_to_user(self, username: str, role_name: str) -> Dict:
511+
"""
512+
Assign a role to a user by adding the user to the role's users list.
513+
514+
:param username: the username to assign the role to
515+
:param role_name: the name of the role to assign
516+
:return: response dict with status
517+
"""
518+
user_id = f"{self.org_name}/{username}"
519+
520+
role = await self.get_role(role_name)
521+
if not role:
522+
raise Exception(f"Role {role_name} not found")
523+
524+
if not role.get("users"):
525+
role["users"] = []
526+
527+
if user_id not in role["users"]:
528+
role["users"].append(user_id)
529+
return await self.update_role(role)
530+
531+
return {"status": "ok", "msg": "User already has this role"}
532+
533+
async def remove_role_from_user(self, username: str, role_name: str) -> Dict:
534+
"""
535+
Remove a role from a user by removing the user from the role's users list.
536+
537+
:param username: the username to remove the role from
538+
:param role_name: the name of the role to remove
539+
:return: response dict with status
540+
"""
541+
user_id = f"{self.org_name}/{username}"
542+
543+
role = await self.get_role(role_name)
544+
if not role:
545+
raise Exception(f"Role {role_name} not found")
546+
547+
if role.get("users") and user_id in role["users"]:
548+
role["users"].remove(user_id)
549+
return await self.update_role(role)
550+
551+
return {"status": "ok", "msg": "User does not have this role"}
552+
553+
async def get_user_roles(self, username: str) -> List[Dict]:
554+
"""
555+
Get all roles assigned to a user.
556+
557+
:param username: the username to get roles for
558+
:return: list of role dicts assigned to the user
559+
"""
560+
user_id = f"{self.org_name}/{username}"
561+
all_roles = await self.get_roles()
562+
563+
user_roles = []
564+
for role in all_roles:
565+
if role.get("users") and user_id in role["users"]:
566+
user_roles.append(role)
567+
568+
return user_roles

src/casdoor/role.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,3 +125,63 @@ def update_role(self, role: Role) -> Dict:
125125
def delete_role(self, role: Role) -> Dict:
126126
response = self.modify_role("delete-role", role)
127127
return response
128+
129+
def assign_role_to_user(self, username: str, role_name: str) -> Dict:
130+
"""
131+
Assign a role to a user by adding the user to the role's users list.
132+
133+
:param username: the username to assign the role to
134+
:param role_name: the name of the role to assign
135+
:return: response dict with status
136+
"""
137+
user_id = f"{self.org_name}/{username}"
138+
139+
role = self.get_role(role_name)
140+
if role is None:
141+
raise Exception(f"Role {role_name} not found")
142+
143+
if not hasattr(role, "users") or role.users is None:
144+
role.users = []
145+
146+
if user_id not in role.users:
147+
role.users.append(user_id)
148+
return self.update_role(role)
149+
150+
return {"status": "ok", "msg": "User already has this role"}
151+
152+
def remove_role_from_user(self, username: str, role_name: str) -> Dict:
153+
"""
154+
Remove a role from a user by removing the user from the role's users list.
155+
156+
:param username: the username to remove the role from
157+
:param role_name: the name of the role to remove
158+
:return: response dict with status
159+
"""
160+
user_id = f"{self.org_name}/{username}"
161+
162+
role = self.get_role(role_name)
163+
if role is None:
164+
raise Exception(f"Role {role_name} not found")
165+
166+
if hasattr(role, "users") and role.users and user_id in role.users:
167+
role.users.remove(user_id)
168+
return self.update_role(role)
169+
170+
return {"status": "ok", "msg": "User does not have this role"}
171+
172+
def get_user_roles(self, username: str) -> List[Role]:
173+
"""
174+
Get all roles assigned to a user.
175+
176+
:param username: the username to get roles for
177+
:return: list of Role objects assigned to the user
178+
"""
179+
user_id = f"{self.org_name}/{username}"
180+
all_roles = self.get_roles()
181+
182+
user_roles = []
183+
for role in all_roles:
184+
if hasattr(role, "users") and role.users and user_id in role.users:
185+
user_roles.append(role)
186+
187+
return user_roles

0 commit comments

Comments
 (0)