-
Notifications
You must be signed in to change notification settings - Fork 37
Expand file tree
/
Copy pathwhoami.py
More file actions
186 lines (154 loc) · 6.14 KB
/
whoami.py
File metadata and controls
186 lines (154 loc) · 6.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
"""CLI/Commands - Retrieve authentication status."""
import click
from ...core import keyring
from ...core.api.exceptions import ApiException
from ...core.api.user import get_token_metadata, get_user_brief
from .. import decorators, utils
from ..exceptions import handle_api_exceptions
from .main import main
def _get_active_method(api_config):
"""Inspect API config to determine SSO, API key, or no auth."""
headers = getattr(api_config, "headers", {}) or {}
if headers.get("Authorization", "").startswith("Bearer "):
return "sso_token"
if (getattr(api_config, "api_key", {}) or {}).get("X-Api-Key"):
return "api_key"
return "none"
def _get_api_key_source(opts):
"""Determine where the API key was loaded from.
Uses the credential provider chain result attached by initialise_api.
"""
credential = getattr(opts, "credential", None)
if credential:
return {
"configured": True,
"source": credential.source_detail or credential.source_name,
"source_key": credential.source_name,
}
return {"configured": False, "source": None, "source_key": None}
def _get_sso_status(api_host):
"""Return SSO token status from the system keyring."""
enabled = keyring.should_use_keyring()
has_tokens = enabled and keyring.has_sso_tokens(api_host)
refreshed = keyring.get_refresh_attempted_at(api_host) if has_tokens else None
return {
"configured": has_tokens,
"keyring_enabled": enabled,
"source": "System Keyring" if has_tokens else None,
"last_refreshed": utils.fmt_datetime(refreshed) if refreshed else None,
}
def _get_verbose_auth_data(opts, api_host):
"""Gather all auth details for verbose output."""
api_key_info = _get_api_key_source(opts)
sso_info = _get_sso_status(api_host)
# Fetch token metadata (extra API call, graceful fallback)
token_meta = None
if api_key_info["configured"]:
try:
token_meta = get_token_metadata()
except ApiException:
token_meta = None
created = token_meta.get("created") if token_meta else None
api_key_info["slug"] = token_meta["slug"] if token_meta else None
api_key_info["created"] = utils.fmt_datetime(created) if created else None
return {
"active_method": _get_active_method(opts.api_config),
"api_key": api_key_info,
"sso": sso_info,
}
def _print_user_line(name, username, email):
"""Print a styled user identity line."""
styled_name = click.style(name or "Unknown", fg="cyan")
styled_slug = click.style(username or "Unknown", fg="magenta")
email_part = f", email: {click.style(email, fg='green')}" if email else ""
click.echo(f"User: {styled_name} (slug: {styled_slug}{email_part})")
def _print_verbose_text(data):
"""Print verbose authentication details as styled text."""
click.echo()
_print_user_line(data["name"], data["username"], data.get("email"))
auth = data["auth"]
active = auth["active_method"]
ak = auth["api_key"]
sso = auth["sso"]
click.echo()
if active == "sso_token":
click.secho("Authentication Method: SSO Token (primary)", fg="cyan", bold=True)
if sso.get("source"):
click.echo(f" Source: {sso['source']}")
if sso.get("last_refreshed"):
click.echo(
f" Last Refreshed: {sso['last_refreshed']} (refreshes every 30 min)"
)
if ak["configured"]:
click.echo()
click.secho("API Key: Also configured", fg="yellow")
if ak.get("source"):
click.echo(f" Source: {ak['source']}")
click.echo(" Note: SSO token is being used instead")
elif active == "api_key":
if ak.get("source_key") == "oidc":
click.secho(
"Authentication Method: OIDC Auto-Discovery", fg="cyan", bold=True
)
else:
click.secho("Authentication Method: API Key", fg="cyan", bold=True)
for label, field in [
("Source", "source"),
("Token Slug", "slug"),
("Created", "created"),
]:
if ak.get(field):
click.echo(f" {label}: {ak[field]}")
else:
click.secho("Authentication Method: None (anonymous)", fg="yellow", bold=True)
if active != "sso_token":
click.echo()
if not sso["keyring_enabled"]:
click.secho(
"SSO Status: Keyring disabled (CLOUDSMITH_NO_KEYRING)", fg="yellow"
)
elif sso["configured"]:
click.secho("SSO Status: Configured (not active)", fg="yellow")
click.echo(f" Source: {sso['source']}")
else:
click.echo("SSO Status: Not configured")
click.echo(" Keyring: Enabled (no tokens stored)")
@main.command()
@decorators.common_cli_config_options
@decorators.common_cli_output_options
@decorators.common_api_auth_options
@decorators.initialise_api
@click.pass_context
def whoami(ctx, opts):
"""Retrieve your current authentication status."""
use_stderr = utils.should_use_stderr(opts)
click.echo(
"Retrieving your authentication status from the API ... ",
nl=False,
err=use_stderr,
)
context_msg = "Failed to retrieve your authentication status!"
with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg):
with utils.maybe_spinner(opts):
is_auth, username, email, name = get_user_brief()
click.secho("OK", fg="green", err=use_stderr)
data = {
"is_authenticated": is_auth,
"username": username,
"email": email,
"name": name,
}
if opts.verbose:
api_host = getattr(opts.api_config, "host", None) or opts.api_host
data["auth"] = _get_verbose_auth_data(opts, api_host)
if utils.maybe_print_as_json(opts, data):
return
if not is_auth:
click.echo("You are authenticated as:")
click.secho("Nobody (i.e. anonymous user)", fg="yellow")
return
if opts.verbose:
_print_verbose_text(data)
else:
click.echo("You are authenticated as:")
_print_user_line(name, username, email)