From bdcb41e1bd37809ec68c7a8034eddcfa81975c55 Mon Sep 17 00:00:00 2001 From: nsgupta1 Date: Thu, 24 Apr 2025 16:45:55 -0700 Subject: [PATCH 1/5] CrowdStrike API call and retry mechanism Signed-off-by: nsgupta1 --- .../atlassian/AtlassianSourceConfig.java | 5 + .../crowdstrike/CrowdStrikeService.java | 89 ++++++++++++ .../crowdstrike/CrowdStrikeSourceConfig.java | 2 +- .../models/CrowdStrikeApiResponse.java | 29 ++++ .../models/CrowdStrikeIndicatorResult.java | 18 +++ .../crowdstrike/models/ThreatIndicator.java | 59 ++++++++ .../rest/CrowdStrikeAuthClient.java | 70 +++++++--- .../rest/CrowdStrikeRestClient.java | 99 ++++++++++++++ .../source/crowdstrike/utils/Constants.java | 14 +- .../utils/CrowdStrikeNextLinkValidator.java | 122 +++++++++++++++++ .../CrowdStrikeNextLinkValidatorTest.java | 66 +++++++++ .../crowdstrike/CrowdStrikeServiceTest.java | 112 +++++++++++++++ .../CrowdStrikeSourceConfigTest.java | 4 +- .../models/CrowdStrikeApiResponseTest.java | 80 +++++++++++ .../CrowdStrikeIndicatorResultTest.java | 60 ++++++++ .../models/ThreatIndicatorTest.java | 69 ++++++++++ .../rest/CrowdStrikeAuthClientTest.java | 100 +++++++++++++- .../rest/CrowdStrikeRestClientTest.java | 129 ++++++++++++++++++ .../base/CrawlerSourceConfig.java | 7 + .../base/CrawlerSourcePlugin.java | 3 +- 20 files changed, 1106 insertions(+), 31 deletions(-) create mode 100644 data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/CrowdStrikeService.java create mode 100644 data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/models/CrowdStrikeApiResponse.java create mode 100644 data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/models/CrowdStrikeIndicatorResult.java create mode 100644 data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/models/ThreatIndicator.java create mode 100644 data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/rest/CrowdStrikeRestClient.java create mode 100644 data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/utils/CrowdStrikeNextLinkValidator.java create mode 100644 data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/test/java/org/opensearch/dataprepper/plugins/source/crowdstrike/CrowdStrikeNextLinkValidatorTest.java create mode 100644 data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/test/java/org/opensearch/dataprepper/plugins/source/crowdstrike/CrowdStrikeServiceTest.java create mode 100644 data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/test/java/org/opensearch/dataprepper/plugins/source/crowdstrike/models/CrowdStrikeApiResponseTest.java create mode 100644 data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/test/java/org/opensearch/dataprepper/plugins/source/crowdstrike/models/CrowdStrikeIndicatorResultTest.java create mode 100644 data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/test/java/org/opensearch/dataprepper/plugins/source/crowdstrike/models/ThreatIndicatorTest.java create mode 100644 data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/test/java/org/opensearch/dataprepper/plugins/source/crowdstrike/rest/CrowdStrikeRestClientTest.java diff --git a/data-prepper-plugins/saas-source-plugins/atlassian-commons/src/main/java/org/opensearch/dataprepper/plugins/source/atlassian/AtlassianSourceConfig.java b/data-prepper-plugins/saas-source-plugins/atlassian-commons/src/main/java/org/opensearch/dataprepper/plugins/source/atlassian/AtlassianSourceConfig.java index a6a4fe7e38..e6a94eab6f 100644 --- a/data-prepper-plugins/saas-source-plugins/atlassian-commons/src/main/java/org/opensearch/dataprepper/plugins/source/atlassian/AtlassianSourceConfig.java +++ b/data-prepper-plugins/saas-source-plugins/atlassian-commons/src/main/java/org/opensearch/dataprepper/plugins/source/atlassian/AtlassianSourceConfig.java @@ -50,4 +50,9 @@ public String getAuthType() { } public abstract String getOauth2UrlContext(); + + @Override + public int getNumberOfWorkers() { + return DEFAULT_NUMBER_OF_WORKERS; + } } diff --git a/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/CrowdStrikeService.java b/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/CrowdStrikeService.java new file mode 100644 index 0000000000..55dc576d1f --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/CrowdStrikeService.java @@ -0,0 +1,89 @@ +package org.opensearch.dataprepper.plugins.source.crowdstrike; +import io.micrometer.core.instrument.Timer; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.plugins.source.crowdstrike.models.CrowdStrikeApiResponse; +import org.opensearch.dataprepper.plugins.source.crowdstrike.models.CrowdStrikeIndicatorResult; +import org.opensearch.dataprepper.plugins.source.crowdstrike.rest.CrowdStrikeRestClient; +import org.opensearch.dataprepper.plugins.source.crowdstrike.utils.CrowdStrikeNextLinkValidator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.ResponseEntity; +import org.springframework.web.util.UriComponentsBuilder; +import javax.inject.Named; +import java.net.URI; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import static org.opensearch.dataprepper.plugins.source.crowdstrike.utils.Constants.BATCH_SIZE; +import static org.opensearch.dataprepper.plugins.source.crowdstrike.utils.Constants.FILTER_KEY; +import static org.opensearch.dataprepper.plugins.source.crowdstrike.utils.Constants.LAST_UPDATED; +import static org.opensearch.dataprepper.plugins.source.crowdstrike.utils.Constants.LIMIT_KEY; + +/** + * This service manages the interaction with CrowdStrike's API endpoints, handles pagination + * and provides methods for fetching indicators from CrowdStrike. + */ +@Named +public class CrowdStrikeService { + + private final CrowdStrikeRestClient crowdStrikeRestClient; + private static final Logger log = LoggerFactory.getLogger(CrowdStrikeService.class); + private static final String BASE_URL = "https://api.crowdstrike.com/"; + private static final String COMBINED_URL = "https://api.crowdstrike.com/intel/combined/indicators/v1"; + private final Timer searchCallLatencyTimer; + + + public CrowdStrikeService(CrowdStrikeRestClient crowdStrikeRestClient, PluginMetrics pluginMetrics) { + this.crowdStrikeRestClient = crowdStrikeRestClient; + this.searchCallLatencyTimer = pluginMetrics.timer("searchCallLatencyTimer"); + } + + /** + * Retrieves all indicator data from CrowdStrike in a paginated fashion. + * @param startTime The start timestamp (inclusive) for filtering indicators. + * @param endTime The end timestamp (exclusive) for filtering indicators. + * @param paginationLink An optional pagination URL suffix (used when fetching next pages). + * @return A {@link CrowdStrikeApiResponse} containing response body and headers. + */ + public CrowdStrikeApiResponse getAllContent(Long startTime, Long endTime, String paginationLink) { + URI uri = buildCrowdStrikeUri(startTime, endTime, paginationLink); + + return searchCallLatencyTimer.record(() -> { + try { + log.debug("Calling CrowdStrike API with URI: {}", uri); + ResponseEntity responseEntity = crowdStrikeRestClient.invokeGetApi(uri, CrowdStrikeIndicatorResult.class); + + CrowdStrikeApiResponse response = new CrowdStrikeApiResponse(); + response.setBody(responseEntity.getBody()); + response.setHeaders(responseEntity.getHeaders()); + return response; + } catch (Exception e) { + log.error("Error fetching CrowdStrike content from URI: {}", uri, e); + throw new RuntimeException("CrowdStrike API call failed", e); + } + }); + } + + protected URI buildCrowdStrikeUri(Long startTime, Long endTime, String paginationLink) { + try { + if (paginationLink != null) { + String urlString = BASE_URL + paginationLink; + urlString = CrowdStrikeNextLinkValidator.validateAndSanitizeURL(urlString); + return new URI(urlString); + } else { + // Manually construct and encode the query string + String filter1 = URLEncoder.encode(LAST_UPDATED + ":>=" + startTime, StandardCharsets.UTF_8); + String filter2 = URLEncoder.encode(LAST_UPDATED + ":<" + endTime, StandardCharsets.UTF_8); + String encodedFilter = filter1 + "%2B" + filter2; // Use literal '+' // ensure literal '+' + + UriComponentsBuilder builder = UriComponentsBuilder + .fromHttpUrl(COMBINED_URL) + .queryParam(FILTER_KEY, encodedFilter) + .queryParam(LIMIT_KEY, BATCH_SIZE); + + return builder.build(true).toUri(); + } + } catch (Exception e) { + throw new RuntimeException("Failed to construct CrowdStrike request URI", e); + } + } +} diff --git a/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/CrowdStrikeSourceConfig.java b/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/CrowdStrikeSourceConfig.java index d8aee0fdd6..03989e1472 100644 --- a/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/CrowdStrikeSourceConfig.java +++ b/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/CrowdStrikeSourceConfig.java @@ -27,6 +27,6 @@ public class CrowdStrikeSourceConfig implements CrawlerSourceConfig { @Min(1) @Max(50) @Valid - private int numWorkers = DEFAULT_NUMBER_OF_WORKERS; + private int numberOfWorkers = DEFAULT_NUMBER_OF_WORKERS; } diff --git a/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/models/CrowdStrikeApiResponse.java b/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/models/CrowdStrikeApiResponse.java new file mode 100644 index 0000000000..48caf3a6cf --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/models/CrowdStrikeApiResponse.java @@ -0,0 +1,29 @@ +package org.opensearch.dataprepper.plugins.source.crowdstrike.models; + +import lombok.Getter; +import lombok.Setter; +import org.springframework.util.CollectionUtils; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * Represents the response returned from a CrowdStrike API call. + */ +@Getter @Setter + public class CrowdStrikeApiResponse { + + private CrowdStrikeIndicatorResult body; + private Map> headers; + + // Convenience method to get a specific header + public List getHeader(String headerName) { + return headers.getOrDefault(headerName, Collections.emptyList()); + } + + // Convenience method to get the first value of a specific header + public String getFirstHeaderValue(String headerName) { + List values = getHeader(headerName); + return CollectionUtils.isEmpty(values) ? null : values.get(0); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/models/CrowdStrikeIndicatorResult.java b/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/models/CrowdStrikeIndicatorResult.java new file mode 100644 index 0000000000..60c62f79f1 --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/models/CrowdStrikeIndicatorResult.java @@ -0,0 +1,18 @@ +package org.opensearch.dataprepper.plugins.source.crowdstrike.models; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Getter; +import java.util.List; + +/** + * The result of Falcon query search. + */ +@Getter +@JsonIgnoreProperties(ignoreUnknown = true) +public class CrowdStrikeIndicatorResult { + + @JsonProperty("resources") + private List results = null; + +} diff --git a/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/models/ThreatIndicator.java b/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/models/ThreatIndicator.java new file mode 100644 index 0000000000..23983ef258 --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/models/ThreatIndicator.java @@ -0,0 +1,59 @@ +package org.opensearch.dataprepper.plugins.source.crowdstrike.models; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Getter; +import lombok.Setter; + +/** + * Represents a threat intelligence indicator from CrowdStrike's API. + * This class encapsulates information about potential security threats, + * including indicators of compromise (IoCs) and associated metadata. + * + *

A threat indicator may include various attributes such as: + *

    + *
  • Type of the indicator (IP, Domain, Hash, etc.)
  • + *
  • Value of the indicator
  • + *
  • Confidence score
  • + *
  • Associated timestamps
  • + *
+ *

+ */ +@Setter +@Getter +@JsonIgnoreProperties(ignoreUnknown = true) +public class ThreatIndicator { + /** + * The ID of the IOC. + */ + @JsonProperty("id") + private String id = null; + + /** + * The type of the IOC. + */ + @JsonProperty("type") + private String type = null; + + /** + * The value of the IOC. + */ + @JsonProperty("indicator") + private String indicator = null; + + /** + * The epoch timestamp of the creation date of IOC. + */ + @JsonProperty("published_date") + private long publishedDate = 0L; + + @JsonProperty("malicious_confidence") + private String maliciousConfidence = null; + + /** + * The epoch timestamp of last updated date of the IOC. + */ + @JsonProperty("last_updated") + private long lastUpdated = 0L; + +} diff --git a/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/rest/CrowdStrikeAuthClient.java b/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/rest/CrowdStrikeAuthClient.java index f48efe5b13..3e1f56fa25 100644 --- a/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/rest/CrowdStrikeAuthClient.java +++ b/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/rest/CrowdStrikeAuthClient.java @@ -3,6 +3,7 @@ import lombok.Getter; import org.opensearch.dataprepper.plugins.source.crowdstrike.CrowdStrikeSourceConfig; import org.opensearch.dataprepper.plugins.source.crowdstrike.configuration.AuthenticationConfig; +import org.opensearch.dataprepper.plugins.source.source_crawler.exception.UnauthorizedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.HttpEntity; @@ -16,6 +17,9 @@ import java.util.Map; import javax.inject.Named; import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY; +import static org.opensearch.dataprepper.plugins.source.crowdstrike.utils.Constants.MAX_RETRIES; +import static org.opensearch.dataprepper.plugins.source.crowdstrike.utils.Constants.RETRY_ATTEMPT_SLEEP_TIME; +import static org.opensearch.dataprepper.plugins.source.crowdstrike.utils.Constants.SLEEP_TIME_MULTIPLIER; /** @@ -37,7 +41,7 @@ public class CrowdStrikeAuthClient { private static final String OAUTH_TOKEN_URL = "https://api.crowdstrike.com/oauth2/token"; private static final String ACCESS_TOKEN = "access_token"; private static final String EXPIRE_IN = "expires_in"; - + private final Object tokenRenewLock = new Object(); public CrowdStrikeAuthClient(final CrowdStrikeSourceConfig sourceConfig) { @@ -48,48 +52,74 @@ public CrowdStrikeAuthClient(final CrowdStrikeSourceConfig sourceConfig) { /** - * Initializes the credentials by obtaining an authentication token. + * Initializes the credentials by getting an authentication token. */ public void initCredentials() { log.info("Getting CrowdStrike Authentication Token"); getAuthToken(); } + /** * Retrieves a new authentication token from the CrowdStrike API. * The token is stored in the {@code bearerToken} field, and its expiration time is updated. * - * @throws RuntimeException if the token cannot be retrieved. + * @throws UnauthorizedException Runtime exception if the token cannot be retrieved. */ - private void getAuthToken() { + protected void getAuthToken() { log.info(NOISY, "You are trying to access token"); HttpHeaders headers = new HttpHeaders(); headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED); headers.setBasicAuth(this.clientId, this.clientSecret); HttpEntity request = new HttpEntity<>(headers); - try { - ResponseEntity response = restTemplate.postForEntity(OAUTH_TOKEN_URL, request, Map.class); - Map tokenData = response.getBody(); - this.bearerToken = (String) tokenData.get(ACCESS_TOKEN); - this.expireTime = Instant.now().plusSeconds((Integer) tokenData.get(EXPIRE_IN)); - log.info("Access token acquired successfully"); - } catch (HttpClientErrorException ex) { - this.expireTime = Instant.ofEpochMilli(0); - HttpStatus statusCode = ex.getStatusCode(); - log.error("Failed to acquire access token. Status code: {}, Error Message: {}", - statusCode, ex.getMessage()); - throw new RuntimeException("Error while requesting token:" + ex.getMessage(), ex); + int retryCount = 0; + while(retryCount < MAX_RETRIES) { + try { + ResponseEntity response = restTemplate.postForEntity(OAUTH_TOKEN_URL, request, Map.class); + Map tokenData = response.getBody(); + this.bearerToken = (String) tokenData.get(ACCESS_TOKEN); + this.expireTime = Instant.now().plusSeconds((Integer) tokenData.get(EXPIRE_IN)); + log.info("Access token acquired successfully"); + return; + } catch (HttpClientErrorException ex) { + this.expireTime = Instant.ofEpochMilli(0); + HttpStatus statusCode = ex.getStatusCode(); + String statusMessage = ex.getMessage(); + log.error("Failed to acquire access token. Status code: {}, Error Message: {}", + statusCode, statusMessage); + try { + Thread.sleep((long) RETRY_ATTEMPT_SLEEP_TIME.get(retryCount) * SLEEP_TIME_MULTIPLIER); + } catch (InterruptedException e) { + throw new RuntimeException("Sleep in the retry attempt got interrupted", e); + } + } + retryCount++; } + String errorMessage = String.format("Failed to acquire access token even after %s retry attempts", MAX_RETRIES); + log.error(errorMessage); + throw new UnauthorizedException(errorMessage); } - public boolean isTokenExpired() { - return this.bearerToken == null || Instant.now().isAfter(this.expireTime); + protected boolean isTokenValid() { + Instant currentTime = Instant.now(); + return this.bearerToken != null && currentTime.isBefore(this.expireTime); } /** * Refreshes the bearer token by retrieving a new one from CrowdStrike. */ public void refreshToken() { - + if (isTokenValid()) { + //There is still time to renew, or someone else must have already renewed it + return; + } + synchronized (tokenRenewLock) { + if (isTokenValid()) { + //Someone else must have already renewed it + return; + } + log.info("Renewing authentication token for CrowdStrike Connector."); + getAuthToken(); + } } -} +} \ No newline at end of file diff --git a/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/rest/CrowdStrikeRestClient.java b/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/rest/CrowdStrikeRestClient.java new file mode 100644 index 0000000000..81ec6cfd10 --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/rest/CrowdStrikeRestClient.java @@ -0,0 +1,99 @@ +package org.opensearch.dataprepper.plugins.source.crowdstrike.rest; + +import io.micrometer.core.instrument.Counter; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.plugins.source.source_crawler.exception.UnauthorizedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpMethod; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpEntity; +import org.springframework.web.client.HttpClientErrorException; +import org.springframework.web.client.RestTemplate; + +import javax.inject.Named; +import java.net.URI; +import java.util.Collections; + +import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY; +import static org.opensearch.dataprepper.plugins.source.crowdstrike.utils.Constants.MAX_RETRIES; +import static org.opensearch.dataprepper.plugins.source.crowdstrike.utils.Constants.RETRY_ATTEMPT_SLEEP_TIME; +import static org.opensearch.dataprepper.plugins.source.crowdstrike.utils.Constants.SLEEP_TIME_MULTIPLIER; + +/** + * Client for interacting with the CrowdStrike Threat Intel REST API. + */ +@Named +public class CrowdStrikeRestClient { + + static final String AUTH_FAILURES_COUNTER = "authFailures"; + private static final Logger log = LoggerFactory.getLogger(CrowdStrikeRestClient.class); + private final Counter authFailures; + private final RestTemplate restTemplate; + private final CrowdStrikeAuthClient authClient; + + public CrowdStrikeRestClient(PluginMetrics pluginMetrics, CrowdStrikeAuthClient authClient) { + this.authFailures = pluginMetrics.counter(AUTH_FAILURES_COUNTER); + this.restTemplate = new RestTemplate(); + this.authClient = authClient; + } + + /** + * Executes a GET request to the specified CrowdStrike API URI with bearer token authentication + * and retries in case of transient failures (e.g., 401 Unauthorized, 429 Too Many Requests). + * + *

Retry strategy: + *

    + *
  • Retries up to {@code maxRetries} times
  • + *
  • Uses exponential backoff only for 429 (Too Many Requests)
  • + *
  • Refreshes token on 401 (Unauthorized)
  • + *
  • Fails fast on 403 (Forbidden)
  • + *
+ * + * @param uri The target URI of the CrowdStrike API endpoint + * @param responseType The expected response body type to map the result into + * @param The type of the response body + * @return ResponseEntity containing the API response body and headers + * @throws UnauthorizedException if the API returns 403 (Forbidden) + * @throws RuntimeException if all retries are exhausted or unexpected errors occur + */ + public ResponseEntity invokeGetApi(URI uri, Class responseType) { + int retryCount = 0; + // Create headers + HttpHeaders headers = new HttpHeaders(); + headers.setBearerAuth(authClient.getBearerToken()); + headers.setContentType(MediaType.APPLICATION_JSON); + headers.setAccept(Collections.singletonList(MediaType.APPLICATION_JSON)); + HttpEntity requestEntity = new HttpEntity<>(headers); + while (retryCount < MAX_RETRIES) { + try { + return restTemplate.exchange(uri, HttpMethod.GET, requestEntity, responseType); + } catch (HttpClientErrorException ex) { + HttpStatus statusCode = ex.getStatusCode(); + String statusMessage = ex.getMessage(); + log.error("An exception has occurred while getting response from search API {}", ex.getMessage()); + if (statusCode == HttpStatus.FORBIDDEN) { + throw new UnauthorizedException(statusMessage); + } else if (statusCode == HttpStatus.UNAUTHORIZED) { + authFailures.increment(); + log.warn(NOISY, "Token expired. We will try to renew the tokens now", ex); + authClient.refreshToken(); + } else if (statusCode == HttpStatus.TOO_MANY_REQUESTS) { + log.error(NOISY, "Hitting API rate limit. Backing off with sleep timer.", ex); + } + try { + Thread.sleep((long) RETRY_ATTEMPT_SLEEP_TIME.get(retryCount) * SLEEP_TIME_MULTIPLIER); + } catch (InterruptedException e) { + throw new RuntimeException("Sleep in the retry attempt got interrupted", e); + } + } + retryCount++; + } + String errorMessage = String.format("Exceeded max retry attempts. Failed to execute the Rest API call %s", uri); + log.error(errorMessage); + throw new RuntimeException(errorMessage); + } +} diff --git a/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/utils/Constants.java b/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/utils/Constants.java index a6d823829e..c8959b8253 100644 --- a/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/utils/Constants.java +++ b/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/utils/Constants.java @@ -1,10 +1,18 @@ package org.opensearch.dataprepper.plugins.source.crowdstrike.utils; +import java.util.List; + /** * The type Constants. */ public class Constants { - public static final String PLUGIN_NAME = "crowdstrike"; - -} \ No newline at end of file + public static final int MAX_RETRIES = 6; + public static final List RETRY_ATTEMPT_SLEEP_TIME = List.of(1, 2, 5, 10, 20, 40); + public static final int SLEEP_TIME_MULTIPLIER = 1000; + public static final String FILTER_KEY = "filter"; + public static final String LIMIT_KEY = "limit"; + public static final String LAST_UPDATED = "last_updated"; + public static final String MARKER_KEY = "_marker"; + public static final int BATCH_SIZE = 10000; +} diff --git a/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/utils/CrowdStrikeNextLinkValidator.java b/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/utils/CrowdStrikeNextLinkValidator.java new file mode 100644 index 0000000000..42df12c5fd --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/utils/CrowdStrikeNextLinkValidator.java @@ -0,0 +1,122 @@ +package org.opensearch.dataprepper.plugins.source.crowdstrike.utils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLDecoder; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.opensearch.dataprepper.plugins.source.crowdstrike.utils.Constants.FILTER_KEY; +import static org.opensearch.dataprepper.plugins.source.crowdstrike.utils.Constants.LAST_UPDATED; +import static org.opensearch.dataprepper.plugins.source.crowdstrike.utils.Constants.LIMIT_KEY; +import static org.opensearch.dataprepper.plugins.source.crowdstrike.utils.Constants.MARKER_KEY; + +/** + * Utility class for validating and sanitizing CrowdStrike API URLs. + */ +public class CrowdStrikeNextLinkValidator { + + private static final Logger log = LoggerFactory.getLogger(CrowdStrikeNextLinkValidator.class); + + private static final Set ALLOWED_FILTER_KEYS = Set.of(LAST_UPDATED, MARKER_KEY); + private static final Pattern FILTER_COMPONENT_PATTERN = + Pattern.compile("^([a-z_]+):(>=|<=|>|<)['%a-zA-Z0-9:\\-.]+$"); + private static final Pattern LIMIT_PATTERN = Pattern.compile("^\\d{1,5}$"); + + /** + * Validates and sanitizes a CrowdStrike API URL, preserving only allowed query parameters. + * For the 'filter' parameter, only 'last_updated' and '_marker' components are retained. + * + * @param urlString the full API URL to validate + * @return sanitized version of the URL + * @throws MalformedURLException if the URL is malformed + */ + public static String validateAndSanitizeURL(String urlString) throws MalformedURLException { + URL url = new URL(urlString); + String query = url.getQuery(); + + if (query == null || query.isEmpty()) { + return urlString; + } + + Map validatedParams = new HashMap<>(); + String[] pairs = query.split("&"); + + for (String pair : pairs) { + int idx = pair.indexOf('='); + if (idx <= 0) continue; + + String key = URLDecoder.decode(pair.substring(0, idx), StandardCharsets.UTF_8); + + if (FILTER_KEY.equals(key)) { + List validSubFilters = new ArrayList<>(); + + String encodedValue = pair.substring(idx + 1); // This is still URL-encoded + String[] encodedSubFilters = encodedValue.split("%2B"); // Use raw '%2B' which is '+' in URL encoding + + for (String encodedSub : encodedSubFilters) { + String sub; + try { + sub = URLDecoder.decode(encodedSub, StandardCharsets.UTF_8); + } catch (IllegalArgumentException e) { + log.warn("Invalid URL encoding in subfilter: {}", encodedSub); + continue; + } + + Matcher matcher = FILTER_COMPONENT_PATTERN.matcher(sub); + if (matcher.matches()) { + String filterKey = sub.substring(0, sub.indexOf(":")); + if (ALLOWED_FILTER_KEYS.contains(filterKey)) { + validSubFilters.add(sub); + } else { + log.warn("Disallowed filter key: {}", filterKey); + } + } else { + log.warn("Malformed filter segment: {}", sub); + } + } + + if (!validSubFilters.isEmpty()) { + validatedParams.put(FILTER_KEY, String.join("+", validSubFilters)); + } + + } else { + String value = URLDecoder.decode(pair.substring(idx + 1), StandardCharsets.UTF_8); + if (LIMIT_KEY.equals(key) && LIMIT_PATTERN.matcher(value).matches()) { + validatedParams.put(LIMIT_KEY, value); + } else { + log.warn("Skipping disallowed or malformed param: {}={}", key, value); + } + } + } + StringBuilder sanitizedURL = new StringBuilder() + .append(url.getProtocol()).append("://").append(url.getHost()); + + if (url.getPort() != -1) { + sanitizedURL.append(":").append(url.getPort()); + } + + sanitizedURL.append(url.getPath()); + + if (!validatedParams.isEmpty()) { + String queryString = validatedParams.entrySet().stream() + .map(entry -> URLEncoder.encode(entry.getKey(), StandardCharsets.UTF_8) + "=" + + URLEncoder.encode(entry.getValue(), StandardCharsets.UTF_8)) + .collect(Collectors.joining("&")); + sanitizedURL.append("?").append(queryString); + } + + return sanitizedURL.toString(); + } + +} diff --git a/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/test/java/org/opensearch/dataprepper/plugins/source/crowdstrike/CrowdStrikeNextLinkValidatorTest.java b/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/test/java/org/opensearch/dataprepper/plugins/source/crowdstrike/CrowdStrikeNextLinkValidatorTest.java new file mode 100644 index 0000000000..4212eb1f30 --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/test/java/org/opensearch/dataprepper/plugins/source/crowdstrike/CrowdStrikeNextLinkValidatorTest.java @@ -0,0 +1,66 @@ +package org.opensearch.dataprepper.plugins.source.crowdstrike; + +import org.junit.jupiter.api.Test; +import java.net.MalformedURLException; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.opensearch.dataprepper.plugins.source.crowdstrike.utils.CrowdStrikeNextLinkValidator.validateAndSanitizeURL; + +public class CrowdStrikeNextLinkValidatorTest { + + @Test + void testValidEncodedCrowdStrikeUrlPreserved() throws MalformedURLException { + String url = "https://api.crowdstrike.com//intel/combined/indicators/v1" + + "?filter=last_updated%3A%3E%3D1745519529%2Blast_updated%3A%3C1745523129%2B_marker%3A%3C%2717455225567d09efadf14547a1aee2bc25cabc525e%27" + + "&limit=10000"; + + String sanitized = validateAndSanitizeURL(url); + + assertTrue(sanitized.contains("filter="), "Filter parameter should be retained"); + assertTrue(sanitized.contains("last_updated%3A%3E%3D1745519529"), "Start last_updated should be present"); + assertTrue(sanitized.contains("last_updated%3A%3C1745523129"), "End last_updated should be present"); + assertTrue(sanitized.contains("_marker%3A%3C%2717455225567d09efadf14547a1aee2bc25cabc525e%27"), "_marker should be present"); + assertTrue(sanitized.contains("limit=10000"), "Limit should be present"); + } + + @Test + void testLimitOutOfRangeExcluded() throws MalformedURLException { + String url = "https://api.crowdstrike.com/intel/combined/indicators/v1?limit=1000000"; + String sanitized = validateAndSanitizeURL(url); + assertFalse(sanitized.contains("limit=")); + } + + @Test + void testXSSInjectionBlocked() throws MalformedURLException { + String url = "https://api.crowdstrike.com/intel/combined/indicators/v1?filter=last_updated:>=1745000000+_marker:<%27%27"; + String sanitized = validateAndSanitizeURL(url); + assertFalse(sanitized.contains("