-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathlogin.py
More file actions
132 lines (93 loc) · 3.71 KB
/
login.py
File metadata and controls
132 lines (93 loc) · 3.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import logging
import time
from typing import Any
import httpx
import typer
from pydantic import BaseModel
from fastapi_cloud_cli.config import Settings
from fastapi_cloud_cli.context import ctx
from fastapi_cloud_cli.utils.api import APIClient
from fastapi_cloud_cli.utils.auth import AuthConfig, write_auth_config
from fastapi_cloud_cli.utils.cli import get_rich_toolkit, handle_http_errors
logger = logging.getLogger(__name__)
class AuthorizationData(BaseModel):
user_code: str
device_code: str
verification_uri: str
verification_uri_complete: str
interval: int = 5
class TokenResponse(BaseModel):
access_token: str
def _start_device_authorization(
client: httpx.Client,
) -> AuthorizationData:
settings = Settings.get()
response = client.post(
"/login/device/authorization", data={"client_id": settings.client_id}
)
response.raise_for_status()
return AuthorizationData.model_validate_json(response.text)
def _fetch_access_token(client: httpx.Client, device_code: str, interval: int) -> str:
settings = Settings.get()
while True:
response = client.post(
"/login/device/token",
data={
"device_code": device_code,
"client_id": settings.client_id,
"grant_type": "urn:ietf:params:oauth:grant-type:device_code",
},
)
if response.status_code not in (200, 400):
response.raise_for_status()
if response.status_code == 400:
data = response.json()
if data.get("error") != "authorization_pending":
response.raise_for_status()
if response.status_code == 200:
break
time.sleep(interval)
response_data = TokenResponse.model_validate_json(response.text)
return response_data.access_token
def login() -> Any:
"""
Login to FastAPI Cloud. 🚀
"""
# Duplicate context initialization here to make `fastapi login` command work
# (callback doesn't take effect in this case)
ctx.initialize()
identity = ctx.get_identity()
if identity.is_logged_in():
with get_rich_toolkit(minimal=True) as toolkit:
toolkit.print("You are already logged in.")
toolkit.print(
"Run [bold]fastapi cloud logout[/bold] first if you want to switch accounts."
)
return
if identity.deploy_token is not None:
with get_rich_toolkit() as toolkit:
toolkit.print(
(
"You have [bold blue]FASTAPI_CLOUD_TOKEN[/] environment variable set.\n"
+ "This token will take precedence over the user token for "
+ "[blue]`fastapi deploy`[/] command."
),
tag="Warning",
)
with get_rich_toolkit() as toolkit, APIClient() as client:
toolkit.print_title("Login to FastAPI Cloud", tag="FastAPI")
toolkit.print_line()
with toolkit.progress("Starting authorization") as progress:
with handle_http_errors(progress):
authorization_data = _start_device_authorization(client)
url = authorization_data.verification_uri_complete
progress.log(f"Opening [link={url}]{url}[/link]")
toolkit.print_line()
with toolkit.progress("Waiting for user to authorize...") as progress:
typer.launch(url)
with handle_http_errors(progress):
access_token = _fetch_access_token(
client, authorization_data.device_code, authorization_data.interval
)
write_auth_config(AuthConfig(access_token=access_token))
progress.log("Now you are logged in! 🚀")