-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
216 lines (173 loc) · 6.37 KB
/
main.py
File metadata and controls
216 lines (173 loc) · 6.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
from fastapi import FastAPI, Request, Depends, HTTPException
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
from pydantic_settings import BaseSettings
from authlib.integrations.starlette_client import OAuth
from starlette.middleware.sessions import SessionMiddleware
import secrets
import jwt
import httpx
from typing import Optional
from async_lru import alru_cache
class Settings(BaseSettings):
google_client_id: str
google_client_secret: str
secret_key: str = secrets.token_urlsafe(32)
class Config:
env_file = ".env"
class User(BaseModel):
id: str
email: str
name: str
picture: str | None = None
settings = Settings()
app = FastAPI(title="FastAPI Google OAuth Example")
app.add_middleware(SessionMiddleware, secret_key=settings.secret_key)
oauth = OAuth()
oauth.register(
"google",
client_id=settings.google_client_id,
client_secret=settings.google_client_secret,
server_metadata_url="https://accounts.google.com/.well-known/openid-configuration",
client_kwargs={"scope": "openid email profile"},
)
security = HTTPBearer(auto_error=False)
@alru_cache(maxsize=1)
async def get_google_public_keys():
try:
async with httpx.AsyncClient() as client:
response = await client.get("https://www.googleapis.com/oauth2/v3/certs")
response.raise_for_status()
return response.json()
except httpx.RequestError:
return None
async def verify_google_jwt(token: str) -> Optional[dict]:
try:
google_keys = await get_google_public_keys()
if not google_keys:
return None
header = jwt.get_unverified_header(token)
kid = header.get("kid")
# Find the matching public key for this JWT token
# JWT tokens contain a "kid" (key ID) in their header that tells us which
# of Google's public keys was used to sign this specific token.
# Google rotates keys periodically, so they publish multiple active keys.
# We need to find the exact key that matches this token's "kid" to verify its signature.
key_data = None
for key in google_keys.get("keys", []):
if key.get("kid") == kid:
key_data = key
break
if not key_data:
return None
from jwt.algorithms import RSAAlgorithm
public_key = RSAAlgorithm.from_jwk(key_data)
payload = jwt.decode(
token,
public_key,
algorithms=["RS256"],
audience=settings.google_client_id,
issuer="https://accounts.google.com",
)
return {
"id": payload.get("sub"),
"email": payload.get("email"),
"name": payload.get("name"),
"picture": payload.get("picture"),
}
except jwt.InvalidTokenError:
return None
except Exception:
return None
async def get_current_user(
request: Request,
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
) -> User | None:
# Try JWT token first (for programmatic access)
if credentials:
user_data = await verify_google_jwt(credentials.credentials)
if user_data:
return User(**user_data)
# Fall back to session (for browser access)
user_data = request.session.get("user")
if user_data:
return User(**user_data)
return None
async def require_auth(
request: Request,
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
) -> User:
user = await get_current_user(request, credentials)
if not user:
raise HTTPException(status_code=401, detail="Not authenticated")
return user
@app.get("/", response_class=HTMLResponse, include_in_schema=False)
async def home(request: Request):
user = await get_current_user(request, None)
return HTMLResponse(f"""
<!DOCTYPE html>
<html>
<head>
<title>FastAPI Google OAuth</title>
<style>
body {{ font-family: Arial, sans-serif; max-width: 800px; margin: 0 auto; padding: 20px; }}
.user-card {{ background: #f5f5f5; padding: 20px; border-radius: 8px; margin: 20px 0; }}
.btn {{ background: #4285f4; color: white; padding: 10px 20px; text-decoration: none; border-radius: 4px; display: inline-block; }}
.btn:hover {{ background: #357ae8; }}
.logout-btn {{ background: #dc3545; }}
.logout-btn:hover {{ background: #c82333; }}
</style>
</head>
<body>
<h1>FastAPI Google OAuth Demo</h1>
{
f'''
<div class="user-card">
<h2>Welcome, {user.name}!</h2>
<p><strong>Email:</strong> {user.email}</p>
<p><strong>User ID:</strong> {user.id}</p>
{f'<img src="{user.picture}" alt="Profile Picture" style="width: 100px; height: 100px; border-radius: 50%;">' if user.picture else ""}
<br><br>
<a href="/logout" class="btn logout-btn">Logout</a>
</div>
'''
if user
else '''
<p>You are not logged in.</p>
<a href="/login" class="btn">Login with Google</a>
'''
}
</body>
</html>
""")
@app.get("/login", include_in_schema=False)
async def login_via_google(request: Request):
redirect_uri = request.url_for("auth_via_google")
return await oauth.google.authorize_redirect(request, redirect_uri)
@app.get("/auth/google", include_in_schema=False)
async def auth_via_google(request: Request):
token = await oauth.google.authorize_access_token(request)
user_info = token["userinfo"]
if user_info:
user_data = {
"id": user_info["sub"],
"email": user_info["email"],
"name": user_info["name"],
"picture": user_info.get("picture"),
}
request.session["user"] = user_data
return RedirectResponse(url="/")
@app.get("/logout", include_in_schema=False)
async def logout(request: Request):
request.session.clear()
return RedirectResponse(url="/")
@app.post("/echo")
async def echo(message: str):
return message
@app.get("/profile")
async def profile(request: Request, user: User = Depends(require_auth)):
return user
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)