-
Notifications
You must be signed in to change notification settings - Fork 332
CrowdStrike client and coordinator implementation #5678
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,27 +3,94 @@ | |
| import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; | ||
| import org.opensearch.dataprepper.model.buffer.Buffer; | ||
| import org.opensearch.dataprepper.model.event.Event; | ||
| import org.opensearch.dataprepper.model.event.EventType; | ||
| import org.opensearch.dataprepper.model.event.JacksonEvent; | ||
| import org.opensearch.dataprepper.model.record.Record; | ||
| import org.opensearch.dataprepper.plugins.source.crowdstrike.models.CrowdStrikeIndicatorResult; | ||
| import org.opensearch.dataprepper.plugins.source.crowdstrike.models.CrowdStrikeThreatIntelApiResponse; | ||
| import org.opensearch.dataprepper.plugins.source.crowdstrike.models.ThreatIndicator; | ||
| import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerClient; | ||
| import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.SaasWorkerProgressState; | ||
| import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.CrowdStrikeWorkerProgressState; | ||
| import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
| import javax.inject.Named; | ||
| import java.time.Instant; | ||
| import java.util.Iterator; | ||
| import java.time.Duration; | ||
| import java.util.List; | ||
| import java.util.Optional; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| import static org.opensearch.dataprepper.plugins.source.crowdstrike.utils.Constants.NEXT_PAGE; | ||
|
|
||
| /** | ||
| * This class represents a CrowdStrike client. | ||
| */ | ||
| @Named | ||
| public class CrowdStrikeClient implements CrawlerClient { | ||
| public class CrowdStrikeClient implements CrawlerClient<CrowdStrikeWorkerProgressState> { | ||
| CrowdStrikeService crowdStrikeService; | ||
| private static final Logger log = LoggerFactory.getLogger(CrowdStrikeClient.class); | ||
| private final CrowdStrikeSourceConfig configuration; | ||
| private final int bufferWriteTimeoutInSeconds = 10; | ||
|
|
||
|
|
||
| public CrowdStrikeClient(final CrowdStrikeService crowdStrikeService, | ||
| final CrowdStrikeSourceConfig sourceConfig) { | ||
| log.info("Creating CrowdStrike Crawler"); | ||
| this.crowdStrikeService = crowdStrikeService; | ||
| this.configuration = sourceConfig; | ||
| log.info("Created CrowdStrike Crawler"); | ||
| } | ||
| @Override | ||
| public Iterator<ItemInfo> listItems(Instant lastPollTime) { | ||
| return null; | ||
| } | ||
|
|
||
| /** | ||
| * Executes the data ingestion process for a given time-based partition of CrowdStrike threat intelligence data. | ||
| * This method fetches threat indicators from the CrowdStrike API for the specified time range defined in | ||
| * CrowdStrikeWorkerProgressState. It handles pagination using the `next-page` link header and continues | ||
| * fetching data until all pages are exhausted. Each batch of threat indicators is converted into | ||
| * JacksonEvent records and written to the provided Buffer | ||
| */ | ||
| @Override | ||
| public void executePartition(SaasWorkerProgressState state, Buffer<Record<Event>> buffer, AcknowledgementSet acknowledgementSet) { | ||
| public void executePartition(CrowdStrikeWorkerProgressState state, Buffer<Record<Event>> buffer, AcknowledgementSet acknowledgementSet) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make the parameters final |
||
| final Instant startTime = state.getStartTime(); | ||
| final Instant endTime = state.getEndTime(); | ||
| Optional<String> paginationLink = Optional.empty(); | ||
|
|
||
| do { | ||
| CrowdStrikeThreatIntelApiResponse response = crowdStrikeService.getThreatIndicators(startTime, endTime, paginationLink); | ||
| CrowdStrikeIndicatorResult result = response.getBody(); | ||
| List<ThreatIndicator> indicators = result.getResults(); | ||
| if (indicators == null || indicators.isEmpty()) { | ||
| log.info("No threat indicators found for the time window {} to {}", startTime, endTime); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if it's empty should we continue to next page at all? |
||
| } else { | ||
| writeIndicatorsToBuffer(indicators, buffer); | ||
| } | ||
| paginationLink = response.getFirstHeaderValue(NEXT_PAGE); | ||
| } while (paginationLink.isPresent()); | ||
|
|
||
| if (configuration.isAcknowledgments()) { | ||
| acknowledgementSet.complete(); | ||
| } | ||
| } | ||
|
|
||
| private void writeIndicatorsToBuffer(List<ThreatIndicator> indicators, Buffer<Record<Event>> buffer) { | ||
| List<Record<Event>> records = indicators.parallelStream() | ||
| .map(indicator -> (Event) JacksonEvent.builder() | ||
| .withEventType(EventType.DOCUMENT.toString()) | ||
| .withData(indicator) | ||
| .build()) | ||
| .map(Record::new) | ||
| .collect(Collectors.toList()); | ||
|
|
||
| try { | ||
| buffer.writeAll(records, (int) Duration.ofSeconds(bufferWriteTimeoutInSeconds).toMillis()); | ||
| } catch (Exception e) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Its better to catch the expected exception rather than generalising it. This might mask the actual actual code error as service failure.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. WriteAll method doesn't throw a specific exception, that's why I had to catch Exception. Here is the method signature. |
||
| log.error("Failed to write {} indicators to buffer", records.size(), e); | ||
| throw new RuntimeException("Buffer write failed for CrowdStrike indicators", e); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Convert this to a custom exception rather than throwing runtime exception
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can address it in follow-up PR, I see all the other clients also have same implementation I can check with @san81 and create a specific exception for all Saas crawlers. |
||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,6 +15,7 @@ | |
| public class CrowdStrikeSourceConfig implements CrawlerSourceConfig { | ||
|
|
||
| private static final int DEFAULT_NUMBER_OF_WORKERS = 5; | ||
| private static final int DEFAULT_NUMBER_OF_LOOK_BACK_DAYS = 0; | ||
|
|
||
| @JsonProperty("authentication") | ||
| @Valid | ||
|
|
@@ -29,4 +30,10 @@ public class CrowdStrikeSourceConfig implements CrawlerSourceConfig { | |
| @Valid | ||
| private int numberOfWorkers = DEFAULT_NUMBER_OF_WORKERS; | ||
|
|
||
| @JsonProperty("look_back_days") | ||
| @Min(0) | ||
| @Max(90) | ||
| @Valid | ||
| private int lookBackDays = DEFAULT_NUMBER_OF_LOOK_BACK_DAYS; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: can this be final? |
||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,6 +3,7 @@ | |
| import com.fasterxml.jackson.annotation.JsonIgnoreProperties; | ||
| import com.fasterxml.jackson.annotation.JsonProperty; | ||
| import lombok.Data; | ||
| import java.time.Instant; | ||
|
|
||
| /** | ||
| * Represents a threat intelligence indicator from CrowdStrike's API. | ||
|
|
@@ -34,7 +35,7 @@ public class ThreatIndicator { | |
| * The epoch timestamp of the creation date of IOC. | ||
| */ | ||
| @JsonProperty("published_date") | ||
| private long publishedDate = 0L; | ||
| private Instant publishedDate; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can all these variables be final? |
||
|
|
||
| @JsonProperty("malicious_confidence") | ||
| private String maliciousConfidence = null; | ||
|
|
@@ -43,6 +44,6 @@ public class ThreatIndicator { | |
| * The epoch timestamp of last updated date of the IOC. | ||
| */ | ||
| @JsonProperty("last_updated") | ||
| private long lastUpdated = 0L; | ||
| private Instant lastUpdated; | ||
|
|
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be configurable?