Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerClient;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerSourceConfig;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.PluginExecutorServiceProvider;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.SaasWorkerProgressState;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.AtlassianWorkerProgressState;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -49,7 +49,7 @@
* This class represents a Confluence client.
*/
@Named
public class ConfluenceClient implements CrawlerClient {
public class ConfluenceClient implements CrawlerClient<AtlassianWorkerProgressState> {

private static final Logger log = LoggerFactory.getLogger(ConfluenceClient.class);
private ObjectMapper objectMapper = new ObjectMapper();
Expand Down Expand Up @@ -81,7 +81,7 @@ public void injectObjectMapper(ObjectMapper objectMapper) {
}

@Override
public void executePartition(SaasWorkerProgressState state,
public void executePartition(AtlassianWorkerProgressState state,
Buffer<Record<Event>> buffer,
AcknowledgementSet acknowledgementSet) {
log.trace("Executing the partition: {} with {} ticket(s)",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
import org.opensearch.dataprepper.plugins.source.confluence.utils.ConfluenceConfigHelper;
import org.opensearch.dataprepper.plugins.source.source_crawler.CrawlerApplicationContextMarker;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerSourcePlugin;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.LeaderProgressState;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.PaginationCrawler;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.PluginExecutorServiceProvider;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.AtlassianLeaderProgressState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import static org.opensearch.dataprepper.plugins.source.confluence.utils.Constants.PLUGIN_NAME;


Expand Down Expand Up @@ -70,6 +72,11 @@ public void start(Buffer<Record<Event>> buffer) {
super.start(buffer);
}

@Override
protected LeaderProgressState createLeaderProgressState() {
return new AtlassianLeaderProgressState(Instant.EPOCH);
}

@Override
public void stop() {
log.info("Stopping Confluence Source Plugin");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.PluginExecutorServiceProvider;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.SaasWorkerProgressState;

import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.AtlassianWorkerProgressState;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -49,7 +48,7 @@ public class ConfluenceClientTest {
@Mock
private Buffer<Record<Event>> buffer;
@Mock
private SaasWorkerProgressState saasWorkerProgressState;
private AtlassianWorkerProgressState saasWorkerProgressState;
@Mock
private AcknowledgementSet acknowledgementSet;
@Mock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ dependencies {
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'javax.inject:javax.inject:1'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'


implementation 'org.projectlombok:lombok:1.18.30'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,94 @@
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventType;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.crowdstrike.models.CrowdStrikeIndicatorResult;
import org.opensearch.dataprepper.plugins.source.crowdstrike.models.CrowdStrikeThreatIntelApiResponse;
import org.opensearch.dataprepper.plugins.source.crowdstrike.models.ThreatIndicator;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerClient;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.SaasWorkerProgressState;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.CrowdStrikeWorkerProgressState;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Named;
import java.time.Instant;
import java.util.Iterator;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

import static org.opensearch.dataprepper.plugins.source.crowdstrike.utils.Constants.NEXT_PAGE;

/**
* This class represents a CrowdStrike client.
*/
@Named
public class CrowdStrikeClient implements CrawlerClient {
public class CrowdStrikeClient implements CrawlerClient<CrowdStrikeWorkerProgressState> {
CrowdStrikeService crowdStrikeService;
private static final Logger log = LoggerFactory.getLogger(CrowdStrikeClient.class);
private final CrowdStrikeSourceConfig configuration;
private final int bufferWriteTimeoutInSeconds = 10;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be configurable?



public CrowdStrikeClient(final CrowdStrikeService crowdStrikeService,
final CrowdStrikeSourceConfig sourceConfig) {
log.info("Creating CrowdStrike Crawler");
this.crowdStrikeService = crowdStrikeService;
this.configuration = sourceConfig;
log.info("Created CrowdStrike Crawler");
}
@Override
public Iterator<ItemInfo> listItems(Instant lastPollTime) {
return null;
}

/**
* Executes the data ingestion process for a given time-based partition of CrowdStrike threat intelligence data.
* This method fetches threat indicators from the CrowdStrike API for the specified time range defined in
* CrowdStrikeWorkerProgressState. It handles pagination using the `next-page` link header and continues
* fetching data until all pages are exhausted. Each batch of threat indicators is converted into
* JacksonEvent records and written to the provided Buffer
*/
@Override
public void executePartition(SaasWorkerProgressState state, Buffer<Record<Event>> buffer, AcknowledgementSet acknowledgementSet) {
public void executePartition(CrowdStrikeWorkerProgressState state, Buffer<Record<Event>> buffer, AcknowledgementSet acknowledgementSet) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make the parameters final

final Instant startTime = state.getStartTime();
final Instant endTime = state.getEndTime();
Optional<String> paginationLink = Optional.empty();

do {
CrowdStrikeThreatIntelApiResponse response = crowdStrikeService.getThreatIndicators(startTime, endTime, paginationLink);
CrowdStrikeIndicatorResult result = response.getBody();
List<ThreatIndicator> indicators = result.getResults();
if (indicators == null || indicators.isEmpty()) {
log.info("No threat indicators found for the time window {} to {}", startTime, endTime);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it's empty should we continue to next page at all?

} else {
writeIndicatorsToBuffer(indicators, buffer);
}
paginationLink = response.getFirstHeaderValue(NEXT_PAGE);
} while (paginationLink.isPresent());

if (configuration.isAcknowledgments()) {
acknowledgementSet.complete();
}
}

private void writeIndicatorsToBuffer(List<ThreatIndicator> indicators, Buffer<Record<Event>> buffer) {
List<Record<Event>> records = indicators.parallelStream()
.map(indicator -> (Event) JacksonEvent.builder()
.withEventType(EventType.DOCUMENT.toString())
.withData(indicator)
.build())
.map(Record::new)
.collect(Collectors.toList());

try {
buffer.writeAll(records, (int) Duration.ofSeconds(bufferWriteTimeoutInSeconds).toMillis());
} catch (Exception e) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its better to catch the expected exception rather than generalising it. This might mask the actual actual code error as service failure.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WriteAll method doesn't throw a specific exception, that's why I had to catch Exception. Here is the method signature. void writeAll(Collection<T> records, int timeoutInMillis) throws Exception;

log.error("Failed to write {} indicators to buffer", records.size(), e);
throw new RuntimeException("Buffer write failed for CrowdStrike indicators", e);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Convert this to a custom exception rather than throwing runtime exception

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can address it in follow-up PR, I see all the other clients also have same implementation I can check with @san81 and create a specific exception for all Saas crawlers.

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.plugins.source.crowdstrike.rest.CrowdStrikeAuthClient;
import org.opensearch.dataprepper.plugins.source.source_crawler.CrawlerApplicationContextMarker;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.Crawler;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerSourcePlugin;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.LeaderProgressState;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.PluginExecutorServiceProvider;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.TimeSliceCrawler;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.CrowdStrikeLeaderProgressState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import static org.opensearch.dataprepper.plugins.source.crowdstrike.utils.Constants.PLUGIN_NAME;


Expand All @@ -45,7 +47,7 @@ public CrowdStrikeSource(final CrowdStrikeSourceConfig sourceConfig,
final PluginFactory pluginFactory,
final AcknowledgementSetManager acknowledgementSetManager,
final CrowdStrikeAuthClient authClient,
Crawler crawler, PluginExecutorServiceProvider executorServiceProvider) {
TimeSliceCrawler crawler, PluginExecutorServiceProvider executorServiceProvider) {
super(PLUGIN_NAME, pluginMetrics, sourceConfig, pluginFactory, acknowledgementSetManager, crawler, executorServiceProvider);
log.info("Creating CrowdStrike Source Plugin");
this.sourceConfig = sourceConfig;
Expand All @@ -56,9 +58,15 @@ public CrowdStrikeSource(final CrowdStrikeSourceConfig sourceConfig,
public void start(Buffer<Record<Event>> buffer) {
log.info("Starting CrowdStrike Source Plugin...");
authClient.initCredentials();
// super.start(buffer);
super.start(buffer);
}

@Override
protected LeaderProgressState createLeaderProgressState() {
return new CrowdStrikeLeaderProgressState(Instant.now(), sourceConfig.getLookBackDays());
}


@Override
public void stop() {
log.info("Stopping CrowdStrike Source Plugin...");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
public class CrowdStrikeSourceConfig implements CrawlerSourceConfig {

private static final int DEFAULT_NUMBER_OF_WORKERS = 5;
private static final int DEFAULT_NUMBER_OF_LOOK_BACK_DAYS = 0;

@JsonProperty("authentication")
@Valid
Expand All @@ -29,4 +30,10 @@ public class CrowdStrikeSourceConfig implements CrawlerSourceConfig {
@Valid
private int numberOfWorkers = DEFAULT_NUMBER_OF_WORKERS;

@JsonProperty("look_back_days")
@Min(0)
@Max(90)
@Valid
private int lookBackDays = DEFAULT_NUMBER_OF_LOOK_BACK_DAYS;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can this be final?


}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
* Represents the response returned from a CrowdStrike API call.
Expand All @@ -26,8 +27,8 @@ public List<String> getHeader(String headerName) {
}

// Convenience method to get the first value of a specific header
public String getFirstHeaderValue(String headerName) {
public Optional<String> getFirstHeaderValue(String headerName) {
List<String> values = getHeader(headerName);
return CollectionUtils.isEmpty(values) ? null : values.get(0);
return CollectionUtils.isEmpty(values) ? Optional.empty(): Optional.of(values.get(0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
import java.time.Instant;

/**
* Represents a threat intelligence indicator from CrowdStrike's API.
Expand Down Expand Up @@ -34,7 +35,7 @@ public class ThreatIndicator {
* The epoch timestamp of the creation date of IOC.
*/
@JsonProperty("published_date")
private long publishedDate = 0L;
private Instant publishedDate;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can all these variables be final?


@JsonProperty("malicious_confidence")
private String maliciousConfidence = null;
Expand All @@ -43,6 +44,6 @@ public class ThreatIndicator {
* The epoch timestamp of last updated date of the IOC.
*/
@JsonProperty("last_updated")
private long lastUpdated = 0L;
private Instant lastUpdated;

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ public class Constants {
public static final String LAST_UPDATED = "last_updated";
public static final String MARKER_KEY = "_marker";
public static final int BATCH_SIZE = 10000;
public static final String NEXT_PAGE = "Next-Page";
}
Loading