From bdcb41e1bd37809ec68c7a8034eddcfa81975c55 Mon Sep 17 00:00:00 2001
From: nsgupta1
Date: Thu, 24 Apr 2025 16:45:55 -0700
Subject: [PATCH 1/5] CrowdStrike API call and retry mechanism
Signed-off-by: nsgupta1
---
.../atlassian/AtlassianSourceConfig.java | 5 +
.../crowdstrike/CrowdStrikeService.java | 89 ++++++++++++
.../crowdstrike/CrowdStrikeSourceConfig.java | 2 +-
.../models/CrowdStrikeApiResponse.java | 29 ++++
.../models/CrowdStrikeIndicatorResult.java | 18 +++
.../crowdstrike/models/ThreatIndicator.java | 59 ++++++++
.../rest/CrowdStrikeAuthClient.java | 70 +++++++---
.../rest/CrowdStrikeRestClient.java | 99 ++++++++++++++
.../source/crowdstrike/utils/Constants.java | 14 +-
.../utils/CrowdStrikeNextLinkValidator.java | 122 +++++++++++++++++
.../CrowdStrikeNextLinkValidatorTest.java | 66 +++++++++
.../crowdstrike/CrowdStrikeServiceTest.java | 112 +++++++++++++++
.../CrowdStrikeSourceConfigTest.java | 4 +-
.../models/CrowdStrikeApiResponseTest.java | 80 +++++++++++
.../CrowdStrikeIndicatorResultTest.java | 60 ++++++++
.../models/ThreatIndicatorTest.java | 69 ++++++++++
.../rest/CrowdStrikeAuthClientTest.java | 100 +++++++++++++-
.../rest/CrowdStrikeRestClientTest.java | 129 ++++++++++++++++++
.../base/CrawlerSourceConfig.java | 7 +
.../base/CrawlerSourcePlugin.java | 3 +-
20 files changed, 1106 insertions(+), 31 deletions(-)
create mode 100644 data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/CrowdStrikeService.java
create mode 100644 data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/models/CrowdStrikeApiResponse.java
create mode 100644 data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/models/CrowdStrikeIndicatorResult.java
create mode 100644 data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/models/ThreatIndicator.java
create mode 100644 data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/rest/CrowdStrikeRestClient.java
create mode 100644 data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/utils/CrowdStrikeNextLinkValidator.java
create mode 100644 data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/test/java/org/opensearch/dataprepper/plugins/source/crowdstrike/CrowdStrikeNextLinkValidatorTest.java
create mode 100644 data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/test/java/org/opensearch/dataprepper/plugins/source/crowdstrike/CrowdStrikeServiceTest.java
create mode 100644 data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/test/java/org/opensearch/dataprepper/plugins/source/crowdstrike/models/CrowdStrikeApiResponseTest.java
create mode 100644 data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/test/java/org/opensearch/dataprepper/plugins/source/crowdstrike/models/CrowdStrikeIndicatorResultTest.java
create mode 100644 data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/test/java/org/opensearch/dataprepper/plugins/source/crowdstrike/models/ThreatIndicatorTest.java
create mode 100644 data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/test/java/org/opensearch/dataprepper/plugins/source/crowdstrike/rest/CrowdStrikeRestClientTest.java
diff --git a/data-prepper-plugins/saas-source-plugins/atlassian-commons/src/main/java/org/opensearch/dataprepper/plugins/source/atlassian/AtlassianSourceConfig.java b/data-prepper-plugins/saas-source-plugins/atlassian-commons/src/main/java/org/opensearch/dataprepper/plugins/source/atlassian/AtlassianSourceConfig.java
index a6a4fe7e38..e6a94eab6f 100644
--- a/data-prepper-plugins/saas-source-plugins/atlassian-commons/src/main/java/org/opensearch/dataprepper/plugins/source/atlassian/AtlassianSourceConfig.java
+++ b/data-prepper-plugins/saas-source-plugins/atlassian-commons/src/main/java/org/opensearch/dataprepper/plugins/source/atlassian/AtlassianSourceConfig.java
@@ -50,4 +50,9 @@ public String getAuthType() {
}
public abstract String getOauth2UrlContext();
+
+ @Override
+ public int getNumberOfWorkers() {
+ return DEFAULT_NUMBER_OF_WORKERS;
+ }
}
diff --git a/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/CrowdStrikeService.java b/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/CrowdStrikeService.java
new file mode 100644
index 0000000000..55dc576d1f
--- /dev/null
+++ b/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/CrowdStrikeService.java
@@ -0,0 +1,89 @@
+package org.opensearch.dataprepper.plugins.source.crowdstrike;
+import io.micrometer.core.instrument.Timer;
+import org.opensearch.dataprepper.metrics.PluginMetrics;
+import org.opensearch.dataprepper.plugins.source.crowdstrike.models.CrowdStrikeApiResponse;
+import org.opensearch.dataprepper.plugins.source.crowdstrike.models.CrowdStrikeIndicatorResult;
+import org.opensearch.dataprepper.plugins.source.crowdstrike.rest.CrowdStrikeRestClient;
+import org.opensearch.dataprepper.plugins.source.crowdstrike.utils.CrowdStrikeNextLinkValidator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.util.UriComponentsBuilder;
+import javax.inject.Named;
+import java.net.URI;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import static org.opensearch.dataprepper.plugins.source.crowdstrike.utils.Constants.BATCH_SIZE;
+import static org.opensearch.dataprepper.plugins.source.crowdstrike.utils.Constants.FILTER_KEY;
+import static org.opensearch.dataprepper.plugins.source.crowdstrike.utils.Constants.LAST_UPDATED;
+import static org.opensearch.dataprepper.plugins.source.crowdstrike.utils.Constants.LIMIT_KEY;
+
+/**
+ * This service manages the interaction with CrowdStrike's API endpoints, handles pagination
+ * and provides methods for fetching indicators from CrowdStrike.
+ */
+@Named
+public class CrowdStrikeService {
+
+ private final CrowdStrikeRestClient crowdStrikeRestClient;
+ private static final Logger log = LoggerFactory.getLogger(CrowdStrikeService.class);
+ private static final String BASE_URL = "https://api.crowdstrike.com/";
+ private static final String COMBINED_URL = "https://api.crowdstrike.com/intel/combined/indicators/v1";
+ private final Timer searchCallLatencyTimer;
+
+
+ public CrowdStrikeService(CrowdStrikeRestClient crowdStrikeRestClient, PluginMetrics pluginMetrics) {
+ this.crowdStrikeRestClient = crowdStrikeRestClient;
+ this.searchCallLatencyTimer = pluginMetrics.timer("searchCallLatencyTimer");
+ }
+
+ /**
+ * Retrieves all indicator data from CrowdStrike in a paginated fashion.
+ * @param startTime The start timestamp (inclusive) for filtering indicators.
+ * @param endTime The end timestamp (exclusive) for filtering indicators.
+ * @param paginationLink An optional pagination URL suffix (used when fetching next pages).
+ * @return A {@link CrowdStrikeApiResponse} containing response body and headers.
+ */
+ public CrowdStrikeApiResponse getAllContent(Long startTime, Long endTime, String paginationLink) {
+ URI uri = buildCrowdStrikeUri(startTime, endTime, paginationLink);
+
+ return searchCallLatencyTimer.record(() -> {
+ try {
+ log.debug("Calling CrowdStrike API with URI: {}", uri);
+ ResponseEntity responseEntity = crowdStrikeRestClient.invokeGetApi(uri, CrowdStrikeIndicatorResult.class);
+
+ CrowdStrikeApiResponse response = new CrowdStrikeApiResponse();
+ response.setBody(responseEntity.getBody());
+ response.setHeaders(responseEntity.getHeaders());
+ return response;
+ } catch (Exception e) {
+ log.error("Error fetching CrowdStrike content from URI: {}", uri, e);
+ throw new RuntimeException("CrowdStrike API call failed", e);
+ }
+ });
+ }
+
+ protected URI buildCrowdStrikeUri(Long startTime, Long endTime, String paginationLink) {
+ try {
+ if (paginationLink != null) {
+ String urlString = BASE_URL + paginationLink;
+ urlString = CrowdStrikeNextLinkValidator.validateAndSanitizeURL(urlString);
+ return new URI(urlString);
+ } else {
+ // Manually construct and encode the query string
+ String filter1 = URLEncoder.encode(LAST_UPDATED + ":>=" + startTime, StandardCharsets.UTF_8);
+ String filter2 = URLEncoder.encode(LAST_UPDATED + ":<" + endTime, StandardCharsets.UTF_8);
+ String encodedFilter = filter1 + "%2B" + filter2; // Use literal '+' // ensure literal '+'
+
+ UriComponentsBuilder builder = UriComponentsBuilder
+ .fromHttpUrl(COMBINED_URL)
+ .queryParam(FILTER_KEY, encodedFilter)
+ .queryParam(LIMIT_KEY, BATCH_SIZE);
+
+ return builder.build(true).toUri();
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to construct CrowdStrike request URI", e);
+ }
+ }
+}
diff --git a/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/CrowdStrikeSourceConfig.java b/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/CrowdStrikeSourceConfig.java
index d8aee0fdd6..03989e1472 100644
--- a/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/CrowdStrikeSourceConfig.java
+++ b/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/CrowdStrikeSourceConfig.java
@@ -27,6 +27,6 @@ public class CrowdStrikeSourceConfig implements CrawlerSourceConfig {
@Min(1)
@Max(50)
@Valid
- private int numWorkers = DEFAULT_NUMBER_OF_WORKERS;
+ private int numberOfWorkers = DEFAULT_NUMBER_OF_WORKERS;
}
diff --git a/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/models/CrowdStrikeApiResponse.java b/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/models/CrowdStrikeApiResponse.java
new file mode 100644
index 0000000000..48caf3a6cf
--- /dev/null
+++ b/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/models/CrowdStrikeApiResponse.java
@@ -0,0 +1,29 @@
+package org.opensearch.dataprepper.plugins.source.crowdstrike.models;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.util.CollectionUtils;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Represents the response returned from a CrowdStrike API call.
+ */
+@Getter @Setter
+ public class CrowdStrikeApiResponse {
+
+ private CrowdStrikeIndicatorResult body;
+ private Map> headers;
+
+ // Convenience method to get a specific header
+ public List getHeader(String headerName) {
+ return headers.getOrDefault(headerName, Collections.emptyList());
+ }
+
+ // Convenience method to get the first value of a specific header
+ public String getFirstHeaderValue(String headerName) {
+ List values = getHeader(headerName);
+ return CollectionUtils.isEmpty(values) ? null : values.get(0);
+ }
+}
\ No newline at end of file
diff --git a/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/models/CrowdStrikeIndicatorResult.java b/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/models/CrowdStrikeIndicatorResult.java
new file mode 100644
index 0000000000..60c62f79f1
--- /dev/null
+++ b/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/models/CrowdStrikeIndicatorResult.java
@@ -0,0 +1,18 @@
+package org.opensearch.dataprepper.plugins.source.crowdstrike.models;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.Getter;
+import java.util.List;
+
+/**
+ * The result of Falcon query search.
+ */
+@Getter
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CrowdStrikeIndicatorResult {
+
+ @JsonProperty("resources")
+ private List results = null;
+
+}
diff --git a/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/models/ThreatIndicator.java b/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/models/ThreatIndicator.java
new file mode 100644
index 0000000000..23983ef258
--- /dev/null
+++ b/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/models/ThreatIndicator.java
@@ -0,0 +1,59 @@
+package org.opensearch.dataprepper.plugins.source.crowdstrike.models;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * Represents a threat intelligence indicator from CrowdStrike's API.
+ * This class encapsulates information about potential security threats,
+ * including indicators of compromise (IoCs) and associated metadata.
+ *
+ * A threat indicator may include various attributes such as:
+ *
+ * - Type of the indicator (IP, Domain, Hash, etc.)
+ * - Value of the indicator
+ * - Confidence score
+ * - Associated timestamps
+ *
+ *
+ */
+@Setter
+@Getter
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ThreatIndicator {
+ /**
+ * The ID of the IOC.
+ */
+ @JsonProperty("id")
+ private String id = null;
+
+ /**
+ * The type of the IOC.
+ */
+ @JsonProperty("type")
+ private String type = null;
+
+ /**
+ * The value of the IOC.
+ */
+ @JsonProperty("indicator")
+ private String indicator = null;
+
+ /**
+ * The epoch timestamp of the creation date of IOC.
+ */
+ @JsonProperty("published_date")
+ private long publishedDate = 0L;
+
+ @JsonProperty("malicious_confidence")
+ private String maliciousConfidence = null;
+
+ /**
+ * The epoch timestamp of last updated date of the IOC.
+ */
+ @JsonProperty("last_updated")
+ private long lastUpdated = 0L;
+
+}
diff --git a/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/rest/CrowdStrikeAuthClient.java b/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/rest/CrowdStrikeAuthClient.java
index f48efe5b13..3e1f56fa25 100644
--- a/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/rest/CrowdStrikeAuthClient.java
+++ b/data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/rest/CrowdStrikeAuthClient.java
@@ -3,6 +3,7 @@
import lombok.Getter;
import org.opensearch.dataprepper.plugins.source.crowdstrike.CrowdStrikeSourceConfig;
import org.opensearch.dataprepper.plugins.source.crowdstrike.configuration.AuthenticationConfig;
+import org.opensearch.dataprepper.plugins.source.source_crawler.exception.UnauthorizedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpEntity;
@@ -16,6 +17,9 @@
import java.util.Map;
import javax.inject.Named;
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;
+import static org.opensearch.dataprepper.plugins.source.crowdstrike.utils.Constants.MAX_RETRIES;
+import static org.opensearch.dataprepper.plugins.source.crowdstrike.utils.Constants.RETRY_ATTEMPT_SLEEP_TIME;
+import static org.opensearch.dataprepper.plugins.source.crowdstrike.utils.Constants.SLEEP_TIME_MULTIPLIER;
/**
@@ -37,7 +41,7 @@ public class CrowdStrikeAuthClient {
private static final String OAUTH_TOKEN_URL = "https://api.crowdstrike.com/oauth2/token";
private static final String ACCESS_TOKEN = "access_token";
private static final String EXPIRE_IN = "expires_in";
-
+ private final Object tokenRenewLock = new Object();
public CrowdStrikeAuthClient(final CrowdStrikeSourceConfig sourceConfig) {
@@ -48,48 +52,74 @@ public CrowdStrikeAuthClient(final CrowdStrikeSourceConfig sourceConfig) {
/**
- * Initializes the credentials by obtaining an authentication token.
+ * Initializes the credentials by getting an authentication token.
*/
public void initCredentials() {
log.info("Getting CrowdStrike Authentication Token");
getAuthToken();
}
+
/**
* Retrieves a new authentication token from the CrowdStrike API.
* The token is stored in the {@code bearerToken} field, and its expiration time is updated.
*
- * @throws RuntimeException if the token cannot be retrieved.
+ * @throws UnauthorizedException Runtime exception if the token cannot be retrieved.
*/
- private void getAuthToken() {
+ protected void getAuthToken() {
log.info(NOISY, "You are trying to access token");
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
headers.setBasicAuth(this.clientId, this.clientSecret);
HttpEntity request = new HttpEntity<>(headers);
- try {
- ResponseEntity