-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathauth.py
More file actions
140 lines (108 loc) · 4.5 KB
/
Copy pathauth.py
File metadata and controls
140 lines (108 loc) · 4.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
"""Authentication for robot-gateway."""
from __future__ import annotations
import os
from fastapi import Header, HTTPException, status
from google.auth.transport import requests
from google.oauth2 import id_token
from config import ORCHESTRATOR_API_KEY
from observability.logger import get_runtime_logger
logger = get_runtime_logger(__name__)
def _env_flag(name: str) -> bool:
return os.environ.get(name, "").strip().lower() in {"1", "true", "yes", "on"}
DEV_BYPASS_AUTH = _env_flag("DEV_BYPASS_AUTH")
DEV_USER_EMAIL = os.environ.get("DEV_USER_EMAIL", "").strip()
def _split_csv(value: str) -> list[str]:
return [item.strip() for item in value.split(",") if item.strip()]
def get_google_client_ids() -> list[str]:
return list(dict.fromkeys(_split_csv(os.environ.get("GOOGLE_CLIENT_IDS", ""))))
GOOGLE_CLIENT_IDS = [] if DEV_BYPASS_AUTH else get_google_client_ids()
if not GOOGLE_CLIENT_IDS and not DEV_BYPASS_AUTH:
raise ValueError("GOOGLE_CLIENT_IDS is not set")
if DEV_BYPASS_AUTH and not DEV_USER_EMAIL:
raise ValueError("DEV_USER_EMAIL is required when DEV_BYPASS_AUTH is enabled")
def get_allowed_users() -> set[str]:
allowlist = os.environ.get("ALLOWED_USERS", "").strip()
if not allowlist:
raise ValueError("ALLOWED_USERS must be configured and non-empty unless DEV_BYPASS_AUTH is enabled")
users = {email.strip() for email in allowlist.split(",") if email.strip()}
if not users:
raise ValueError("ALLOWED_USERS must contain at least one email address")
return users
ALLOWED_USERS = set() if DEV_BYPASS_AUTH else get_allowed_users()
def require_service_api_key(
x_service_api_key: str = Header(default="", alias="x-service-api-key"),
) -> None:
"""Validate internal service API key for service-to-service endpoints."""
if not ORCHESTRATOR_API_KEY:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Service API key is not configured",
)
if x_service_api_key != ORCHESTRATOR_API_KEY:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid service API key",
)
def verify_google_token(token: str) -> dict:
try:
last_error: ValueError | None = None
request = requests.Request()
for client_id in GOOGLE_CLIENT_IDS:
try:
return id_token.verify_oauth2_token(token, request, client_id)
except ValueError as exc:
last_error = exc
raise last_error or ValueError("No Google OAuth client IDs configured")
except ValueError as exc:
logger.warning(
"Invalid authentication token",
extra={
"error": str(exc),
"configured_google_client_ids": len(GOOGLE_CLIENT_IDS),
},
)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Invalid authentication token: {exc!s}",
) from exc
except Exception as exc:
logger.exception("Authentication failed")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Authentication failed: {exc!s}",
) from exc
def check_user_allowed(email: str) -> None:
if email not in ALLOWED_USERS:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Access denied. Your account is not authorized.",
)
async def get_current_user(authorization: str | None = Header(None)) -> dict:
if DEV_BYPASS_AUTH:
return {
"email": DEV_USER_EMAIL,
"user_email": DEV_USER_EMAIL,
"name": DEV_USER_EMAIL.split("@")[0],
}
if not authorization:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Missing authorization header",
headers={"WWW-Authenticate": "Bearer"},
)
parts = authorization.split()
if len(parts) != 2 or parts[0].lower() != "bearer":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authorization header format. Expected: Bearer <token>",
headers={"WWW-Authenticate": "Bearer"},
)
user_info = verify_google_token(parts[1])
email = user_info.get("email")
if not email:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token does not contain email",
)
check_user_allowed(email)
return user_info