22from sqladmin .authentication import AuthenticationBackend
33
44from auth_backend .settings import get_settings
5-
5+ from auth_backend .utils .security import UnionAuth
6+ from auth_lib .methods import AuthLib
67
78settings = get_settings ()
89
9-
1010class AdminAuth (AuthenticationBackend ):
11+
1112 async def login (self , request : Request ) -> bool :
1213 form = await request .form ()
1314 username = form .get ("username" )
14- password = form .get ("password" )
15- if username == settings .ADMIN_LOGIN and password == settings .ADMIN_PASSWORD :
16- request .session ["user" ] = username
15+ token = form .get ("password" )
16+ if username != settings .ADMIN_LOGIN :
17+ return False
18+ valid = await self ._is_valid_token (token )
19+ if valid :
20+ request .session ["token" ] = token
1721 return True
18- return False
22+ else :
23+ return False
24+
25+ async def authenticate (self , request : Request ) -> bool :
26+ token = request .session .get ("token" )
27+ if not token :
28+ return False
29+ return await self ._is_valid_token (token )
1930
2031 async def logout (self , request : Request ) -> bool :
2132 request .session .clear ()
2233 return True
2334
24- async def authenticate (self , request : Request ) -> bool :
25- user = request .session .get ("user" )
26- return user is not None
35+ @staticmethod
36+ async def _is_valid_token (token : str ) -> bool :
37+ try :
38+ result = AuthLib (auth_url = settings .AUTH_URL ).check_token (token )
39+ if not result :
40+ return False
41+ session_scopes = {
42+ scope ["name" ].lower () for scope in result .get ("session_scopes" , [])
43+ }
44+ required_scopes = "auth.sqladmin.admin"
45+ if required_scopes not in session_scopes :
46+ return False
47+ return True
48+ except Exception :
49+ return False
0 commit comments