Skip to content

Commit ddd0ce0

Browse files
authored
M365 Crawler Metric, Buffer, and Unit Test Updates (opensearch-project#6142)
Signed-off-by: Alex Christensen <alchrisk@amazon.com> Co-authored-by: Alex Christensen <alchrisk@amazon.com>
1 parent d4adc40 commit ddd0ce0

6 files changed

Lines changed: 269 additions & 17 deletions

File tree

data-prepper-plugins/saas-source-plugins/microsoft-office365-source/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ dependencies {
2929
annotationProcessor 'org.projectlombok:lombok:1.18.30'
3030

3131
testImplementation project(':data-prepper-test:test-common')
32+
testRuntimeOnly 'org.slf4j:slf4j-simple:2.0.7'
3233
testImplementation testLibs.awaitility
3334

3435
implementation(libs.spring.context) {

data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClient.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,9 @@ public void executePartition(final DimensionalTimeSliceWorkerProgressState state
106106

107107
try {
108108
String nextPageUri = null;
109-
List<Record<Event>> records = new ArrayList<>();
110109

111110
do {
111+
List<Record<Event>> records = new ArrayList<>();
112112
AuditLogsResponse response =
113113
service.searchAuditLogs(logType, startTime, endTime, nextPageUri);
114114

@@ -138,18 +138,22 @@ public void executePartition(final DimensionalTimeSliceWorkerProgressState state
138138
}
139139
}
140140

141+
// Write Records to the buffer after processing a page of data
142+
bufferWriteLatencyTimer.record(() -> {
143+
try {
144+
writeRecordsWithRetry(records, buffer, acknowledgementSet);
145+
} catch (Exception e) {
146+
bufferWriteFailuresCounter.increment();
147+
throw e;
148+
}
149+
});
150+
141151
nextPageUri = response.getNextPageUri();
142152
} while (nextPageUri != null);
143153

144-
bufferWriteLatencyTimer.record(() -> {
145-
try {
146-
writeRecordsWithRetry(records, buffer, acknowledgementSet);
147-
} catch (Exception e) {
148-
bufferWriteFailuresCounter.increment();
149-
throw e;
150-
}
151-
});
152-
154+
if (configuration.isAcknowledgments()) {
155+
acknowledgementSet.complete();
156+
}
153157
} catch (Exception e) {
154158
log.error(NOISY, "Failed to process partition for log type {} from {} to {}",
155159
logType, startTime, endTime, e);
@@ -212,7 +216,6 @@ private void writeRecordsWithRetry(final List<Record<Event>> records,
212216
if (configuration.isAcknowledgments()) {
213217
records.forEach(record -> acknowledgementSet.add(record.getData()));
214218
buffer.writeAll(records, (int) Duration.ofSeconds(BUFFER_TIMEOUT_IN_SECONDS).toMillis());
215-
acknowledgementSet.complete();
216219
} else {
217220
buffer.writeAll(records, (int) Duration.ofSeconds(BUFFER_TIMEOUT_IN_SECONDS).toMillis());
218221
}

data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClient.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
package org.opensearch.dataprepper.plugins.source.microsoft_office365;
1111

1212
import io.micrometer.core.instrument.Counter;
13+
import io.micrometer.core.instrument.DistributionSummary;
1314
import io.micrometer.core.instrument.Timer;
1415
import lombok.extern.slf4j.Slf4j;
1516
import org.opensearch.dataprepper.metrics.PluginMetrics;
@@ -26,6 +27,7 @@
2627
import org.springframework.web.client.RestTemplate;
2728

2829
import javax.inject.Named;
30+
import java.nio.charset.StandardCharsets;
2931
import java.time.Instant;
3032
import java.util.List;
3133
import java.util.Map;
@@ -41,10 +43,13 @@
4143
@Named
4244
public class Office365RestClient {
4345
private static final String AUDIT_LOG_FETCH_LATENCY = "auditLogFetchLatency";
44-
private static final String SEARCH_CALL_LATENCY = "searchCallLatency";
45-
private static final String AUDIT_LOGS_REQUESTED = "auditLogsRequested";
46+
private static final String AUDIT_LOG_RESPONSE_SIZE = "auditLogResponseSizeBytes";
4647
private static final String AUDIT_LOG_REQUESTS_FAILED = "auditLogRequestsFailed";
4748
private static final String AUDIT_LOG_REQUESTS_SUCCESS = "auditLogRequestsSuccess";
49+
private static final String AUDIT_LOGS_REQUESTED = "auditLogsRequested";
50+
private static final String SEARCH_CALL_LATENCY = "searchCallLatency";
51+
private static final String SEARCH_RESPONSE_SIZE = "searchResponseSizeBytes";
52+
private static final String SEARCH_REQUESTS_SUCCESS = "searchRequestsSuccess";
4853
private static final String SEARCH_REQUESTS_FAILED = "searchRequestsFailed";
4954
private static final String MANAGEMENT_API_BASE_URL = "https://manage.office.com/api/v1.0/";
5055

@@ -56,6 +61,9 @@ public class Office365RestClient {
5661
private final Counter auditLogRequestsFailedCounter;
5762
private final Counter auditLogRequestsSuccessCounter;
5863
private final Counter searchRequestsFailedCounter;
64+
private final Counter searchRequestsSuccessCounter;
65+
private final DistributionSummary auditLogResponseSizeSummary;
66+
private final DistributionSummary searchResponseSizeSummary;
5967

6068
public Office365RestClient(final Office365AuthenticationInterface authConfig,
6169
final PluginMetrics pluginMetrics) {
@@ -67,6 +75,9 @@ public Office365RestClient(final Office365AuthenticationInterface authConfig,
6775
this.auditLogRequestsFailedCounter = pluginMetrics.counter(AUDIT_LOG_REQUESTS_FAILED);
6876
this.auditLogRequestsSuccessCounter = pluginMetrics.counter(AUDIT_LOG_REQUESTS_SUCCESS);
6977
this.searchRequestsFailedCounter = pluginMetrics.counter(SEARCH_REQUESTS_FAILED);
78+
this.searchRequestsSuccessCounter = pluginMetrics.counter(SEARCH_REQUESTS_SUCCESS);
79+
this.auditLogResponseSizeSummary = pluginMetrics.summary(AUDIT_LOG_RESPONSE_SIZE);
80+
this.searchResponseSizeSummary = pluginMetrics.summary(SEARCH_RESPONSE_SIZE);
7081
}
7182

7283
/**
@@ -170,6 +181,8 @@ public AuditLogsResponse searchAuditLogs(final String contentType,
170181
new HttpEntity<>(headers),
171182
new ParameterizedTypeReference<>() {}
172183
);
184+
// Record search request size.
185+
searchResponseSizeSummary.record(response.getHeaders().getContentLength());
173186

174187
// Extract NextPageUri from response headers
175188
List<String> nextPageHeaders = response.getHeaders().get("NextPageUri");
@@ -180,6 +193,7 @@ public AuditLogsResponse searchAuditLogs(final String contentType,
180193
log.debug("Next page URI found: {}", nextPageUri);
181194
}
182195

196+
searchRequestsSuccessCounter.increment();
183197
return new AuditLogsResponse(response.getBody(), nextPageUri);
184198
},
185199
authConfig::renewCredentials,
@@ -210,12 +224,20 @@ public String getAuditLog(String contentUri) {
210224
try {
211225
String response = RetryHandler.executeWithRetry(() -> {
212226
headers.setBearerAuth(authConfig.getAccessToken());
213-
return restTemplate.exchange(
227+
ResponseEntity<String> responseEntity = restTemplate.exchange(
214228
contentUri,
215229
HttpMethod.GET,
216230
new HttpEntity<>(headers),
217231
String.class
218-
).getBody();
232+
);
233+
234+
// Record audit log request size from response body
235+
String responseBody = responseEntity.getBody();
236+
if (responseBody != null) {
237+
auditLogResponseSizeSummary.record(responseBody.getBytes(StandardCharsets.UTF_8).length);
238+
}
239+
240+
return responseBody;
219241
}, authConfig::renewCredentials, auditLogRequestsFailedCounter);
220242
auditLogRequestsSuccessCounter.increment();
221243
return response;

data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClientTest.java

Lines changed: 147 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,13 @@
99

1010
package org.opensearch.dataprepper.plugins.source.microsoft_office365;
1111

12+
import io.micrometer.core.instrument.Counter;
13+
import io.micrometer.core.instrument.DistributionSummary;
14+
import io.micrometer.core.instrument.Timer;
1215
import org.junit.jupiter.api.BeforeEach;
1316
import org.junit.jupiter.api.Test;
1417
import org.junit.jupiter.api.extension.ExtendWith;
15-
import io.micrometer.core.instrument.Counter;
18+
1619
import org.mockito.ArgumentCaptor;
1720
import org.mockito.Mock;
1821
import org.mockito.junit.jupiter.MockitoExtension;
@@ -347,4 +350,147 @@ void testGetAuditLogFailureCounterIncrementsOnEachRetry() throws Exception {
347350
// Verify counter.increment() was called exactly 6 times (once for each retry attempt)
348351
verify(mockCounter, times(6)).increment();
349352
}
353+
354+
@Test
355+
void testMetricsInitialization() {
356+
// Test metrics initialization during construction. This approach is used for metrics that are called
357+
// inside RetryHandler.executeWithRetry() static method calls, which would require complex static mocking
358+
// to test for invocation. Testing initialization ensures the metrics infrastructure is properly set up.
359+
360+
// Mock all required timers and counters for Office365RestClient constructor
361+
PluginMetrics mockPluginMetrics = org.mockito.Mockito.mock(PluginMetrics.class);
362+
Timer mockAuditLogFetchLatencyTimer = org.mockito.Mockito.mock(Timer.class);
363+
Timer mockSearchCallLatencyTimer = org.mockito.Mockito.mock(Timer.class);
364+
Counter mockAuditLogsRequestedCounter = org.mockito.Mockito.mock(Counter.class);
365+
Counter mockAuditLogRequestsFailedCounter = org.mockito.Mockito.mock(Counter.class);
366+
Counter mockAuditLogRequestsSuccessCounter = org.mockito.Mockito.mock(Counter.class);
367+
Counter mockSearchRequestsFailedCounter = org.mockito.Mockito.mock(Counter.class);
368+
Counter mockSearchRequestsSuccessCounter = org.mockito.Mockito.mock(Counter.class);
369+
DistributionSummary mockAuditLogRequestSizeSummary = org.mockito.Mockito.mock(DistributionSummary.class);
370+
DistributionSummary mockSearchRequestSizeSummary = org.mockito.Mockito.mock(DistributionSummary.class);
371+
372+
when(mockPluginMetrics.timer("auditLogFetchLatency")).thenReturn(mockAuditLogFetchLatencyTimer);
373+
when(mockPluginMetrics.timer("searchCallLatency")).thenReturn(mockSearchCallLatencyTimer);
374+
when(mockPluginMetrics.counter("auditLogsRequested")).thenReturn(mockAuditLogsRequestedCounter);
375+
when(mockPluginMetrics.counter("auditLogRequestsFailed")).thenReturn(mockAuditLogRequestsFailedCounter);
376+
when(mockPluginMetrics.counter("auditLogRequestsSuccess")).thenReturn(mockAuditLogRequestsSuccessCounter);
377+
when(mockPluginMetrics.counter("searchRequestsFailed")).thenReturn(mockSearchRequestsFailedCounter);
378+
when(mockPluginMetrics.counter("searchRequestsSuccess")).thenReturn(mockSearchRequestsSuccessCounter);
379+
when(mockPluginMetrics.summary("auditLogResponseSizeBytes")).thenReturn(mockAuditLogRequestSizeSummary);
380+
when(mockPluginMetrics.summary("searchResponseSizeBytes")).thenReturn(mockSearchRequestSizeSummary);
381+
382+
// Create Office365RestClient with mocked metrics
383+
Office365RestClient testClient = new Office365RestClient(authConfig, mockPluginMetrics);
384+
385+
// Verify all metrics were requested during construction
386+
verify(mockPluginMetrics).timer("auditLogFetchLatency");
387+
verify(mockPluginMetrics).timer("searchCallLatency");
388+
verify(mockPluginMetrics).counter("auditLogsRequested");
389+
verify(mockPluginMetrics).counter("auditLogRequestsFailed");
390+
verify(mockPluginMetrics).counter("auditLogRequestsSuccess");
391+
verify(mockPluginMetrics).counter("searchRequestsFailed");
392+
verify(mockPluginMetrics).counter("searchRequestsSuccess");
393+
verify(mockPluginMetrics).summary("auditLogResponseSizeBytes");
394+
verify(mockPluginMetrics).summary("searchResponseSizeBytes");
395+
}
396+
397+
@Test
398+
void testGetAuditLogMetricsInvocation() throws NoSuchFieldException, IllegalAccessException {
399+
// Test metrics for getAuditLog() method - both success and failure scenarios
400+
401+
// Create mock metrics and inject them
402+
Counter mockAuditLogsRequestedCounter = org.mockito.Mockito.mock(Counter.class);
403+
Counter mockAuditLogRequestsFailedCounter = org.mockito.Mockito.mock(Counter.class);
404+
Timer mockAuditLogFetchLatencyTimer = org.mockito.Mockito.mock(Timer.class);
405+
DistributionSummary mockAuditLogRequestSizeSummary = org.mockito.Mockito.mock(DistributionSummary.class);
406+
407+
ReflectivelySetField.setField(Office365RestClient.class, office365RestClient, "auditLogsRequestedCounter", mockAuditLogsRequestedCounter);
408+
ReflectivelySetField.setField(Office365RestClient.class, office365RestClient, "auditLogRequestsFailedCounter", mockAuditLogRequestsFailedCounter);
409+
ReflectivelySetField.setField(Office365RestClient.class, office365RestClient, "auditLogFetchLatencyTimer", mockAuditLogFetchLatencyTimer);
410+
ReflectivelySetField.setField(Office365RestClient.class, office365RestClient, "auditLogResponseSizeSummary", mockAuditLogRequestSizeSummary);
411+
412+
// Mock timer.record() to execute the lambda
413+
when(mockAuditLogFetchLatencyTimer.record(any(java.util.function.Supplier.class))).thenAnswer(invocation -> {
414+
java.util.function.Supplier<?> supplier = invocation.getArgument(0);
415+
return supplier.get();
416+
});
417+
418+
String contentUri = "https://manage.office.com/api/v1.0/test-tenant/activity/feed/audit/123";
419+
420+
// Test success scenario
421+
String mockAuditLog = "{\"id\":\"123\",\"contentType\":\"Audit.AzureActiveDirectory\"}";
422+
ResponseEntity<String> mockResponse = new ResponseEntity<>(mockAuditLog, HttpStatus.OK);
423+
when(restTemplate.exchange(eq(contentUri), eq(HttpMethod.GET), any(), eq(String.class)))
424+
.thenReturn(mockResponse);
425+
426+
office365RestClient.getAuditLog(contentUri);
427+
428+
// Verify success metrics
429+
verify(mockAuditLogsRequestedCounter).increment(); // Called directly before RetryHandler
430+
verify(mockAuditLogFetchLatencyTimer).record(any(java.util.function.Supplier.class)); // Timer wrapper
431+
verify(mockAuditLogRequestSizeSummary).record(mockAuditLog.getBytes(java.nio.charset.StandardCharsets.UTF_8).length); // Size metric inside RetryHandler
432+
433+
// Test failure scenario
434+
when(restTemplate.exchange(eq(contentUri), eq(HttpMethod.GET), any(), eq(String.class)))
435+
.thenThrow(new HttpClientErrorException(HttpStatus.INTERNAL_SERVER_ERROR));
436+
437+
assertThrows(RuntimeException.class, () -> office365RestClient.getAuditLog(contentUri));
438+
439+
// Verify failure metrics
440+
verify(mockAuditLogsRequestedCounter, times(2)).increment(); // Called again before retry
441+
verify(mockAuditLogRequestsFailedCounter, times(6)).increment(); // Called 6 times (once for each retry attempt)
442+
}
443+
444+
@Test
445+
void testSearchAuditLogsMetricsInvocation() throws NoSuchFieldException, IllegalAccessException {
446+
// Test metrics for searchAuditLogs() method - both success and failure scenarios
447+
448+
// Create mock metrics and inject them
449+
Counter mockSearchRequestsFailedCounter = org.mockito.Mockito.mock(Counter.class);
450+
Timer mockSearchCallLatencyTimer = org.mockito.Mockito.mock(Timer.class);
451+
DistributionSummary mockSearchRequestSizeSummary = org.mockito.Mockito.mock(DistributionSummary.class);
452+
453+
ReflectivelySetField.setField(Office365RestClient.class, office365RestClient, "searchRequestsFailedCounter", mockSearchRequestsFailedCounter);
454+
ReflectivelySetField.setField(Office365RestClient.class, office365RestClient, "searchCallLatencyTimer", mockSearchCallLatencyTimer);
455+
ReflectivelySetField.setField(Office365RestClient.class, office365RestClient, "searchResponseSizeSummary", mockSearchRequestSizeSummary);
456+
457+
// Mock timer.record() to execute the lambda
458+
when(mockSearchCallLatencyTimer.record(any(java.util.function.Supplier.class))).thenAnswer(invocation -> {
459+
java.util.function.Supplier<?> supplier = invocation.getArgument(0);
460+
return supplier.get();
461+
});
462+
463+
// Test success scenario
464+
List<Map<String, Object>> mockResults = Collections.singletonList(new HashMap<>());
465+
HttpHeaders responseHeaders = new HttpHeaders();
466+
responseHeaders.setContentLength(1024L); // Mock content length
467+
ResponseEntity<List<Map<String, Object>>> mockResponse = new ResponseEntity<>(mockResults, responseHeaders, HttpStatus.OK);
468+
when(restTemplate.exchange(anyString(), eq(HttpMethod.GET), any(), any(ParameterizedTypeReference.class)))
469+
.thenReturn(mockResponse);
470+
471+
office365RestClient.searchAuditLogs(
472+
"Audit.AzureActiveDirectory",
473+
Instant.now().minus(1, ChronoUnit.HOURS),
474+
Instant.now(),
475+
null
476+
);
477+
478+
// Verify success metrics
479+
verify(mockSearchCallLatencyTimer).record(any(java.util.function.Supplier.class)); // Timer wrapper
480+
verify(mockSearchRequestSizeSummary).record(1024L); // Size metric inside RetryHandler
481+
482+
// Test failure scenario
483+
when(restTemplate.exchange(anyString(), eq(HttpMethod.GET), any(), any(ParameterizedTypeReference.class)))
484+
.thenThrow(new HttpClientErrorException(HttpStatus.INTERNAL_SERVER_ERROR));
485+
486+
assertThrows(RuntimeException.class, () -> office365RestClient.searchAuditLogs(
487+
"Audit.AzureActiveDirectory",
488+
Instant.now().minus(1, ChronoUnit.HOURS),
489+
Instant.now(),
490+
null
491+
));
492+
493+
// Verify failure metrics
494+
verify(mockSearchRequestsFailedCounter, times(6)).increment(); // Called 6 times (once for each retry attempt)
495+
}
350496
}

data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/TokenPaginationCrawler.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,11 @@ public class TokenPaginationCrawler implements Crawler<PaginationCrawlerWorkerPr
3333
private static final int batchSize = 50;
3434
private static final String PAGINATION_WORKER_PARTITIONS_CREATED = "paginationWorkerPartitionsCreated";
3535
private static final String INVALID_PAGINATION_ITEMS = "invalidPaginationItems";
36+
private static final String WORKER_PARTITION_WAIT_TIME = "WorkerPartitionWaitTime";
37+
private static final String WORKER_PARTITION_PROCESS_LATENCY = "WorkerPartitionProcessLatency";
3638
private final Timer crawlingTimer;
39+
private final Timer partitionWaitTimeTimer;
40+
private final Timer partitionProcessLatencyTimer;
3741
private final CrawlerClient client;
3842
private final Counter parititionsCreatedCounter;
3943
private final Counter invalidPaginationItemsCounter;
@@ -43,6 +47,8 @@ public TokenPaginationCrawler(CrawlerClient client,
4347
this.client = client;
4448
this.crawlingTimer = pluginMetrics.timer("crawlingTime");
4549
this.parititionsCreatedCounter = pluginMetrics.counter(PAGINATION_WORKER_PARTITIONS_CREATED);
50+
this.partitionWaitTimeTimer = pluginMetrics.timer(WORKER_PARTITION_WAIT_TIME);
51+
this.partitionProcessLatencyTimer = pluginMetrics.timer(WORKER_PARTITION_PROCESS_LATENCY);
4652
this.invalidPaginationItemsCounter = pluginMetrics.counter(INVALID_PAGINATION_ITEMS);
4753

4854
}
@@ -92,7 +98,8 @@ public Instant crawl(LeaderPartition leaderPartition,
9298
}
9399

94100
public void executePartition(PaginationCrawlerWorkerProgressState state, Buffer<Record<Event>> buffer, AcknowledgementSet acknowledgementSet) {
95-
client.executePartition(state, buffer, acknowledgementSet);
101+
partitionWaitTimeTimer.record(Duration.between(state.getExportStartTime(), Instant.now()));
102+
partitionProcessLatencyTimer.record(() -> client.executePartition(state, buffer, acknowledgementSet));
96103
}
97104

98105
private void updateLeaderProgressState(LeaderPartition leaderPartition, String updatedToken, EnhancedSourceCoordinator coordinator) {

0 commit comments

Comments
 (0)