Skip to content

CrowdStrike client and coordinator implementation#5678

Merged
san81 merged 3 commits into
opensearch-project:mainfrom
nsgupta1:main
May 6, 2025
Merged

CrowdStrike client and coordinator implementation#5678
san81 merged 3 commits into
opensearch-project:mainfrom
nsgupta1:main

Conversation

@nsgupta1
Copy link
Copy Markdown
Contributor

@nsgupta1 nsgupta1 commented May 5, 2025

Description

  • CrowdStrike Client implementation
  • TimeSlice Crawler implementation
  • CrowdStrike specific leader and worker implementation

Issues Resolved

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • [Y] New functionality includes testing.
  • [N] New functionality has a documentation issue. Please link to it in this PR.
  • [ Y] New functionality has javadoc added
  • [Y] Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: nsgupta1 <nsgupta1@users.noreply.github.com>
createPartitionForCrawling(leaderPartition, coordinator, latestModifiedTime);
long crawlTimeMillis = System.currentTimeMillis() - startTime;
log.debug("Crawling completed in {} ms", crawlTimeMillis);
crawlingTimer.record(crawlTimeMillis, TimeUnit.MILLISECONDS);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leader Crawling in this case doesn't involve any network calls, tracking this time may not be much helpful. May be tracking the number of partitions helpful here? Something to think about.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we are tracking the partitionCounter, I will remove Crawling time.

for(int i = remainingDays; i > 0; i--) {
Instant startDate = todayUtc.minus(Duration.ofDays(i));
createWorkerPartition(startDate, startDate.plus(Duration.ofDays(1)), coordinator);
updateLeaderProgressState(leaderPartition, i-1, leaderProgressState.getLastPollTime(), coordinator);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be updatingLeaderProgressState just once at the end of the loop is also fine I guess as there are not network calls and possibility of failures inside the loop are super minimal.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will change this.

Copy link
Copy Markdown
Collaborator

@san81 san81 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just added 2 suggestions. Nice work 👍

@nsgupta1
Copy link
Copy Markdown
Contributor Author

nsgupta1 commented May 5, 2025

Just added 2 suggestions. Nice work 👍

Thank you for quick review

Signed-off-by: nsgupta1 <nsgupta1@users.noreply.github.com>
*/
@Override
public void executePartition(SaasWorkerProgressState state, Buffer<Record<Event>> buffer, AcknowledgementSet acknowledgementSet) {
public void executePartition(CrowdStrikeWorkerProgressState state, Buffer<Record<Event>> buffer, AcknowledgementSet acknowledgementSet) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make the parameters final


try {
buffer.writeAll(records, (int) Duration.ofSeconds(bufferWriteTimeoutInSeconds).toMillis());
} catch (Exception e) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its better to catch the expected exception rather than generalising it. This might mask the actual actual code error as service failure.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WriteAll method doesn't throw a specific exception, that's why I had to catch Exception. Here is the method signature. void writeAll(Collection<T> records, int timeoutInMillis) throws Exception;

buffer.writeAll(records, (int) Duration.ofSeconds(bufferWriteTimeoutInSeconds).toMillis());
} catch (Exception e) {
log.error("Failed to write {} indicators to buffer", records.size(), e);
throw new RuntimeException("Buffer write failed for CrowdStrike indicators", e);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Convert this to a custom exception rather than throwing runtime exception

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can address it in follow-up PR, I see all the other clients also have same implementation I can check with @san81 and create a specific exception for all Saas crawlers.

.registerModule(new JavaTimeModule());
private final LeaderProgressState state;

public LeaderPartition() {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this not used anywhere?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it is not used anymore


@Named
public class PaginationCrawler implements Crawler {
public class PaginationCrawler implements Crawler<AtlassianWorkerProgressState> {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this crawler specific to Atlassian/Jira? Why use a Atlassian specific state for base crawler/s?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes PaginationCrawler is specific to Atlassian and TimeSlice is specific to CrowdStrikeWorker.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we rename PaginationCrawler to AtlassianPaginationCrawler?

Signed-off-by: nsgupta1 <nsgupta1@users.noreply.github.com>
CrowdStrikeIndicatorResult result = response.getBody();
List<ThreatIndicator> indicators = result.getResults();
if (indicators == null || indicators.isEmpty()) {
log.info("No threat indicators found for the time window {} to {}", startTime, endTime);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it's empty should we continue to next page at all?

@Min(0)
@Max(90)
@Valid
private int lookBackDays = DEFAULT_NUMBER_OF_LOOK_BACK_DAYS;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can this be final?

*/
@JsonProperty("published_date")
private long publishedDate = 0L;
private Instant publishedDate;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can all these variables be final?

@JsonSubTypes.Type(value = AtlassianLeaderProgressState.class),
@JsonSubTypes.Type(value = CrowdStrikeLeaderProgressState.class)
})
public interface LeaderProgressState {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it not store information about failures too?

plz add java docs on what is "pollTime"

}

/**
* Main crawling logic for CrowdStrike using time-based partitioning.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plz add code comments/java docs for each method

double startCount = partitionsCreatedCounter.count();
createPartitionForCrawling(leaderPartition, coordinator, latestModifiedTime);
double partitionsInThisCrawl = partitionsCreatedCounter.count() - startCount;
log.info("Total partitions created in this crawl: {}", partitionsInThisCrawl);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plz have log indicate. it is crowdstrikeClient's partition count

private Instant endTime;

@JsonProperty("marker")
private String marker;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add java doc on what is a marker

CrowdStrikeService crowdStrikeService;
private static final Logger log = LoggerFactory.getLogger(CrowdStrikeClient.class);
private final CrowdStrikeSourceConfig configuration;
private final int bufferWriteTimeoutInSeconds = 10;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be configurable?

@JsonProperty("remaining_days")
private int remainingDays;

public CrowdStrikeLeaderProgressState(@JsonProperty("last_poll_time") final Instant lastPollTime, @JsonProperty("remaining_days") int remainingDays) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: do we need the @JsonProperty annotation on both the constructor parameters and the member variables? Jackson should be able to deserialize with just a no-args constructor + setters on the member variables

@nsgupta1
Copy link
Copy Markdown
Contributor Author

nsgupta1 commented May 6, 2025

Thanks @engechas and @eirsep for reviews, since none of the comments are blockers I will address these in follow up PR.

@san81 san81 merged commit 75f7bc7 into opensearch-project:main May 6, 2025
45 of 47 checks passed
alparish pushed a commit to alparish/data-prepper that referenced this pull request May 22, 2025
…#5678)

* CrowdStrike client and coordinator implementation

Signed-off-by: nsgupta1 <nsgupta1@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants