1+ import time
2+ import webbrowser
3+ from requests import Request
4+ from typing import Optional
5+ from cli .exceptions .custom_exceptions import AuthProcessError
16from cli .utils .string_utils import generate_random_string , hash_string_to_sha256
27from cli .user_settings .configuration_manager import ConfigurationManager
8+ from cli .user_settings .credentials_manager import CredentialsManager
9+ from cyclient .auth_client import AuthClient
10+ from cyclient .models import ApiToken , ApiTokenGenerationPollingResponse
11+ from cyclient import logger
312
413
514class AuthManager :
6-
715 CODE_VERIFIER_LENGTH = 101
16+ POLLING_WAIT_INTERVAL_IN_SECONDS = 3
17+ POLLING_TIMEOUT_IN_SECONDS = 180
18+ FAILED_POLLING_STATUS = "Error"
19+ COMPLETED_POLLING_STATUS = "Completed"
820
921 configuration_manager : ConfigurationManager
22+ credentials_manager : CredentialsManager
23+ auth_client : AuthClient
1024
1125 def __init__ (self ):
1226 self .configuration_manager = ConfigurationManager ()
27+ self .credentials_manager = CredentialsManager ()
28+ self .auth_client = AuthClient ()
29+
30+ def authenticate (self ):
31+ logger .debug ('generating pkce code pair' )
32+ code_challenge , code_verifier = self ._generate_pkce_code_pair ()
33+
34+ logger .debug ('starting authentication session' )
35+ session_id = self .start_session (code_challenge )
36+ logger .debug ('authentication session created, %s' , {'session_id' : session_id })
37+
38+ logger .debug ('opening browser and redirecting to cycode login page' )
39+ self .redirect_to_login_page (code_challenge , session_id )
40+
41+ logger .debug ('starting get api token process' )
42+ api_token = self .get_api_token (session_id , code_verifier )
43+
44+ logger .debug ('saving get api token' )
45+ self .save_api_token (api_token )
46+
47+ def start_session (self , code_challenge : str ):
48+ auth_session = self .auth_client .start_session (code_challenge )
49+ return auth_session .session_id
50+
51+ def redirect_to_login_page (self , code_challenge : str , session_id : str ):
52+ login_url = self ._build_login_url (code_challenge , session_id )
53+ webbrowser .open (login_url )
1354
14- def generate_pkce_code_pair (self ) -> (str , str ):
55+ def get_api_token (self , session_id : str , code_verifier : str ) -> Optional [ApiToken ]:
56+ api_token = self .get_api_token_polling (session_id , code_verifier )
57+ if api_token is None :
58+ raise AuthProcessError ("getting api token is completed, but the token is missing" )
59+ return api_token
60+
61+ def get_api_token_polling (self , session_id : str , code_verifier : str ) -> Optional [ApiToken ]:
62+ end_polling_time = time .time () + self .POLLING_TIMEOUT_IN_SECONDS
63+ while time .time () < end_polling_time :
64+ logger .debug ('trying to get api token...' )
65+ api_token_polling_response = self .auth_client .get_api_token (session_id , code_verifier )
66+ if self ._is_api_token_process_completed (api_token_polling_response ):
67+ logger .debug ('get api token process completed' )
68+ return api_token_polling_response .api_token
69+ if self ._is_api_token_process_failed (api_token_polling_response ):
70+ logger .debug ('get api token process failed' )
71+ raise AuthProcessError ('error during getting api token' )
72+ time .sleep (self .POLLING_WAIT_INTERVAL_IN_SECONDS )
73+
74+ raise AuthProcessError ('session expired' )
75+
76+ def save_api_token (self , api_token : ApiToken ):
77+ self .credentials_manager .update_credentials_file (api_token .client_id , api_token .secret )
78+
79+ def _build_login_url (self , code_challenge : str , session_id : str ):
80+ app_url = self .configuration_manager .get_cycode_app_url ()
81+ login_url = f'{ app_url } /account/login'
82+ query_params = {
83+ 'source' : 'cycode_cli' ,
84+ 'code_challenge' : code_challenge ,
85+ 'session_id' : session_id
86+ }
87+ request = Request (url = login_url , params = query_params )
88+ return request .prepare ().url
89+
90+ def _generate_pkce_code_pair (self ) -> (str , str ):
1591 code_verifier = generate_random_string (self .CODE_VERIFIER_LENGTH )
1692 code_challenge = hash_string_to_sha256 (code_verifier )
17- return code_challenge , code_verifier
93+ return code_challenge , code_verifier
94+
95+ def _is_api_token_process_completed (self , api_token_polling_response : ApiTokenGenerationPollingResponse ) -> bool :
96+ return api_token_polling_response is not None \
97+ and api_token_polling_response .status == self .COMPLETED_POLLING_STATUS
98+
99+ def _is_api_token_process_failed (self , api_token_polling_response : ApiTokenGenerationPollingResponse ) -> bool :
100+ return api_token_polling_response is not None \
101+ and api_token_polling_response .status == self .FAILED_POLLING_STATUS
0 commit comments