Skip to content

Commit c808208

Browse files
committed
Crawler Metric, Buffer, and Unit Test Updates
Signed-off-by: Alex Christensen <alchrisk@amazon.com>
1 parent bb2dfa1 commit c808208

7 files changed

Lines changed: 329 additions & 79 deletions

File tree

data-prepper-plugins/saas-source-plugins/microsoft-office365-source/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ dependencies {
2929
annotationProcessor 'org.projectlombok:lombok:1.18.30'
3030

3131
testImplementation project(':data-prepper-test:test-common')
32+
testRuntimeOnly 'org.slf4j:slf4j-simple:2.0.7'
3233

3334
implementation(libs.spring.context) {
3435
exclude group: 'commons-logging', module: 'commons-logging'

data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClient.java

Lines changed: 51 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,6 @@ public void executePartition(final DimensionalTimeSliceWorkerProgressState state
106106

107107
try {
108108
String nextPageUri = null;
109-
List<Record<Event>> records = new ArrayList<>();
110109

111110
do {
112111
AuditLogsResponse response =
@@ -118,7 +117,8 @@ public void executePartition(final DimensionalTimeSliceWorkerProgressState state
118117
try {
119118
Record<Event> record = processAuditLog(metadata);
120119
if (record != null) {
121-
records.add(record);
120+
// Write each record individually as it's processed
121+
writeRecordWithRetry(record, buffer, acknowledgementSet);
122122
}
123123
} catch (Office365Exception e) {
124124

@@ -141,15 +141,6 @@ public void executePartition(final DimensionalTimeSliceWorkerProgressState state
141141
nextPageUri = response.getNextPageUri();
142142
} while (nextPageUri != null);
143143

144-
bufferWriteLatencyTimer.record(() -> {
145-
try {
146-
writeRecordsWithRetry(records, buffer, acknowledgementSet);
147-
} catch (Exception e) {
148-
bufferWriteFailuresCounter.increment();
149-
throw e;
150-
}
151-
});
152-
153144
} catch (Exception e) {
154145
log.error(NOISY, "Failed to process partition for log type {} from {} to {}",
155146
logType, startTime, endTime, e);
@@ -198,56 +189,63 @@ private Record<Event> processAuditLog(Map<String, Object> metadata) throws Offic
198189
}
199190
}
200191

201-
private void writeRecordsWithRetry(final List<Record<Event>> records,
202-
final Buffer<Record<Event>> buffer,
203-
final AcknowledgementSet acknowledgementSet) {
204-
bufferWriteAttemptsCounter.increment();
205-
int retryCount = 0;
206-
int currentBackoff = 1000; // Start with 1 second
207-
final int maxBackoff = 30000; // Max 30 seconds
208-
final int maxRetries = 5;
209-
210-
while (true) {
192+
private void writeRecordWithRetry(final Record<Event> record,
193+
final Buffer<Record<Event>> buffer,
194+
final AcknowledgementSet acknowledgementSet) {
195+
bufferWriteLatencyTimer.record(() -> {
211196
try {
212-
if (configuration.isAcknowledgments()) {
213-
records.forEach(record -> acknowledgementSet.add(record.getData()));
214-
buffer.writeAll(records, (int) Duration.ofSeconds(BUFFER_TIMEOUT_IN_SECONDS).toMillis());
215-
acknowledgementSet.complete();
216-
} else {
217-
buffer.writeAll(records, (int) Duration.ofSeconds(BUFFER_TIMEOUT_IN_SECONDS).toMillis());
218-
}
197+
bufferWriteAttemptsCounter.increment();
198+
int retryCount = 0;
199+
int currentBackoff = 1000; // Start with 1 second
200+
final int maxBackoff = 30000; // Max 30 seconds
201+
final int maxRetries = 5;
202+
203+
while (true) {
204+
try {
205+
if (configuration.isAcknowledgments()) {
206+
acknowledgementSet.add(record.getData());
207+
buffer.write(record, (int) Duration.ofSeconds(BUFFER_TIMEOUT_IN_SECONDS).toMillis());
208+
acknowledgementSet.complete();
209+
} else {
210+
buffer.write(record, (int) Duration.ofSeconds(BUFFER_TIMEOUT_IN_SECONDS).toMillis());
211+
}
219212

220-
if (retryCount > 0) {
221-
bufferWriteRetrySuccessCounter.increment();
222-
} else {
223-
bufferWriteSuccessCounter.increment();
224-
}
225-
return;
213+
if (retryCount > 0) {
214+
bufferWriteRetrySuccessCounter.increment();
215+
} else {
216+
bufferWriteSuccessCounter.increment();
217+
}
218+
return;
226219

227-
} catch (TimeoutException e) {
228-
retryCount++;
229-
if (retryCount >= maxRetries) {
230-
bufferWriteFailuresCounter.increment();
231-
throw new RuntimeException("Failed to write to buffer after " + maxRetries + " attempts", e);
232-
}
220+
} catch (TimeoutException e) {
221+
retryCount++;
222+
if (retryCount >= maxRetries) {
223+
bufferWriteFailuresCounter.increment();
224+
throw new RuntimeException("Failed to write to buffer after " + maxRetries + " attempts", e);
225+
}
233226

234-
bufferWriteRetryAttemptsCounter.increment();
235-
currentBackoff = Math.min((int)(currentBackoff * 2.0), maxBackoff);
236-
log.info("Buffer full, backing off for {} ms before retry", currentBackoff);
237-
238-
try {
239-
Thread.sleep(currentBackoff);
240-
// TODO: Update worker partition state to prevent timeout
241-
// Ideally, we want to call the saveWorkerPartitionState and extend the lease like so
242-
// coordinator.saveProgressStateForPartition(leaderPartition, DEFAULT_EXTEND_LEASE_MINUTES);
243-
} catch (InterruptedException ie) {
244-
Thread.currentThread().interrupt();
245-
throw new RuntimeException("Buffer write retry interrupted", ie);
227+
bufferWriteRetryAttemptsCounter.increment();
228+
currentBackoff = Math.min((int)(currentBackoff * 2.0), maxBackoff);
229+
log.info("Buffer full, backing off for {} ms before retry", currentBackoff);
230+
231+
try {
232+
Thread.sleep(currentBackoff);
233+
// TODO: Update worker partition state to prevent timeout
234+
// Ideally, we want to call the saveWorkerPartitionState and extend the lease like so
235+
// coordinator.saveProgressStateForPartition(leaderPartition, DEFAULT_EXTEND_LEASE_MINUTES);
236+
} catch (InterruptedException ie) {
237+
Thread.currentThread().interrupt();
238+
throw new RuntimeException("Buffer write retry interrupted", ie);
239+
}
240+
} catch (Exception e) {
241+
bufferWriteFailuresCounter.increment();
242+
throw new RuntimeException("Error writing to buffer", e);
243+
}
246244
}
247245
} catch (Exception e) {
248246
bufferWriteFailuresCounter.increment();
249247
throw new RuntimeException("Error writing to buffer", e);
250248
}
251-
}
249+
});
252250
}
253251
}

data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClient.java

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
package org.opensearch.dataprepper.plugins.source.microsoft_office365;
1111

1212
import io.micrometer.core.instrument.Counter;
13+
import io.micrometer.core.instrument.DistributionSummary;
1314
import io.micrometer.core.instrument.Timer;
1415
import lombok.extern.slf4j.Slf4j;
1516
import org.opensearch.dataprepper.metrics.PluginMetrics;
@@ -26,6 +27,8 @@
2627
import org.springframework.web.client.RestTemplate;
2728

2829
import javax.inject.Named;
30+
31+
import java.nio.charset.StandardCharsets;
2932
import java.time.Instant;
3033
import java.util.List;
3134
import java.util.Map;
@@ -41,10 +44,13 @@
4144
@Named
4245
public class Office365RestClient {
4346
private static final String AUDIT_LOG_FETCH_LATENCY = "auditLogFetchLatency";
44-
private static final String SEARCH_CALL_LATENCY = "searchCallLatency";
45-
private static final String AUDIT_LOGS_REQUESTED = "auditLogsRequested";
47+
private static final String AUDIT_LOG_REQUEST_SIZE = "auditLogRequestSizeBytes";
4648
private static final String AUDIT_LOG_REQUESTS_FAILED = "auditLogRequestsFailed";
4749
private static final String AUDIT_LOG_REQUESTS_SUCCESS = "auditLogRequestsSuccess";
50+
private static final String AUDIT_LOGS_REQUESTED = "auditLogsRequested";
51+
private static final String SEARCH_CALL_LATENCY = "searchCallLatency";
52+
private static final String SEARCH_REQUEST_SIZE = "searchRequestSizeBytes";
53+
private static final String SEARCH_REQUESTS_SUCCESS = "searchRequestsSuccess";
4854
private static final String SEARCH_REQUESTS_FAILED = "searchRequestsFailed";
4955
private static final String MANAGEMENT_API_BASE_URL = "https://manage.office.com/api/v1.0/";
5056

@@ -56,6 +62,9 @@ public class Office365RestClient {
5662
private final Counter auditLogRequestsFailedCounter;
5763
private final Counter auditLogRequestsSuccessCounter;
5864
private final Counter searchRequestsFailedCounter;
65+
private final Counter searchRequestsSuccessCounter;
66+
private final DistributionSummary auditLogRequestSizeSummary;
67+
private final DistributionSummary searchRequestSizeSummary;
5968

6069
public Office365RestClient(final Office365AuthenticationInterface authConfig,
6170
final PluginMetrics pluginMetrics) {
@@ -67,6 +76,9 @@ public Office365RestClient(final Office365AuthenticationInterface authConfig,
6776
this.auditLogRequestsFailedCounter = pluginMetrics.counter(AUDIT_LOG_REQUESTS_FAILED);
6877
this.auditLogRequestsSuccessCounter = pluginMetrics.counter(AUDIT_LOG_REQUESTS_SUCCESS);
6978
this.searchRequestsFailedCounter = pluginMetrics.counter(SEARCH_REQUESTS_FAILED);
79+
this.searchRequestsSuccessCounter = pluginMetrics.counter(SEARCH_REQUESTS_SUCCESS);
80+
this.auditLogRequestSizeSummary = pluginMetrics.summary(AUDIT_LOG_REQUEST_SIZE);
81+
this.searchRequestSizeSummary = pluginMetrics.summary(SEARCH_REQUEST_SIZE);
7082
}
7183

7284
/**
@@ -170,6 +182,8 @@ public AuditLogsResponse searchAuditLogs(final String contentType,
170182
new HttpEntity<>(headers),
171183
new ParameterizedTypeReference<>() {}
172184
);
185+
// Record search request size.
186+
searchRequestSizeSummary.record(response.getHeaders().getContentLength());
173187

174188
// Extract NextPageUri from response headers
175189
List<String> nextPageHeaders = response.getHeaders().get("NextPageUri");
@@ -180,6 +194,7 @@ public AuditLogsResponse searchAuditLogs(final String contentType,
180194
log.debug("Next page URI found: {}", nextPageUri);
181195
}
182196

197+
searchRequestsSuccessCounter.increment();
183198
return new AuditLogsResponse(response.getBody(), nextPageUri);
184199
},
185200
authConfig::renewCredentials
@@ -210,12 +225,21 @@ public String getAuditLog(String contentUri) {
210225
try {
211226
String response = RetryHandler.executeWithRetry(() -> {
212227
headers.setBearerAuth(authConfig.getAccessToken());
213-
return restTemplate.exchange(
214-
contentUri,
215-
HttpMethod.GET,
216-
new HttpEntity<>(headers),
217-
String.class
218-
).getBody();
228+
229+
ResponseEntity<String> responseEntity = restTemplate.exchange(
230+
contentUri,
231+
HttpMethod.GET,
232+
new HttpEntity<>(headers),
233+
String.class
234+
);
235+
236+
// Record audit log request size from response body
237+
String responseBody = responseEntity.getBody();
238+
if (responseBody != null) {
239+
auditLogRequestSizeSummary.record(responseBody.getBytes(StandardCharsets.UTF_8).length);
240+
}
241+
242+
return responseBody;
219243
}, authConfig::renewCredentials);
220244
auditLogRequestsSuccessCounter.increment();
221245
return response;

data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClientTest.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -130,19 +130,15 @@ void testExecutePartition() throws Exception {
130130
return null;
131131
}).when(bufferWriteLatencyTimer).record(any(Runnable.class));
132132

133-
ArgumentCaptor<Collection<Record<Event>>> recordsCaptor = ArgumentCaptor.forClass((Class) Collection.class);
133+
ArgumentCaptor<Record<Event>> recordCaptor = ArgumentCaptor.forClass(Record.class);
134134

135135
client.executePartition(state, buffer, acknowledgementSet);
136136

137-
verify(buffer).writeAll(recordsCaptor.capture(), anyInt());
138-
Collection<Record<Event>> capturedRecords = recordsCaptor.getValue();
137+
verify(buffer).write(recordCaptor.capture(), anyInt());
138+
Record<Event> capturedRecord = recordCaptor.getValue();
139139

140-
assertFalse(capturedRecords.isEmpty());
141-
assertEquals(1, capturedRecords.size());
142-
for (Record<Event> record : capturedRecords) {
143-
assertNotNull(record.getData());
144-
assertEquals("Exchange", record.getData().getMetadata().getAttribute("contentType"));
145-
}
140+
assertNotNull(capturedRecord.getData());
141+
assertEquals("Exchange", capturedRecord.getData().getMetadata().getAttribute("contentType"));
146142
}
147143

148144
@Test
@@ -175,7 +171,8 @@ void testExecutePartitionWithJsonProcessingError() throws Exception {
175171

176172
client.executePartition(state, buffer, acknowledgementSet);
177173

178-
verify(buffer).writeAll(argThat(list -> list.isEmpty()), anyInt());
174+
// With individual record processing, no records should be written due to JSON processing error
175+
verify(buffer, org.mockito.Mockito.never()).write(any(Record.class), anyInt());
179176
}
180177

181178
@Test
@@ -209,7 +206,7 @@ void testBufferWriteWithAcknowledgements() throws Exception {
209206

210207
verify(acknowledgementSet).add(any(Event.class));
211208
verify(acknowledgementSet).complete();
212-
verify(buffer).writeAll(any(), anyInt());
209+
verify(buffer).write(any(Record.class), anyInt());
213210
}
214211

215212
@Test
@@ -240,13 +237,14 @@ void testBufferWriteTimeout() throws Exception {
240237

241238
doThrow(new RuntimeException("Error writing to buffer"))
242239
.when(buffer)
243-
.writeAll(any(), anyInt());
240+
.write(any(Record.class), anyInt());
244241

245242
RuntimeException exception = assertThrows(RuntimeException.class,
246243
() -> client.executePartition(state, buffer, acknowledgementSet));
247244

248-
assertEquals("Error writing to buffer", exception.getMessage());
249-
verify(buffer).writeAll(any(), anyInt());
245+
// With individual record processing, the exception gets wrapped with the log ID
246+
assertEquals("Unexpected error processing audit log: ID1", exception.getMessage());
247+
verify(buffer).write(any(Record.class), anyInt());
250248
}
251249

252250
@Test
@@ -275,7 +273,8 @@ void testNonRetryableError() throws Exception {
275273

276274
client.executePartition(state, buffer, acknowledgementSet);
277275

278-
verify(buffer).writeAll(argThat(list -> list.isEmpty()), anyInt());
276+
// With individual record processing, no records should be written due to null content
277+
verify(buffer, org.mockito.Mockito.never()).write(any(Record.class), anyInt());
279278
}
280279

281280
@Test
@@ -305,6 +304,7 @@ void testMissingWorkloadField() throws Exception {
305304

306305
client.executePartition(state, buffer, acknowledgementSet);
307306

308-
verify(buffer).writeAll(argThat(list -> list.isEmpty()), anyInt());
307+
// With individual record processing, no records should be written due to missing Workload field
308+
verify(buffer, org.mockito.Mockito.never()).write(any(Record.class), anyInt());
309309
}
310310
}

0 commit comments

Comments
 (0)