Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
plugins {
id 'java'
}

dependencies {

implementation project(path: ':data-prepper-plugins:saas-source-plugins:source-crawler')
implementation project(path: ':data-prepper-api')
implementation project(path: ':data-prepper-plugins:aws-plugin-api')
implementation project(path: ':data-prepper-plugins:buffer-common')
implementation project(path: ':data-prepper-plugins:common')

implementation libs.commons.io
implementation 'io.micrometer:micrometer-core'
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'javax.inject:javax.inject:1'


implementation 'org.projectlombok:lombok:1.18.30'
annotationProcessor 'org.projectlombok:lombok:1.18.30'

testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.13.4'
testImplementation project(path: ':data-prepper-test-common')
testImplementation 'ch.qos.logback:logback-classic:1.4.12'
testImplementation 'org.slf4j:slf4j-api:2.0.7'
implementation(libs.spring.context) {
exclude group: 'commons-logging', module: 'commons-logging'
}
implementation(libs.spring.web)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.opensearch.dataprepper.plugins.source.crowdstrike;

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerClient;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.SaasWorkerProgressState;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
import javax.inject.Named;
import java.time.Instant;
import java.util.Iterator;

/**
* This class represents a CrowdStrike client.
*/
@Named
public class CrowdStrikeClient implements CrawlerClient {

@Override
public Iterator<ItemInfo> listItems(Instant lastPollTime) {
return null;
}

@Override
public void executePartition(SaasWorkerProgressState state, Buffer<Record<Event>> buffer, AcknowledgementSet acknowledgementSet) {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package org.opensearch.dataprepper.plugins.source.crowdstrike;


import com.google.common.annotations.Beta;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.plugins.source.crowdstrike.rest.CrowdStrikeAuthClient;
import org.opensearch.dataprepper.plugins.source.source_crawler.CrawlerApplicationContextMarker;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.Crawler;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerSourcePlugin;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.PluginExecutorServiceProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.opensearch.dataprepper.plugins.source.crowdstrike.utils.Constants.PLUGIN_NAME;


/**
* CrowdStrike connector entry point.
* 🚧 Work in progress — under active development.
* Not ready for production use.
*/
@Beta
@DataPrepperPlugin(name = PLUGIN_NAME,
pluginType = Source.class,
pluginConfigurationType = CrowdStrikeSourceConfig.class,
packagesToScan = {CrawlerApplicationContextMarker.class, CrowdStrikeSource.class}
)
public class CrowdStrikeSource extends CrawlerSourcePlugin {

private static final Logger log = LoggerFactory.getLogger(CrowdStrikeSource.class);
private final CrowdStrikeSourceConfig sourceConfig;
private final CrowdStrikeAuthClient authClient;

@DataPrepperPluginConstructor
public CrowdStrikeSource(final CrowdStrikeSourceConfig sourceConfig,
final PluginMetrics pluginMetrics,
final PluginFactory pluginFactory,
final AcknowledgementSetManager acknowledgementSetManager,
final CrowdStrikeAuthClient authClient,
Crawler crawler, PluginExecutorServiceProvider executorServiceProvider) {
super(PLUGIN_NAME, pluginMetrics, sourceConfig, pluginFactory, acknowledgementSetManager, crawler, executorServiceProvider);
log.info("Creating CrowdStrike Source Plugin");
this.sourceConfig = sourceConfig;
this.authClient = authClient;
}

@Override
public void start(Buffer<Record<Event>> buffer) {
log.info("Starting CrowdStrike Source Plugin...");
authClient.initCredentials();
// super.start(buffer);

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll uncomment this code in follow up PR when CrowdStrikeService and Client classes are implemented. cc: @san81 @engechas

}

@Override
public void stop() {
log.info("Stopping CrowdStrike Source Plugin...");
super.stop();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.opensearch.dataprepper.plugins.source.crowdstrike;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.Max;
import jakarta.validation.constraints.Min;
import lombok.Getter;
import org.opensearch.dataprepper.plugins.source.crowdstrike.configuration.AuthenticationConfig;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerSourceConfig;

/**
* Configuration class for the CrowdStrike source plugin.
*/
@Getter
public class CrowdStrikeSourceConfig implements CrawlerSourceConfig {

private static final int DEFAULT_NUMBER_OF_WORKERS = 5;

@JsonProperty("authentication")
@Valid
protected AuthenticationConfig authenticationConfig;

@JsonProperty("acknowledgments")
private boolean acknowledgments = false;

@JsonProperty("workers")
@Min(1)
@Max(50)
@Valid
private int numWorkers = DEFAULT_NUMBER_OF_WORKERS;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package org.opensearch.dataprepper.plugins.source.crowdstrike.configuration;

import jakarta.validation.constraints.AssertTrue;
import lombok.Getter;
import com.fasterxml.jackson.annotation.JsonProperty;

/**
* Configuration class for authentication with the CrowdStrike API.
*/
@Getter
public class AuthenticationConfig {

@JsonProperty("client_id")
private String clientId;

@JsonProperty("client_secret")
private String clientSecret;

@AssertTrue(message = "Client Id and Client Secret are both required for Authentication")
public boolean isValidConfig() {
return clientId != null && clientSecret != null;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package org.opensearch.dataprepper.plugins.source.crowdstrike.rest;

import lombok.Getter;
import org.opensearch.dataprepper.plugins.source.crowdstrike.CrowdStrikeSourceConfig;
import org.opensearch.dataprepper.plugins.source.crowdstrike.configuration.AuthenticationConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.client.HttpClientErrorException;
import org.springframework.web.client.RestTemplate;
import java.time.Instant;
import java.util.Map;
import javax.inject.Named;
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;


/**
* Client to manage authentication with the CrowdStrike API.
* Responsible for acquiring and refreshing Bearer tokens to access
* CrowdStrike services.
*/
@Named
public class CrowdStrikeAuthClient {

@Getter
private String bearerToken;
@Getter
private Instant expireTime;
private final String clientId;
private final String clientSecret;
RestTemplate restTemplate = new RestTemplate();
private static final Logger log = LoggerFactory.getLogger(CrowdStrikeAuthClient.class);
private static final String OAUTH_TOKEN_URL = "https://api.crowdstrike.com/oauth2/token";
private static final String ACCESS_TOKEN = "access_token";
private static final String EXPIRE_IN = "expires_in";



public CrowdStrikeAuthClient(final CrowdStrikeSourceConfig sourceConfig) {
AuthenticationConfig authConfig = sourceConfig.getAuthenticationConfig();
this.clientId = authConfig.getClientId();
this.clientSecret = authConfig.getClientSecret();
}


/**
* Initializes the credentials by obtaining an authentication token.
*/
public void initCredentials() {
log.info("Getting CrowdStrike Authentication Token");
getAuthToken();
}

/**
* Retrieves a new authentication token from the CrowdStrike API.
* The token is stored in the {@code bearerToken} field, and its expiration time is updated.
*
* @throws RuntimeException if the token cannot be retrieved.
*/
private void getAuthToken() {
log.info(NOISY, "You are trying to access token");

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When token expired, you probably don't want every worker to renew it. Think about handling that when you wire this method into the renewal flow

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack !! I will include refresh mechanism in next PR.

HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
headers.setBasicAuth(this.clientId, this.clientSecret);
HttpEntity<String> request = new HttpEntity<>(headers);
try {
ResponseEntity<Map> response = restTemplate.postForEntity(OAUTH_TOKEN_URL, request, Map.class);
Map tokenData = response.getBody();
this.bearerToken = (String) tokenData.get(ACCESS_TOKEN);
this.expireTime = Instant.now().plusSeconds((Integer) tokenData.get(EXPIRE_IN));
log.info("Access token acquired successfully");
} catch (HttpClientErrorException ex) {
this.expireTime = Instant.ofEpochMilli(0);

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think about the retry strategy here in your next pr

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack !! I will include retry mechanism in next PR.

HttpStatus statusCode = ex.getStatusCode();
log.error("Failed to acquire access token. Status code: {}, Error Message: {}",
statusCode, ex.getMessage());
throw new RuntimeException("Error while requesting token:" + ex.getMessage(), ex);
}
}

public boolean isTokenExpired() {
return this.bearerToken == null || Instant.now().isAfter(this.expireTime);
}

/**
* Refreshes the bearer token by retrieving a new one from CrowdStrike.
*/
public void refreshToken() {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.opensearch.dataprepper.plugins.source.crowdstrike.utils;

/**
* The type Constants.
*/
public class Constants {

public static final String PLUGIN_NAME = "crowdstrike";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.opensearch.dataprepper.plugins.source.crowdstrike;

import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
public class CrowdStrikeClientTest {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package org.opensearch.dataprepper.plugins.source.crowdstrike;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;


public class CrowdStrikeSourceConfigTest {

private static final ObjectMapper objectMapper = new ObjectMapper();
private CrowdStrikeSourceConfig config;

private static final int DEFAULT_WORKERS = 5;

@BeforeEach
void setup() {
config = new CrowdStrikeSourceConfig();
}

@Test
void testDefaultValues() {
assertEquals(DEFAULT_WORKERS, config.getNumWorkers());
assertFalse(config.isAcknowledgments());
}

@Test
void testDeserializationWithValues() throws Exception {
Map<String, Object> yamlConfig = new HashMap<>();
Map<String, Object> authMap = new HashMap<>();
authMap.put("client_id", "dummy-client-id");
authMap.put("client_secret", "dummy-client-secret");
yamlConfig.put("authentication", authMap);
yamlConfig.put("acknowledgments", true);
yamlConfig.put("workers", 10);

String json = objectMapper.writeValueAsString(yamlConfig);
CrowdStrikeSourceConfig loadedConfig = objectMapper.readValue(json, CrowdStrikeSourceConfig.class);

assertNotNull(loadedConfig.getAuthenticationConfig());
assertTrue(loadedConfig.isAcknowledgments());
assertEquals(10, loadedConfig.getNumWorkers());
}

}
Loading