diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClient.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClient.java index 6f9e974785..198584bc26 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClient.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClient.java @@ -57,6 +57,8 @@ public class Office365CrawlerClient implements CrawlerClient { try { headers.setBearerAuth(authConfig.getAccessToken()); + apiCallsCounter.increment(); ResponseEntity response = restTemplate.exchange( url, HttpMethod.POST, @@ -179,6 +183,7 @@ public AuditLogsResponse searchAuditLogs(final String contentType, return RetryHandler.executeWithRetry( () -> { headers.setBearerAuth(authConfig.getAccessToken()); + apiCallsCounter.increment(); ResponseEntity>> response = restTemplate.exchange( url, @@ -230,6 +235,7 @@ public String getAuditLog(String contentUri) { try { String response = RetryHandler.executeWithRetry(() -> { headers.setBearerAuth(authConfig.getAccessToken()); + apiCallsCounter.increment(); ResponseEntity responseEntity = restTemplate.exchange( contentUri, HttpMethod.GET, diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClientTest.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClientTest.java index 8465daf5ca..3509bcd161 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClientTest.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClientTest.java @@ -30,6 +30,7 @@ import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.DimensionalTimeSliceWorkerProgressState; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.plugins.source.microsoft_office365.service.Office365Service; +import org.opensearch.dataprepper.plugins.source.microsoft_office365.exception.Office365Exception; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -340,4 +341,172 @@ void testExecutePartitionWithSearchAuditLogsError() throws Exception { assertEquals("Search audit logs failed", exception.getMessage()); verify(mockRequestErrorsCounter).increment(); } + + @Test + void testRetryableErrorCounterIncrement() throws Exception { + // Mock the retryable errors counter + Counter mockRetryableErrorsCounter = mock(Counter.class); + when(pluginMetrics.counter("retryableErrors")).thenReturn(mockRetryableErrorsCounter); + + Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, pluginMetrics); + + AuditLogsResponse response = new AuditLogsResponse( + Arrays.asList(Map.of( + "contentId", "ID1", + "contentUri", "uri1" + )), null); + + when(service.searchAuditLogs( + anyString(), + any(Instant.class), + any(Instant.class), + any() + )).thenReturn(response); + + // Mock service.getAuditLog to throw a retryable Office365Exception + when(service.getAuditLog(anyString())) + .thenThrow(new Office365Exception("Retryable error", true)); + + doAnswer(invocation -> { + Runnable runnable = invocation.getArgument(0); + runnable.run(); + return null; + }).when(bufferWriteLatencyTimer).record(any(Runnable.class)); + + // Execute and expect RuntimeException to be thrown due to retryable error + RuntimeException exception = assertThrows(RuntimeException.class, + () -> client.executePartition(state, buffer, acknowledgementSet)); + + // Verify retryable error counter was incremented + verify(mockRetryableErrorsCounter).increment(); + assertEquals("Retryable error processing audit log: ID1", exception.getMessage()); + } + + @Test + void testNonRetryableErrorCounterIncrement() throws Exception { + // Mock the non-retryable errors counter + Counter mockNonRetryableErrorsCounter = mock(Counter.class); + when(pluginMetrics.counter("nonRetryableErrors")).thenReturn(mockNonRetryableErrorsCounter); + + Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, pluginMetrics); + + AuditLogsResponse response = new AuditLogsResponse( + Arrays.asList(Map.of( + "contentId", "ID1", + "contentUri", "uri1" + )), null); + + when(service.searchAuditLogs( + anyString(), + any(Instant.class), + any(Instant.class), + any() + )).thenReturn(response); + + // Mock service.getAuditLog to throw a non-retryable Office365Exception + when(service.getAuditLog(anyString())) + .thenThrow(new Office365Exception("Non-retryable error", false)); + + doAnswer(invocation -> { + Runnable runnable = invocation.getArgument(0); + runnable.run(); + return null; + }).when(bufferWriteLatencyTimer).record(any(Runnable.class)); + + // Execute - non-retryable errors should not throw exception, just drop the record + client.executePartition(state, buffer, acknowledgementSet); + + // Verify non-retryable error counter was incremented + verify(mockNonRetryableErrorsCounter).increment(); + // Verify buffer was called with empty list (record was dropped) + verify(buffer).writeAll(argThat(list -> list.isEmpty()), anyInt()); + } + + @Test + void testUnexpectedErrorTreatedAsRetryable() throws Exception { + // Mock the retryable errors counter + Counter mockRetryableErrorsCounter = mock(Counter.class); + when(pluginMetrics.counter("retryableErrors")).thenReturn(mockRetryableErrorsCounter); + + Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, pluginMetrics); + + AuditLogsResponse response = new AuditLogsResponse( + Arrays.asList(Map.of( + "contentId", "ID1", + "contentUri", "uri1" + )), null); + + when(service.searchAuditLogs( + anyString(), + any(Instant.class), + any(Instant.class), + any() + )).thenReturn(response); + + // Mock service.getAuditLog to throw an unexpected RuntimeException (not Office365Exception) + when(service.getAuditLog(anyString())) + .thenThrow(new RuntimeException("Unexpected error")); + + doAnswer(invocation -> { + Runnable runnable = invocation.getArgument(0); + runnable.run(); + return null; + }).when(bufferWriteLatencyTimer).record(any(Runnable.class)); + + // Execute and expect RuntimeException to be thrown + RuntimeException exception = assertThrows(RuntimeException.class, + () -> client.executePartition(state, buffer, acknowledgementSet)); + + // Verify retryable error counter was incremented (unexpected errors are treated as retryable) + verify(mockRetryableErrorsCounter).increment(); + assertEquals("Unexpected error processing audit log: ID1", exception.getMessage()); + } + + @Test + void testMultipleRecordsWithMixedErrors() throws Exception { + // Mock both error counters + Counter mockRetryableErrorsCounter = mock(Counter.class); + Counter mockNonRetryableErrorsCounter = mock(Counter.class); + when(pluginMetrics.counter("retryableErrors")).thenReturn(mockRetryableErrorsCounter); + when(pluginMetrics.counter("nonRetryableErrors")).thenReturn(mockNonRetryableErrorsCounter); + + Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, pluginMetrics); + + AuditLogsResponse response = new AuditLogsResponse( + Arrays.asList( + Map.of("contentId", "ID1", "contentUri", "uri1"), + Map.of("contentId", "ID2", "contentUri", "uri2"), + Map.of("contentId", "ID3", "contentUri", "uri3") + ), null); + + when(service.searchAuditLogs( + anyString(), + any(Instant.class), + any(Instant.class), + any() + )).thenReturn(response); + + // Mock different responses for different URIs + when(service.getAuditLog("uri1")) + .thenReturn("{\"Workload\":\"Exchange\",\"Operation\":\"Test\"}"); // Success + when(service.getAuditLog("uri2")) + .thenThrow(new Office365Exception("Non-retryable error", false)); // Non-retryable + when(service.getAuditLog("uri3")) + .thenThrow(new Office365Exception("Retryable error", true)); // Retryable + + doAnswer(invocation -> { + Runnable runnable = invocation.getArgument(0); + runnable.run(); + return null; + }).when(bufferWriteLatencyTimer).record(any(Runnable.class)); + + // Execute and expect RuntimeException due to retryable error + RuntimeException exception = assertThrows(RuntimeException.class, + () -> client.executePartition(state, buffer, acknowledgementSet)); + + // Verify both error counters were incremented + verify(mockNonRetryableErrorsCounter).increment(); // For ID2 + verify(mockRetryableErrorsCounter).increment(); // For ID3 + assertEquals("Retryable error processing audit log: ID3", exception.getMessage()); + } } diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClientTest.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClientTest.java index b6fd397da0..7fbcb025fc 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClientTest.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClientTest.java @@ -370,6 +370,7 @@ void testMetricsInitialization() { Counter mockAuditLogRequestsSuccessCounter = org.mockito.Mockito.mock(Counter.class); Counter mockSearchRequestsFailedCounter = org.mockito.Mockito.mock(Counter.class); Counter mockSearchRequestsSuccessCounter = org.mockito.Mockito.mock(Counter.class); + Counter mockApiCallsCounter = org.mockito.Mockito.mock(Counter.class); DistributionSummary mockAuditLogRequestSizeSummary = org.mockito.Mockito.mock(DistributionSummary.class); DistributionSummary mockSearchRequestSizeSummary = org.mockito.Mockito.mock(DistributionSummary.class); @@ -380,6 +381,7 @@ void testMetricsInitialization() { when(mockPluginMetrics.counter("auditLogRequestsSuccess")).thenReturn(mockAuditLogRequestsSuccessCounter); when(mockPluginMetrics.counter("searchRequestsFailed")).thenReturn(mockSearchRequestsFailedCounter); when(mockPluginMetrics.counter("searchRequestsSuccess")).thenReturn(mockSearchRequestsSuccessCounter); + when(mockPluginMetrics.counter("apiCalls")).thenReturn(mockApiCallsCounter); when(mockPluginMetrics.summary("auditLogResponseSizeBytes")).thenReturn(mockAuditLogRequestSizeSummary); when(mockPluginMetrics.summary("searchResponseSizeBytes")).thenReturn(mockSearchRequestSizeSummary); @@ -394,6 +396,7 @@ void testMetricsInitialization() { verify(mockPluginMetrics).counter("auditLogRequestsSuccess"); verify(mockPluginMetrics).counter("searchRequestsFailed"); verify(mockPluginMetrics).counter("searchRequestsSuccess"); + verify(mockPluginMetrics).counter("apiCalls"); verify(mockPluginMetrics).summary("auditLogResponseSizeBytes"); verify(mockPluginMetrics).summary("searchResponseSizeBytes"); } @@ -675,4 +678,124 @@ void testPublishErrorTypeMetricCounterForStartSubscriptions(HttpStatus status, b verify(mockCounter, never()).increment(); } } + + @Test + void testApiCallsCounterIncrementForStartSubscriptions() throws NoSuchFieldException, IllegalAccessException { + // Test that apiCallsCounter is incremented for each subscription start call + Counter mockApiCallsCounter = org.mockito.Mockito.mock(Counter.class); + ReflectivelySetField.setField(Office365RestClient.class, office365RestClient, "apiCallsCounter", mockApiCallsCounter); + + // Mock auth config + when(authConfig.getTenantId()).thenReturn("test-tenant-id"); + when(authConfig.getAccessToken()).thenReturn("test-access-token"); + + // Mock successful response + ResponseEntity mockResponse = new ResponseEntity<>("{\"status\":\"enabled\"}", HttpStatus.OK); + when(restTemplate.exchange(anyString(), eq(HttpMethod.POST), any(), eq(String.class))) + .thenReturn(mockResponse); + + // Execute + office365RestClient.startSubscriptions(); + + // Verify apiCallsCounter was incremented once for each content type + verify(mockApiCallsCounter, times(CONTENT_TYPES.length)).increment(); + } + + @Test + void testApiCallsCounterIncrementForSearchAuditLogs() throws NoSuchFieldException, IllegalAccessException { + // Test that apiCallsCounter is incremented for search audit logs call + Counter mockApiCallsCounter = org.mockito.Mockito.mock(Counter.class); + ReflectivelySetField.setField(Office365RestClient.class, office365RestClient, "apiCallsCounter", mockApiCallsCounter); + + // Mock auth config + when(authConfig.getTenantId()).thenReturn("test-tenant-id"); + when(authConfig.getAccessToken()).thenReturn("test-access-token"); + + // Mock successful response + List> mockResults = Collections.singletonList(new HashMap<>()); + ResponseEntity>> mockResponse = new ResponseEntity<>(mockResults, HttpStatus.OK); + when(restTemplate.exchange(anyString(), eq(HttpMethod.GET), any(), any(ParameterizedTypeReference.class))) + .thenReturn(mockResponse); + + // Execute + office365RestClient.searchAuditLogs( + "Audit.AzureActiveDirectory", + Instant.now().minus(1, ChronoUnit.HOURS), + Instant.now(), + null + ); + + // Verify apiCallsCounter was incremented once + verify(mockApiCallsCounter, times(1)).increment(); + } + + @Test + void testApiCallsCounterIncrementForGetAuditLog() throws NoSuchFieldException, IllegalAccessException { + // Test that apiCallsCounter is incremented for get audit log call + Counter mockApiCallsCounter = org.mockito.Mockito.mock(Counter.class); + ReflectivelySetField.setField(Office365RestClient.class, office365RestClient, "apiCallsCounter", mockApiCallsCounter); + + // Mock auth config + when(authConfig.getAccessToken()).thenReturn("test-access-token"); + + // Mock successful response + String contentUri = "https://manage.office.com/api/v1.0/test-tenant/activity/feed/audit/123"; + String mockAuditLog = "{\"id\":\"123\",\"contentType\":\"Audit.AzureActiveDirectory\"}"; + ResponseEntity mockResponse = new ResponseEntity<>(mockAuditLog, HttpStatus.OK); + when(restTemplate.exchange(eq(contentUri), eq(HttpMethod.GET), any(), eq(String.class))) + .thenReturn(mockResponse); + + // Execute + office365RestClient.getAuditLog(contentUri); + + // Verify apiCallsCounter was incremented once + verify(mockApiCallsCounter, times(1)).increment(); + } + + @Test + void testApiCallsCounterIncrementOnRetries() throws NoSuchFieldException, IllegalAccessException { + // Test that apiCallsCounter is incremented for each retry attempt + Counter mockApiCallsCounter = org.mockito.Mockito.mock(Counter.class); + ReflectivelySetField.setField(Office365RestClient.class, office365RestClient, "apiCallsCounter", mockApiCallsCounter); + + // Mock auth config + when(authConfig.getAccessToken()).thenReturn("test-access-token"); + + // Mock failure response that will trigger retries + String contentUri = "https://manage.office.com/api/v1.0/test-tenant/activity/feed/audit/123"; + when(restTemplate.exchange(eq(contentUri), eq(HttpMethod.GET), any(), eq(String.class))) + .thenThrow(new HttpClientErrorException(HttpStatus.TOO_MANY_REQUESTS)); + + // Execute and expect exception + assertThrows(RuntimeException.class, () -> office365RestClient.getAuditLog(contentUri)); + + // Verify apiCallsCounter was incremented 6 times (once for each retry attempt) + verify(mockApiCallsCounter, times(6)).increment(); + } + + @Test + void testApiCallsCounterIncrementForSearchAuditLogsWithRetries() throws NoSuchFieldException, IllegalAccessException { + // Test that apiCallsCounter is incremented for each retry attempt in searchAuditLogs + Counter mockApiCallsCounter = org.mockito.Mockito.mock(Counter.class); + ReflectivelySetField.setField(Office365RestClient.class, office365RestClient, "apiCallsCounter", mockApiCallsCounter); + + // Mock auth config + when(authConfig.getTenantId()).thenReturn("test-tenant-id"); + when(authConfig.getAccessToken()).thenReturn("test-access-token"); + + // Mock failure response that will trigger retries + when(restTemplate.exchange(anyString(), eq(HttpMethod.GET), any(), any(ParameterizedTypeReference.class))) + .thenThrow(new HttpClientErrorException(HttpStatus.TOO_MANY_REQUESTS)); + + // Execute and expect exception + assertThrows(RuntimeException.class, () -> office365RestClient.searchAuditLogs( + "Audit.AzureActiveDirectory", + Instant.now().minus(1, ChronoUnit.HOURS), + Instant.now(), + null + )); + + // Verify apiCallsCounter was incremented 6 times (once for each retry attempt) + verify(mockApiCallsCounter, times(6)).increment(); + } }