Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ public class Office365CrawlerClient implements CrawlerClient<DimensionalTimeSlic
private static final String BUFFER_WRITE_RETRY_SUCCESS = "bufferWriteRetrySuccess";
private static final String BUFFER_WRITE_RETRY_ATTEMPTS = "bufferWriteRetryAttempts";
private static final String BUFFER_WRITE_FAILURES = "bufferWriteFailures";
private static final String NON_RETRYABLE_ERRORS = "nonRetryableErrors";
private static final String RETRYABLE_ERRORS = "retryableErrors";
private static final int BUFFER_TIMEOUT_IN_SECONDS = 10;
private static final String CONTENT_ID = "contentId";
private static final String CONTENT_URI = "contentUri";
Expand All @@ -70,6 +72,8 @@ public class Office365CrawlerClient implements CrawlerClient<DimensionalTimeSlic
private final Counter bufferWriteRetryAttemptsCounter;
private final Counter bufferWriteFailuresCounter;
private final Counter requestErrorsCounter;
private final Counter nonRetryableErrorsCounter;
private final Counter retryableErrorsCounter;
private ObjectMapper objectMapper;

public Office365CrawlerClient(final Office365Service service,
Expand All @@ -87,6 +91,8 @@ public Office365CrawlerClient(final Office365Service service,
this.bufferWriteRetryAttemptsCounter = pluginMetrics.counter(BUFFER_WRITE_RETRY_ATTEMPTS);
this.bufferWriteFailuresCounter = pluginMetrics.counter(BUFFER_WRITE_FAILURES);
this.requestErrorsCounter = pluginMetrics.counter(REQUEST_ERRORS);
this.nonRetryableErrorsCounter = pluginMetrics.counter(NON_RETRYABLE_ERRORS);
this.retryableErrorsCounter = pluginMetrics.counter(RETRYABLE_ERRORS);
}

@VisibleForTesting
Expand Down Expand Up @@ -128,13 +134,16 @@ public void executePartition(final DimensionalTimeSliceWorkerProgressState state
log.error(NOISY, "{} error processing audit log: {}",
e.isRetryable() ? "Retryable" : "Non-retryable", logId, e);
if (e.isRetryable()) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing to increase the retryable error counter here? Simply calling handleExceptionMetrics method from here without this if else checking would be better.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is throwing a RuntimeException, it will get caught in the outer catch which contains the handleExceptionMetrics(e) call.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. It doesn't work that way, since you are throwing a RuntimeException. It won't be an instance of Office365 exception

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but from my understanding when the RuntimeException is thrown from within this if statement, it would get caught in the outer catch which contains the handleExceptionMetrics() call which would increment the retryableErrorsCounter within handleExceptionMetrics() because the caught error would be a RuntimeException (not an Office365Exception anymore) as you said.

retryableErrorsCounter.increment();
throw new RuntimeException("Retryable error processing audit log: " + logId, e);
} else {
nonRetryableErrorsCounter.increment();
// TODO: When pipeline DLQ is ready, add this record to DLQ instead of dropping the record
log.error(NOISY, "Non-retryable error - record will be dropped. Error processing audit log: {}", logId, e);
}
} catch (Exception e) {
// Unexpected errors are treated as retryable to be safe
retryableErrorsCounter.increment();
log.error(NOISY, "Unexpected error processing audit log: {}", logId, e);
throw new RuntimeException("Unexpected error processing audit log: " + logId, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class Office365RestClient {
private static final String AUDIT_LOG_RESPONSE_SIZE = "auditLogResponseSizeBytes";
private static final String AUDIT_LOG_REQUESTS_FAILED = "auditLogRequestsFailed";
private static final String AUDIT_LOG_REQUESTS_SUCCESS = "auditLogRequestsSuccess";
private static final String API_CALLS = "apiCalls";
private static final String AUDIT_LOGS_REQUESTED = "auditLogsRequested";
private static final String SEARCH_CALL_LATENCY = "searchCallLatency";
private static final String SEARCH_RESPONSE_SIZE = "searchResponseSizeBytes";
Expand All @@ -66,6 +67,7 @@ public class Office365RestClient {
private final Counter auditLogRequestsSuccessCounter;
private final Counter searchRequestsFailedCounter;
private final Counter searchRequestsSuccessCounter;
private final Counter apiCallsCounter;
private final DistributionSummary auditLogResponseSizeSummary;
private final DistributionSummary searchResponseSizeSummary;

Expand All @@ -82,6 +84,7 @@ public Office365RestClient(final Office365AuthenticationInterface authConfig,
this.auditLogRequestsSuccessCounter = pluginMetrics.counter(AUDIT_LOG_REQUESTS_SUCCESS);
this.searchRequestsFailedCounter = pluginMetrics.counter(SEARCH_REQUESTS_FAILED);
this.searchRequestsSuccessCounter = pluginMetrics.counter(SEARCH_REQUESTS_SUCCESS);
this.apiCallsCounter = pluginMetrics.counter(API_CALLS);
this.auditLogResponseSizeSummary = pluginMetrics.summary(AUDIT_LOG_RESPONSE_SIZE);
this.searchResponseSizeSummary = pluginMetrics.summary(SEARCH_RESPONSE_SIZE);

Expand Down Expand Up @@ -124,6 +127,7 @@ public void startSubscriptions() {
RetryHandler.executeWithRetry(() -> {
try {
headers.setBearerAuth(authConfig.getAccessToken());
apiCallsCounter.increment();
ResponseEntity<String> response = restTemplate.exchange(
url,
HttpMethod.POST,
Expand Down Expand Up @@ -179,6 +183,7 @@ public AuditLogsResponse searchAuditLogs(final String contentType,
return RetryHandler.executeWithRetry(
() -> {
headers.setBearerAuth(authConfig.getAccessToken());
apiCallsCounter.increment();

ResponseEntity<List<Map<String, Object>>> response = restTemplate.exchange(
url,
Expand Down Expand Up @@ -230,6 +235,7 @@ public String getAuditLog(String contentUri) {
try {
String response = RetryHandler.executeWithRetry(() -> {
headers.setBearerAuth(authConfig.getAccessToken());
apiCallsCounter.increment();
ResponseEntity<String> responseEntity = restTemplate.exchange(
contentUri,
HttpMethod.GET,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.DimensionalTimeSliceWorkerProgressState;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.plugins.source.microsoft_office365.service.Office365Service;
import org.opensearch.dataprepper.plugins.source.microsoft_office365.exception.Office365Exception;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -340,4 +341,172 @@ void testExecutePartitionWithSearchAuditLogsError() throws Exception {
assertEquals("Search audit logs failed", exception.getMessage());
verify(mockRequestErrorsCounter).increment();
}

@Test
void testRetryableErrorCounterIncrement() throws Exception {
// Mock the retryable errors counter
Counter mockRetryableErrorsCounter = mock(Counter.class);
when(pluginMetrics.counter("retryableErrors")).thenReturn(mockRetryableErrorsCounter);

Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, pluginMetrics);

AuditLogsResponse response = new AuditLogsResponse(
Arrays.asList(Map.of(
"contentId", "ID1",
"contentUri", "uri1"
)), null);

when(service.searchAuditLogs(
anyString(),
any(Instant.class),
any(Instant.class),
any()
)).thenReturn(response);

// Mock service.getAuditLog to throw a retryable Office365Exception
when(service.getAuditLog(anyString()))
.thenThrow(new Office365Exception("Retryable error", true));

doAnswer(invocation -> {
Runnable runnable = invocation.getArgument(0);
runnable.run();
return null;
}).when(bufferWriteLatencyTimer).record(any(Runnable.class));

// Execute and expect RuntimeException to be thrown due to retryable error
RuntimeException exception = assertThrows(RuntimeException.class,
() -> client.executePartition(state, buffer, acknowledgementSet));

// Verify retryable error counter was incremented
verify(mockRetryableErrorsCounter).increment();
assertEquals("Retryable error processing audit log: ID1", exception.getMessage());
}

@Test
void testNonRetryableErrorCounterIncrement() throws Exception {
// Mock the non-retryable errors counter
Counter mockNonRetryableErrorsCounter = mock(Counter.class);
when(pluginMetrics.counter("nonRetryableErrors")).thenReturn(mockNonRetryableErrorsCounter);

Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, pluginMetrics);

AuditLogsResponse response = new AuditLogsResponse(
Arrays.asList(Map.of(
"contentId", "ID1",
"contentUri", "uri1"
)), null);

when(service.searchAuditLogs(
anyString(),
any(Instant.class),
any(Instant.class),
any()
)).thenReturn(response);

// Mock service.getAuditLog to throw a non-retryable Office365Exception
when(service.getAuditLog(anyString()))
.thenThrow(new Office365Exception("Non-retryable error", false));

doAnswer(invocation -> {
Runnable runnable = invocation.getArgument(0);
runnable.run();
return null;
}).when(bufferWriteLatencyTimer).record(any(Runnable.class));

// Execute - non-retryable errors should not throw exception, just drop the record
client.executePartition(state, buffer, acknowledgementSet);

// Verify non-retryable error counter was incremented
verify(mockNonRetryableErrorsCounter).increment();
// Verify buffer was called with empty list (record was dropped)
verify(buffer).writeAll(argThat(list -> list.isEmpty()), anyInt());
}

@Test
void testUnexpectedErrorTreatedAsRetryable() throws Exception {
// Mock the retryable errors counter
Counter mockRetryableErrorsCounter = mock(Counter.class);
when(pluginMetrics.counter("retryableErrors")).thenReturn(mockRetryableErrorsCounter);

Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, pluginMetrics);

AuditLogsResponse response = new AuditLogsResponse(
Arrays.asList(Map.of(
"contentId", "ID1",
"contentUri", "uri1"
)), null);

when(service.searchAuditLogs(
anyString(),
any(Instant.class),
any(Instant.class),
any()
)).thenReturn(response);

// Mock service.getAuditLog to throw an unexpected RuntimeException (not Office365Exception)
when(service.getAuditLog(anyString()))
.thenThrow(new RuntimeException("Unexpected error"));

doAnswer(invocation -> {
Runnable runnable = invocation.getArgument(0);
runnable.run();
return null;
}).when(bufferWriteLatencyTimer).record(any(Runnable.class));

// Execute and expect RuntimeException to be thrown
RuntimeException exception = assertThrows(RuntimeException.class,
() -> client.executePartition(state, buffer, acknowledgementSet));

// Verify retryable error counter was incremented (unexpected errors are treated as retryable)
verify(mockRetryableErrorsCounter).increment();
assertEquals("Unexpected error processing audit log: ID1", exception.getMessage());
}

@Test
void testMultipleRecordsWithMixedErrors() throws Exception {
// Mock both error counters
Counter mockRetryableErrorsCounter = mock(Counter.class);
Counter mockNonRetryableErrorsCounter = mock(Counter.class);
when(pluginMetrics.counter("retryableErrors")).thenReturn(mockRetryableErrorsCounter);
when(pluginMetrics.counter("nonRetryableErrors")).thenReturn(mockNonRetryableErrorsCounter);

Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, pluginMetrics);

AuditLogsResponse response = new AuditLogsResponse(
Arrays.asList(
Map.of("contentId", "ID1", "contentUri", "uri1"),
Map.of("contentId", "ID2", "contentUri", "uri2"),
Map.of("contentId", "ID3", "contentUri", "uri3")
), null);

when(service.searchAuditLogs(
anyString(),
any(Instant.class),
any(Instant.class),
any()
)).thenReturn(response);

// Mock different responses for different URIs
when(service.getAuditLog("uri1"))
.thenReturn("{\"Workload\":\"Exchange\",\"Operation\":\"Test\"}"); // Success
when(service.getAuditLog("uri2"))
.thenThrow(new Office365Exception("Non-retryable error", false)); // Non-retryable
when(service.getAuditLog("uri3"))
.thenThrow(new Office365Exception("Retryable error", true)); // Retryable

doAnswer(invocation -> {
Runnable runnable = invocation.getArgument(0);
runnable.run();
return null;
}).when(bufferWriteLatencyTimer).record(any(Runnable.class));

// Execute and expect RuntimeException due to retryable error
RuntimeException exception = assertThrows(RuntimeException.class,
() -> client.executePartition(state, buffer, acknowledgementSet));

// Verify both error counters were incremented
verify(mockNonRetryableErrorsCounter).increment(); // For ID2
verify(mockRetryableErrorsCounter).increment(); // For ID3
assertEquals("Retryable error processing audit log: ID3", exception.getMessage());
}
}
Loading
Loading