From c3f0349dd8c3ab8b1ba546431b2ce4cdada2bee2 Mon Sep 17 00:00:00 2001 From: Vecheka Chhourn Date: Tue, 11 Nov 2025 00:05:53 +0000 Subject: [PATCH] Standardize Exception handling in souce plugins Signed-off-by: Vecheka Chhourn --- .../Office365CrawlerClient.java | 61 +++--- .../Office365RestClient.java | 10 +- .../exception/Office365Exception.java | 31 --- .../service/Office365Service.java | 8 +- .../Office365CrawlerClientTest.java | 206 ++++++------------ .../Office365RestClientTest.java | 28 ++- .../service/Office365ServiceTest.java | 68 +++++- .../scheduler/WorkerScheduler.java | 29 ++- .../exception/SaaSCrawlerException.java | 34 +++ .../scheduler/WorkerSchedulerTest.java | 76 +++++++ .../exception/SaaSCrawlerExceptionTest.java | 80 +++++++ 11 files changed, 401 insertions(+), 230 deletions(-) delete mode 100644 data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/exception/Office365Exception.java create mode 100644 data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/exception/SaaSCrawlerException.java create mode 100644 data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/exception/SaaSCrawlerExceptionTest.java diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClient.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClient.java index 198584bc26..34f371f6da 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClient.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClient.java @@ -24,7 +24,7 @@ import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.source.microsoft_office365.exception.Office365Exception; +import org.opensearch.dataprepper.plugins.source.source_crawler.exception.SaaSCrawlerException; import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerClient; import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.DimensionalTimeSliceWorkerProgressState; import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo; @@ -51,14 +51,16 @@ @Slf4j @Named public class Office365CrawlerClient implements CrawlerClient { + + public static final String NON_RETRYABLE_ERRORS = "nonRetryableErrors"; + public static final String RETRYABLE_ERRORS = "retryableErrors"; + private static final String BUFFER_WRITE_LATENCY = "bufferWriteLatency"; private static final String BUFFER_WRITE_ATTEMPTS = "bufferWriteAttempts"; private static final String BUFFER_WRITE_SUCCESS = "bufferWriteSuccess"; private static final String BUFFER_WRITE_RETRY_SUCCESS = "bufferWriteRetrySuccess"; private static final String BUFFER_WRITE_RETRY_ATTEMPTS = "bufferWriteRetryAttempts"; private static final String BUFFER_WRITE_FAILURES = "bufferWriteFailures"; - private static final String NON_RETRYABLE_ERRORS = "nonRetryableErrors"; - private static final String RETRYABLE_ERRORS = "retryableErrors"; private static final int BUFFER_TIMEOUT_IN_SECONDS = 10; private static final String CONTENT_ID = "contentId"; private static final String CONTENT_URI = "contentUri"; @@ -129,23 +131,11 @@ public void executePartition(final DimensionalTimeSliceWorkerProgressState state if (record != null) { records.add(record); } - } catch (Office365Exception e) { - + } catch (SaaSCrawlerException e) { + boolean isRetryable = e.isRetryable(); log.error(NOISY, "{} error processing audit log: {}", - e.isRetryable() ? "Retryable" : "Non-retryable", logId, e); - if (e.isRetryable()) { - retryableErrorsCounter.increment(); - throw new RuntimeException("Retryable error processing audit log: " + logId, e); - } else { - nonRetryableErrorsCounter.increment(); - // TODO: When pipeline DLQ is ready, add this record to DLQ instead of dropping the record - log.error(NOISY, "Non-retryable error - record will be dropped. Error processing audit log: {}", logId, e); - } - } catch (Exception e) { - // Unexpected errors are treated as retryable to be safe - retryableErrorsCounter.increment(); - log.error(NOISY, "Unexpected error processing audit log: {}", logId, e); - throw new RuntimeException("Unexpected error processing audit log: " + logId, e); + isRetryable ? "Retryable" : "Non-retryable", logId, e); + throw new SaaSCrawlerException("Error processing audit log: " + logId, e, isRetryable); } } } @@ -170,19 +160,30 @@ public void executePartition(final DimensionalTimeSliceWorkerProgressState state log.error(NOISY, "Failed to process partition for log type {} from {} to {}", logType, startTime, endTime, e); requestErrorsCounter.increment(); - throw e; + if (e instanceof SaaSCrawlerException) { + SaaSCrawlerException saasException = (SaaSCrawlerException) e; + if (saasException.isRetryable()) { + retryableErrorsCounter.increment(); + } else { + nonRetryableErrorsCounter.increment(); + } + throw e; + } + // any other exceptions = non-retryable + nonRetryableErrorsCounter.increment(); + throw new SaaSCrawlerException("Failed to process partition", e, false); } } - private Record processAuditLog(Map metadata) throws Office365Exception { + private Record processAuditLog(Map metadata) throws SaaSCrawlerException { String contentUri = (String) metadata.get(CONTENT_URI); if (contentUri == null) { - throw new Office365Exception("Missing contentUri in metadata", false); + throw new SaaSCrawlerException("Missing contentUri in metadata", false); } String logContent = service.getAuditLog(contentUri); if (logContent == null) { - throw new Office365Exception("Received null log content for URI: " + contentUri, false); + throw new SaaSCrawlerException("Received null log content for URI: " + contentUri, false); } String logId = (String) metadata.get(CONTENT_ID); @@ -200,7 +201,7 @@ private Record processAuditLog(Map metadata) throws Offic String contentType = (String) data.get("Workload"); if (contentType == null) { - throw new Office365Exception("Missing Workload field in audit log: " + logId, false); + throw new SaaSCrawlerException("Missing Workload field in audit log: " + logId, false); } Event event = JacksonEvent.builder() @@ -211,7 +212,7 @@ private Record processAuditLog(Map metadata) throws Offic return new Record<>(event); } catch (JsonProcessingException e) { // JSON parsing errors are non-retryable as they indicate malformed data - throw new Office365Exception("Failed to parse audit log: " + logId, e, false); + throw new SaaSCrawlerException("Failed to parse audit log: " + logId, e, false); } } @@ -244,7 +245,8 @@ private void writeRecordsWithRetry(final List> records, retryCount++; if (retryCount >= maxRetries) { bufferWriteFailuresCounter.increment(); - throw new RuntimeException("Failed to write to buffer after " + maxRetries + " attempts", e); + // allows all writeToBuffer exceptions to be retryable to keep current behaviour of immediate retry by WorkerScheduler + throw new SaaSCrawlerException("Failed to write to buffer after " + maxRetries + " attempts", e, true); } bufferWriteRetryAttemptsCounter.increment(); @@ -253,16 +255,13 @@ private void writeRecordsWithRetry(final List> records, try { Thread.sleep(currentBackoff); - // TODO: Update worker partition state to prevent timeout - // Ideally, we want to call the saveWorkerPartitionState and extend the lease like so - // coordinator.saveProgressStateForPartition(leaderPartition, DEFAULT_EXTEND_LEASE_MINUTES); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); - throw new RuntimeException("Buffer write retry interrupted", ie); + throw new SaaSCrawlerException("Buffer write retry interrupted", ie, true); } } catch (Exception e) { bufferWriteFailuresCounter.increment(); - throw new RuntimeException("Error writing to buffer", e); + throw new SaaSCrawlerException("Error writing to buffer", e, true); } } } diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClient.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClient.java index 9a1e5e39d8..c58462cf4c 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClient.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClient.java @@ -15,7 +15,7 @@ import lombok.extern.slf4j.Slf4j; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.plugins.source.microsoft_office365.auth.Office365AuthenticationInterface; -import org.opensearch.dataprepper.plugins.source.microsoft_office365.exception.Office365Exception; +import org.opensearch.dataprepper.plugins.source.source_crawler.exception.SaaSCrawlerException; import org.opensearch.dataprepper.plugins.source.microsoft_office365.models.AuditLogsResponse; import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.HttpEntity; @@ -148,7 +148,7 @@ public void startSubscriptions() { } catch (Exception e) { publishErrorTypeMetricCounter(e, this.errorTypeMetricCounterMap); log.error(NOISY, "Failed to initialize subscriptions", e); - throw new RuntimeException("Failed to initialize subscriptions: " + e.getMessage(), e); + throw new SaaSCrawlerException("Failed to initialize subscriptions: " + e.getMessage(), e, true); } } @@ -212,7 +212,7 @@ public AuditLogsResponse searchAuditLogs(final String contentType, } catch (Exception e) { publishErrorTypeMetricCounter(e, this.errorTypeMetricCounterMap); log.error(NOISY, "Error while fetching audit logs for content type {}", contentType, e); - throw new RuntimeException("Failed to fetch audit logs", e); + throw new SaaSCrawlerException("Failed to fetch audit logs", e, true); } }); } @@ -226,7 +226,7 @@ public AuditLogsResponse searchAuditLogs(final String contentType, */ public String getAuditLog(String contentUri) { if (!contentUri.startsWith(MANAGEMENT_API_BASE_URL)) { - throw new Office365Exception("ContentUri must be from Office365 Management API: " + contentUri, false); + throw new SaaSCrawlerException("ContentUri must be from Office365 Management API: " + contentUri, false); } auditLogsRequestedCounter.increment(); final HttpHeaders headers = new HttpHeaders(); @@ -256,7 +256,7 @@ public String getAuditLog(String contentUri) { } catch (Exception e) { publishErrorTypeMetricCounter(e, this.errorTypeMetricCounterMap); log.error(NOISY, "Error while fetching audit log content from URI: {}", contentUri, e); - throw new RuntimeException("Failed to fetch audit log", e); + throw new SaaSCrawlerException("Failed to fetch audit log", e, true); } }); } diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/exception/Office365Exception.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/exception/Office365Exception.java deleted file mode 100644 index 7612e34e2f..0000000000 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/exception/Office365Exception.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.dataprepper.plugins.source.microsoft_office365.exception; - -/** - * Exception class for Office 365 source plugin errors. - */ -public class Office365Exception extends RuntimeException { - private final boolean retryable; - - public Office365Exception(String message, boolean retryable) { - super(message); - this.retryable = retryable; - } - - public Office365Exception(String message, Throwable cause, boolean retryable) { - super(message, cause); - this.retryable = retryable; - } - - public boolean isRetryable() { - return retryable; - } -} diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365Service.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365Service.java index d4e3ce5168..21c3f4aeeb 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365Service.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365Service.java @@ -14,7 +14,7 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.plugins.source.microsoft_office365.Office365RestClient; import org.opensearch.dataprepper.plugins.source.microsoft_office365.Office365SourceConfig; -import org.opensearch.dataprepper.plugins.source.microsoft_office365.exception.Office365Exception; +import org.opensearch.dataprepper.plugins.source.source_crawler.exception.SaaSCrawlerException; import org.opensearch.dataprepper.plugins.source.microsoft_office365.models.AuditLogsResponse; import javax.inject.Named; @@ -52,10 +52,10 @@ public AuditLogsResponse searchAuditLogs(final String logType, final Instant endTime, final String nextPageUri) { if (startTime == null || endTime == null) { - throw new IllegalArgumentException("startTime and endTime must not be null"); + throw new SaaSCrawlerException("startTime and endTime must not be null", false); } if (logType == null) { - throw new IllegalArgumentException("logType must not be null"); + throw new SaaSCrawlerException("logType must not be null", false); } try { // If pagination URI exists, use it directly @@ -78,7 +78,7 @@ public AuditLogsResponse searchAuditLogs(final String logType, return response; } catch (Exception e) { windowRetryCounter.increment(); - throw new Office365Exception( + throw new SaaSCrawlerException( String.format("Failed to fetch logs for time window %s to %s for log type %s.", startTime, endTime, logType), e, true); } diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClientTest.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClientTest.java index 3509bcd161..737b42e9f1 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClientTest.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClientTest.java @@ -28,9 +28,9 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.source.microsoft_office365.models.AuditLogsResponse; import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.DimensionalTimeSliceWorkerProgressState; +import org.opensearch.dataprepper.plugins.source.source_crawler.exception.SaaSCrawlerException; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.plugins.source.microsoft_office365.service.Office365Service; -import org.opensearch.dataprepper.plugins.source.microsoft_office365.exception.Office365Exception; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +42,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -59,6 +60,8 @@ import static org.mockito.Mockito.never; import static org.opensearch.dataprepper.plugins.source.source_crawler.utils.MetricsHelper.REQUEST_ERRORS; +import static org.opensearch.dataprepper.plugins.source.microsoft_office365.Office365CrawlerClient.NON_RETRYABLE_ERRORS; +import static org.opensearch.dataprepper.plugins.source.microsoft_office365.Office365CrawlerClient.RETRYABLE_ERRORS; @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) @@ -181,9 +184,14 @@ void testExecutePartitionWithJsonProcessingError() throws Exception { return null; }).when(bufferWriteLatencyTimer).record(any(Runnable.class)); - client.executePartition(state, buffer, acknowledgementSet); + SaaSCrawlerException exception = assertThrows(SaaSCrawlerException.class, + () -> client.executePartition(state, buffer, acknowledgementSet)); + + assertEquals("Error processing audit log: ID1", exception.getMessage()); + assertFalse(exception.isRetryable()); + assertTrue(exception.getCause() instanceof SaaSCrawlerException); + assertEquals("Failed to parse audit log: ID1", exception.getCause().getMessage()); - verify(buffer).writeAll(argThat(list -> list.isEmpty()), anyInt()); verify(mockRequestErrorsCounter, never()).increment(); } @@ -251,15 +259,20 @@ void testBufferWriteTimeout() throws Exception { .when(buffer) .writeAll(any(), anyInt()); - RuntimeException exception = assertThrows(RuntimeException.class, + SaaSCrawlerException exception = assertThrows(SaaSCrawlerException.class, () -> client.executePartition(state, buffer, acknowledgementSet)); assertEquals("Error writing to buffer", exception.getMessage()); + assertTrue(exception.isRetryable()); verify(buffer).writeAll(any(), anyInt()); } @Test void testNonRetryableError() throws Exception { + // Mock the non-retryable errors counter + Counter mockNonRetryableErrorsCounter = mock(Counter.class); + when(pluginMetrics.counter(NON_RETRYABLE_ERRORS)).thenReturn(mockNonRetryableErrorsCounter); + Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, pluginMetrics); AuditLogsResponse response = new AuditLogsResponse( @@ -282,71 +295,23 @@ void testNonRetryableError() throws Exception { return null; }).when(bufferWriteLatencyTimer).record(any(Runnable.class)); - client.executePartition(state, buffer, acknowledgementSet); + SaaSCrawlerException exception = assertThrows(SaaSCrawlerException.class, + () -> client.executePartition(state, buffer, acknowledgementSet)); - verify(buffer).writeAll(argThat(list -> list.isEmpty()), anyInt()); - } + assertEquals("Error processing audit log: ID1", exception.getMessage()); + assertFalse(exception.isRetryable()); + assertTrue(exception.getCause() instanceof SaaSCrawlerException); + assertEquals("Received null log content for URI: uri1", exception.getCause().getMessage()); - @Test - void testMissingWorkloadField() throws Exception { - Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, pluginMetrics); - - AuditLogsResponse response = new AuditLogsResponse( - Arrays.asList(Map.of( - "contentId", "ID1", - "contentUri", "uri1" - )), null); - - when(service.searchAuditLogs( - anyString(), - any(Instant.class), - any(Instant.class), - any() - )).thenReturn(response); - - when(service.getAuditLog(anyString())).thenReturn("{\"Operation\":\"Test\"}"); - - doAnswer(invocation -> { - Runnable runnable = invocation.getArgument(0); - runnable.run(); - return null; - }).when(bufferWriteLatencyTimer).record(any(Runnable.class)); - - client.executePartition(state, buffer, acknowledgementSet); - - verify(buffer).writeAll(argThat(list -> list.isEmpty()), anyInt()); - } - - @Test - void testExecutePartitionWithSearchAuditLogsError() throws Exception { - // Mock the total failures counter before creating the client - Counter mockRequestErrorsCounter = mock(Counter.class); - when(pluginMetrics.counter(REQUEST_ERRORS)).thenReturn(mockRequestErrorsCounter); - - Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, pluginMetrics); - - // Mock searchAuditLogs to throw exception - when(service.searchAuditLogs( - eq("Exchange"), // Match the value from setUp() - any(Instant.class), - any(Instant.class), - isNull() - )).thenThrow(new RuntimeException("Search audit logs failed")); - - // Execute and verify exception - RuntimeException exception = assertThrows(RuntimeException.class, - () -> client.executePartition(state, buffer, acknowledgementSet)); - - // Verify exception message and counter increment - assertEquals("Search audit logs failed", exception.getMessage()); - verify(mockRequestErrorsCounter).increment(); + verify(buffer, never()).writeAll(argThat(list -> list.isEmpty()), anyInt()); + verify(mockNonRetryableErrorsCounter).increment(); } @Test void testRetryableErrorCounterIncrement() throws Exception { // Mock the retryable errors counter Counter mockRetryableErrorsCounter = mock(Counter.class); - when(pluginMetrics.counter("retryableErrors")).thenReturn(mockRetryableErrorsCounter); + when(pluginMetrics.counter(RETRYABLE_ERRORS)).thenReturn(mockRetryableErrorsCounter); Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, pluginMetrics); @@ -363,9 +328,9 @@ void testRetryableErrorCounterIncrement() throws Exception { any() )).thenReturn(response); - // Mock service.getAuditLog to throw a retryable Office365Exception + // Mock service.getAuditLog to throw a retryable SaaSCrawlerException when(service.getAuditLog(anyString())) - .thenThrow(new Office365Exception("Retryable error", true)); + .thenThrow(new SaaSCrawlerException("Retryable error", true)); doAnswer(invocation -> { Runnable runnable = invocation.getArgument(0); @@ -379,15 +344,11 @@ void testRetryableErrorCounterIncrement() throws Exception { // Verify retryable error counter was incremented verify(mockRetryableErrorsCounter).increment(); - assertEquals("Retryable error processing audit log: ID1", exception.getMessage()); + assertEquals("Error processing audit log: ID1", exception.getMessage()); } @Test - void testNonRetryableErrorCounterIncrement() throws Exception { - // Mock the non-retryable errors counter - Counter mockNonRetryableErrorsCounter = mock(Counter.class); - when(pluginMetrics.counter("nonRetryableErrors")).thenReturn(mockNonRetryableErrorsCounter); - + void testMissingWorkloadField() throws Exception { Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, pluginMetrics); AuditLogsResponse response = new AuditLogsResponse( @@ -403,9 +364,7 @@ void testNonRetryableErrorCounterIncrement() throws Exception { any() )).thenReturn(response); - // Mock service.getAuditLog to throw a non-retryable Office365Exception - when(service.getAuditLog(anyString())) - .thenThrow(new Office365Exception("Non-retryable error", false)); + when(service.getAuditLog(anyString())).thenReturn("{\"Operation\":\"Test\"}"); doAnswer(invocation -> { Runnable runnable = invocation.getArgument(0); @@ -413,100 +372,73 @@ void testNonRetryableErrorCounterIncrement() throws Exception { return null; }).when(bufferWriteLatencyTimer).record(any(Runnable.class)); - // Execute - non-retryable errors should not throw exception, just drop the record - client.executePartition(state, buffer, acknowledgementSet); + SaaSCrawlerException exception = assertThrows(SaaSCrawlerException.class, + () -> client.executePartition(state, buffer, acknowledgementSet)); - // Verify non-retryable error counter was incremented - verify(mockNonRetryableErrorsCounter).increment(); - // Verify buffer was called with empty list (record was dropped) - verify(buffer).writeAll(argThat(list -> list.isEmpty()), anyInt()); + assertEquals("Error processing audit log: ID1", exception.getMessage()); + assertFalse(exception.isRetryable()); + assertTrue(exception.getCause() instanceof SaaSCrawlerException); + assertEquals("Missing Workload field in audit log: ID1", exception.getCause().getMessage()); + + verify(buffer, never()).writeAll(argThat(list -> list.isEmpty()), anyInt()); } @Test - void testUnexpectedErrorTreatedAsRetryable() throws Exception { - // Mock the retryable errors counter + void testExecutePartitionWithSearchAuditLogsError() throws Exception { + Counter mockRequestErrorsCounter = mock(Counter.class); Counter mockRetryableErrorsCounter = mock(Counter.class); - when(pluginMetrics.counter("retryableErrors")).thenReturn(mockRetryableErrorsCounter); + when(pluginMetrics.counter(REQUEST_ERRORS)).thenReturn(mockRequestErrorsCounter); + when(pluginMetrics.counter(RETRYABLE_ERRORS)).thenReturn(mockRetryableErrorsCounter); Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, pluginMetrics); - AuditLogsResponse response = new AuditLogsResponse( - Arrays.asList(Map.of( - "contentId", "ID1", - "contentUri", "uri1" - )), null); - + // Mock searchAuditLogs to throw exception when(service.searchAuditLogs( - anyString(), + eq("Exchange"), // Match the value from setUp() any(Instant.class), any(Instant.class), - any() - )).thenReturn(response); - - // Mock service.getAuditLog to throw an unexpected RuntimeException (not Office365Exception) - when(service.getAuditLog(anyString())) - .thenThrow(new RuntimeException("Unexpected error")); - - doAnswer(invocation -> { - Runnable runnable = invocation.getArgument(0); - runnable.run(); - return null; - }).when(bufferWriteLatencyTimer).record(any(Runnable.class)); + isNull() + )).thenThrow(new SaaSCrawlerException("Search audit logs failed", true)); - // Execute and expect RuntimeException to be thrown - RuntimeException exception = assertThrows(RuntimeException.class, + // Execute and verify exception + SaaSCrawlerException exception = assertThrows(SaaSCrawlerException.class, () -> client.executePartition(state, buffer, acknowledgementSet)); - // Verify retryable error counter was incremented (unexpected errors are treated as retryable) + // Verify exception message and counter increment + assertEquals("Search audit logs failed", exception.getMessage()); + assertTrue(exception.isRetryable()); + verify(mockRequestErrorsCounter).increment(); verify(mockRetryableErrorsCounter).increment(); - assertEquals("Unexpected error processing audit log: ID1", exception.getMessage()); } @Test - void testMultipleRecordsWithMixedErrors() throws Exception { - // Mock both error counters - Counter mockRetryableErrorsCounter = mock(Counter.class); + void testExecutePartitionWithNonSaaSCrawlerException() throws Exception { + // Create the counter mock before creating the client + Counter mockRequestErrorsCounter = mock(Counter.class); Counter mockNonRetryableErrorsCounter = mock(Counter.class); - when(pluginMetrics.counter("retryableErrors")).thenReturn(mockRetryableErrorsCounter); - when(pluginMetrics.counter("nonRetryableErrors")).thenReturn(mockNonRetryableErrorsCounter); + when(pluginMetrics.counter(REQUEST_ERRORS)).thenReturn(mockRequestErrorsCounter); + when(pluginMetrics.counter(NON_RETRYABLE_ERRORS)).thenReturn(mockNonRetryableErrorsCounter); + // Create client after counter is mocked Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, pluginMetrics); - AuditLogsResponse response = new AuditLogsResponse( - Arrays.asList( - Map.of("contentId", "ID1", "contentUri", "uri1"), - Map.of("contentId", "ID2", "contentUri", "uri2"), - Map.of("contentId", "ID3", "contentUri", "uri3") - ), null); - + // Simulate a non-SaaSCrawlerException (like RuntimeException) when(service.searchAuditLogs( anyString(), any(Instant.class), any(Instant.class), any() - )).thenReturn(response); - - // Mock different responses for different URIs - when(service.getAuditLog("uri1")) - .thenReturn("{\"Workload\":\"Exchange\",\"Operation\":\"Test\"}"); // Success - when(service.getAuditLog("uri2")) - .thenThrow(new Office365Exception("Non-retryable error", false)); // Non-retryable - when(service.getAuditLog("uri3")) - .thenThrow(new Office365Exception("Retryable error", true)); // Retryable - - doAnswer(invocation -> { - Runnable runnable = invocation.getArgument(0); - runnable.run(); - return null; - }).when(bufferWriteLatencyTimer).record(any(Runnable.class)); + )).thenThrow(new RuntimeException("Unexpected error")); - // Execute and expect RuntimeException due to retryable error - RuntimeException exception = assertThrows(RuntimeException.class, + // Execute and verify exception + SaaSCrawlerException exception = assertThrows(SaaSCrawlerException.class, () -> client.executePartition(state, buffer, acknowledgementSet)); - // Verify both error counters were incremented - verify(mockNonRetryableErrorsCounter).increment(); // For ID2 - verify(mockRetryableErrorsCounter).increment(); // For ID3 - assertEquals("Retryable error processing audit log: ID3", exception.getMessage()); + // Verify: + assertEquals("Failed to process partition", exception.getMessage()); + assertFalse(exception.isRetryable()); + assertTrue(exception.getCause() instanceof RuntimeException); + verify(mockRequestErrorsCounter).increment(); + verify(mockNonRetryableErrorsCounter).increment(); } } diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClientTest.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClientTest.java index 7fbcb025fc..a4a993c7f1 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClientTest.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClientTest.java @@ -22,6 +22,7 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.plugins.source.microsoft_office365.auth.Office365AuthenticationInterface; import org.opensearch.dataprepper.plugins.source.microsoft_office365.models.AuditLogsResponse; +import org.opensearch.dataprepper.plugins.source.source_crawler.exception.SaaSCrawlerException; import org.opensearch.dataprepper.test.helper.ReflectivelySetField; import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.HttpEntity; @@ -115,10 +116,11 @@ void testStartSubscriptionsOtherError() { when(restTemplate.exchange(anyString(), eq(HttpMethod.POST), any(), eq(String.class))) .thenThrow(otherException); - RuntimeException exception = assertThrows(RuntimeException.class, + SaaSCrawlerException exception = assertThrows(SaaSCrawlerException.class, () -> office365RestClient.startSubscriptions()); assertEquals("Failed to initialize subscriptions: 400 Bad Request", exception.getMessage()); + assertTrue(exception.isRetryable()); } @Test @@ -210,7 +212,7 @@ void testSearchAuditLogsFailure() { )).thenThrow(new HttpClientErrorException(HttpStatus.INTERNAL_SERVER_ERROR)); // Verify that the exception is propagated - RuntimeException exception = assertThrows(RuntimeException.class, + SaaSCrawlerException exception = assertThrows(SaaSCrawlerException.class, () -> office365RestClient.searchAuditLogs( "Audit.AzureActiveDirectory", startTime, @@ -218,6 +220,7 @@ void testSearchAuditLogsFailure() { null )); assertEquals("Failed to fetch audit logs", exception.getMessage()); + assertTrue(exception.isRetryable()); } @Test @@ -250,9 +253,10 @@ void testGetAuditLogFailure() { )).thenThrow(new HttpClientErrorException(HttpStatus.INTERNAL_SERVER_ERROR)); // Verify that the exception is propagated - RuntimeException exception = assertThrows(RuntimeException.class, + SaaSCrawlerException exception = assertThrows(SaaSCrawlerException.class, () -> office365RestClient.getAuditLog(contentUri)); assertEquals("Failed to fetch audit log", exception.getMessage()); + assertTrue(exception.isRetryable()); } @Test @@ -318,7 +322,7 @@ void testSearchAuditLogsFailureCounterIncrementsOnEachRetry() throws Exception { any(ParameterizedTypeReference.class) )).thenThrow(new HttpClientErrorException(HttpStatus.TOO_MANY_REQUESTS)); - assertThrows(RuntimeException.class, () -> + assertThrows(SaaSCrawlerException.class, () -> office365RestClient.searchAuditLogs( "Audit.AzureActiveDirectory", startTime, @@ -347,7 +351,7 @@ void testGetAuditLogFailureCounterIncrementsOnEachRetry() throws Exception { eq(String.class) )).thenThrow(new HttpClientErrorException(HttpStatus.TOO_MANY_REQUESTS)); - assertThrows(RuntimeException.class, () -> + assertThrows(SaaSCrawlerException.class, () -> office365RestClient.getAuditLog(contentUri) ); @@ -441,7 +445,7 @@ void testGetAuditLogMetricsInvocation() throws NoSuchFieldException, IllegalAcce when(restTemplate.exchange(eq(contentUri), eq(HttpMethod.GET), any(), eq(String.class))) .thenThrow(new HttpClientErrorException(HttpStatus.INTERNAL_SERVER_ERROR)); - assertThrows(RuntimeException.class, () -> office365RestClient.getAuditLog(contentUri)); + assertThrows(SaaSCrawlerException.class, () -> office365RestClient.getAuditLog(contentUri)); // Verify failure metrics verify(mockAuditLogsRequestedCounter, times(2)).increment(); // Called again before retry @@ -532,9 +536,10 @@ void testPublishErrorTypeMetricCounterForGetAuditLog(HttpStatus status, boolean )).thenThrow(new HttpClientErrorException(status)); // Execute and verify exception - RuntimeException exception = assertThrows(RuntimeException.class, + SaaSCrawlerException exception = assertThrows(SaaSCrawlerException.class, () -> office365RestClient.getAuditLog(contentUri)); assertEquals("Failed to fetch audit log", exception.getMessage()); + assertTrue(exception.isRetryable()); // Verify counter increment if (shouldIncrementCounter) { @@ -562,9 +567,10 @@ void testPublishErrorTypeMetricCounterWithUnmappedError() throws NoSuchFieldExce )).thenThrow(new HttpClientErrorException(HttpStatus.BAD_GATEWAY)); // Execute and verify exception - RuntimeException exception = assertThrows(RuntimeException.class, + SaaSCrawlerException exception = assertThrows(SaaSCrawlerException.class, () -> office365RestClient.getAuditLog(contentUri)); assertEquals("Failed to fetch audit log", exception.getMessage()); + assertTrue(exception.isRetryable()); // Verify no counters were incremented assertTrue(mockErrorTypeMetricCounterMap.isEmpty()); @@ -610,7 +616,7 @@ void testPublishErrorTypeMetricCounterForSearchAuditLogs(HttpStatus status, bool )).thenThrow(new HttpClientErrorException(status)); // Execute and verify exception - RuntimeException exception = assertThrows(RuntimeException.class, + SaaSCrawlerException exception = assertThrows(SaaSCrawlerException.class, () -> office365RestClient.searchAuditLogs( contentType, startTime, @@ -618,6 +624,7 @@ void testPublishErrorTypeMetricCounterForSearchAuditLogs(HttpStatus status, bool null )); assertEquals("Failed to fetch audit logs", exception.getMessage()); + assertTrue(exception.isRetryable()); // Verify counter increment if (shouldIncrementCounter) { @@ -662,7 +669,7 @@ void testPublishErrorTypeMetricCounterForStartSubscriptions(HttpStatus status, b )).thenThrow(new HttpClientErrorException(status)); // Execute and verify exception - RuntimeException exception = assertThrows(RuntimeException.class, + SaaSCrawlerException exception = assertThrows(SaaSCrawlerException.class, () -> office365RestClient.startSubscriptions()); if (status == HttpStatus.FORBIDDEN) { assertEquals("Failed to initialize subscriptions: Access forbidden: 403 FORBIDDEN", @@ -671,6 +678,7 @@ void testPublishErrorTypeMetricCounterForStartSubscriptions(HttpStatus status, b assertEquals("Failed to initialize subscriptions: " + status.toString(), exception.getMessage()); } + assertTrue(exception.isRetryable()); if (shouldIncrementCounter) { verify(mockCounter).increment(); diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365ServiceTest.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365ServiceTest.java index d9c6dbf86a..ba64d22820 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365ServiceTest.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365ServiceTest.java @@ -17,7 +17,7 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.plugins.source.microsoft_office365.Office365RestClient; import org.opensearch.dataprepper.plugins.source.microsoft_office365.Office365SourceConfig; -import org.opensearch.dataprepper.plugins.source.microsoft_office365.exception.Office365Exception; +import org.opensearch.dataprepper.plugins.source.source_crawler.exception.SaaSCrawlerException; import org.opensearch.dataprepper.plugins.source.microsoft_office365.models.AuditLogsResponse; import java.time.Duration; @@ -31,6 +31,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; @@ -183,13 +185,14 @@ void testSearchAuditLogsError() { any(), any(), any(), any() )).thenThrow(new RuntimeException("API Error")); - Office365Exception exception = assertThrows( - Office365Exception.class, + SaaSCrawlerException exception = assertThrows( + SaaSCrawlerException.class, () -> office365Service.searchAuditLogs(logType, startTime, endTime, null) ); assertEquals("Failed to fetch logs for time window " + startTime + " to " + endTime + " for log type " + logType + ".", exception.getMessage()); + assertTrue(exception.isRetryable()); } @Test @@ -208,6 +211,65 @@ void testGetAuditLog() { verify(office365RestClient).getAuditLog("test-id"); } + @Test + void testSearchAuditLogsWithNullStartTime() { + // Given + Instant endTime = Instant.now(); + String logType = "Exchange"; + + // When/Then + SaaSCrawlerException exception = assertThrows( + SaaSCrawlerException.class, + () -> office365Service.searchAuditLogs(logType, null, endTime, null) + ); + assertEquals("startTime and endTime must not be null", exception.getMessage()); + assertFalse(exception.isRetryable()); + } + + @Test + void testSearchAuditLogsWithNullEndTime() { + // Given + Instant startTime = Instant.now().minus(Duration.ofHours(1)); + String logType = "Exchange"; + + // When/Then + SaaSCrawlerException exception = assertThrows( + SaaSCrawlerException.class, + () -> office365Service.searchAuditLogs(logType, startTime, null, null) + ); + assertEquals("startTime and endTime must not be null", exception.getMessage()); + assertFalse(exception.isRetryable()); + } + + @Test + void testSearchAuditLogsWithBothTimesNull() { + // Given + String logType = "Exchange"; + + // When/Then + SaaSCrawlerException exception = assertThrows( + SaaSCrawlerException.class, + () -> office365Service.searchAuditLogs(logType, null, null, null) + ); + assertEquals("startTime and endTime must not be null", exception.getMessage()); + assertFalse(exception.isRetryable()); + } + + @Test + void testSearchAuditLogsWithNullLogType() { + // Given + Instant startTime = Instant.now().minus(Duration.ofHours(1)); + Instant endTime = Instant.now(); + + // When/Then + SaaSCrawlerException exception = assertThrows( + SaaSCrawlerException.class, + () -> office365Service.searchAuditLogs(null, startTime, endTime, null) + ); + assertEquals("logType must not be null", exception.getMessage()); + assertFalse(exception.isRetryable()); + } + private Map createTestItem(String contentId, Instant contentCreated) { Map item = new HashMap<>(); item.put("contentId", contentId); diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/WorkerScheduler.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/WorkerScheduler.java index 01a1900ea3..bab62eb299 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/WorkerScheduler.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/WorkerScheduler.java @@ -13,6 +13,7 @@ import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerSourceConfig; import org.opensearch.dataprepper.plugins.source.source_crawler.base.SaasWorkerProgressState; import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.SaasSourcePartition; + import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,8 +29,9 @@ public class WorkerScheduler implements Runnable { public static final String ACKNOWLEDGEMENT_SET_SUCCESS_METRIC_NAME = "acknowledgementSetSuccesses"; public static final String ACKNOWLEDGEMENT_SET_FAILURES_METRIC_NAME = "acknowledgementSetFailures"; - private static final String WORKER_PARTITIONS_COMPLETED = "workerPartitionsCompleted"; - private static final String WORKER_PARTITIONS_FAILED = "workerPartitionsFailed"; + public static final String WORKER_PARTITIONS_FAILED = "workerPartitionsFailed"; + public static final String WORKER_PARTITIONS_COMPLETED = "workerPartitionsCompleted"; + private static final Duration ACKNOWLEDGEMENT_SET_TIMEOUT = Duration.ofSeconds(20); private static final Logger log = LoggerFactory.getLogger(WorkerScheduler.class); private static final int RETRY_BACKOFF_ON_EXCEPTION_MILLIS = 5_000; @@ -92,18 +94,27 @@ public void run() { } } } catch (Exception e) { - log.error("Error processing partition", e); - parititionsFailedCounter.increment(); - try { - Thread.sleep(RETRY_BACKOFF_ON_EXCEPTION_MILLIS); - } catch (InterruptedException ex) { - log.warn("Thread interrupted while waiting to retry due to {}", ex.getMessage()); - } + // TODO: will be in a followup to handle retry strategy differently for non-retryable exceptions + backoffRetry(e); } } log.warn("SourceItemWorker Scheduler is interrupted, looks like shutdown has triggered"); } + /** + * Default behaviour of backoff retry workerScheduler by sleeping RETRY_BACKOFF_ON_EXCEPTION_MILLIS + * @param e - exception thrown by workerScheduler + */ + private void backoffRetry(Exception e) { + this.parititionsFailedCounter.increment(); + log.error("Error processing partition", e); + try { + Thread.sleep(RETRY_BACKOFF_ON_EXCEPTION_MILLIS); + } catch (InterruptedException ex) { + log.warn("Thread interrupted while waiting to retry due to {}", ex.getMessage()); + } + } + private void processPartition(EnhancedSourcePartition partition, Buffer> buffer) { // Implement your source extraction logic here // Update the partition state or commit the partition as needed diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/exception/SaaSCrawlerException.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/exception/SaaSCrawlerException.java new file mode 100644 index 0000000000..6deaa1fc33 --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/exception/SaaSCrawlerException.java @@ -0,0 +1,34 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.source.source_crawler.exception; + +/** + * Exception class for source plugin exceptions. + * Two following criterias: + * + Any REST API calls failures will be considered retryable + * + Other exceptions (e.g internal failures) will be considered non-retryable except writing to Buffer + */ +public class SaaSCrawlerException extends RuntimeException { + private final boolean retryable; + + public SaaSCrawlerException(String message, boolean retryable) { + super(message); + this.retryable = retryable; + } + + public SaaSCrawlerException(String message, Throwable cause, boolean retryable) { + super(message, cause); + this.retryable = retryable; + } + + public boolean isRetryable() { + return retryable; + } +} diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/WorkerSchedulerTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/WorkerSchedulerTest.java index 546dcb8e99..60695b0f2f 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/WorkerSchedulerTest.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/WorkerSchedulerTest.java @@ -23,6 +23,10 @@ import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.DimensionalTimeSliceWorkerProgressState; import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.PaginationCrawlerWorkerProgressState; import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.CrowdStrikeWorkerProgressState; +import org.opensearch.dataprepper.plugins.source.source_crawler.exception.SaaSCrawlerException; +import io.micrometer.core.instrument.Counter; + +import java.time.Instant; import java.util.Optional; import java.util.UUID; import java.util.concurrent.ExecutorService; @@ -35,9 +39,18 @@ import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.times; +import static org.mockito.Mockito.any; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; + +import static org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler.WorkerScheduler.WORKER_PARTITIONS_FAILED; +import static org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler.WorkerScheduler.WORKER_PARTITIONS_COMPLETED; +import static org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler.WorkerScheduler.ACKNOWLEDGEMENT_SET_SUCCESS_METRIC_NAME; +import static org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler.WorkerScheduler.ACKNOWLEDGEMENT_SET_FAILURES_METRIC_NAME; @ExtendWith(MockitoExtension.class) public class WorkerSchedulerTest { @@ -259,4 +272,67 @@ void testCompletePartitionWithAcknowledgements() throws InterruptedException { // Verify that coordinator.completePartition was called (from the acknowledgement callback with true) verify(coordinator, times(1)).completePartition(eq(mockPartition)); } + + @Test + void testRetryableAndNonRetryableSaaSCrawlerExceptions() throws InterruptedException { + // Given + Counter mockFailedCounter = mock(Counter.class); + Counter mockCompletedCounter = mock(Counter.class); + Counter mockAckSuccessCounter = mock(Counter.class); + Counter mockAckFailureCounter = mock(Counter.class); + + when(pluginMetrics.counter(WORKER_PARTITIONS_FAILED)).thenReturn(mockFailedCounter); + when(pluginMetrics.counter(WORKER_PARTITIONS_COMPLETED)).thenReturn(mockCompletedCounter); + when(pluginMetrics.counter(ACKNOWLEDGEMENT_SET_SUCCESS_METRIC_NAME)).thenReturn(mockAckSuccessCounter); + when(pluginMetrics.counter(ACKNOWLEDGEMENT_SET_FAILURES_METRIC_NAME)).thenReturn(mockAckFailureCounter);; + + WorkerScheduler workerScheduler = new WorkerScheduler(pluginName, buffer, + coordinator, sourceConfig, crawler, pluginMetrics, acknowledgementSetManager); + + // Mock partition and state + DimensionalTimeSliceWorkerProgressState mockProgressState = new DimensionalTimeSliceWorkerProgressState(); + mockProgressState.setPartitionCreationTime(Instant.now()); + SaasSourcePartition mockPartition = org.mockito.Mockito.mock(SaasSourcePartition.class); + when(mockPartition.getProgressState()).thenReturn(Optional.of(mockProgressState)); + + // Set up coordinator to return our mock partition + when(coordinator.acquireAvailablePartition(SaasSourcePartition.PARTITION_TYPE)) + .thenReturn(Optional.of(mockPartition)) + .thenReturn(Optional.empty()); + + // Mock crawler to throw retryable exception + doThrow(new SaaSCrawlerException("Retryable error", true)) + .when(crawler).executePartition(any(), any(), any()); + + // When + ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(workerScheduler); + + Thread.sleep(500); + executorService.shutdownNow(); + + // Then - Verify retryable exception handling + verify(coordinator, never()).saveProgressStateForPartition(any(), any()); + verify(coordinator, never()).giveUpPartition(any()); + verify(mockFailedCounter).increment(); + + // Reset mocks for non-retryable test + when(coordinator.acquireAvailablePartition(SaasSourcePartition.PARTITION_TYPE)) + .thenReturn(Optional.of(mockPartition)) + .thenReturn(Optional.empty()); + doThrow(new SaaSCrawlerException("Non-retryable error", false)) + .when(crawler).executePartition(any(), any(), any()); + + // When - Run again with non-retryable exception + executorService = Executors.newSingleThreadExecutor(); + executorService.submit(workerScheduler); + + Thread.sleep(500); + executorService.shutdownNow(); + + // Then - Verify non-retryable exception handling + verify(coordinator, never()).saveProgressStateForPartition(any(), any()); + verify(coordinator, never()).giveUpPartition(any()); + verify(mockFailedCounter, times(2)).increment(); + } } diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/exception/SaaSCrawlerExceptionTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/exception/SaaSCrawlerExceptionTest.java new file mode 100644 index 0000000000..4054058814 --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/exception/SaaSCrawlerExceptionTest.java @@ -0,0 +1,80 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.source_crawler.exception; + +import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertFalse; + +public class SaaSCrawlerExceptionTest { + + @Test + void constructor_WithMessageAndRetryableFlag_SetsMessageAndFlag() { + // Given + String errorMessage = "Test error message"; + boolean retryable = true; + + // When + SaaSCrawlerException exception = new SaaSCrawlerException(errorMessage, retryable); + + // Then + assertEquals(errorMessage, exception.getMessage()); + assertTrue(exception.isRetryable()); + } + + @Test + void constructor_WithMessageAndRetryableFlagFalse_SetsMessageAndFlag() { + // Given + String errorMessage = "Test error message"; + boolean retryable = false; + + // When + SaaSCrawlerException exception = new SaaSCrawlerException(errorMessage, retryable); + + // Then + assertEquals(errorMessage, exception.getMessage()); + assertFalse(exception.isRetryable()); + } + + @Test + void constructor_WithMessageCauseAndRetryableFlag_SetsAllFields() { + // Given + String errorMessage = "Test error message"; + Throwable cause = new IllegalArgumentException("Test cause"); + boolean retryable = true; + + // When + SaaSCrawlerException exception = new SaaSCrawlerException(errorMessage, cause, retryable); + + // Then + assertEquals(errorMessage, exception.getMessage()); + assertEquals(cause, exception.getCause()); + assertTrue(exception.isRetryable()); + } + + @Test + void constructor_WithMessageCauseAndRetryableFlagFalse_SetsAllFields() { + // Given + String errorMessage = "Test error message"; + Throwable cause = new IllegalArgumentException("Test cause"); + boolean retryable = false; + + // When + SaaSCrawlerException exception = new SaaSCrawlerException(errorMessage, cause, retryable); + + // Then + assertEquals(errorMessage, exception.getMessage()); + assertEquals(cause, exception.getCause()); + assertFalse(exception.isRetryable()); + } + +}