|
1 | | -"""Credential Provider Chain for Cloudsmith CLI. |
2 | | -
|
3 | | -Implements an AWS SDK-style credential resolution chain that evaluates |
4 | | -credential sources sequentially and returns the first valid result. |
5 | | -""" |
6 | | - |
7 | | -from __future__ import annotations |
8 | | - |
9 | | -import logging |
10 | | -from abc import ABC, abstractmethod |
11 | | -from dataclasses import dataclass |
12 | | -from typing import TYPE_CHECKING |
13 | | - |
14 | | -if TYPE_CHECKING: |
15 | | - import requests |
16 | | - |
17 | | -logger = logging.getLogger(__name__) |
18 | | - |
19 | | - |
20 | | -@dataclass |
21 | | -class CredentialContext: |
22 | | - """Context passed to credential providers during resolution. |
23 | | -
|
24 | | - All values are populated directly from Click options / ``opts``. |
25 | | - """ |
26 | | - |
27 | | - session: requests.Session | None = None |
28 | | - api_key: str | None = None |
29 | | - api_host: str = "https://api.cloudsmith.io" |
30 | | - creds_file_path: str | None = None |
31 | | - profile: str | None = None |
32 | | - debug: bool = False |
33 | | - keyring_refresh_failed: bool = False |
34 | | - |
35 | | - |
36 | | -@dataclass |
37 | | -class CredentialResult: |
38 | | - """Result from a successful credential resolution.""" |
39 | | - |
40 | | - api_key: str |
41 | | - source_name: str |
42 | | - source_detail: str | None = None |
43 | | - auth_type: str = "api_key" |
44 | | - |
45 | | - |
46 | | -class CredentialProvider(ABC): |
47 | | - """Base class for credential providers.""" |
48 | | - |
49 | | - name: str = "base" |
50 | | - |
51 | | - @abstractmethod |
52 | | - def resolve(self, context: CredentialContext) -> CredentialResult | None: |
53 | | - """Attempt to resolve credentials. Return CredentialResult or None.""" |
54 | | - |
55 | | - |
56 | | -class CredentialProviderChain: |
57 | | - """Evaluates credential providers in order, returning the first valid result. |
58 | | -
|
59 | | - If no providers are given, uses the default chain: |
60 | | - Keyring → CLIFlag. |
61 | | - """ |
62 | | - |
63 | | - def __init__(self, providers: list[CredentialProvider] | None = None): |
64 | | - if providers is not None: |
65 | | - self.providers = providers |
66 | | - else: |
67 | | - from .providers import CLIFlagProvider, KeyringProvider |
68 | | - |
69 | | - self.providers = [ |
70 | | - KeyringProvider(), |
71 | | - CLIFlagProvider(), |
72 | | - ] |
73 | | - |
74 | | - def resolve(self, context: CredentialContext) -> CredentialResult | None: |
75 | | - """Evaluate each provider in order. Return the first successful result.""" |
76 | | - for provider in self.providers: |
77 | | - try: |
78 | | - result = provider.resolve(context) |
79 | | - if result is not None: |
80 | | - if context.debug: |
81 | | - logger.debug( |
82 | | - "Credentials resolved by %s: %s", |
83 | | - provider.name, |
84 | | - result.source_detail or result.source_name, |
85 | | - ) |
86 | | - return result |
87 | | - if context.debug: |
88 | | - logger.debug( |
89 | | - "Provider %s did not resolve credentials, trying next", |
90 | | - provider.name, |
91 | | - ) |
92 | | - except Exception: # pylint: disable=broad-exception-caught |
93 | | - # Intentionally broad - one provider failing shouldn't stop others |
94 | | - logger.debug( |
95 | | - "Provider %s raised an exception, skipping", |
96 | | - provider.name, |
97 | | - exc_info=True, |
98 | | - ) |
99 | | - continue |
100 | | - return None |
0 commit comments