Skip to content

Commit 047aaa8

Browse files
committed
M365 Crawler Metric, Buffer, and Unit Test Updates
Signed-off-by: Alex Christensen <alchrisk@amazon.com>
1 parent bb2dfa1 commit 047aaa8

7 files changed

Lines changed: 280 additions & 23 deletions

File tree

data-prepper-plugins/saas-source-plugins/microsoft-office365-source/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ dependencies {
2929
annotationProcessor 'org.projectlombok:lombok:1.18.30'
3030

3131
testImplementation project(':data-prepper-test:test-common')
32+
testRuntimeOnly 'org.slf4j:slf4j-simple:2.0.7'
3233

3334
implementation(libs.spring.context) {
3435
exclude group: 'commons-logging', module: 'commons-logging'

data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClient.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,9 @@ public void executePartition(final DimensionalTimeSliceWorkerProgressState state
106106

107107
try {
108108
String nextPageUri = null;
109-
List<Record<Event>> records = new ArrayList<>();
110109

111110
do {
111+
List<Record<Event>> records = new ArrayList<>();
112112
AuditLogsResponse response =
113113
service.searchAuditLogs(logType, startTime, endTime, nextPageUri);
114114

@@ -138,18 +138,24 @@ public void executePartition(final DimensionalTimeSliceWorkerProgressState state
138138
}
139139
}
140140

141+
// Write Records to the buffer after processing a page of data if there are records to write.
142+
if(!records.isEmpty()) {
143+
bufferWriteLatencyTimer.record(() -> {
144+
try {
145+
writeRecordsWithRetry(records, buffer, acknowledgementSet);
146+
} catch (Exception e) {
147+
bufferWriteFailuresCounter.increment();
148+
throw e;
149+
}
150+
});
151+
}
152+
141153
nextPageUri = response.getNextPageUri();
142154
} while (nextPageUri != null);
143155

144-
bufferWriteLatencyTimer.record(() -> {
145-
try {
146-
writeRecordsWithRetry(records, buffer, acknowledgementSet);
147-
} catch (Exception e) {
148-
bufferWriteFailuresCounter.increment();
149-
throw e;
150-
}
151-
});
152-
156+
if (configuration.isAcknowledgments()) {
157+
acknowledgementSet.complete();
158+
}
153159
} catch (Exception e) {
154160
log.error(NOISY, "Failed to process partition for log type {} from {} to {}",
155161
logType, startTime, endTime, e);
@@ -212,7 +218,6 @@ private void writeRecordsWithRetry(final List<Record<Event>> records,
212218
if (configuration.isAcknowledgments()) {
213219
records.forEach(record -> acknowledgementSet.add(record.getData()));
214220
buffer.writeAll(records, (int) Duration.ofSeconds(BUFFER_TIMEOUT_IN_SECONDS).toMillis());
215-
acknowledgementSet.complete();
216221
} else {
217222
buffer.writeAll(records, (int) Duration.ofSeconds(BUFFER_TIMEOUT_IN_SECONDS).toMillis());
218223
}
@@ -250,4 +255,4 @@ private void writeRecordsWithRetry(final List<Record<Event>> records,
250255
}
251256
}
252257
}
253-
}
258+
}

data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClient.java

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
package org.opensearch.dataprepper.plugins.source.microsoft_office365;
1111

1212
import io.micrometer.core.instrument.Counter;
13+
import io.micrometer.core.instrument.DistributionSummary;
1314
import io.micrometer.core.instrument.Timer;
1415
import lombok.extern.slf4j.Slf4j;
1516
import org.opensearch.dataprepper.metrics.PluginMetrics;
@@ -26,6 +27,8 @@
2627
import org.springframework.web.client.RestTemplate;
2728

2829
import javax.inject.Named;
30+
31+
import java.nio.charset.StandardCharsets;
2932
import java.time.Instant;
3033
import java.util.List;
3134
import java.util.Map;
@@ -41,10 +44,13 @@
4144
@Named
4245
public class Office365RestClient {
4346
private static final String AUDIT_LOG_FETCH_LATENCY = "auditLogFetchLatency";
44-
private static final String SEARCH_CALL_LATENCY = "searchCallLatency";
45-
private static final String AUDIT_LOGS_REQUESTED = "auditLogsRequested";
47+
private static final String AUDIT_LOG_RESPONSE_SIZE = "auditLogResponseSizeBytes";
4648
private static final String AUDIT_LOG_REQUESTS_FAILED = "auditLogRequestsFailed";
4749
private static final String AUDIT_LOG_REQUESTS_SUCCESS = "auditLogRequestsSuccess";
50+
private static final String AUDIT_LOGS_REQUESTED = "auditLogsRequested";
51+
private static final String SEARCH_CALL_LATENCY = "searchCallLatency";
52+
private static final String SEARCH_RESPONSE_SIZE = "searchResponseSizeBytes";
53+
private static final String SEARCH_REQUESTS_SUCCESS = "searchRequestsSuccess";
4854
private static final String SEARCH_REQUESTS_FAILED = "searchRequestsFailed";
4955
private static final String MANAGEMENT_API_BASE_URL = "https://manage.office.com/api/v1.0/";
5056

@@ -56,6 +62,9 @@ public class Office365RestClient {
5662
private final Counter auditLogRequestsFailedCounter;
5763
private final Counter auditLogRequestsSuccessCounter;
5864
private final Counter searchRequestsFailedCounter;
65+
private final Counter searchRequestsSuccessCounter;
66+
private final DistributionSummary auditLogResponseSizeSummary;
67+
private final DistributionSummary searchResponseSizeSummary;
5968

6069
public Office365RestClient(final Office365AuthenticationInterface authConfig,
6170
final PluginMetrics pluginMetrics) {
@@ -67,6 +76,9 @@ public Office365RestClient(final Office365AuthenticationInterface authConfig,
6776
this.auditLogRequestsFailedCounter = pluginMetrics.counter(AUDIT_LOG_REQUESTS_FAILED);
6877
this.auditLogRequestsSuccessCounter = pluginMetrics.counter(AUDIT_LOG_REQUESTS_SUCCESS);
6978
this.searchRequestsFailedCounter = pluginMetrics.counter(SEARCH_REQUESTS_FAILED);
79+
this.searchRequestsSuccessCounter = pluginMetrics.counter(SEARCH_REQUESTS_SUCCESS);
80+
this.auditLogResponseSizeSummary = pluginMetrics.summary(AUDIT_LOG_RESPONSE_SIZE);
81+
this.searchResponseSizeSummary = pluginMetrics.summary(SEARCH_RESPONSE_SIZE);
7082
}
7183

7284
/**
@@ -170,6 +182,8 @@ public AuditLogsResponse searchAuditLogs(final String contentType,
170182
new HttpEntity<>(headers),
171183
new ParameterizedTypeReference<>() {}
172184
);
185+
// Record search request size.
186+
searchResponseSizeSummary.record(response.getHeaders().getContentLength());
173187

174188
// Extract NextPageUri from response headers
175189
List<String> nextPageHeaders = response.getHeaders().get("NextPageUri");
@@ -180,6 +194,7 @@ public AuditLogsResponse searchAuditLogs(final String contentType,
180194
log.debug("Next page URI found: {}", nextPageUri);
181195
}
182196

197+
searchRequestsSuccessCounter.increment();
183198
return new AuditLogsResponse(response.getBody(), nextPageUri);
184199
},
185200
authConfig::renewCredentials
@@ -210,12 +225,21 @@ public String getAuditLog(String contentUri) {
210225
try {
211226
String response = RetryHandler.executeWithRetry(() -> {
212227
headers.setBearerAuth(authConfig.getAccessToken());
213-
return restTemplate.exchange(
214-
contentUri,
215-
HttpMethod.GET,
216-
new HttpEntity<>(headers),
217-
String.class
218-
).getBody();
228+
229+
ResponseEntity<String> responseEntity = restTemplate.exchange(
230+
contentUri,
231+
HttpMethod.GET,
232+
new HttpEntity<>(headers),
233+
String.class
234+
);
235+
236+
// Record audit log request size from response body
237+
String responseBody = responseEntity.getBody();
238+
if (responseBody != null) {
239+
auditLogResponseSizeSummary.record(responseBody.getBytes(StandardCharsets.UTF_8).length);
240+
}
241+
242+
return responseBody;
219243
}, authConfig::renewCredentials);
220244
auditLogRequestsSuccessCounter.increment();
221245
return response;

data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClientTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,4 +307,4 @@ void testMissingWorkloadField() throws Exception {
307307

308308
verify(buffer).writeAll(argThat(list -> list.isEmpty()), anyInt());
309309
}
310-
}
310+
}

data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClientTest.java

Lines changed: 147 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99

1010
package org.opensearch.dataprepper.plugins.source.microsoft_office365;
1111

12+
import io.micrometer.core.instrument.Counter;
13+
import io.micrometer.core.instrument.DistributionSummary;
14+
import io.micrometer.core.instrument.Timer;
1215
import org.junit.jupiter.api.BeforeEach;
1316
import org.junit.jupiter.api.Test;
1417
import org.junit.jupiter.api.extension.ExtendWith;
@@ -291,4 +294,147 @@ void testTokenRenewal() {
291294
assertEquals("Bearer token-0", requestTokens.get(0), "First request should use token-0");
292295
assertEquals("Bearer token-1", requestTokens.get(1), "Second request should use token-1");
293296
}
294-
}
297+
298+
@Test
299+
void testMetricsInitialization() {
300+
// Test metrics initialization during construction. This approach is used for metrics that are called
301+
// inside RetryHandler.executeWithRetry() static method calls, which would require complex static mocking
302+
// to test for invocation. Testing initialization ensures the metrics infrastructure is properly set up.
303+
304+
// Mock all required timers and counters for Office365RestClient constructor
305+
PluginMetrics mockPluginMetrics = org.mockito.Mockito.mock(PluginMetrics.class);
306+
Timer mockAuditLogFetchLatencyTimer = org.mockito.Mockito.mock(Timer.class);
307+
Timer mockSearchCallLatencyTimer = org.mockito.Mockito.mock(Timer.class);
308+
Counter mockAuditLogsRequestedCounter = org.mockito.Mockito.mock(Counter.class);
309+
Counter mockAuditLogRequestsFailedCounter = org.mockito.Mockito.mock(Counter.class);
310+
Counter mockAuditLogRequestsSuccessCounter = org.mockito.Mockito.mock(Counter.class);
311+
Counter mockSearchRequestsFailedCounter = org.mockito.Mockito.mock(Counter.class);
312+
Counter mockSearchRequestsSuccessCounter = org.mockito.Mockito.mock(Counter.class);
313+
DistributionSummary mockAuditLogRequestSizeSummary = org.mockito.Mockito.mock(DistributionSummary.class);
314+
DistributionSummary mockSearchRequestSizeSummary = org.mockito.Mockito.mock(DistributionSummary.class);
315+
316+
when(mockPluginMetrics.timer("auditLogFetchLatency")).thenReturn(mockAuditLogFetchLatencyTimer);
317+
when(mockPluginMetrics.timer("searchCallLatency")).thenReturn(mockSearchCallLatencyTimer);
318+
when(mockPluginMetrics.counter("auditLogsRequested")).thenReturn(mockAuditLogsRequestedCounter);
319+
when(mockPluginMetrics.counter("auditLogRequestsFailed")).thenReturn(mockAuditLogRequestsFailedCounter);
320+
when(mockPluginMetrics.counter("auditLogRequestsSuccess")).thenReturn(mockAuditLogRequestsSuccessCounter);
321+
when(mockPluginMetrics.counter("searchRequestsFailed")).thenReturn(mockSearchRequestsFailedCounter);
322+
when(mockPluginMetrics.counter("searchRequestsSuccess")).thenReturn(mockSearchRequestsSuccessCounter);
323+
when(mockPluginMetrics.summary("auditLogRequestSizeBytes")).thenReturn(mockAuditLogRequestSizeSummary);
324+
when(mockPluginMetrics.summary("searchRequestSizeBytes")).thenReturn(mockSearchRequestSizeSummary);
325+
326+
// Create Office365RestClient with mocked metrics
327+
Office365RestClient testClient = new Office365RestClient(authConfig, mockPluginMetrics);
328+
329+
// Verify all metrics were requested during construction
330+
verify(mockPluginMetrics).timer("auditLogFetchLatency");
331+
verify(mockPluginMetrics).timer("searchCallLatency");
332+
verify(mockPluginMetrics).counter("auditLogsRequested");
333+
verify(mockPluginMetrics).counter("auditLogRequestsFailed");
334+
verify(mockPluginMetrics).counter("auditLogRequestsSuccess");
335+
verify(mockPluginMetrics).counter("searchRequestsFailed");
336+
verify(mockPluginMetrics).counter("searchRequestsSuccess");
337+
verify(mockPluginMetrics).summary("auditLogRequestSizeBytes");
338+
verify(mockPluginMetrics).summary("searchRequestSizeBytes");
339+
}
340+
341+
@Test
342+
void testGetAuditLogMetricsInvocation() throws NoSuchFieldException, IllegalAccessException {
343+
// Test metrics for getAuditLog() method - both success and failure scenarios
344+
345+
// Create mock metrics and inject them
346+
Counter mockAuditLogsRequestedCounter = org.mockito.Mockito.mock(Counter.class);
347+
Counter mockAuditLogRequestsFailedCounter = org.mockito.Mockito.mock(Counter.class);
348+
Timer mockAuditLogFetchLatencyTimer = org.mockito.Mockito.mock(Timer.class);
349+
DistributionSummary mockAuditLogRequestSizeSummary = org.mockito.Mockito.mock(DistributionSummary.class);
350+
351+
ReflectivelySetField.setField(Office365RestClient.class, office365RestClient, "auditLogsRequestedCounter", mockAuditLogsRequestedCounter);
352+
ReflectivelySetField.setField(Office365RestClient.class, office365RestClient, "auditLogRequestsFailedCounter", mockAuditLogRequestsFailedCounter);
353+
ReflectivelySetField.setField(Office365RestClient.class, office365RestClient, "auditLogFetchLatencyTimer", mockAuditLogFetchLatencyTimer);
354+
ReflectivelySetField.setField(Office365RestClient.class, office365RestClient, "auditLogRequestSizeSummary", mockAuditLogRequestSizeSummary);
355+
356+
// Mock timer.record() to execute the lambda
357+
when(mockAuditLogFetchLatencyTimer.record(any(java.util.function.Supplier.class))).thenAnswer(invocation -> {
358+
java.util.function.Supplier<?> supplier = invocation.getArgument(0);
359+
return supplier.get();
360+
});
361+
362+
String contentUri = "https://manage.office.com/api/v1.0/test-tenant/activity/feed/audit/123";
363+
364+
// Test success scenario
365+
String mockAuditLog = "{\"id\":\"123\",\"contentType\":\"Audit.AzureActiveDirectory\"}";
366+
ResponseEntity<String> mockResponse = new ResponseEntity<>(mockAuditLog, HttpStatus.OK);
367+
when(restTemplate.exchange(eq(contentUri), eq(HttpMethod.GET), any(), eq(String.class)))
368+
.thenReturn(mockResponse);
369+
370+
office365RestClient.getAuditLog(contentUri);
371+
372+
// Verify success metrics
373+
verify(mockAuditLogsRequestedCounter).increment(); // Called directly before RetryHandler
374+
verify(mockAuditLogFetchLatencyTimer).record(any(java.util.function.Supplier.class)); // Timer wrapper
375+
verify(mockAuditLogRequestSizeSummary).record(mockAuditLog.getBytes(java.nio.charset.StandardCharsets.UTF_8).length); // Size metric inside RetryHandler
376+
377+
// Test failure scenario
378+
when(restTemplate.exchange(eq(contentUri), eq(HttpMethod.GET), any(), eq(String.class)))
379+
.thenThrow(new HttpClientErrorException(HttpStatus.INTERNAL_SERVER_ERROR));
380+
381+
assertThrows(RuntimeException.class, () -> office365RestClient.getAuditLog(contentUri));
382+
383+
// Verify failure metrics
384+
verify(mockAuditLogsRequestedCounter, times(2)).increment(); // Called again before retry
385+
verify(mockAuditLogRequestsFailedCounter).increment(); // Called in catch block outside RetryHandler
386+
}
387+
388+
@Test
389+
void testSearchAuditLogsMetricsInvocation() throws NoSuchFieldException, IllegalAccessException {
390+
// Test metrics for searchAuditLogs() method - both success and failure scenarios
391+
392+
// Create mock metrics and inject them
393+
Counter mockSearchRequestsFailedCounter = org.mockito.Mockito.mock(Counter.class);
394+
Timer mockSearchCallLatencyTimer = org.mockito.Mockito.mock(Timer.class);
395+
DistributionSummary mockSearchRequestSizeSummary = org.mockito.Mockito.mock(DistributionSummary.class);
396+
397+
ReflectivelySetField.setField(Office365RestClient.class, office365RestClient, "searchRequestsFailedCounter", mockSearchRequestsFailedCounter);
398+
ReflectivelySetField.setField(Office365RestClient.class, office365RestClient, "searchCallLatencyTimer", mockSearchCallLatencyTimer);
399+
ReflectivelySetField.setField(Office365RestClient.class, office365RestClient, "searchRequestSizeSummary", mockSearchRequestSizeSummary);
400+
401+
// Mock timer.record() to execute the lambda
402+
when(mockSearchCallLatencyTimer.record(any(java.util.function.Supplier.class))).thenAnswer(invocation -> {
403+
java.util.function.Supplier<?> supplier = invocation.getArgument(0);
404+
return supplier.get();
405+
});
406+
407+
// Test success scenario
408+
List<Map<String, Object>> mockResults = Collections.singletonList(new HashMap<>());
409+
HttpHeaders responseHeaders = new HttpHeaders();
410+
responseHeaders.setContentLength(1024L); // Mock content length
411+
ResponseEntity<List<Map<String, Object>>> mockResponse = new ResponseEntity<>(mockResults, responseHeaders, HttpStatus.OK);
412+
when(restTemplate.exchange(anyString(), eq(HttpMethod.GET), any(), any(ParameterizedTypeReference.class)))
413+
.thenReturn(mockResponse);
414+
415+
office365RestClient.searchAuditLogs(
416+
"Audit.AzureActiveDirectory",
417+
Instant.now().minus(1, ChronoUnit.HOURS),
418+
Instant.now(),
419+
null
420+
);
421+
422+
// Verify success metrics
423+
verify(mockSearchCallLatencyTimer).record(any(java.util.function.Supplier.class)); // Timer wrapper
424+
verify(mockSearchRequestSizeSummary).record(1024L); // Size metric inside RetryHandler
425+
426+
// Test failure scenario
427+
when(restTemplate.exchange(anyString(), eq(HttpMethod.GET), any(), any(ParameterizedTypeReference.class)))
428+
.thenThrow(new HttpClientErrorException(HttpStatus.INTERNAL_SERVER_ERROR));
429+
430+
assertThrows(RuntimeException.class, () -> office365RestClient.searchAuditLogs(
431+
"Audit.AzureActiveDirectory",
432+
Instant.now().minus(1, ChronoUnit.HOURS),
433+
Instant.now(),
434+
null
435+
));
436+
437+
// Verify failure metrics
438+
verify(mockSearchRequestsFailedCounter).increment(); // Called in catch block outside RetryHandler
439+
}
440+
}

data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/TokenPaginationCrawler.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,11 @@ public class TokenPaginationCrawler implements Crawler<PaginationCrawlerWorkerPr
3333
private static final int batchSize = 50;
3434
private static final String PAGINATION_WORKER_PARTITIONS_CREATED = "paginationWorkerPartitionsCreated";
3535
private static final String INVALID_PAGINATION_ITEMS = "invalidPaginationItems";
36+
private static final String WORKER_PARTITION_WAIT_TIME = "WorkerPartitionWaitTime";
37+
private static final String WORKER_PARTITION_PROCESS_LATENCY = "WorkerPartitionProcessLatency";
3638
private final Timer crawlingTimer;
39+
private final Timer partitionWaitTimeTimer;
40+
private final Timer partitionProcessLatencyTimer;
3741
private final CrawlerClient client;
3842
private final Counter parititionsCreatedCounter;
3943
private final Counter invalidPaginationItemsCounter;
@@ -43,6 +47,8 @@ public TokenPaginationCrawler(CrawlerClient client,
4347
this.client = client;
4448
this.crawlingTimer = pluginMetrics.timer("crawlingTime");
4549
this.parititionsCreatedCounter = pluginMetrics.counter(PAGINATION_WORKER_PARTITIONS_CREATED);
50+
this.partitionWaitTimeTimer = pluginMetrics.timer(WORKER_PARTITION_WAIT_TIME);
51+
this.partitionProcessLatencyTimer = pluginMetrics.timer(WORKER_PARTITION_PROCESS_LATENCY);
4652
this.invalidPaginationItemsCounter = pluginMetrics.counter(INVALID_PAGINATION_ITEMS);
4753

4854
}
@@ -92,7 +98,8 @@ public Instant crawl(LeaderPartition leaderPartition,
9298
}
9399

94100
public void executePartition(PaginationCrawlerWorkerProgressState state, Buffer<Record<Event>> buffer, AcknowledgementSet acknowledgementSet) {
95-
client.executePartition(state, buffer, acknowledgementSet);
101+
partitionWaitTimeTimer.record(Duration.between(state.getExportStartTime(), Instant.now()));
102+
partitionProcessLatencyTimer.record(() -> client.executePartition(state, buffer, acknowledgementSet));
96103
}
97104

98105
private void updateLeaderProgressState(LeaderPartition leaderPartition, String updatedToken, EnhancedSourceCoordinator coordinator) {

0 commit comments

Comments
 (0)