Skip to content

Commit c3e0f45

Browse files
committed
✨ Add support for FASTAPI_CLOUD_TOKEN environment variable auth
1 parent 1aa4e3e commit c3e0f45

File tree

6 files changed

+155
-22
lines changed

6 files changed

+155
-22
lines changed

src/fastapi_cloud_cli/commands/whoami.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@
1414
def whoami() -> Any:
1515
identity = Identity()
1616

17+
if identity.auth_mode == "token":
18+
print("⚡ [bold]Using API token from environment variable[/bold]")
19+
return
20+
1721
if not identity.is_logged_in():
1822
print("No credentials found. Use [blue]`fastapi login`[/] to login.")
1923
return

src/fastapi_cloud_cli/utils/auth.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@
22
import binascii
33
import json
44
import logging
5+
import os
56
import time
6-
from typing import Optional
7+
from typing import Literal, Optional
78

89
from pydantic import BaseModel
910

@@ -59,7 +60,7 @@ def _get_auth_token() -> Optional[str]:
5960
return auth_data.access_token
6061

6162

62-
def _is_token_expired(token: str) -> bool:
63+
def _is_jwt_expired(token: str) -> bool:
6364
try:
6465
parts = token.split(".")
6566

@@ -108,21 +109,32 @@ def _is_token_expired(token: str) -> bool:
108109

109110

110111
class Identity:
112+
auth_mode: Literal["token", "user"]
113+
111114
def __init__(self) -> None:
112115
self.token = _get_auth_token()
116+
self.auth_mode = "user"
117+
118+
# users using `FASTAPI_CLOUD_TOKEN`
119+
if env_token := self._get_token_from_env():
120+
self.token = env_token
121+
self.auth_mode = "token"
122+
123+
def _get_token_from_env(self) -> Optional[str]:
124+
return os.environ.get("FASTAPI_CLOUD_TOKEN")
113125

114126
def is_expired(self) -> bool:
115127
if not self.token:
116128
return True
117129

118-
return _is_token_expired(self.token)
130+
return _is_jwt_expired(self.token)
119131

120132
def is_logged_in(self) -> bool:
121133
if self.token is None:
122134
logger.debug("Login status: False (no token)")
123135
return False
124136

125-
if self.is_expired():
137+
if self.auth_mode == "user" and self.is_expired():
126138
logger.debug("Login status: False (token expired)")
127139
return False
128140

src/fastapi_cloud_cli/utils/cli.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
from rich_toolkit.progress import Progress
1111
from rich_toolkit.styles import MinimalStyle, TaggedStyle
1212

13+
from .auth import Identity
14+
1315
logger = logging.getLogger(__name__)
1416

1517

@@ -89,7 +91,14 @@ def handle_http_errors(
8991
logger.debug(e.response.json()) # pragma: no cover
9092

9193
if isinstance(e, HTTPStatusError) and e.response.status_code in (401, 403):
92-
message = "The specified token is not valid. Use `fastapi login` to generate a new token."
94+
message = "The specified token is not valid. "
95+
96+
identity = Identity()
97+
98+
if identity.auth_mode == "user":
99+
message += "Use `fastapi login` to generate a new token."
100+
else:
101+
message += "Make sure to use a valid token."
93102

94103
else:
95104
message = (

tests/test_auth.py

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,33 +7,33 @@
77
from fastapi_cloud_cli.utils.auth import (
88
AuthConfig,
99
Identity,
10-
_is_token_expired,
10+
_is_jwt_expired,
1111
write_auth_config,
1212
)
1313

1414
from .utils import create_jwt_token
1515

1616

17-
def test_is_token_expired_with_valid_token() -> None:
17+
def test_is_jwt_expired_with_valid_token() -> None:
1818
future_exp = int(time.time()) + 3600
1919

2020
token = create_jwt_token({"exp": future_exp, "sub": "test_user"})
2121

22-
assert not _is_token_expired(token)
22+
assert not _is_jwt_expired(token)
2323

2424

25-
def test_is_token_expired_with_expired_token() -> None:
25+
def test_is_jwt_expired_with_expired_token() -> None:
2626
past_exp = int(time.time()) - 3600
2727
token = create_jwt_token({"exp": past_exp, "sub": "test_user"})
2828

29-
assert _is_token_expired(token)
29+
assert _is_jwt_expired(token)
3030

3131

32-
def test_is_token_expired_with_no_exp_claim() -> None:
32+
def test_is_jwt_expired_with_no_exp_claim() -> None:
3333
token = create_jwt_token({"sub": "test_user"})
3434

3535
# Tokens without exp claim should be considered valid
36-
assert not _is_token_expired(token)
36+
assert not _is_jwt_expired(token)
3737

3838

3939
@pytest.mark.parametrize(
@@ -46,36 +46,36 @@ def test_is_token_expired_with_no_exp_claim() -> None:
4646
"...",
4747
],
4848
)
49-
def test_is_token_expired_with_malformed_token(token: str) -> None:
50-
assert _is_token_expired(token)
49+
def test_is_jwt_expired_with_malformed_token(token: str) -> None:
50+
assert _is_jwt_expired(token)
5151

5252

53-
def test_is_token_expired_with_invalid_base64() -> None:
53+
def test_is_jwt_expired_with_invalid_base64() -> None:
5454
token = "header.!!!invalid_signature!!!.signature"
55-
assert _is_token_expired(token)
55+
assert _is_jwt_expired(token)
5656

5757

58-
def test_is_token_expired_with_invalid_json() -> None:
58+
def test_is_jwt_expired_with_invalid_json() -> None:
5959
header_encoded = base64.urlsafe_b64encode(b'{"alg":"HS256"}').decode().rstrip("=")
6060
payload_encoded = base64.urlsafe_b64encode(b"{invalid json}").decode().rstrip("=")
6161
signature = base64.urlsafe_b64encode(b"signature").decode().rstrip("=")
6262
token = f"{header_encoded}.{payload_encoded}.{signature}"
6363

64-
assert _is_token_expired(token)
64+
assert _is_jwt_expired(token)
6565

6666

67-
def test_is_token_expired_edge_case_exact_expiration() -> None:
67+
def test_is_jwt_expired_edge_case_exact_expiration() -> None:
6868
current_time = int(time.time())
6969
token = create_jwt_token({"exp": current_time, "sub": "test_user"})
7070

71-
assert _is_token_expired(token)
71+
assert _is_jwt_expired(token)
7272

7373

74-
def test_is_token_expired_edge_case_one_second_before() -> None:
74+
def test_is_jwt_expired_edge_case_one_second_before() -> None:
7575
current_time = int(time.time())
7676
token = create_jwt_token({"exp": current_time + 1, "sub": "test_user"})
7777

78-
assert not _is_token_expired(token)
78+
assert not _is_jwt_expired(token)
7979

8080

8181
def test_is_logged_in_with_no_token(temp_auth_config: Path) -> None:

tests/test_cli_deploy.py

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1079,3 +1079,104 @@ def test_cancel_upload_swallows_exceptions(
10791079

10801080
assert upload_cancelled_route.called
10811081
assert "HTTPStatusError" not in result.output
1082+
1083+
1084+
@pytest.mark.respx(base_url=settings.base_api_url)
1085+
def test_deploy_successfully_with_token(
1086+
logged_out_cli: None, tmp_path: Path, respx_mock: respx.MockRouter
1087+
) -> None:
1088+
app_data = _get_random_app()
1089+
team_data = _get_random_team()
1090+
app_id = app_data["id"]
1091+
team_id = team_data["id"]
1092+
deployment_data = _get_random_deployment(app_id=app_id)
1093+
1094+
config_path = tmp_path / ".fastapicloud" / "cloud.json"
1095+
1096+
config_path.parent.mkdir(parents=True, exist_ok=True)
1097+
config_path.write_text(f'{{"app_id": "{app_id}", "team_id": "{team_id}"}}')
1098+
1099+
respx_mock.get(f"/apps/{app_id}", headers={"Authorization": "Bearer hello"}).mock(
1100+
return_value=Response(200, json=app_data)
1101+
)
1102+
1103+
respx_mock.post(
1104+
f"/apps/{app_id}/deployments/", headers={"Authorization": "Bearer hello"}
1105+
).mock(return_value=Response(201, json=deployment_data))
1106+
1107+
respx_mock.post(
1108+
f"/deployments/{deployment_data['id']}/upload",
1109+
headers={"Authorization": "Bearer hello"},
1110+
).mock(
1111+
return_value=Response(
1112+
200,
1113+
json={"url": "http://test.com", "fields": {"key": "value"}},
1114+
)
1115+
)
1116+
1117+
respx_mock.post("http://test.com", data={"key": "value"}).mock(
1118+
return_value=Response(200)
1119+
)
1120+
1121+
respx_mock.get(
1122+
f"/deployments/{deployment_data['id']}/build-logs",
1123+
headers={"Authorization": "Bearer hello"},
1124+
).mock(
1125+
return_value=Response(
1126+
200,
1127+
content=build_logs_response(
1128+
{"type": "message", "message": "Building...", "id": "1"},
1129+
{"type": "message", "message": "All good!", "id": "2"},
1130+
{"type": "complete"},
1131+
),
1132+
)
1133+
)
1134+
1135+
respx_mock.post(
1136+
f"/deployments/{deployment_data['id']}/upload-complete",
1137+
headers={"Authorization": "Bearer hello"},
1138+
).mock(return_value=Response(200))
1139+
1140+
with changing_dir(tmp_path):
1141+
result = runner.invoke(app, ["deploy"], env={"FASTAPI_CLOUD_TOKEN": "hello"})
1142+
1143+
assert result.exit_code == 0
1144+
1145+
# check that logs are shown
1146+
assert "All good!" in result.output
1147+
1148+
# check that the dashboard URL is shown
1149+
assert "You can also check the app logs at" in result.output
1150+
assert deployment_data["dashboard_url"] in result.output
1151+
1152+
# check that the app URL is shown
1153+
assert deployment_data["url"] in result.output
1154+
1155+
1156+
@pytest.mark.respx(base_url=settings.base_api_url)
1157+
def test_deploy_with_token_fails(
1158+
logged_out_cli: None, tmp_path: Path, respx_mock: respx.MockRouter
1159+
) -> None:
1160+
app_data = _get_random_app()
1161+
team_data = _get_random_team()
1162+
app_id = app_data["id"]
1163+
team_id = team_data["id"]
1164+
1165+
config_path = tmp_path / ".fastapicloud" / "cloud.json"
1166+
1167+
config_path.parent.mkdir(parents=True, exist_ok=True)
1168+
config_path.write_text(f'{{"app_id": "{app_id}", "team_id": "{team_id}"}}')
1169+
1170+
respx_mock.get(f"/apps/{app_id}", headers={"Authorization": "Bearer hello"}).mock(
1171+
return_value=Response(401, json=app_data)
1172+
)
1173+
1174+
with changing_dir(tmp_path):
1175+
result = runner.invoke(app, ["deploy"], env={"FASTAPI_CLOUD_TOKEN": "hello"})
1176+
1177+
assert result.exit_code == 1
1178+
1179+
assert (
1180+
"The specified token is not valid. Make sure to use a valid token."
1181+
in result.output
1182+
)

tests/test_cli_whoami.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,3 +70,10 @@ def test_prints_not_logged_in(logged_out_cli: None) -> None:
7070

7171
assert result.exit_code == 0
7272
assert "No credentials found. Use `fastapi login` to login." in result.output
73+
74+
75+
def test_shows_logged_in_via_token(logged_out_cli: None) -> None:
76+
result = runner.invoke(app, ["whoami"], env={"FASTAPI_CLOUD_TOKEN": "ABC"})
77+
78+
assert result.exit_code == 0
79+
assert "Using API token from environment variable" in result.output

0 commit comments

Comments
 (0)