Skip to content

Commit cf2f975

Browse files
committed
Addressing review comments
Signed-off-by: ngsupta1 <guptaneha.e@gmail.com>
1 parent bdcb41e commit cf2f975

9 files changed

Lines changed: 135 additions & 149 deletions

File tree

data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/CrowdStrikeService.java

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313
import java.net.URI;
1414
import java.net.URLEncoder;
1515
import java.nio.charset.StandardCharsets;
16+
import java.time.Instant;
17+
import java.util.Optional;
18+
1619
import static org.opensearch.dataprepper.plugins.source.crowdstrike.utils.Constants.BATCH_SIZE;
1720
import static org.opensearch.dataprepper.plugins.source.crowdstrike.utils.Constants.FILTER_KEY;
1821
import static org.opensearch.dataprepper.plugins.source.crowdstrike.utils.Constants.LAST_UPDATED;
@@ -44,35 +47,30 @@ public CrowdStrikeService(CrowdStrikeRestClient crowdStrikeRestClient, PluginMet
4447
* @param paginationLink An optional pagination URL suffix (used when fetching next pages).
4548
* @return A {@link CrowdStrikeApiResponse} containing response body and headers.
4649
*/
47-
public CrowdStrikeApiResponse getAllContent(Long startTime, Long endTime, String paginationLink) {
50+
public CrowdStrikeApiResponse getThreatIndicators(Instant startTime, Instant endTime, Optional<String> paginationLink) {
51+
if (startTime == null || endTime == null) {
52+
throw new IllegalArgumentException("startTime and endTime must not be null");
53+
}
4854
URI uri = buildCrowdStrikeUri(startTime, endTime, paginationLink);
49-
5055
return searchCallLatencyTimer.record(() -> {
51-
try {
52-
log.debug("Calling CrowdStrike API with URI: {}", uri);
53-
ResponseEntity<CrowdStrikeIndicatorResult> responseEntity = crowdStrikeRestClient.invokeGetApi(uri, CrowdStrikeIndicatorResult.class);
5456

55-
CrowdStrikeApiResponse response = new CrowdStrikeApiResponse();
56-
response.setBody(responseEntity.getBody());
57-
response.setHeaders(responseEntity.getHeaders());
58-
return response;
59-
} catch (Exception e) {
60-
log.error("Error fetching CrowdStrike content from URI: {}", uri, e);
61-
throw new RuntimeException("CrowdStrike API call failed", e);
62-
}
57+
log.debug("Calling CrowdStrike API with URI: {}", uri);
58+
ResponseEntity<CrowdStrikeIndicatorResult> responseEntity = crowdStrikeRestClient.invokeGetApi(uri, CrowdStrikeIndicatorResult.class);
59+
60+
return new CrowdStrikeApiResponse(responseEntity.getBody(), responseEntity.getHeaders());
6361
});
6462
}
6563

66-
protected URI buildCrowdStrikeUri(Long startTime, Long endTime, String paginationLink) {
64+
protected URI buildCrowdStrikeUri(Instant startTime, Instant endTime, Optional<String> paginationLink) {
6765
try {
68-
if (paginationLink != null) {
69-
String urlString = BASE_URL + paginationLink;
66+
if (paginationLink.isPresent()) {
67+
String urlString = BASE_URL + paginationLink.get();
7068
urlString = CrowdStrikeNextLinkValidator.validateAndSanitizeURL(urlString);
7169
return new URI(urlString);
7270
} else {
7371
// Manually construct and encode the query string
74-
String filter1 = URLEncoder.encode(LAST_UPDATED + ":>=" + startTime, StandardCharsets.UTF_8);
75-
String filter2 = URLEncoder.encode(LAST_UPDATED + ":<" + endTime, StandardCharsets.UTF_8);
72+
String filter1 = URLEncoder.encode(LAST_UPDATED + ":>=" + startTime.getEpochSecond(), StandardCharsets.UTF_8);
73+
String filter2 = URLEncoder.encode(LAST_UPDATED + ":<" + endTime.getEpochSecond(), StandardCharsets.UTF_8);
7674
String encodedFilter = filter1 + "%2B" + filter2; // Use literal '+' // ensure literal '+'
7775

7876
UriComponentsBuilder builder = UriComponentsBuilder
@@ -86,4 +84,4 @@ protected URI buildCrowdStrikeUri(Long startTime, Long endTime, String paginatio
8684
throw new RuntimeException("Failed to construct CrowdStrike request URI", e);
8785
}
8886
}
89-
}
87+
}

data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/models/CrowdStrikeApiResponse.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@ public class CrowdStrikeApiResponse {
1616
private CrowdStrikeIndicatorResult body;
1717
private Map<String, List<String>> headers;
1818

19+
public CrowdStrikeApiResponse(CrowdStrikeIndicatorResult body, Map<String, List<String>> headers) {
20+
this.body = body;
21+
this.headers = headers;
22+
}
23+
1924
// Convenience method to get a specific header
2025
public List<String> getHeader(String headerName) {
2126
return headers.getOrDefault(headerName, Collections.emptyList());

data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/models/CrowdStrikeIndicatorResult.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,20 @@
33
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
44
import com.fasterxml.jackson.annotation.JsonProperty;
55
import lombok.Getter;
6+
import lombok.Setter;
7+
68
import java.util.List;
79

810
/**
911
* The result of Falcon query search.
1012
*/
1113
@Getter
14+
@Setter
1215
@JsonIgnoreProperties(ignoreUnknown = true)
1316
public class CrowdStrikeIndicatorResult {
1417

1518
@JsonProperty("resources")
1619
private List<ThreatIndicator> results = null;
1720

21+
1822
}

data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/models/ThreatIndicator.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,6 @@
99
* Represents a threat intelligence indicator from CrowdStrike's API.
1010
* This class encapsulates information about potential security threats,
1111
* including indicators of compromise (IoCs) and associated metadata.
12-
*
13-
* <p>A threat indicator may include various attributes such as:
14-
* <ul>
15-
* <li>Type of the indicator (IP, Domain, Hash, etc.)</li>
16-
* <li>Value of the indicator</li>
17-
* <li>Confidence score</li>
18-
* <li>Associated timestamps</li>
19-
* </ul>
20-
* </p>
2112
*/
2213
@Setter
2314
@Getter

data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/main/java/org/opensearch/dataprepper/plugins/source/crowdstrike/rest/CrowdStrikeAuthClient.java

Lines changed: 34 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -67,37 +67,43 @@ public void initCredentials() {
6767
* @throws UnauthorizedException Runtime exception if the token cannot be retrieved.
6868
*/
6969
protected void getAuthToken() {
70-
log.info(NOISY, "You are trying to access token");
71-
HttpHeaders headers = new HttpHeaders();
72-
headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
73-
headers.setBasicAuth(this.clientId, this.clientSecret);
74-
HttpEntity<String> request = new HttpEntity<>(headers);
75-
int retryCount = 0;
76-
while(retryCount < MAX_RETRIES) {
77-
try {
78-
ResponseEntity<Map> response = restTemplate.postForEntity(OAUTH_TOKEN_URL, request, Map.class);
79-
Map tokenData = response.getBody();
80-
this.bearerToken = (String) tokenData.get(ACCESS_TOKEN);
81-
this.expireTime = Instant.now().plusSeconds((Integer) tokenData.get(EXPIRE_IN));
82-
log.info("Access token acquired successfully");
70+
synchronized (tokenRenewLock) {
71+
if (isTokenValid()) {
72+
//Someone else must have already renewed it
8373
return;
84-
} catch (HttpClientErrorException ex) {
85-
this.expireTime = Instant.ofEpochMilli(0);
86-
HttpStatus statusCode = ex.getStatusCode();
87-
String statusMessage = ex.getMessage();
88-
log.error("Failed to acquire access token. Status code: {}, Error Message: {}",
89-
statusCode, statusMessage);
74+
}
75+
log.info(NOISY, "You are trying to access token");
76+
HttpHeaders headers = new HttpHeaders();
77+
headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
78+
headers.setBasicAuth(this.clientId, this.clientSecret);
79+
HttpEntity<String> request = new HttpEntity<>(headers);
80+
int retryCount = 0;
81+
while (retryCount < MAX_RETRIES) {
9082
try {
91-
Thread.sleep((long) RETRY_ATTEMPT_SLEEP_TIME.get(retryCount) * SLEEP_TIME_MULTIPLIER);
92-
} catch (InterruptedException e) {
93-
throw new RuntimeException("Sleep in the retry attempt got interrupted", e);
83+
ResponseEntity<Map> response = restTemplate.postForEntity(OAUTH_TOKEN_URL, request, Map.class);
84+
Map tokenData = response.getBody();
85+
this.bearerToken = (String) tokenData.get(ACCESS_TOKEN);
86+
this.expireTime = Instant.now().plusSeconds((Integer) tokenData.get(EXPIRE_IN));
87+
log.info("Access token acquired successfully");
88+
return;
89+
} catch (HttpClientErrorException ex) {
90+
this.expireTime = Instant.ofEpochMilli(0);
91+
HttpStatus statusCode = ex.getStatusCode();
92+
String statusMessage = ex.getMessage();
93+
log.error("Failed to acquire access token. Status code: {}, Error Message: {}",
94+
statusCode, statusMessage);
95+
try {
96+
Thread.sleep((long) RETRY_ATTEMPT_SLEEP_TIME.get(retryCount) * SLEEP_TIME_MULTIPLIER);
97+
} catch (InterruptedException e) {
98+
throw new RuntimeException("Sleep in the retry attempt got interrupted", e);
99+
}
94100
}
101+
retryCount++;
95102
}
96-
retryCount++;
103+
String errorMessage = String.format("Failed to acquire access token even after %s retry attempts", MAX_RETRIES);
104+
log.error(errorMessage);
105+
throw new UnauthorizedException(errorMessage);
97106
}
98-
String errorMessage = String.format("Failed to acquire access token even after %s retry attempts", MAX_RETRIES);
99-
log.error(errorMessage);
100-
throw new UnauthorizedException(errorMessage);
101107
}
102108

103109
protected boolean isTokenValid() {
@@ -113,13 +119,7 @@ public void refreshToken() {
113119
//There is still time to renew, or someone else must have already renewed it
114120
return;
115121
}
116-
synchronized (tokenRenewLock) {
117-
if (isTokenValid()) {
118-
//Someone else must have already renewed it
119-
return;
120-
}
121-
log.info("Renewing authentication token for CrowdStrike Connector.");
122-
getAuthToken();
123-
}
122+
log.info("Renewing authentication token for CrowdStrike Connector.");
123+
getAuthToken();
124124
}
125125
}

data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/test/java/org/opensearch/dataprepper/plugins/source/crowdstrike/CrowdStrikeNextLinkValidatorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ public class CrowdStrikeNextLinkValidatorTest {
1111

1212
@Test
1313
void testValidEncodedCrowdStrikeUrlPreserved() throws MalformedURLException {
14-
String url = "https://api.crowdstrike.com//intel/combined/indicators/v1" +
14+
String url = "https://api.crowdstrike.com/intel/combined/indicators/v1" +
1515
"?filter=last_updated%3A%3E%3D1745519529%2Blast_updated%3A%3C1745523129%2B_marker%3A%3C%2717455225567d09efadf14547a1aee2bc25cabc525e%27" +
1616
"&limit=10000";
1717

data-prepper-plugins/saas-source-plugins/crowdstrike-source/src/test/java/org/opensearch/dataprepper/plugins/source/crowdstrike/CrowdStrikeServiceTest.java

Lines changed: 36 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,16 @@
1111
import org.springframework.http.HttpHeaders;
1212
import org.springframework.http.ResponseEntity;
1313
import java.net.URI;
14+
import java.time.Duration;
15+
import java.time.Instant;
16+
import java.util.Optional;
1417
import java.util.function.Supplier;
1518
import static org.junit.jupiter.api.Assertions.assertEquals;
1619
import static org.junit.jupiter.api.Assertions.assertNotNull;
1720
import static org.junit.jupiter.api.Assertions.assertThrows;
1821
import static org.junit.jupiter.api.Assertions.assertTrue;
1922
import static org.mockito.ArgumentMatchers.any;
2023
import static org.mockito.ArgumentMatchers.eq;
21-
import static org.mockito.Mockito.lenient;
2224
import static org.mockito.Mockito.mock;
2325
import static org.mockito.Mockito.times;
2426
import static org.mockito.Mockito.verify;
@@ -32,24 +34,29 @@ class CrowdStrikeServiceTest {
3234
private CrowdStrikeRestClient restClient;
3335
private PluginMetrics pluginMetrics;
3436
private CrowdStrikeService service;
37+
Instant startTime = Instant.now().minus(Duration.ofHours(1));
38+
Instant endTime = Instant.now();
39+
Timer mockTimer;
3540

3641
@BeforeEach
3742
void setup() {
3843
restClient = mock(CrowdStrikeRestClient.class);
3944
pluginMetrics = mock(PluginMetrics.class);
40-
Timer mockTimer = mock(Timer.class);
45+
mockTimer = mock(Timer.class);
46+
4147
when(pluginMetrics.timer(any())).thenReturn(mockTimer);
42-
// Timer records calls synchronously for test
43-
lenient().when(mockTimer.record(Mockito.<Supplier<Object>>any()))
48+
49+
service = new CrowdStrikeService(restClient, pluginMetrics);
50+
51+
when(mockTimer.record(Mockito.<Supplier<Object>>any()))
4452
.thenAnswer(invocation -> {
4553
Supplier<Object> supplier = invocation.getArgument(0);
4654
return supplier.get();
4755
});
48-
service = new CrowdStrikeService(restClient, pluginMetrics);
4956
}
5057

5158
@Test
52-
void testGetAllContentWithValidTimeRange() {
59+
void testGetThreatIndicatorsWithValidTimeRange() {
5360
CrowdStrikeIndicatorResult result = new CrowdStrikeIndicatorResult();
5461
ResponseEntity<CrowdStrikeIndicatorResult> responseEntity = ResponseEntity.ok()
5562
.headers(new HttpHeaders())
@@ -58,15 +65,15 @@ void testGetAllContentWithValidTimeRange() {
5865
when(restClient.invokeGetApi(any(), eq(CrowdStrikeIndicatorResult.class)))
5966
.thenReturn(responseEntity);
6067

61-
CrowdStrikeApiResponse response = service.getAllContent(1745000000L, 1745003600L, null);
68+
CrowdStrikeApiResponse response = service.getThreatIndicators(startTime, endTime, Optional.empty());
6269

6370
assertNotNull(response.getBody());
6471
assertNotNull(response.getHeaders());
6572
verify(restClient, times(1)).invokeGetApi(any(), eq(CrowdStrikeIndicatorResult.class));
6673
}
6774

6875
@Test
69-
void testGetAllContentWithPaginationLink() throws Exception {
76+
void testGetThreatIndicatorsWithPaginationLink() throws Exception {
7077
String paginationLink = "intel/combined/indicators/v1?filter=last_updated%3A%3E%3D1745%2B_marker%3A%3C%27123%27";
7178
URI sanitizedUri = new URI("https://api.crowdstrike.com/" + paginationLink);
7279
CrowdStrikeIndicatorResult result = new CrowdStrikeIndicatorResult();
@@ -78,7 +85,7 @@ void testGetAllContentWithPaginationLink() throws Exception {
7885
when(restClient.invokeGetApi(eq(sanitizedUri), eq(CrowdStrikeIndicatorResult.class)))
7986
.thenReturn(responseEntity);
8087

81-
CrowdStrikeApiResponse response = service.getAllContent(null, null, paginationLink);
88+
CrowdStrikeApiResponse response = service.getThreatIndicators(startTime, endTime, Optional.of(paginationLink));
8289
assertNotNull(response.getBody());
8390
verify(restClient).invokeGetApi(eq(sanitizedUri), eq(CrowdStrikeIndicatorResult.class));
8491
}
@@ -89,24 +96,39 @@ void testRestClientThrowsException() {
8996
.thenThrow(new RuntimeException("API failure"));
9097

9198
RuntimeException ex = assertThrows(RuntimeException.class, () ->
92-
service.getAllContent(1745000000L, 1745003600L, null));
99+
service.getThreatIndicators(startTime, endTime, Optional.empty()));
93100

94-
assertTrue(ex.getMessage().contains("CrowdStrike API call failed"));
101+
assertTrue(ex.getMessage().contains("API failure"));
95102
}
96103

97104
@Test
98105
void testBuildUriFailsGracefully() {
99106
CrowdStrikeService faultyService = new CrowdStrikeService(restClient, pluginMetrics) {
100107
@Override
101-
protected URI buildCrowdStrikeUri(Long startTime, Long endTime, String paginationLink) {
108+
protected URI buildCrowdStrikeUri(Instant startTime, Instant endTime, Optional<String> paginationLink) {
102109
throw new RuntimeException("URI construction failed");
103110
}
104111
};
105112

106113
RuntimeException ex = assertThrows(RuntimeException.class, () ->
107-
faultyService.getAllContent(1745000000L, 1745003600L, null));
114+
faultyService.getThreatIndicators(startTime, endTime, Optional.empty()));
108115

109116
assertEquals("URI construction failed", ex.getMessage());
110117
}
111118

112-
}
119+
@Test
120+
void testSearchCallLatencyTimerIsRecorded() {
121+
CrowdStrikeIndicatorResult result = new CrowdStrikeIndicatorResult();
122+
ResponseEntity<CrowdStrikeIndicatorResult> responseEntity = ResponseEntity.ok()
123+
.headers(new HttpHeaders())
124+
.body(result);
125+
126+
when(restClient.invokeGetApi(any(), eq(CrowdStrikeIndicatorResult.class)))
127+
.thenReturn(responseEntity);
128+
129+
service.getThreatIndicators(startTime, endTime, Optional.empty());
130+
131+
verify(mockTimer, times(1)).record(any(Supplier.class));
132+
}
133+
134+
}

0 commit comments

Comments
 (0)