Skip to content

Commit 5340c3d

Browse files
authored
CrowdStrike API call and retry mechanism (#5654)
* CrowdStrike API call and retry mechanism Signed-off-by: nsgupta1 <nsgupta1@users.noreply.github.com> Signed-off-by: ngsupta1 <guptaneha.e@gmail.com> Co-authored-by: nsgupta1 <nsgupta1@users.noreply.github.com>
1 parent a4e00ac commit 5340c3d

21 files changed

Lines changed: 1108 additions & 37 deletions

File tree

data-prepper-plugins/saas-source-plugins/atlassian-commons/src/main/java/org/opensearch/dataprepper/plugins/source/atlassian/AtlassianSourceConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,4 +50,9 @@ public String getAuthType() {
5050
}
5151

5252
public abstract String getOauth2UrlContext();
53+
54+
@Override
55+
public int getNumberOfWorkers() {
56+
return DEFAULT_NUMBER_OF_WORKERS;
57+
}
5358
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package org.opensearch.dataprepper.plugins.source.crowdstrike;
2+
import io.micrometer.core.instrument.Timer;
3+
import org.opensearch.dataprepper.metrics.PluginMetrics;
4+
import org.opensearch.dataprepper.plugins.source.crowdstrike.models.CrowdStrikeThreatIntelApiResponse;
5+
import org.opensearch.dataprepper.plugins.source.crowdstrike.models.CrowdStrikeIndicatorResult;
6+
import org.opensearch.dataprepper.plugins.source.crowdstrike.rest.CrowdStrikeRestClient;
7+
import org.opensearch.dataprepper.plugins.source.crowdstrike.utils.CrowdStrikeNextLinkValidator;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
import org.springframework.http.ResponseEntity;
11+
import org.springframework.web.util.UriComponentsBuilder;
12+
import javax.inject.Named;
13+
import java.net.URI;
14+
import java.net.URLEncoder;
15+
import java.nio.charset.StandardCharsets;
16+
import java.time.Instant;
17+
import java.util.Optional;
18+
19+
import static org.opensearch.dataprepper.plugins.source.crowdstrike.utils.Constants.BATCH_SIZE;
20+
import static org.opensearch.dataprepper.plugins.source.crowdstrike.utils.Constants.FILTER_KEY;
21+
import static org.opensearch.dataprepper.plugins.source.crowdstrike.utils.Constants.LAST_UPDATED;
22+
import static org.opensearch.dataprepper.plugins.source.crowdstrike.utils.Constants.LIMIT_KEY;
23+
24+
/**
25+
* This service manages the interaction with CrowdStrike's API endpoints, handles pagination
26+
* and provides methods for fetching indicators from CrowdStrike.
27+
*/
28+
@Named
29+
public class CrowdStrikeService {
30+
31+
private final CrowdStrikeRestClient crowdStrikeRestClient;
32+
private static final Logger log = LoggerFactory.getLogger(CrowdStrikeService.class);
33+
private static final String BASE_URL = "https://api.crowdstrike.com/";
34+
private static final String COMBINED_URL = "https://api.crowdstrike.com/intel/combined/indicators/v1";
35+
private final Timer searchCallLatencyTimer;
36+
private static final String GREATER_THAN_EQUALS = ">=";
37+
private static final String LESS_THAN = "<";
38+
private static final String ENCODED_PLUS_SIGN = "%2B";
39+
40+
41+
public CrowdStrikeService(CrowdStrikeRestClient crowdStrikeRestClient, PluginMetrics pluginMetrics) {
42+
this.crowdStrikeRestClient = crowdStrikeRestClient;
43+
this.searchCallLatencyTimer = pluginMetrics.timer("searchCallLatencyTimer");
44+
}
45+
46+
/**
47+
* Retrieves all indicator data from CrowdStrike in a paginated fashion.
48+
* @param startTime The start timestamp (inclusive) for filtering indicators.
49+
* @param endTime The end timestamp (exclusive) for filtering indicators.
50+
* @param paginationLink An optional pagination URL suffix (used when fetching next pages).
51+
* @return A {@link CrowdStrikeThreatIntelApiResponse} containing response body and headers.
52+
*/
53+
public CrowdStrikeThreatIntelApiResponse getThreatIndicators(Instant startTime, Instant endTime, Optional<String> paginationLink) {
54+
if (startTime == null || endTime == null) {
55+
throw new IllegalArgumentException("startTime and endTime must not be null");
56+
}
57+
URI uri = buildCrowdStrikeUri(startTime, endTime, paginationLink);
58+
return searchCallLatencyTimer.record(() -> {
59+
60+
log.debug("Calling CrowdStrike API with URI: {}", uri);
61+
ResponseEntity<CrowdStrikeIndicatorResult> responseEntity = crowdStrikeRestClient.invokeGetApi(uri, CrowdStrikeIndicatorResult.class);
62+
63+
return new CrowdStrikeThreatIntelApiResponse(responseEntity.getBody(), responseEntity.getHeaders());
64+
});
65+
}
66+
67+
protected URI buildCrowdStrikeUri(Instant startTime, Instant endTime, Optional<String> paginationLink) {
68+
try {
69+
if (paginationLink.isPresent()) {
70+
String urlString = BASE_URL + paginationLink.get();
71+
urlString = CrowdStrikeNextLinkValidator.validateAndSanitizeURL(urlString);
72+
return new URI(urlString);
73+
} else {
74+
// Manually construct and encode the query string
75+
String startTimeFilter = URLEncoder.encode(LAST_UPDATED + ":" + GREATER_THAN_EQUALS + startTime.getEpochSecond(), StandardCharsets.UTF_8);
76+
String endTimeFilter = URLEncoder.encode(LAST_UPDATED + ":" + LESS_THAN + endTime.getEpochSecond(), StandardCharsets.UTF_8);
77+
String encodedFilter = startTimeFilter + ENCODED_PLUS_SIGN + endTimeFilter; // ensure literal '+' is encoded
78+
79+
UriComponentsBuilder builder = UriComponentsBuilder
80+
.fromHttpUrl(COMBINED_URL)
81+
.queryParam(FILTER_KEY, encodedFilter)
82+
.queryParam(LIMIT_KEY, BATCH_SIZE);
83+
84+
return builder.build(true).toUri();
85+
}
86+
} catch (Exception e) {
87+
throw new RuntimeException("Failed to construct CrowdStrike request URI", e);
88+
}
89+
}
90+
}

data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/CrowdStrikeSourceConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,6 @@ public class CrowdStrikeSourceConfig implements CrawlerSourceConfig {
2727
@Min(1)
2828
@Max(50)
2929
@Valid
30-
private int numWorkers = DEFAULT_NUMBER_OF_WORKERS;
30+
private int numberOfWorkers = DEFAULT_NUMBER_OF_WORKERS;
3131

3232
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package org.opensearch.dataprepper.plugins.source.crowdstrike.models;
2+
3+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
4+
import com.fasterxml.jackson.annotation.JsonProperty;
5+
import lombok.Data;
6+
import java.util.List;
7+
8+
/**
9+
* Represents the response from a CrowdStrike Falcon Query Language (FQL) search for threat indicators.
10+
*/
11+
@Data
12+
@JsonIgnoreProperties(ignoreUnknown = true)
13+
public class CrowdStrikeIndicatorResult {
14+
15+
@JsonProperty("resources")
16+
private List<ThreatIndicator> results = null;
17+
18+
19+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package org.opensearch.dataprepper.plugins.source.crowdstrike.models;
2+
3+
import lombok.Data;
4+
import org.springframework.util.CollectionUtils;
5+
import java.util.Collections;
6+
import java.util.List;
7+
import java.util.Map;
8+
9+
/**
10+
* Represents the response returned from a CrowdStrike API call.
11+
*/
12+
@Data
13+
public class CrowdStrikeThreatIntelApiResponse {
14+
15+
private CrowdStrikeIndicatorResult body;
16+
private Map<String, List<String>> headers;
17+
18+
public CrowdStrikeThreatIntelApiResponse(CrowdStrikeIndicatorResult body, Map<String, List<String>> headers) {
19+
this.body = body;
20+
this.headers = headers;
21+
}
22+
23+
// Convenience method to get a specific header
24+
public List<String> getHeader(String headerName) {
25+
return headers.getOrDefault(headerName, Collections.emptyList());
26+
}
27+
28+
// Convenience method to get the first value of a specific header
29+
public String getFirstHeaderValue(String headerName) {
30+
List<String> values = getHeader(headerName);
31+
return CollectionUtils.isEmpty(values) ? null : values.get(0);
32+
}
33+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package org.opensearch.dataprepper.plugins.source.crowdstrike.models;
2+
3+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
4+
import com.fasterxml.jackson.annotation.JsonProperty;
5+
import lombok.Data;
6+
7+
/**
8+
* Represents a threat intelligence indicator from CrowdStrike's API.
9+
* This class encapsulates information about potential security threats,
10+
* including indicators of compromise (IoCs) and associated metadata.
11+
*/
12+
@Data
13+
@JsonIgnoreProperties(ignoreUnknown = true)
14+
public class ThreatIndicator {
15+
/**
16+
* The ID of the IOC.
17+
*/
18+
@JsonProperty("id")
19+
private String id = null;
20+
21+
/**
22+
* The type of the IOC.
23+
*/
24+
@JsonProperty("type")
25+
private String type = null;
26+
27+
/**
28+
* The value of the IOC.
29+
*/
30+
@JsonProperty("indicator")
31+
private String indicator = null;
32+
33+
/**
34+
* The epoch timestamp of the creation date of IOC.
35+
*/
36+
@JsonProperty("published_date")
37+
private long publishedDate = 0L;
38+
39+
@JsonProperty("malicious_confidence")
40+
private String maliciousConfidence = null;
41+
42+
/**
43+
* The epoch timestamp of last updated date of the IOC.
44+
*/
45+
@JsonProperty("last_updated")
46+
private long lastUpdated = 0L;
47+
48+
}

data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/rest/CrowdStrikeAuthClient.java

Lines changed: 54 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import lombok.Getter;
44
import org.opensearch.dataprepper.plugins.source.crowdstrike.CrowdStrikeSourceConfig;
55
import org.opensearch.dataprepper.plugins.source.crowdstrike.configuration.AuthenticationConfig;
6+
import org.opensearch.dataprepper.plugins.source.source_crawler.exception.UnauthorizedException;
67
import org.slf4j.Logger;
78
import org.slf4j.LoggerFactory;
89
import org.springframework.http.HttpEntity;
@@ -16,6 +17,9 @@
1617
import java.util.Map;
1718
import javax.inject.Named;
1819
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;
20+
import static org.opensearch.dataprepper.plugins.source.crowdstrike.utils.Constants.MAX_RETRIES;
21+
import static org.opensearch.dataprepper.plugins.source.crowdstrike.utils.Constants.RETRY_ATTEMPT_SLEEP_TIME;
22+
import static org.opensearch.dataprepper.plugins.source.crowdstrike.utils.Constants.SLEEP_TIME_MULTIPLIER;
1923

2024

2125
/**
@@ -37,7 +41,7 @@ public class CrowdStrikeAuthClient {
3741
private static final String OAUTH_TOKEN_URL = "https://api.crowdstrike.com/oauth2/token";
3842
private static final String ACCESS_TOKEN = "access_token";
3943
private static final String EXPIRE_IN = "expires_in";
40-
44+
private final Object tokenRenewLock = new Object();
4145

4246

4347
public CrowdStrikeAuthClient(final CrowdStrikeSourceConfig sourceConfig) {
@@ -48,48 +52,74 @@ public CrowdStrikeAuthClient(final CrowdStrikeSourceConfig sourceConfig) {
4852

4953

5054
/**
51-
* Initializes the credentials by obtaining an authentication token.
55+
* Initializes the credentials by getting an authentication token.
5256
*/
5357
public void initCredentials() {
5458
log.info("Getting CrowdStrike Authentication Token");
5559
getAuthToken();
5660
}
5761

62+
5863
/**
5964
* Retrieves a new authentication token from the CrowdStrike API.
6065
* The token is stored in the {@code bearerToken} field, and its expiration time is updated.
6166
*
62-
* @throws RuntimeException if the token cannot be retrieved.
67+
* @throws UnauthorizedException Runtime exception if the token cannot be retrieved.
6368
*/
6469
private void getAuthToken() {
65-
log.info(NOISY, "You are trying to access token");
66-
HttpHeaders headers = new HttpHeaders();
67-
headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
68-
headers.setBasicAuth(this.clientId, this.clientSecret);
69-
HttpEntity<String> request = new HttpEntity<>(headers);
70-
try {
71-
ResponseEntity<Map> response = restTemplate.postForEntity(OAUTH_TOKEN_URL, request, Map.class);
72-
Map tokenData = response.getBody();
73-
this.bearerToken = (String) tokenData.get(ACCESS_TOKEN);
74-
this.expireTime = Instant.now().plusSeconds((Integer) tokenData.get(EXPIRE_IN));
75-
log.info("Access token acquired successfully");
76-
} catch (HttpClientErrorException ex) {
77-
this.expireTime = Instant.ofEpochMilli(0);
78-
HttpStatus statusCode = ex.getStatusCode();
79-
log.error("Failed to acquire access token. Status code: {}, Error Message: {}",
80-
statusCode, ex.getMessage());
81-
throw new RuntimeException("Error while requesting token:" + ex.getMessage(), ex);
70+
synchronized (tokenRenewLock) {
71+
if (isTokenValid()) {
72+
//Someone else must have already renewed it
73+
return;
74+
}
75+
log.info(NOISY, "You are trying to access token");
76+
HttpHeaders headers = new HttpHeaders();
77+
headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
78+
headers.setBasicAuth(this.clientId, this.clientSecret);
79+
HttpEntity<String> request = new HttpEntity<>(headers);
80+
int retryCount = 0;
81+
while (retryCount < MAX_RETRIES) {
82+
try {
83+
ResponseEntity<Map> response = restTemplate.postForEntity(OAUTH_TOKEN_URL, request, Map.class);
84+
Map tokenData = response.getBody();
85+
this.bearerToken = (String) tokenData.get(ACCESS_TOKEN);
86+
this.expireTime = Instant.now().plusSeconds((Integer) tokenData.get(EXPIRE_IN));
87+
log.info("Access token acquired successfully");
88+
return;
89+
} catch (HttpClientErrorException ex) {
90+
this.expireTime = Instant.ofEpochMilli(0);
91+
HttpStatus statusCode = ex.getStatusCode();
92+
String statusMessage = ex.getMessage();
93+
log.error("Failed to acquire access token. Status code: {}, Error Message: {}",
94+
statusCode, statusMessage);
95+
try {
96+
Thread.sleep((long) RETRY_ATTEMPT_SLEEP_TIME.get(retryCount) * SLEEP_TIME_MULTIPLIER);
97+
} catch (InterruptedException e) {
98+
throw new RuntimeException("Sleep in the retry attempt got interrupted", e);
99+
}
100+
}
101+
retryCount++;
102+
}
103+
String errorMessage = String.format("Failed to acquire access token even after %s retry attempts", MAX_RETRIES);
104+
log.error(errorMessage);
105+
throw new UnauthorizedException(errorMessage);
82106
}
83107
}
84108

85-
public boolean isTokenExpired() {
86-
return this.bearerToken == null || Instant.now().isAfter(this.expireTime);
109+
protected boolean isTokenValid() {
110+
Instant currentTime = Instant.now();
111+
return this.bearerToken != null && currentTime.isBefore(this.expireTime);
87112
}
88113

89114
/**
90115
* Refreshes the bearer token by retrieving a new one from CrowdStrike.
91116
*/
92117
public void refreshToken() {
93-
118+
if (isTokenValid()) {
119+
//There is still time to renew, or someone else must have already renewed it
120+
return;
121+
}
122+
log.info("Renewing authentication token for CrowdStrike Connector.");
123+
getAuthToken();
94124
}
95-
}
125+
}

0 commit comments

Comments
 (0)