33import lombok .Getter ;
44import org .opensearch .dataprepper .plugins .source .crowdstrike .CrowdStrikeSourceConfig ;
55import org .opensearch .dataprepper .plugins .source .crowdstrike .configuration .AuthenticationConfig ;
6+ import org .opensearch .dataprepper .plugins .source .source_crawler .exception .UnauthorizedException ;
67import org .slf4j .Logger ;
78import org .slf4j .LoggerFactory ;
89import org .springframework .http .HttpEntity ;
1617import java .util .Map ;
1718import javax .inject .Named ;
1819import static org .opensearch .dataprepper .logging .DataPrepperMarkers .NOISY ;
20+ import static org .opensearch .dataprepper .plugins .source .crowdstrike .utils .Constants .MAX_RETRIES ;
21+ import static org .opensearch .dataprepper .plugins .source .crowdstrike .utils .Constants .RETRY_ATTEMPT_SLEEP_TIME ;
22+ import static org .opensearch .dataprepper .plugins .source .crowdstrike .utils .Constants .SLEEP_TIME_MULTIPLIER ;
1923
2024
2125/**
@@ -37,7 +41,7 @@ public class CrowdStrikeAuthClient {
3741 private static final String OAUTH_TOKEN_URL = "https://api.crowdstrike.com/oauth2/token" ;
3842 private static final String ACCESS_TOKEN = "access_token" ;
3943 private static final String EXPIRE_IN = "expires_in" ;
40-
44+ private final Object tokenRenewLock = new Object ();
4145
4246
4347 public CrowdStrikeAuthClient (final CrowdStrikeSourceConfig sourceConfig ) {
@@ -48,48 +52,74 @@ public CrowdStrikeAuthClient(final CrowdStrikeSourceConfig sourceConfig) {
4852
4953
5054 /**
51- * Initializes the credentials by obtaining an authentication token.
55+ * Initializes the credentials by getting an authentication token.
5256 */
5357 public void initCredentials () {
5458 log .info ("Getting CrowdStrike Authentication Token" );
5559 getAuthToken ();
5660 }
5761
62+
5863 /**
5964 * Retrieves a new authentication token from the CrowdStrike API.
6065 * The token is stored in the {@code bearerToken} field, and its expiration time is updated.
6166 *
62- * @throws RuntimeException if the token cannot be retrieved.
67+ * @throws UnauthorizedException Runtime exception if the token cannot be retrieved.
6368 */
64- private void getAuthToken () {
69+ protected void getAuthToken () {
6570 log .info (NOISY , "You are trying to access token" );
6671 HttpHeaders headers = new HttpHeaders ();
6772 headers .setContentType (MediaType .APPLICATION_FORM_URLENCODED );
6873 headers .setBasicAuth (this .clientId , this .clientSecret );
6974 HttpEntity <String > request = new HttpEntity <>(headers );
70- try {
71- ResponseEntity <Map > response = restTemplate .postForEntity (OAUTH_TOKEN_URL , request , Map .class );
72- Map tokenData = response .getBody ();
73- this .bearerToken = (String ) tokenData .get (ACCESS_TOKEN );
74- this .expireTime = Instant .now ().plusSeconds ((Integer ) tokenData .get (EXPIRE_IN ));
75- log .info ("Access token acquired successfully" );
76- } catch (HttpClientErrorException ex ) {
77- this .expireTime = Instant .ofEpochMilli (0 );
78- HttpStatus statusCode = ex .getStatusCode ();
79- log .error ("Failed to acquire access token. Status code: {}, Error Message: {}" ,
80- statusCode , ex .getMessage ());
81- throw new RuntimeException ("Error while requesting token:" + ex .getMessage (), ex );
75+ int retryCount = 0 ;
76+ while (retryCount < MAX_RETRIES ) {
77+ try {
78+ ResponseEntity <Map > response = restTemplate .postForEntity (OAUTH_TOKEN_URL , request , Map .class );
79+ Map tokenData = response .getBody ();
80+ this .bearerToken = (String ) tokenData .get (ACCESS_TOKEN );
81+ this .expireTime = Instant .now ().plusSeconds ((Integer ) tokenData .get (EXPIRE_IN ));
82+ log .info ("Access token acquired successfully" );
83+ return ;
84+ } catch (HttpClientErrorException ex ) {
85+ this .expireTime = Instant .ofEpochMilli (0 );
86+ HttpStatus statusCode = ex .getStatusCode ();
87+ String statusMessage = ex .getMessage ();
88+ log .error ("Failed to acquire access token. Status code: {}, Error Message: {}" ,
89+ statusCode , statusMessage );
90+ try {
91+ Thread .sleep ((long ) RETRY_ATTEMPT_SLEEP_TIME .get (retryCount ) * SLEEP_TIME_MULTIPLIER );
92+ } catch (InterruptedException e ) {
93+ throw new RuntimeException ("Sleep in the retry attempt got interrupted" , e );
94+ }
95+ }
96+ retryCount ++;
8297 }
98+ String errorMessage = String .format ("Failed to acquire access token even after %s retry attempts" , MAX_RETRIES );
99+ log .error (errorMessage );
100+ throw new UnauthorizedException (errorMessage );
83101 }
84102
85- public boolean isTokenExpired () {
86- return this .bearerToken == null || Instant .now ().isAfter (this .expireTime );
103+ protected boolean isTokenValid () {
104+ Instant currentTime = Instant .now ();
105+ return this .bearerToken != null && currentTime .isBefore (this .expireTime );
87106 }
88107
89108 /**
90109 * Refreshes the bearer token by retrieving a new one from CrowdStrike.
91110 */
92111 public void refreshToken () {
93-
112+ if (isTokenValid ()) {
113+ //There is still time to renew, or someone else must have already renewed it
114+ return ;
115+ }
116+ synchronized (tokenRenewLock ) {
117+ if (isTokenValid ()) {
118+ //Someone else must have already renewed it
119+ return ;
120+ }
121+ log .info ("Renewing authentication token for CrowdStrike Connector." );
122+ getAuthToken ();
123+ }
94124 }
95- }
125+ }
0 commit comments