From a89b3d7c81256577c1f386a4a292cc30f2527afc Mon Sep 17 00:00:00 2001 From: Brett Zeligson Date: Tue, 4 Nov 2025 12:58:09 -0500 Subject: [PATCH 1/5] Add Retryable/Non-Retryable Exception + API Calls Metrics for O365 Signed-off-by: Brett Zeligson --- .../Office365CrawlerClient.java | 24 ++- .../Office365RestClient.java | 6 + .../Office365CrawlerClientTest.java | 169 ++++++++++++++++++ .../Office365RestClientTest.java | 123 +++++++++++++ .../source_crawler/utils/MetricsHelper.java | 2 +- 5 files changed, 321 insertions(+), 3 deletions(-) diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClient.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClient.java index 2766bc2f00..e4ab8b476a 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClient.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClient.java @@ -42,7 +42,7 @@ import java.util.concurrent.TimeoutException; import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY; -import static org.opensearch.dataprepper.plugins.source.source_crawler.utils.MetricsHelper.REQEUEST_ERRORS; +import static org.opensearch.dataprepper.plugins.source.source_crawler.utils.MetricsHelper.REQUEST_ERRORS; /** * Implementation of CrawlerClient for Office 365 audit logs. @@ -57,6 +57,8 @@ public class Office365CrawlerClient implements CrawlerClient processAuditLog(Map metadata) throws Office365Exception { String contentUri = (String) metadata.get(CONTENT_URI); if (contentUri == null) { diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClient.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClient.java index 3e8c317728..53c7536cab 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClient.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClient.java @@ -50,6 +50,7 @@ public class Office365RestClient { private static final String AUDIT_LOG_RESPONSE_SIZE = "auditLogResponseSizeBytes"; private static final String AUDIT_LOG_REQUESTS_FAILED = "auditLogRequestsFailed"; private static final String AUDIT_LOG_REQUESTS_SUCCESS = "auditLogRequestsSuccess"; + private static final String API_CALLS = "apiCalls"; private static final String AUDIT_LOGS_REQUESTED = "auditLogsRequested"; private static final String SEARCH_CALL_LATENCY = "searchCallLatency"; private static final String SEARCH_RESPONSE_SIZE = "searchResponseSizeBytes"; @@ -67,6 +68,7 @@ public class Office365RestClient { private final Counter auditLogRequestsSuccessCounter; private final Counter searchRequestsFailedCounter; private final Counter searchRequestsSuccessCounter; + private final Counter apiCallsCounter; private final DistributionSummary auditLogResponseSizeSummary; private final DistributionSummary searchResponseSizeSummary; @@ -83,6 +85,7 @@ public Office365RestClient(final Office365AuthenticationInterface authConfig, this.auditLogRequestsSuccessCounter = pluginMetrics.counter(AUDIT_LOG_REQUESTS_SUCCESS); this.searchRequestsFailedCounter = pluginMetrics.counter(SEARCH_REQUESTS_FAILED); this.searchRequestsSuccessCounter = pluginMetrics.counter(SEARCH_REQUESTS_SUCCESS); + this.apiCallsCounter = pluginMetrics.counter(API_CALLS); this.auditLogResponseSizeSummary = pluginMetrics.summary(AUDIT_LOG_RESPONSE_SIZE); this.searchResponseSizeSummary = pluginMetrics.summary(SEARCH_RESPONSE_SIZE); @@ -125,6 +128,7 @@ public void startSubscriptions() { RetryHandler.executeWithRetry(() -> { try { headers.setBearerAuth(authConfig.getAccessToken()); + apiCallsCounter.increment(); ResponseEntity response = restTemplate.exchange( url, HttpMethod.POST, @@ -189,6 +193,7 @@ public AuditLogsResponse searchAuditLogs(final String contentType, return RetryHandler.executeWithRetry( () -> { headers.setBearerAuth(authConfig.getAccessToken()); + apiCallsCounter.increment(); ResponseEntity>> response = restTemplate.exchange( url, @@ -248,6 +253,7 @@ public String getAuditLog(String contentUri) { try { String response = RetryHandler.executeWithRetry(() -> { headers.setBearerAuth(authConfig.getAccessToken()); + apiCallsCounter.increment(); ResponseEntity responseEntity = restTemplate.exchange( contentUri, HttpMethod.GET, diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClientTest.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClientTest.java index 68f5f12b64..157d091490 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClientTest.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClientTest.java @@ -30,6 +30,7 @@ import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.DimensionalTimeSliceWorkerProgressState; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.plugins.source.microsoft_office365.service.Office365Service; +import org.opensearch.dataprepper.plugins.source.microsoft_office365.exception.Office365Exception; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -338,4 +339,172 @@ void testExecutePartitionWithSearchAuditLogsError() throws Exception { assertEquals("Search audit logs failed", exception.getMessage()); verify(mockRequestErrorsCounter).increment(); } + + @Test + void testRetryableErrorCounterIncrement() throws Exception { + // Mock the retryable errors counter + Counter mockRetryableErrorsCounter = mock(Counter.class); + when(pluginMetrics.counter("retryableErrors")).thenReturn(mockRetryableErrorsCounter); + + Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, pluginMetrics); + + AuditLogsResponse response = new AuditLogsResponse( + Arrays.asList(Map.of( + "contentId", "ID1", + "contentUri", "uri1" + )), null); + + when(service.searchAuditLogs( + anyString(), + any(Instant.class), + any(Instant.class), + any() + )).thenReturn(response); + + // Mock service.getAuditLog to throw a retryable Office365Exception + when(service.getAuditLog(anyString())) + .thenThrow(new Office365Exception("Retryable error", true)); + + doAnswer(invocation -> { + Runnable runnable = invocation.getArgument(0); + runnable.run(); + return null; + }).when(bufferWriteLatencyTimer).record(any(Runnable.class)); + + // Execute and expect RuntimeException to be thrown due to retryable error + RuntimeException exception = assertThrows(RuntimeException.class, + () -> client.executePartition(state, buffer, acknowledgementSet)); + + // Verify retryable error counter was incremented + verify(mockRetryableErrorsCounter).increment(); + assertEquals("Retryable error processing audit log: ID1", exception.getMessage()); + } + + @Test + void testNonRetryableErrorCounterIncrement() throws Exception { + // Mock the non-retryable errors counter + Counter mockNonRetryableErrorsCounter = mock(Counter.class); + when(pluginMetrics.counter("nonRetryableErrors")).thenReturn(mockNonRetryableErrorsCounter); + + Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, pluginMetrics); + + AuditLogsResponse response = new AuditLogsResponse( + Arrays.asList(Map.of( + "contentId", "ID1", + "contentUri", "uri1" + )), null); + + when(service.searchAuditLogs( + anyString(), + any(Instant.class), + any(Instant.class), + any() + )).thenReturn(response); + + // Mock service.getAuditLog to throw a non-retryable Office365Exception + when(service.getAuditLog(anyString())) + .thenThrow(new Office365Exception("Non-retryable error", false)); + + doAnswer(invocation -> { + Runnable runnable = invocation.getArgument(0); + runnable.run(); + return null; + }).when(bufferWriteLatencyTimer).record(any(Runnable.class)); + + // Execute - non-retryable errors should not throw exception, just drop the record + client.executePartition(state, buffer, acknowledgementSet); + + // Verify non-retryable error counter was incremented + verify(mockNonRetryableErrorsCounter).increment(); + // Verify buffer was called with empty list (record was dropped) + verify(buffer).writeAll(argThat(list -> list.isEmpty()), anyInt()); + } + + @Test + void testUnexpectedErrorTreatedAsRetryable() throws Exception { + // Mock the retryable errors counter + Counter mockRetryableErrorsCounter = mock(Counter.class); + when(pluginMetrics.counter("retryableErrors")).thenReturn(mockRetryableErrorsCounter); + + Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, pluginMetrics); + + AuditLogsResponse response = new AuditLogsResponse( + Arrays.asList(Map.of( + "contentId", "ID1", + "contentUri", "uri1" + )), null); + + when(service.searchAuditLogs( + anyString(), + any(Instant.class), + any(Instant.class), + any() + )).thenReturn(response); + + // Mock service.getAuditLog to throw an unexpected RuntimeException (not Office365Exception) + when(service.getAuditLog(anyString())) + .thenThrow(new RuntimeException("Unexpected error")); + + doAnswer(invocation -> { + Runnable runnable = invocation.getArgument(0); + runnable.run(); + return null; + }).when(bufferWriteLatencyTimer).record(any(Runnable.class)); + + // Execute and expect RuntimeException to be thrown + RuntimeException exception = assertThrows(RuntimeException.class, + () -> client.executePartition(state, buffer, acknowledgementSet)); + + // Verify retryable error counter was incremented (unexpected errors are treated as retryable) + verify(mockRetryableErrorsCounter).increment(); + assertEquals("Unexpected error processing audit log: ID1", exception.getMessage()); + } + + @Test + void testMultipleRecordsWithMixedErrors() throws Exception { + // Mock both error counters + Counter mockRetryableErrorsCounter = mock(Counter.class); + Counter mockNonRetryableErrorsCounter = mock(Counter.class); + when(pluginMetrics.counter("retryableErrors")).thenReturn(mockRetryableErrorsCounter); + when(pluginMetrics.counter("nonRetryableErrors")).thenReturn(mockNonRetryableErrorsCounter); + + Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, pluginMetrics); + + AuditLogsResponse response = new AuditLogsResponse( + Arrays.asList( + Map.of("contentId", "ID1", "contentUri", "uri1"), + Map.of("contentId", "ID2", "contentUri", "uri2"), + Map.of("contentId", "ID3", "contentUri", "uri3") + ), null); + + when(service.searchAuditLogs( + anyString(), + any(Instant.class), + any(Instant.class), + any() + )).thenReturn(response); + + // Mock different responses for different URIs + when(service.getAuditLog("uri1")) + .thenReturn("{\"Workload\":\"Exchange\",\"Operation\":\"Test\"}"); // Success + when(service.getAuditLog("uri2")) + .thenThrow(new Office365Exception("Non-retryable error", false)); // Non-retryable + when(service.getAuditLog("uri3")) + .thenThrow(new Office365Exception("Retryable error", true)); // Retryable + + doAnswer(invocation -> { + Runnable runnable = invocation.getArgument(0); + runnable.run(); + return null; + }).when(bufferWriteLatencyTimer).record(any(Runnable.class)); + + // Execute and expect RuntimeException due to retryable error + RuntimeException exception = assertThrows(RuntimeException.class, + () -> client.executePartition(state, buffer, acknowledgementSet)); + + // Verify both error counters were incremented + verify(mockNonRetryableErrorsCounter).increment(); // For ID2 + verify(mockRetryableErrorsCounter).increment(); // For ID3 + assertEquals("Retryable error processing audit log: ID3", exception.getMessage()); + } } diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClientTest.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClientTest.java index b6fd397da0..7fbcb025fc 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClientTest.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClientTest.java @@ -370,6 +370,7 @@ void testMetricsInitialization() { Counter mockAuditLogRequestsSuccessCounter = org.mockito.Mockito.mock(Counter.class); Counter mockSearchRequestsFailedCounter = org.mockito.Mockito.mock(Counter.class); Counter mockSearchRequestsSuccessCounter = org.mockito.Mockito.mock(Counter.class); + Counter mockApiCallsCounter = org.mockito.Mockito.mock(Counter.class); DistributionSummary mockAuditLogRequestSizeSummary = org.mockito.Mockito.mock(DistributionSummary.class); DistributionSummary mockSearchRequestSizeSummary = org.mockito.Mockito.mock(DistributionSummary.class); @@ -380,6 +381,7 @@ void testMetricsInitialization() { when(mockPluginMetrics.counter("auditLogRequestsSuccess")).thenReturn(mockAuditLogRequestsSuccessCounter); when(mockPluginMetrics.counter("searchRequestsFailed")).thenReturn(mockSearchRequestsFailedCounter); when(mockPluginMetrics.counter("searchRequestsSuccess")).thenReturn(mockSearchRequestsSuccessCounter); + when(mockPluginMetrics.counter("apiCalls")).thenReturn(mockApiCallsCounter); when(mockPluginMetrics.summary("auditLogResponseSizeBytes")).thenReturn(mockAuditLogRequestSizeSummary); when(mockPluginMetrics.summary("searchResponseSizeBytes")).thenReturn(mockSearchRequestSizeSummary); @@ -394,6 +396,7 @@ void testMetricsInitialization() { verify(mockPluginMetrics).counter("auditLogRequestsSuccess"); verify(mockPluginMetrics).counter("searchRequestsFailed"); verify(mockPluginMetrics).counter("searchRequestsSuccess"); + verify(mockPluginMetrics).counter("apiCalls"); verify(mockPluginMetrics).summary("auditLogResponseSizeBytes"); verify(mockPluginMetrics).summary("searchResponseSizeBytes"); } @@ -675,4 +678,124 @@ void testPublishErrorTypeMetricCounterForStartSubscriptions(HttpStatus status, b verify(mockCounter, never()).increment(); } } + + @Test + void testApiCallsCounterIncrementForStartSubscriptions() throws NoSuchFieldException, IllegalAccessException { + // Test that apiCallsCounter is incremented for each subscription start call + Counter mockApiCallsCounter = org.mockito.Mockito.mock(Counter.class); + ReflectivelySetField.setField(Office365RestClient.class, office365RestClient, "apiCallsCounter", mockApiCallsCounter); + + // Mock auth config + when(authConfig.getTenantId()).thenReturn("test-tenant-id"); + when(authConfig.getAccessToken()).thenReturn("test-access-token"); + + // Mock successful response + ResponseEntity mockResponse = new ResponseEntity<>("{\"status\":\"enabled\"}", HttpStatus.OK); + when(restTemplate.exchange(anyString(), eq(HttpMethod.POST), any(), eq(String.class))) + .thenReturn(mockResponse); + + // Execute + office365RestClient.startSubscriptions(); + + // Verify apiCallsCounter was incremented once for each content type + verify(mockApiCallsCounter, times(CONTENT_TYPES.length)).increment(); + } + + @Test + void testApiCallsCounterIncrementForSearchAuditLogs() throws NoSuchFieldException, IllegalAccessException { + // Test that apiCallsCounter is incremented for search audit logs call + Counter mockApiCallsCounter = org.mockito.Mockito.mock(Counter.class); + ReflectivelySetField.setField(Office365RestClient.class, office365RestClient, "apiCallsCounter", mockApiCallsCounter); + + // Mock auth config + when(authConfig.getTenantId()).thenReturn("test-tenant-id"); + when(authConfig.getAccessToken()).thenReturn("test-access-token"); + + // Mock successful response + List> mockResults = Collections.singletonList(new HashMap<>()); + ResponseEntity>> mockResponse = new ResponseEntity<>(mockResults, HttpStatus.OK); + when(restTemplate.exchange(anyString(), eq(HttpMethod.GET), any(), any(ParameterizedTypeReference.class))) + .thenReturn(mockResponse); + + // Execute + office365RestClient.searchAuditLogs( + "Audit.AzureActiveDirectory", + Instant.now().minus(1, ChronoUnit.HOURS), + Instant.now(), + null + ); + + // Verify apiCallsCounter was incremented once + verify(mockApiCallsCounter, times(1)).increment(); + } + + @Test + void testApiCallsCounterIncrementForGetAuditLog() throws NoSuchFieldException, IllegalAccessException { + // Test that apiCallsCounter is incremented for get audit log call + Counter mockApiCallsCounter = org.mockito.Mockito.mock(Counter.class); + ReflectivelySetField.setField(Office365RestClient.class, office365RestClient, "apiCallsCounter", mockApiCallsCounter); + + // Mock auth config + when(authConfig.getAccessToken()).thenReturn("test-access-token"); + + // Mock successful response + String contentUri = "https://manage.office.com/api/v1.0/test-tenant/activity/feed/audit/123"; + String mockAuditLog = "{\"id\":\"123\",\"contentType\":\"Audit.AzureActiveDirectory\"}"; + ResponseEntity mockResponse = new ResponseEntity<>(mockAuditLog, HttpStatus.OK); + when(restTemplate.exchange(eq(contentUri), eq(HttpMethod.GET), any(), eq(String.class))) + .thenReturn(mockResponse); + + // Execute + office365RestClient.getAuditLog(contentUri); + + // Verify apiCallsCounter was incremented once + verify(mockApiCallsCounter, times(1)).increment(); + } + + @Test + void testApiCallsCounterIncrementOnRetries() throws NoSuchFieldException, IllegalAccessException { + // Test that apiCallsCounter is incremented for each retry attempt + Counter mockApiCallsCounter = org.mockito.Mockito.mock(Counter.class); + ReflectivelySetField.setField(Office365RestClient.class, office365RestClient, "apiCallsCounter", mockApiCallsCounter); + + // Mock auth config + when(authConfig.getAccessToken()).thenReturn("test-access-token"); + + // Mock failure response that will trigger retries + String contentUri = "https://manage.office.com/api/v1.0/test-tenant/activity/feed/audit/123"; + when(restTemplate.exchange(eq(contentUri), eq(HttpMethod.GET), any(), eq(String.class))) + .thenThrow(new HttpClientErrorException(HttpStatus.TOO_MANY_REQUESTS)); + + // Execute and expect exception + assertThrows(RuntimeException.class, () -> office365RestClient.getAuditLog(contentUri)); + + // Verify apiCallsCounter was incremented 6 times (once for each retry attempt) + verify(mockApiCallsCounter, times(6)).increment(); + } + + @Test + void testApiCallsCounterIncrementForSearchAuditLogsWithRetries() throws NoSuchFieldException, IllegalAccessException { + // Test that apiCallsCounter is incremented for each retry attempt in searchAuditLogs + Counter mockApiCallsCounter = org.mockito.Mockito.mock(Counter.class); + ReflectivelySetField.setField(Office365RestClient.class, office365RestClient, "apiCallsCounter", mockApiCallsCounter); + + // Mock auth config + when(authConfig.getTenantId()).thenReturn("test-tenant-id"); + when(authConfig.getAccessToken()).thenReturn("test-access-token"); + + // Mock failure response that will trigger retries + when(restTemplate.exchange(anyString(), eq(HttpMethod.GET), any(), any(ParameterizedTypeReference.class))) + .thenThrow(new HttpClientErrorException(HttpStatus.TOO_MANY_REQUESTS)); + + // Execute and expect exception + assertThrows(RuntimeException.class, () -> office365RestClient.searchAuditLogs( + "Audit.AzureActiveDirectory", + Instant.now().minus(1, ChronoUnit.HOURS), + Instant.now(), + null + )); + + // Verify apiCallsCounter was incremented 6 times (once for each retry attempt) + verify(mockApiCallsCounter, times(6)).increment(); + } } diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/utils/MetricsHelper.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/utils/MetricsHelper.java index 9c1570d85f..246a5a399c 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/utils/MetricsHelper.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/utils/MetricsHelper.java @@ -27,7 +27,7 @@ public class MetricsHelper { private static final String RESOURCE_NOT_FOUND = "resourceNotFound"; // other errors in crawlerClient - public static final String REQEUEST_ERRORS = "requestErrors"; + public static final String REQUEST_ERRORS = "requestErrors"; /** * Get the metric counter map for specific errorType From 59d446216e2e46f80b8a8885cea9dda990fe5719 Mon Sep 17 00:00:00 2001 From: Brett Zeligson Date: Tue, 4 Nov 2025 12:58:09 -0500 Subject: [PATCH 2/5] Add Retryable/Non-Retryable Exception + API Calls Metrics for O365 Signed-off-by: Brett Zeligson --- .../source/microsoft_office365/Office365CrawlerClient.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClient.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClient.java index e4ab8b476a..9d119ba958 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClient.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClient.java @@ -134,14 +134,16 @@ public void executePartition(final DimensionalTimeSliceWorkerProgressState state log.error(NOISY, "{} error processing audit log: {}", e.isRetryable() ? "Retryable" : "Non-retryable", logId, e); if (e.isRetryable()) { + retryableErrorsCounter.increment(); throw new RuntimeException("Retryable error processing audit log: " + logId, e); } else { - handleExceptionMetrics(e); + nonRetryableErrorsCounter.increment(); // TODO: When pipeline DLQ is ready, add this record to DLQ instead of dropping the record log.error(NOISY, "Non-retryable error - record will be dropped. Error processing audit log: {}", logId, e); } } catch (Exception e) { // Unexpected errors are treated as retryable to be safe + retryableErrorsCounter.increment(); log.error(NOISY, "Unexpected error processing audit log: {}", logId, e); throw new RuntimeException("Unexpected error processing audit log: " + logId, e); } From a48f9524a2de123ea38815e36597b025c6cdc0cf Mon Sep 17 00:00:00 2001 From: Brett Zeligson Date: Fri, 7 Nov 2025 12:29:45 -0500 Subject: [PATCH 3/5] Add Retryable/Non-Retryable Exception + API Calls Metrics for O365 Signed-off-by: Brett Zeligson --- .../microsoft_office365/Office365CrawlerClient.java | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClient.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClient.java index 9d119ba958..198584bc26 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClient.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClient.java @@ -167,7 +167,6 @@ public void executePartition(final DimensionalTimeSliceWorkerProgressState state acknowledgementSet.complete(); } } catch (Exception e) { - handleExceptionMetrics(e); log.error(NOISY, "Failed to process partition for log type {} from {} to {}", logType, startTime, endTime, e); requestErrorsCounter.increment(); @@ -175,18 +174,6 @@ public void executePartition(final DimensionalTimeSliceWorkerProgressState state } } - private void handleExceptionMetrics(Exception e) { - if (e instanceof Office365Exception) { - if (((Office365Exception) e).isRetryable()) { - retryableErrorsCounter.increment(); - } else { - nonRetryableErrorsCounter.increment(); - } - } else { - retryableErrorsCounter.increment(); - } - } - private Record processAuditLog(Map metadata) throws Office365Exception { String contentUri = (String) metadata.get(CONTENT_URI); if (contentUri == null) { From 2c50d6e79d8a18a798cc919e45bafa76cdc3ab40 Mon Sep 17 00:00:00 2001 From: Vecheka Date: Fri, 7 Nov 2025 16:36:33 -0800 Subject: [PATCH 4/5] Address typo and improve publishErrorTypeMetricCounter function (#6253) Signed-off-by: Vecheka --- .../Office365CrawlerClient.java | 4 +-- .../Office365RestClient.java | 32 ++----------------- .../Office365CrawlerClientTest.java | 6 ++-- .../source_crawler/utils/MetricsHelper.java | 28 +++++++++++++--- 4 files changed, 32 insertions(+), 38 deletions(-) diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClient.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClient.java index 2766bc2f00..6f9e974785 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClient.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClient.java @@ -42,7 +42,7 @@ import java.util.concurrent.TimeoutException; import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY; -import static org.opensearch.dataprepper.plugins.source.source_crawler.utils.MetricsHelper.REQEUEST_ERRORS; +import static org.opensearch.dataprepper.plugins.source.source_crawler.utils.MetricsHelper.REQUEST_ERRORS; /** * Implementation of CrawlerClient for Office 365 audit logs. @@ -86,7 +86,7 @@ public Office365CrawlerClient(final Office365Service service, this.bufferWriteRetrySuccessCounter = pluginMetrics.counter(BUFFER_WRITE_RETRY_SUCCESS); this.bufferWriteRetryAttemptsCounter = pluginMetrics.counter(BUFFER_WRITE_RETRY_ATTEMPTS); this.bufferWriteFailuresCounter = pluginMetrics.counter(BUFFER_WRITE_FAILURES); - this.requestErrorsCounter = pluginMetrics.counter(REQEUEST_ERRORS); + this.requestErrorsCounter = pluginMetrics.counter(REQUEST_ERRORS); } @VisibleForTesting diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClient.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClient.java index 3e8c317728..434ec43875 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClient.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClient.java @@ -26,7 +26,6 @@ import org.springframework.web.client.HttpClientErrorException; import org.springframework.web.client.HttpServerErrorException; import org.springframework.web.client.RestTemplate; -import org.springframework.http.HttpStatus; import javax.inject.Named; import java.nio.charset.StandardCharsets; @@ -142,17 +141,8 @@ public void startSubscriptions() { } }, authConfig::renewCredentials); } - } catch (HttpClientErrorException | HttpServerErrorException e) { - HttpStatus statusCode = e.getStatusCode(); - publishErrorTypeMetricCounter(statusCode.getReasonPhrase(), this.errorTypeMetricCounterMap); - log.error(NOISY, "Failed to initialize subscriptions with status code {}: {}", - statusCode, e.getMessage()); - throw new RuntimeException("Failed to initialize subscriptions: " + e.getMessage(), e); } catch (Exception e) { - // FORBIDDEN throws SecurityException in RetryHandler - if (e instanceof SecurityException) { - publishErrorTypeMetricCounter(HttpStatus.FORBIDDEN.getReasonPhrase(), this.errorTypeMetricCounterMap); - } + publishErrorTypeMetricCounter(e, this.errorTypeMetricCounterMap); log.error(NOISY, "Failed to initialize subscriptions", e); throw new RuntimeException("Failed to initialize subscriptions: " + e.getMessage(), e); } @@ -214,16 +204,8 @@ public AuditLogsResponse searchAuditLogs(final String contentType, authConfig::renewCredentials, searchRequestsFailedCounter ); - } catch (HttpClientErrorException | HttpServerErrorException e) { - HttpStatus statusCode = e.getStatusCode(); - publishErrorTypeMetricCounter(statusCode.getReasonPhrase(), this.errorTypeMetricCounterMap); - log.error(NOISY, "Error while fetching audit logs for content type {}", contentType, e); - throw new RuntimeException("Failed to fetch audit logs", e); } catch (Exception e) { - // FORBIDDEN throws SecurityException in RetryHandler - if (e instanceof SecurityException) { - publishErrorTypeMetricCounter(HttpStatus.FORBIDDEN.getReasonPhrase(), this.errorTypeMetricCounterMap); - } + publishErrorTypeMetricCounter(e, this.errorTypeMetricCounterMap); log.error(NOISY, "Error while fetching audit logs for content type {}", contentType, e); throw new RuntimeException("Failed to fetch audit logs", e); } @@ -265,16 +247,8 @@ public String getAuditLog(String contentUri) { }, authConfig::renewCredentials, auditLogRequestsFailedCounter); auditLogRequestsSuccessCounter.increment(); return response; - } catch (HttpClientErrorException | HttpServerErrorException e) { - HttpStatus statusCode = e.getStatusCode(); - publishErrorTypeMetricCounter(statusCode.getReasonPhrase(), this.errorTypeMetricCounterMap); - log.error(NOISY, "Error while fetching audit log content from URI: {}", contentUri, e); - throw new RuntimeException("Failed to fetch audit log", e); } catch (Exception e) { - // FORBIDDEN throws SecurityException in RetryHandler - if (e instanceof SecurityException) { - publishErrorTypeMetricCounter(HttpStatus.FORBIDDEN.getReasonPhrase(), this.errorTypeMetricCounterMap); - } + publishErrorTypeMetricCounter(e, this.errorTypeMetricCounterMap); log.error(NOISY, "Error while fetching audit log content from URI: {}", contentUri, e); throw new RuntimeException("Failed to fetch audit log", e); } diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClientTest.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClientTest.java index 68f5f12b64..8465daf5ca 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClientTest.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClientTest.java @@ -57,6 +57,8 @@ import static org.mockito.Mockito.when; import static org.mockito.Mockito.never; +import static org.opensearch.dataprepper.plugins.source.source_crawler.utils.MetricsHelper.REQUEST_ERRORS; + @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) class Office365CrawlerClientTest { @@ -154,7 +156,7 @@ void testExecutePartitionWithJsonProcessingError() throws Exception { // Mock the total failures counter Counter mockRequestErrorsCounter = mock(Counter.class); - when(pluginMetrics.counter("requestErrors")).thenReturn(mockRequestErrorsCounter); + when(pluginMetrics.counter(REQUEST_ERRORS)).thenReturn(mockRequestErrorsCounter); AuditLogsResponse response = new AuditLogsResponse( Arrays.asList(Map.of( @@ -318,7 +320,7 @@ void testMissingWorkloadField() throws Exception { void testExecutePartitionWithSearchAuditLogsError() throws Exception { // Mock the total failures counter before creating the client Counter mockRequestErrorsCounter = mock(Counter.class); - when(pluginMetrics.counter("requestErrors")).thenReturn(mockRequestErrorsCounter); + when(pluginMetrics.counter(REQUEST_ERRORS)).thenReturn(mockRequestErrorsCounter); Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, pluginMetrics); diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/utils/MetricsHelper.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/utils/MetricsHelper.java index 9c1570d85f..19a5bb98bf 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/utils/MetricsHelper.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/utils/MetricsHelper.java @@ -13,8 +13,11 @@ import io.micrometer.core.instrument.Counter; import org.springframework.http.HttpStatus; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.springframework.web.client.HttpClientErrorException; +import org.springframework.web.client.HttpServerErrorException; import java.util.Map; +import java.util.Optional; import java.util.HashMap; /** * The MetricsHelper class. @@ -27,7 +30,7 @@ public class MetricsHelper { private static final String RESOURCE_NOT_FOUND = "resourceNotFound"; // other errors in crawlerClient - public static final String REQEUEST_ERRORS = "requestErrors"; + public static final String REQUEST_ERRORS = "requestErrors"; /** * Get the metric counter map for specific errorType @@ -54,12 +57,27 @@ public static Map getErrorTypeMetricCounterMap(PluginMetrics pl * TOO_MANY_REQUESTS = requestThrottled * NOT_FOUND = resourceNotFound * - * @param errorType - the httpStatusCode string represenation + * @param ex - exception from RestClient * @param errorTypeMetricCounterMap - the map of errorType to metric counter */ - public static void publishErrorTypeMetricCounter(String errorType, Map errorTypeMetricCounterMap) { - if (errorTypeMetricCounterMap != null && errorTypeMetricCounterMap.containsKey(errorType)) { - errorTypeMetricCounterMap.get(errorType).increment(); + public static void publishErrorTypeMetricCounter(Exception ex, Map errorTypeMetricCounterMap) { + Optional statusCode = Optional.empty(); + if (ex instanceof HttpClientErrorException) { + HttpClientErrorException httpE = (HttpClientErrorException) ex; + statusCode = Optional.ofNullable(httpE.getStatusCode().getReasonPhrase()); + } else if (ex instanceof HttpServerErrorException) { + HttpServerErrorException httpE = (HttpServerErrorException) ex; + statusCode = Optional.ofNullable(httpE.getStatusCode().getReasonPhrase()); + } else if (ex instanceof SecurityException) { // FORBIDDEN throws SecurityException in RetryHandler + statusCode = Optional.ofNullable(HttpStatus.FORBIDDEN.getReasonPhrase()); + } // ignore for others + + if (statusCode.isPresent()) { + String errorType = statusCode.get(); + if (errorTypeMetricCounterMap != null && errorTypeMetricCounterMap.containsKey(errorType)) { + errorTypeMetricCounterMap.get(errorType).increment(); + } } + } } \ No newline at end of file From 2420ad6b68238ac8cddbcf18fc1b209571f92e16 Mon Sep 17 00:00:00 2001 From: Vecheka Date: Fri, 7 Nov 2025 16:36:33 -0800 Subject: [PATCH 5/5] Address typo and improve publishErrorTypeMetricCounter function (#6253) Signed-off-by: Vecheka --- .../Office365RestClient.java | 32 ++----------------- .../Office365CrawlerClientTest.java | 6 ++-- .../source_crawler/utils/MetricsHelper.java | 26 ++++++++++++--- 3 files changed, 29 insertions(+), 35 deletions(-) diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClient.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClient.java index 53c7536cab..9a1e5e39d8 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClient.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClient.java @@ -26,7 +26,6 @@ import org.springframework.web.client.HttpClientErrorException; import org.springframework.web.client.HttpServerErrorException; import org.springframework.web.client.RestTemplate; -import org.springframework.http.HttpStatus; import javax.inject.Named; import java.nio.charset.StandardCharsets; @@ -146,17 +145,8 @@ public void startSubscriptions() { } }, authConfig::renewCredentials); } - } catch (HttpClientErrorException | HttpServerErrorException e) { - HttpStatus statusCode = e.getStatusCode(); - publishErrorTypeMetricCounter(statusCode.getReasonPhrase(), this.errorTypeMetricCounterMap); - log.error(NOISY, "Failed to initialize subscriptions with status code {}: {}", - statusCode, e.getMessage()); - throw new RuntimeException("Failed to initialize subscriptions: " + e.getMessage(), e); } catch (Exception e) { - // FORBIDDEN throws SecurityException in RetryHandler - if (e instanceof SecurityException) { - publishErrorTypeMetricCounter(HttpStatus.FORBIDDEN.getReasonPhrase(), this.errorTypeMetricCounterMap); - } + publishErrorTypeMetricCounter(e, this.errorTypeMetricCounterMap); log.error(NOISY, "Failed to initialize subscriptions", e); throw new RuntimeException("Failed to initialize subscriptions: " + e.getMessage(), e); } @@ -219,16 +209,8 @@ public AuditLogsResponse searchAuditLogs(final String contentType, authConfig::renewCredentials, searchRequestsFailedCounter ); - } catch (HttpClientErrorException | HttpServerErrorException e) { - HttpStatus statusCode = e.getStatusCode(); - publishErrorTypeMetricCounter(statusCode.getReasonPhrase(), this.errorTypeMetricCounterMap); - log.error(NOISY, "Error while fetching audit logs for content type {}", contentType, e); - throw new RuntimeException("Failed to fetch audit logs", e); } catch (Exception e) { - // FORBIDDEN throws SecurityException in RetryHandler - if (e instanceof SecurityException) { - publishErrorTypeMetricCounter(HttpStatus.FORBIDDEN.getReasonPhrase(), this.errorTypeMetricCounterMap); - } + publishErrorTypeMetricCounter(e, this.errorTypeMetricCounterMap); log.error(NOISY, "Error while fetching audit logs for content type {}", contentType, e); throw new RuntimeException("Failed to fetch audit logs", e); } @@ -271,16 +253,8 @@ public String getAuditLog(String contentUri) { }, authConfig::renewCredentials, auditLogRequestsFailedCounter); auditLogRequestsSuccessCounter.increment(); return response; - } catch (HttpClientErrorException | HttpServerErrorException e) { - HttpStatus statusCode = e.getStatusCode(); - publishErrorTypeMetricCounter(statusCode.getReasonPhrase(), this.errorTypeMetricCounterMap); - log.error(NOISY, "Error while fetching audit log content from URI: {}", contentUri, e); - throw new RuntimeException("Failed to fetch audit log", e); } catch (Exception e) { - // FORBIDDEN throws SecurityException in RetryHandler - if (e instanceof SecurityException) { - publishErrorTypeMetricCounter(HttpStatus.FORBIDDEN.getReasonPhrase(), this.errorTypeMetricCounterMap); - } + publishErrorTypeMetricCounter(e, this.errorTypeMetricCounterMap); log.error(NOISY, "Error while fetching audit log content from URI: {}", contentUri, e); throw new RuntimeException("Failed to fetch audit log", e); } diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClientTest.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClientTest.java index 157d091490..3509bcd161 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClientTest.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClientTest.java @@ -58,6 +58,8 @@ import static org.mockito.Mockito.when; import static org.mockito.Mockito.never; +import static org.opensearch.dataprepper.plugins.source.source_crawler.utils.MetricsHelper.REQUEST_ERRORS; + @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) class Office365CrawlerClientTest { @@ -155,7 +157,7 @@ void testExecutePartitionWithJsonProcessingError() throws Exception { // Mock the total failures counter Counter mockRequestErrorsCounter = mock(Counter.class); - when(pluginMetrics.counter("requestErrors")).thenReturn(mockRequestErrorsCounter); + when(pluginMetrics.counter(REQUEST_ERRORS)).thenReturn(mockRequestErrorsCounter); AuditLogsResponse response = new AuditLogsResponse( Arrays.asList(Map.of( @@ -319,7 +321,7 @@ void testMissingWorkloadField() throws Exception { void testExecutePartitionWithSearchAuditLogsError() throws Exception { // Mock the total failures counter before creating the client Counter mockRequestErrorsCounter = mock(Counter.class); - when(pluginMetrics.counter("requestErrors")).thenReturn(mockRequestErrorsCounter); + when(pluginMetrics.counter(REQUEST_ERRORS)).thenReturn(mockRequestErrorsCounter); Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, pluginMetrics); diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/utils/MetricsHelper.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/utils/MetricsHelper.java index 246a5a399c..19a5bb98bf 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/utils/MetricsHelper.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/utils/MetricsHelper.java @@ -13,8 +13,11 @@ import io.micrometer.core.instrument.Counter; import org.springframework.http.HttpStatus; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.springframework.web.client.HttpClientErrorException; +import org.springframework.web.client.HttpServerErrorException; import java.util.Map; +import java.util.Optional; import java.util.HashMap; /** * The MetricsHelper class. @@ -54,12 +57,27 @@ public static Map getErrorTypeMetricCounterMap(PluginMetrics pl * TOO_MANY_REQUESTS = requestThrottled * NOT_FOUND = resourceNotFound * - * @param errorType - the httpStatusCode string represenation + * @param ex - exception from RestClient * @param errorTypeMetricCounterMap - the map of errorType to metric counter */ - public static void publishErrorTypeMetricCounter(String errorType, Map errorTypeMetricCounterMap) { - if (errorTypeMetricCounterMap != null && errorTypeMetricCounterMap.containsKey(errorType)) { - errorTypeMetricCounterMap.get(errorType).increment(); + public static void publishErrorTypeMetricCounter(Exception ex, Map errorTypeMetricCounterMap) { + Optional statusCode = Optional.empty(); + if (ex instanceof HttpClientErrorException) { + HttpClientErrorException httpE = (HttpClientErrorException) ex; + statusCode = Optional.ofNullable(httpE.getStatusCode().getReasonPhrase()); + } else if (ex instanceof HttpServerErrorException) { + HttpServerErrorException httpE = (HttpServerErrorException) ex; + statusCode = Optional.ofNullable(httpE.getStatusCode().getReasonPhrase()); + } else if (ex instanceof SecurityException) { // FORBIDDEN throws SecurityException in RetryHandler + statusCode = Optional.ofNullable(HttpStatus.FORBIDDEN.getReasonPhrase()); + } // ignore for others + + if (statusCode.isPresent()) { + String errorType = statusCode.get(); + if (errorTypeMetricCounterMap != null && errorTypeMetricCounterMap.containsKey(errorType)) { + errorTypeMetricCounterMap.get(errorType).increment(); + } } + } } \ No newline at end of file