Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,9 @@ public String getAuthType() {
}

public abstract String getOauth2UrlContext();

@Override
public int getNumberOfWorkers() {
return DEFAULT_NUMBER_OF_WORKERS;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package org.opensearch.dataprepper.plugins.source.crowdstrike;
import io.micrometer.core.instrument.Timer;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.plugins.source.crowdstrike.models.CrowdStrikeThreatIntelApiResponse;
import org.opensearch.dataprepper.plugins.source.crowdstrike.models.CrowdStrikeIndicatorResult;
import org.opensearch.dataprepper.plugins.source.crowdstrike.rest.CrowdStrikeRestClient;
import org.opensearch.dataprepper.plugins.source.crowdstrike.utils.CrowdStrikeNextLinkValidator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.ResponseEntity;
import org.springframework.web.util.UriComponentsBuilder;
import javax.inject.Named;
import java.net.URI;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Optional;

import static org.opensearch.dataprepper.plugins.source.crowdstrike.utils.Constants.BATCH_SIZE;
import static org.opensearch.dataprepper.plugins.source.crowdstrike.utils.Constants.FILTER_KEY;
import static org.opensearch.dataprepper.plugins.source.crowdstrike.utils.Constants.LAST_UPDATED;
import static org.opensearch.dataprepper.plugins.source.crowdstrike.utils.Constants.LIMIT_KEY;

/**
* This service manages the interaction with CrowdStrike's API endpoints, handles pagination
* and provides methods for fetching indicators from CrowdStrike.
*/
@Named
public class CrowdStrikeService {

private final CrowdStrikeRestClient crowdStrikeRestClient;
private static final Logger log = LoggerFactory.getLogger(CrowdStrikeService.class);
private static final String BASE_URL = "https://api.crowdstrike.com/";
private static final String COMBINED_URL = "https://api.crowdstrike.com/intel/combined/indicators/v1";
private final Timer searchCallLatencyTimer;
private static final String GREATER_THAN_EQUALS = ">=";
private static final String LESS_THAN = "<";
private static final String ENCODED_PLUS_SIGN = "%2B";


public CrowdStrikeService(CrowdStrikeRestClient crowdStrikeRestClient, PluginMetrics pluginMetrics) {
this.crowdStrikeRestClient = crowdStrikeRestClient;
this.searchCallLatencyTimer = pluginMetrics.timer("searchCallLatencyTimer");
}

/**
* Retrieves all indicator data from CrowdStrike in a paginated fashion.
* @param startTime The start timestamp (inclusive) for filtering indicators.
* @param endTime The end timestamp (exclusive) for filtering indicators.
* @param paginationLink An optional pagination URL suffix (used when fetching next pages).
* @return A {@link CrowdStrikeThreatIntelApiResponse} containing response body and headers.
*/
public CrowdStrikeThreatIntelApiResponse getThreatIndicators(Instant startTime, Instant endTime, Optional<String> paginationLink) {
if (startTime == null || endTime == null) {
throw new IllegalArgumentException("startTime and endTime must not be null");
}
URI uri = buildCrowdStrikeUri(startTime, endTime, paginationLink);
return searchCallLatencyTimer.record(() -> {

log.debug("Calling CrowdStrike API with URI: {}", uri);
ResponseEntity<CrowdStrikeIndicatorResult> responseEntity = crowdStrikeRestClient.invokeGetApi(uri, CrowdStrikeIndicatorResult.class);

return new CrowdStrikeThreatIntelApiResponse(responseEntity.getBody(), responseEntity.getHeaders());
});
}

protected URI buildCrowdStrikeUri(Instant startTime, Instant endTime, Optional<String> paginationLink) {
try {
if (paginationLink.isPresent()) {
String urlString = BASE_URL + paginationLink.get();
urlString = CrowdStrikeNextLinkValidator.validateAndSanitizeURL(urlString);
return new URI(urlString);
} else {
// Manually construct and encode the query string
String startTimeFilter = URLEncoder.encode(LAST_UPDATED + ":" + GREATER_THAN_EQUALS + startTime.getEpochSecond(), StandardCharsets.UTF_8);
String endTimeFilter = URLEncoder.encode(LAST_UPDATED + ":" + LESS_THAN + endTime.getEpochSecond(), StandardCharsets.UTF_8);
String encodedFilter = startTimeFilter + ENCODED_PLUS_SIGN + endTimeFilter; // ensure literal '+' is encoded

UriComponentsBuilder builder = UriComponentsBuilder
.fromHttpUrl(COMBINED_URL)
.queryParam(FILTER_KEY, encodedFilter)
.queryParam(LIMIT_KEY, BATCH_SIZE);

return builder.build(true).toUri();
}
} catch (Exception e) {
throw new RuntimeException("Failed to construct CrowdStrike request URI", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@ public class CrowdStrikeSourceConfig implements CrawlerSourceConfig {
@Min(1)
@Max(50)
@Valid
private int numWorkers = DEFAULT_NUMBER_OF_WORKERS;
private int numberOfWorkers = DEFAULT_NUMBER_OF_WORKERS;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.opensearch.dataprepper.plugins.source.crowdstrike.models;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
import java.util.List;

/**
* Represents the response from a CrowdStrike Falcon Query Language (FQL) search for threat indicators.
*/
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class CrowdStrikeIndicatorResult {

@JsonProperty("resources")
private List<ThreatIndicator> results = null;


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.opensearch.dataprepper.plugins.source.crowdstrike.models;

import lombok.Data;
import org.springframework.util.CollectionUtils;
import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
* Represents the response returned from a CrowdStrike API call.
*/
@Data
public class CrowdStrikeThreatIntelApiResponse {

private CrowdStrikeIndicatorResult body;
private Map<String, List<String>> headers;

public CrowdStrikeThreatIntelApiResponse(CrowdStrikeIndicatorResult body, Map<String, List<String>> headers) {
this.body = body;
this.headers = headers;
}

// Convenience method to get a specific header
public List<String> getHeader(String headerName) {
return headers.getOrDefault(headerName, Collections.emptyList());
}

// Convenience method to get the first value of a specific header
public String getFirstHeaderValue(String headerName) {
List<String> values = getHeader(headerName);
return CollectionUtils.isEmpty(values) ? null : values.get(0);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package org.opensearch.dataprepper.plugins.source.crowdstrike.models;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;

/**
* Represents a threat intelligence indicator from CrowdStrike's API.
* This class encapsulates information about potential security threats,
* including indicators of compromise (IoCs) and associated metadata.
*/
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class ThreatIndicator {
/**
* The ID of the IOC.
*/
@JsonProperty("id")
private String id = null;

/**
* The type of the IOC.
*/
@JsonProperty("type")
private String type = null;

/**
* The value of the IOC.
*/
@JsonProperty("indicator")
private String indicator = null;

/**
* The epoch timestamp of the creation date of IOC.
*/
@JsonProperty("published_date")
private long publishedDate = 0L;

@JsonProperty("malicious_confidence")
private String maliciousConfidence = null;

/**
* The epoch timestamp of last updated date of the IOC.
*/
@JsonProperty("last_updated")
private long lastUpdated = 0L;

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import lombok.Getter;
import org.opensearch.dataprepper.plugins.source.crowdstrike.CrowdStrikeSourceConfig;
import org.opensearch.dataprepper.plugins.source.crowdstrike.configuration.AuthenticationConfig;
import org.opensearch.dataprepper.plugins.source.source_crawler.exception.UnauthorizedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpEntity;
Expand All @@ -16,6 +17,9 @@
import java.util.Map;
import javax.inject.Named;
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;
import static org.opensearch.dataprepper.plugins.source.crowdstrike.utils.Constants.MAX_RETRIES;
import static org.opensearch.dataprepper.plugins.source.crowdstrike.utils.Constants.RETRY_ATTEMPT_SLEEP_TIME;
import static org.opensearch.dataprepper.plugins.source.crowdstrike.utils.Constants.SLEEP_TIME_MULTIPLIER;


/**
Expand All @@ -37,7 +41,7 @@ public class CrowdStrikeAuthClient {
private static final String OAUTH_TOKEN_URL = "https://api.crowdstrike.com/oauth2/token";
private static final String ACCESS_TOKEN = "access_token";
private static final String EXPIRE_IN = "expires_in";

private final Object tokenRenewLock = new Object();


public CrowdStrikeAuthClient(final CrowdStrikeSourceConfig sourceConfig) {
Expand All @@ -48,48 +52,74 @@ public CrowdStrikeAuthClient(final CrowdStrikeSourceConfig sourceConfig) {


/**
* Initializes the credentials by obtaining an authentication token.
* Initializes the credentials by getting an authentication token.
*/
public void initCredentials() {
log.info("Getting CrowdStrike Authentication Token");
getAuthToken();
}


/**
* Retrieves a new authentication token from the CrowdStrike API.
* The token is stored in the {@code bearerToken} field, and its expiration time is updated.
*
* @throws RuntimeException if the token cannot be retrieved.
* @throws UnauthorizedException Runtime exception if the token cannot be retrieved.
*/
private void getAuthToken() {
log.info(NOISY, "You are trying to access token");
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
headers.setBasicAuth(this.clientId, this.clientSecret);
HttpEntity<String> request = new HttpEntity<>(headers);
try {
ResponseEntity<Map> response = restTemplate.postForEntity(OAUTH_TOKEN_URL, request, Map.class);
Map tokenData = response.getBody();
this.bearerToken = (String) tokenData.get(ACCESS_TOKEN);
this.expireTime = Instant.now().plusSeconds((Integer) tokenData.get(EXPIRE_IN));
log.info("Access token acquired successfully");
} catch (HttpClientErrorException ex) {
this.expireTime = Instant.ofEpochMilli(0);
HttpStatus statusCode = ex.getStatusCode();
log.error("Failed to acquire access token. Status code: {}, Error Message: {}",
statusCode, ex.getMessage());
throw new RuntimeException("Error while requesting token:" + ex.getMessage(), ex);
synchronized (tokenRenewLock) {
if (isTokenValid()) {
//Someone else must have already renewed it
return;
}
log.info(NOISY, "You are trying to access token");
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
headers.setBasicAuth(this.clientId, this.clientSecret);
HttpEntity<String> request = new HttpEntity<>(headers);
int retryCount = 0;
while (retryCount < MAX_RETRIES) {
try {
ResponseEntity<Map> response = restTemplate.postForEntity(OAUTH_TOKEN_URL, request, Map.class);
Map tokenData = response.getBody();
this.bearerToken = (String) tokenData.get(ACCESS_TOKEN);
this.expireTime = Instant.now().plusSeconds((Integer) tokenData.get(EXPIRE_IN));
log.info("Access token acquired successfully");
return;
} catch (HttpClientErrorException ex) {
this.expireTime = Instant.ofEpochMilli(0);
HttpStatus statusCode = ex.getStatusCode();
String statusMessage = ex.getMessage();
log.error("Failed to acquire access token. Status code: {}, Error Message: {}",
statusCode, statusMessage);
try {
Thread.sleep((long) RETRY_ATTEMPT_SLEEP_TIME.get(retryCount) * SLEEP_TIME_MULTIPLIER);
} catch (InterruptedException e) {
throw new RuntimeException("Sleep in the retry attempt got interrupted", e);
}
}
retryCount++;
}
String errorMessage = String.format("Failed to acquire access token even after %s retry attempts", MAX_RETRIES);
log.error(errorMessage);
throw new UnauthorizedException(errorMessage);
}
}

public boolean isTokenExpired() {
return this.bearerToken == null || Instant.now().isAfter(this.expireTime);
protected boolean isTokenValid() {
Instant currentTime = Instant.now();
return this.bearerToken != null && currentTime.isBefore(this.expireTime);
}

/**
* Refreshes the bearer token by retrieving a new one from CrowdStrike.
*/
public void refreshToken() {

if (isTokenValid()) {
//There is still time to renew, or someone else must have already renewed it
return;
}
log.info("Renewing authentication token for CrowdStrike Connector.");
getAuthToken();
}
}
}
Loading
Loading