diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClient.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClient.java index 34f371f6da..108706a79b 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClient.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClient.java @@ -14,20 +14,18 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; -import io.micrometer.core.instrument.Counter; -import io.micrometer.core.instrument.Timer; import lombok.extern.slf4j.Slf4j; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventType; import org.opensearch.dataprepper.model.event.JacksonEvent; -import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.source.source_crawler.exception.SaaSCrawlerException; import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerClient; import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.DimensionalTimeSliceWorkerProgressState; import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo; +import org.opensearch.dataprepper.plugins.source.source_crawler.metrics.VendorAPIMetricsRecorder; import org.opensearch.dataprepper.plugins.source.microsoft_office365.service.Office365Service; import org.opensearch.dataprepper.plugins.source.microsoft_office365.models.AuditLogsResponse; @@ -42,7 +40,6 @@ import java.util.concurrent.TimeoutException; import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY; -import static org.opensearch.dataprepper.plugins.source.source_crawler.utils.MetricsHelper.REQUEST_ERRORS; /** * Implementation of CrawlerClient for Office 365 audit logs. @@ -52,49 +49,22 @@ @Named public class Office365CrawlerClient implements CrawlerClient { - public static final String NON_RETRYABLE_ERRORS = "nonRetryableErrors"; - public static final String RETRYABLE_ERRORS = "retryableErrors"; - - private static final String BUFFER_WRITE_LATENCY = "bufferWriteLatency"; - private static final String BUFFER_WRITE_ATTEMPTS = "bufferWriteAttempts"; - private static final String BUFFER_WRITE_SUCCESS = "bufferWriteSuccess"; - private static final String BUFFER_WRITE_RETRY_SUCCESS = "bufferWriteRetrySuccess"; - private static final String BUFFER_WRITE_RETRY_ATTEMPTS = "bufferWriteRetryAttempts"; - private static final String BUFFER_WRITE_FAILURES = "bufferWriteFailures"; private static final int BUFFER_TIMEOUT_IN_SECONDS = 10; private static final String CONTENT_ID = "contentId"; private static final String CONTENT_URI = "contentUri"; private final Office365Service service; private final Office365SourceConfig configuration; - private final Timer bufferWriteLatencyTimer; - private final Counter bufferWriteAttemptsCounter; - private final Counter bufferWriteSuccessCounter; - private final Counter bufferWriteRetrySuccessCounter; - private final Counter bufferWriteRetryAttemptsCounter; - private final Counter bufferWriteFailuresCounter; - private final Counter requestErrorsCounter; - private final Counter nonRetryableErrorsCounter; - private final Counter retryableErrorsCounter; + private final VendorAPIMetricsRecorder metricsRecorder; private ObjectMapper objectMapper; public Office365CrawlerClient(final Office365Service service, final Office365SourceConfig sourceConfig, - final PluginMetrics pluginMetrics) { + final VendorAPIMetricsRecorder metricsRecorder) { this.service = service; this.configuration = sourceConfig; + this.metricsRecorder = metricsRecorder; this.objectMapper = new ObjectMapper(); - - // Initialize metrics - this.bufferWriteLatencyTimer = pluginMetrics.timer(BUFFER_WRITE_LATENCY); - this.bufferWriteAttemptsCounter = pluginMetrics.counter(BUFFER_WRITE_ATTEMPTS); - this.bufferWriteSuccessCounter = pluginMetrics.counter(BUFFER_WRITE_SUCCESS); - this.bufferWriteRetrySuccessCounter = pluginMetrics.counter(BUFFER_WRITE_RETRY_SUCCESS); - this.bufferWriteRetryAttemptsCounter = pluginMetrics.counter(BUFFER_WRITE_RETRY_ATTEMPTS); - this.bufferWriteFailuresCounter = pluginMetrics.counter(BUFFER_WRITE_FAILURES); - this.requestErrorsCounter = pluginMetrics.counter(REQUEST_ERRORS); - this.nonRetryableErrorsCounter = pluginMetrics.counter(NON_RETRYABLE_ERRORS); - this.retryableErrorsCounter = pluginMetrics.counter(RETRYABLE_ERRORS); } @VisibleForTesting @@ -141,13 +111,8 @@ public void executePartition(final DimensionalTimeSliceWorkerProgressState state } // Write Records to the buffer after processing a page of data - bufferWriteLatencyTimer.record(() -> { - try { - writeRecordsWithRetry(records, buffer, acknowledgementSet); - } catch (Exception e) { - bufferWriteFailuresCounter.increment(); - throw e; - } + metricsRecorder.recordBufferWriteLatency(() -> { + writeRecordsWithRetry(records, buffer, acknowledgementSet); }); nextPageUri = response.getNextPageUri(); @@ -159,18 +124,18 @@ public void executePartition(final DimensionalTimeSliceWorkerProgressState state } catch (Exception e) { log.error(NOISY, "Failed to process partition for log type {} from {} to {}", logType, startTime, endTime, e); - requestErrorsCounter.increment(); + metricsRecorder.recordError(e); if (e instanceof SaaSCrawlerException) { SaaSCrawlerException saasException = (SaaSCrawlerException) e; if (saasException.isRetryable()) { - retryableErrorsCounter.increment(); + metricsRecorder.recordRetryableError(); } else { - nonRetryableErrorsCounter.increment(); + metricsRecorder.recordNonRetryableError(); } throw e; } // any other exceptions = non-retryable - nonRetryableErrorsCounter.increment(); + metricsRecorder.recordNonRetryableError(); throw new SaaSCrawlerException("Failed to process partition", e, false); } } @@ -219,7 +184,7 @@ private Record processAuditLog(Map metadata) throws SaaSC private void writeRecordsWithRetry(final List> records, final Buffer> buffer, final AcknowledgementSet acknowledgementSet) { - bufferWriteAttemptsCounter.increment(); + metricsRecorder.recordBufferWriteAttempt(); int retryCount = 0; int currentBackoff = 1000; // Start with 1 second final int maxBackoff = 30000; // Max 30 seconds @@ -235,21 +200,21 @@ private void writeRecordsWithRetry(final List> records, } if (retryCount > 0) { - bufferWriteRetrySuccessCounter.increment(); + metricsRecorder.recordBufferWriteRetrySuccess(); } else { - bufferWriteSuccessCounter.increment(); + metricsRecorder.recordBufferWriteSuccess(); } return; } catch (TimeoutException e) { retryCount++; if (retryCount >= maxRetries) { - bufferWriteFailuresCounter.increment(); + metricsRecorder.recordBufferWriteFailure(); // allows all writeToBuffer exceptions to be retryable to keep current behaviour of immediate retry by WorkerScheduler throw new SaaSCrawlerException("Failed to write to buffer after " + maxRetries + " attempts", e, true); } - bufferWriteRetryAttemptsCounter.increment(); + metricsRecorder.recordBufferWriteRetryAttempt(); currentBackoff = Math.min((int)(currentBackoff * 2.0), maxBackoff); log.info("Buffer full, backing off for {} ms before retry", currentBackoff); @@ -260,7 +225,7 @@ private void writeRecordsWithRetry(final List> records, throw new SaaSCrawlerException("Buffer write retry interrupted", ie, true); } } catch (Exception e) { - bufferWriteFailuresCounter.increment(); + metricsRecorder.recordBufferWriteFailure(); throw new SaaSCrawlerException("Error writing to buffer", e, true); } } diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/configuration/Office365Configuration.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/configuration/Office365Configuration.java index 81566f04d7..0505fd62ec 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/configuration/Office365Configuration.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/configuration/Office365Configuration.java @@ -10,10 +10,12 @@ package org.opensearch.dataprepper.plugins.source.microsoft_office365.configuration; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.plugins.source.microsoft_office365.Office365CrawlerClient; import org.opensearch.dataprepper.plugins.source.microsoft_office365.Office365RestClient; import org.opensearch.dataprepper.plugins.source.microsoft_office365.Office365SourceConfig; import org.opensearch.dataprepper.plugins.source.microsoft_office365.auth.Office365AuthenticationInterface; import org.opensearch.dataprepper.plugins.source.microsoft_office365.auth.Office365AuthenticationProvider; +import org.opensearch.dataprepper.plugins.source.microsoft_office365.service.Office365Service; import org.opensearch.dataprepper.plugins.source.source_crawler.metrics.VendorAPIMetricsRecorder; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -72,4 +74,20 @@ public Office365RestClient office365RestClient( VendorAPIMetricsRecorder vendorAPIMetricsRecorder) { return new Office365RestClient(authConfig, vendorAPIMetricsRecorder); } + + /** + * Creates Office365CrawlerClient with unified metrics recorder. + * + * @param service The Office 365 service + * @param sourceConfig The Office 365 source configuration + * @param metricsRecorder The unified metrics recorder + * @return Configured Office365CrawlerClient + */ + @Bean + public Office365CrawlerClient office365CrawlerClient( + Office365Service service, + Office365SourceConfig sourceConfig, + VendorAPIMetricsRecorder metricsRecorder) { + return new Office365CrawlerClient(service, sourceConfig, metricsRecorder); + } } diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClientTest.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClientTest.java index 737b42e9f1..365f982e82 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClientTest.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClientTest.java @@ -11,8 +11,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import io.micrometer.core.instrument.Counter; -import io.micrometer.core.instrument.Timer; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -29,7 +27,7 @@ import org.opensearch.dataprepper.plugins.source.microsoft_office365.models.AuditLogsResponse; import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.DimensionalTimeSliceWorkerProgressState; import org.opensearch.dataprepper.plugins.source.source_crawler.exception.SaaSCrawlerException; -import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.plugins.source.source_crawler.metrics.VendorAPIMetricsRecorder; import org.opensearch.dataprepper.plugins.source.microsoft_office365.service.Office365Service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,10 +57,6 @@ import static org.mockito.Mockito.when; import static org.mockito.Mockito.never; -import static org.opensearch.dataprepper.plugins.source.source_crawler.utils.MetricsHelper.REQUEST_ERRORS; -import static org.opensearch.dataprepper.plugins.source.microsoft_office365.Office365CrawlerClient.NON_RETRYABLE_ERRORS; -import static org.opensearch.dataprepper.plugins.source.microsoft_office365.Office365CrawlerClient.RETRYABLE_ERRORS; - @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) class Office365CrawlerClientTest { @@ -82,10 +76,7 @@ class Office365CrawlerClientTest { private Office365Service service; @Mock - private PluginMetrics pluginMetrics; - - @Mock - private Timer bufferWriteLatencyTimer; + private VendorAPIMetricsRecorder metricsRecorder; @Mock private static Logger log; @@ -98,8 +89,6 @@ static void setupLogger() { @BeforeEach void setUp() { - when(pluginMetrics.timer(anyString())).thenReturn(bufferWriteLatencyTimer); - when(pluginMetrics.counter(anyString())).thenReturn(mock(Counter.class)); when(state.getStartTime()).thenReturn(Instant.now().minus(Duration.ofHours(1))); when(state.getEndTime()).thenReturn(Instant.now()); when(state.getDimensionType()).thenReturn("Exchange"); @@ -107,13 +96,13 @@ void setUp() { @Test void testConstructor() { - Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, pluginMetrics); + Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, metricsRecorder); assertNotNull(client); } @Test void testExecutePartition() throws Exception { - Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, pluginMetrics); + Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, metricsRecorder); AuditLogsResponse response = new AuditLogsResponse( Arrays.asList(Map.of( @@ -131,11 +120,12 @@ void testExecutePartition() throws Exception { when(service.getAuditLog(anyString())) .thenReturn("{\"Workload\":\"Exchange\",\"Operation\":\"Test\"}"); + // Mock the metrics recorder methods doAnswer(invocation -> { Runnable runnable = invocation.getArgument(0); runnable.run(); return null; - }).when(bufferWriteLatencyTimer).record(any(Runnable.class)); + }).when(metricsRecorder).recordBufferWriteLatency(any(Runnable.class)); ArgumentCaptor>> recordsCaptor = ArgumentCaptor.forClass((Class) Collection.class); @@ -150,18 +140,18 @@ void testExecutePartition() throws Exception { assertNotNull(record.getData()); assertEquals("Exchange", record.getData().getMetadata().getAttribute("contentType")); } + + verify(metricsRecorder).recordBufferWriteLatency(any(Runnable.class)); + verify(metricsRecorder).recordBufferWriteAttempt(); + verify(metricsRecorder).recordBufferWriteSuccess(); } @Test void testExecutePartitionWithJsonProcessingError() throws Exception { - Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, pluginMetrics); + Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, metricsRecorder); ObjectMapper mockObjectMapper = mock(ObjectMapper.class); client.injectObjectMapper(mockObjectMapper); - // Mock the total failures counter - Counter mockRequestErrorsCounter = mock(Counter.class); - when(pluginMetrics.counter(REQUEST_ERRORS)).thenReturn(mockRequestErrorsCounter); - AuditLogsResponse response = new AuditLogsResponse( Arrays.asList(Map.of( "contentId", "ID1", @@ -182,9 +172,9 @@ void testExecutePartitionWithJsonProcessingError() throws Exception { Runnable runnable = invocation.getArgument(0); runnable.run(); return null; - }).when(bufferWriteLatencyTimer).record(any(Runnable.class)); + }).when(metricsRecorder).recordBufferWriteLatency(any(Runnable.class)); - SaaSCrawlerException exception = assertThrows(SaaSCrawlerException.class, + SaaSCrawlerException exception = assertThrows(SaaSCrawlerException.class, () -> client.executePartition(state, buffer, acknowledgementSet)); assertEquals("Error processing audit log: ID1", exception.getMessage()); @@ -192,12 +182,13 @@ void testExecutePartitionWithJsonProcessingError() throws Exception { assertTrue(exception.getCause() instanceof SaaSCrawlerException); assertEquals("Failed to parse audit log: ID1", exception.getCause().getMessage()); - verify(mockRequestErrorsCounter, never()).increment(); + verify(metricsRecorder).recordError(any(Exception.class)); + verify(metricsRecorder).recordNonRetryableError(); } @Test void testBufferWriteWithAcknowledgements() throws Exception { - Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, pluginMetrics); + Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, metricsRecorder); AuditLogsResponse response = new AuditLogsResponse( Arrays.asList(Map.of( @@ -220,18 +211,19 @@ void testBufferWriteWithAcknowledgements() throws Exception { Runnable runnable = invocation.getArgument(0); runnable.run(); return null; - }).when(bufferWriteLatencyTimer).record(any(Runnable.class)); + }).when(metricsRecorder).recordBufferWriteLatency(any(Runnable.class)); client.executePartition(state, buffer, acknowledgementSet); verify(acknowledgementSet).add(any(Event.class)); verify(acknowledgementSet).complete(); verify(buffer).writeAll(any(), anyInt()); + verify(metricsRecorder).recordBufferWriteLatency(any(Runnable.class)); } @Test void testBufferWriteTimeout() throws Exception { - Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, pluginMetrics); + Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, metricsRecorder); AuditLogsResponse response = new AuditLogsResponse( Arrays.asList(Map.of( @@ -253,7 +245,7 @@ void testBufferWriteTimeout() throws Exception { Runnable runnable = invocation.getArgument(0); runnable.run(); return null; - }).when(bufferWriteLatencyTimer).record(any(Runnable.class)); + }).when(metricsRecorder).recordBufferWriteLatency(any(Runnable.class)); doThrow(new RuntimeException("Error writing to buffer")) .when(buffer) @@ -265,15 +257,12 @@ void testBufferWriteTimeout() throws Exception { assertEquals("Error writing to buffer", exception.getMessage()); assertTrue(exception.isRetryable()); verify(buffer).writeAll(any(), anyInt()); + verify(metricsRecorder).recordBufferWriteFailure(); } @Test void testNonRetryableError() throws Exception { - // Mock the non-retryable errors counter - Counter mockNonRetryableErrorsCounter = mock(Counter.class); - when(pluginMetrics.counter(NON_RETRYABLE_ERRORS)).thenReturn(mockNonRetryableErrorsCounter); - - Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, pluginMetrics); + Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, metricsRecorder); AuditLogsResponse response = new AuditLogsResponse( Arrays.asList(Map.of( @@ -293,7 +282,7 @@ void testNonRetryableError() throws Exception { Runnable runnable = invocation.getArgument(0); runnable.run(); return null; - }).when(bufferWriteLatencyTimer).record(any(Runnable.class)); + }).when(metricsRecorder).recordBufferWriteLatency(any(Runnable.class)); SaaSCrawlerException exception = assertThrows(SaaSCrawlerException.class, () -> client.executePartition(state, buffer, acknowledgementSet)); @@ -304,16 +293,13 @@ void testNonRetryableError() throws Exception { assertEquals("Received null log content for URI: uri1", exception.getCause().getMessage()); verify(buffer, never()).writeAll(argThat(list -> list.isEmpty()), anyInt()); - verify(mockNonRetryableErrorsCounter).increment(); + verify(metricsRecorder).recordError(any(Exception.class)); + verify(metricsRecorder).recordNonRetryableError(); } @Test void testRetryableErrorCounterIncrement() throws Exception { - // Mock the retryable errors counter - Counter mockRetryableErrorsCounter = mock(Counter.class); - when(pluginMetrics.counter(RETRYABLE_ERRORS)).thenReturn(mockRetryableErrorsCounter); - - Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, pluginMetrics); + Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, metricsRecorder); AuditLogsResponse response = new AuditLogsResponse( Arrays.asList(Map.of( @@ -336,20 +322,21 @@ void testRetryableErrorCounterIncrement() throws Exception { Runnable runnable = invocation.getArgument(0); runnable.run(); return null; - }).when(bufferWriteLatencyTimer).record(any(Runnable.class)); + }).when(metricsRecorder).recordBufferWriteLatency(any(Runnable.class)); // Execute and expect RuntimeException to be thrown due to retryable error RuntimeException exception = assertThrows(RuntimeException.class, () -> client.executePartition(state, buffer, acknowledgementSet)); // Verify retryable error counter was incremented - verify(mockRetryableErrorsCounter).increment(); + verify(metricsRecorder).recordError(any(Exception.class)); + verify(metricsRecorder).recordRetryableError(); assertEquals("Error processing audit log: ID1", exception.getMessage()); } @Test void testMissingWorkloadField() throws Exception { - Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, pluginMetrics); + Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, metricsRecorder); AuditLogsResponse response = new AuditLogsResponse( Arrays.asList(Map.of( @@ -370,7 +357,7 @@ void testMissingWorkloadField() throws Exception { Runnable runnable = invocation.getArgument(0); runnable.run(); return null; - }).when(bufferWriteLatencyTimer).record(any(Runnable.class)); + }).when(metricsRecorder).recordBufferWriteLatency(any(Runnable.class)); SaaSCrawlerException exception = assertThrows(SaaSCrawlerException.class, () -> client.executePartition(state, buffer, acknowledgementSet)); @@ -381,16 +368,13 @@ void testMissingWorkloadField() throws Exception { assertEquals("Missing Workload field in audit log: ID1", exception.getCause().getMessage()); verify(buffer, never()).writeAll(argThat(list -> list.isEmpty()), anyInt()); + verify(metricsRecorder).recordError(any(Exception.class)); + verify(metricsRecorder).recordNonRetryableError(); } @Test void testExecutePartitionWithSearchAuditLogsError() throws Exception { - Counter mockRequestErrorsCounter = mock(Counter.class); - Counter mockRetryableErrorsCounter = mock(Counter.class); - when(pluginMetrics.counter(REQUEST_ERRORS)).thenReturn(mockRequestErrorsCounter); - when(pluginMetrics.counter(RETRYABLE_ERRORS)).thenReturn(mockRetryableErrorsCounter); - - Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, pluginMetrics); + Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, metricsRecorder); // Mock searchAuditLogs to throw exception when(service.searchAuditLogs( @@ -407,20 +391,13 @@ void testExecutePartitionWithSearchAuditLogsError() throws Exception { // Verify exception message and counter increment assertEquals("Search audit logs failed", exception.getMessage()); assertTrue(exception.isRetryable()); - verify(mockRequestErrorsCounter).increment(); - verify(mockRetryableErrorsCounter).increment(); + verify(metricsRecorder).recordError(any(Exception.class)); + verify(metricsRecorder).recordRetryableError(); } @Test void testExecutePartitionWithNonSaaSCrawlerException() throws Exception { - // Create the counter mock before creating the client - Counter mockRequestErrorsCounter = mock(Counter.class); - Counter mockNonRetryableErrorsCounter = mock(Counter.class); - when(pluginMetrics.counter(REQUEST_ERRORS)).thenReturn(mockRequestErrorsCounter); - when(pluginMetrics.counter(NON_RETRYABLE_ERRORS)).thenReturn(mockNonRetryableErrorsCounter); - - // Create client after counter is mocked - Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, pluginMetrics); + Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, metricsRecorder); // Simulate a non-SaaSCrawlerException (like RuntimeException) when(service.searchAuditLogs( @@ -438,7 +415,41 @@ void testExecutePartitionWithNonSaaSCrawlerException() throws Exception { assertEquals("Failed to process partition", exception.getMessage()); assertFalse(exception.isRetryable()); assertTrue(exception.getCause() instanceof RuntimeException); - verify(mockRequestErrorsCounter).increment(); - verify(mockNonRetryableErrorsCounter).increment(); + verify(metricsRecorder).recordError(any(Exception.class)); + verify(metricsRecorder).recordNonRetryableError(); + } + + @Test + void testBufferWriteRetrySuccess() throws Exception { + Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, metricsRecorder); + + AuditLogsResponse response = new AuditLogsResponse( + Arrays.asList(Map.of( + "contentId", "ID1", + "contentUri", "uri1" + )), null); + + when(service.searchAuditLogs( + anyString(), + any(Instant.class), + any(Instant.class), + any() + )).thenReturn(response); + + when(service.getAuditLog(anyString())) + .thenReturn("{\"Workload\":\"Exchange\",\"Operation\":\"Test\"}"); + + doAnswer(invocation -> { + Runnable runnable = invocation.getArgument(0); + runnable.run(); + return null; + }).when(metricsRecorder).recordBufferWriteLatency(any(Runnable.class)); + + client.executePartition(state, buffer, acknowledgementSet); + + verify(metricsRecorder).recordBufferWriteAttempt(); + verify(metricsRecorder).recordBufferWriteSuccess(); + verify(metricsRecorder, never()).recordBufferWriteRetrySuccess(); + verify(metricsRecorder, never()).recordBufferWriteFailure(); } } diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/metrics/VendorAPIMetricsRecorder.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/metrics/VendorAPIMetricsRecorder.java index 105a70157e..5e6d76a08c 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/metrics/VendorAPIMetricsRecorder.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/metrics/VendorAPIMetricsRecorder.java @@ -21,7 +21,7 @@ * * This class provides a unified interface for recording metrics across different types of vendor API operations: * - Search operations: latency, success/failure rates, and response sizes - * - Get/retrieval operations: latency, success/failure rates, and response sizes + * - Get/retrieval operations: latency, success/failure rates, and response sizes * - Authentication operations: latency, success/failure rates * - Subscription operations: latency, success/failure rates, and call counts * - General API operations: request counts, logs requested, error categorization @@ -43,7 +43,7 @@ public class VendorAPIMetricsRecorder { private final Timer searchLatencyTimer; private final DistributionSummary searchResponseSizeSummary; - // Get operation metrics + // Get operation metrics private final Counter getSuccessCounter; private final Counter getFailureCounter; private final Timer getLatencyTimer; @@ -66,14 +66,23 @@ public class VendorAPIMetricsRecorder { private final Timer listSubscriptionLatencyTimer; private final Counter listSubscriptionCallsCounter; + private final Timer bufferWriteLatencyTimer; + private final Counter bufferWriteAttemptsCounter; + private final Counter bufferWriteSuccessCounter; + private final Counter bufferWriteRetrySuccessCounter; + private final Counter bufferWriteRetryAttemptsCounter; + private final Counter bufferWriteFailuresCounter; + // Shared metrics private final Counter totalDataApiRequestsCounter; private final Counter logsRequestedCounter; - + // Error metrics private final Counter requestAccessDeniedCounter; private final Counter requestThrottledCounter; private final Counter resourceNotFoundCounter; + private final Counter nonRetryableErrorsCounter; + private final Counter retryableErrorsCounter; private final PluginMetrics pluginMetrics; private final boolean enableSubscriptionMetrics; @@ -97,7 +106,7 @@ public VendorAPIMetricsRecorder(PluginMetrics pluginMetrics) { public VendorAPIMetricsRecorder(PluginMetrics pluginMetrics, boolean enableSubscriptionMetrics) { this.pluginMetrics = pluginMetrics; this.enableSubscriptionMetrics = enableSubscriptionMetrics; - + // Search metrics this.searchSuccessCounter = pluginMetrics.counter("searchRequestsSuccess"); this.searchFailureCounter = pluginMetrics.counter("searchRequestsFailed"); @@ -110,7 +119,7 @@ public VendorAPIMetricsRecorder(PluginMetrics pluginMetrics, boolean enableSubsc this.getLatencyTimer = pluginMetrics.timer("getRequestLatency"); this.getResponseSizeSummary = pluginMetrics.summary("getResponseSizeBytes"); - // Auth metrics + // Auth metrics this.authSuccessCounter = pluginMetrics.counter("authenticationRequestsSuccess"); this.authFailureCounter = pluginMetrics.counter("authenticationRequestsFailed"); this.authLatencyTimer = pluginMetrics.timer("authenticationRequestLatency"); @@ -141,14 +150,23 @@ public VendorAPIMetricsRecorder(PluginMetrics pluginMetrics, boolean enableSubsc this.listSubscriptionCallsCounter = null; } + this.bufferWriteLatencyTimer = pluginMetrics.timer("bufferWriteLatency"); + this.bufferWriteAttemptsCounter = pluginMetrics.counter("bufferWriteAttempts"); + this.bufferWriteSuccessCounter = pluginMetrics.counter("bufferWriteSuccess"); + this.bufferWriteRetrySuccessCounter = pluginMetrics.counter("bufferWriteRetrySuccess"); + this.bufferWriteRetryAttemptsCounter = pluginMetrics.counter("bufferWriteRetryAttempts"); + this.bufferWriteFailuresCounter = pluginMetrics.counter("bufferWriteFailures"); + // Shared metrics this.totalDataApiRequestsCounter = pluginMetrics.counter("totalDataApiRequests"); this.logsRequestedCounter = pluginMetrics.counter("logsRequested"); - + // Error metrics this.requestAccessDeniedCounter = pluginMetrics.counter("requestAccessDenied"); this.requestThrottledCounter = pluginMetrics.counter("requestThrottled"); this.resourceNotFoundCounter = pluginMetrics.counter("resourceNotFound"); + this.nonRetryableErrorsCounter = pluginMetrics.counter("nonRetryableErrors"); + this.retryableErrorsCounter = pluginMetrics.counter("retryableErrors"); } // Search operation methods @@ -358,6 +376,50 @@ public void recordLogsRequested() { logsRequestedCounter.increment(); } + public void recordLogsRequested(int count) { + logsRequestedCounter.increment(count); + } + + public void recordBufferWriteAttempt() { + bufferWriteAttemptsCounter.increment(); + } + + public void recordBufferWriteSuccess() { + bufferWriteSuccessCounter.increment(); + } + + public void recordBufferWriteRetrySuccess() { + bufferWriteRetrySuccessCounter.increment(); + } + + public void recordBufferWriteRetryAttempt() { + bufferWriteRetryAttemptsCounter.increment(); + } + + public void recordBufferWriteFailure() { + bufferWriteFailuresCounter.increment(); + } + + public T recordBufferWriteLatency(Supplier operation) { + return bufferWriteLatencyTimer.record(operation); + } + + public void recordBufferWriteLatency(Runnable operation) { + bufferWriteLatencyTimer.record(operation); + } + + public void recordBufferWriteLatency(Duration duration) { + bufferWriteLatencyTimer.record(duration); + } + + public void recordNonRetryableError() { + nonRetryableErrorsCounter.increment(); + } + + public void recordRetryableError() { + retryableErrorsCounter.increment(); + } + /** * Records error metrics based on exception type and HTTP status code. * Maps specific HTTP errors to business-meaningful metrics: diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/metrics/VendorAPIMetricsRecorderTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/metrics/VendorAPIMetricsRecorderTest.java index 3984599058..83edad7fec 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/metrics/VendorAPIMetricsRecorderTest.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/metrics/VendorAPIMetricsRecorderTest.java @@ -88,6 +88,20 @@ class VendorAPIMetricsRecorderTest { @Mock private Counter listSubscriptionCallsCounter; + // Buffer metrics + @Mock + private Timer bufferWriteLatencyTimer; + @Mock + private Counter bufferWriteAttemptsCounter; + @Mock + private Counter bufferWriteSuccessCounter; + @Mock + private Counter bufferWriteRetrySuccessCounter; + @Mock + private Counter bufferWriteRetryAttemptsCounter; + @Mock + private Counter bufferWriteFailuresCounter; + // Shared metrics @Mock private Counter totalDataApiRequestsCounter; @@ -101,6 +115,10 @@ class VendorAPIMetricsRecorderTest { private Counter requestThrottledCounter; @Mock private Counter resourceNotFoundCounter; + @Mock + private Counter nonRetryableErrorsCounter; + @Mock + private Counter retryableErrorsCounter; private VendorAPIMetricsRecorder recorder; @@ -135,6 +153,14 @@ void setUp() { when(pluginMetrics.timer("listSubscriptionRequestLatency")).thenReturn(listSubscriptionLatencyTimer); when(pluginMetrics.counter("listSubscriptionApiCalls")).thenReturn(listSubscriptionCallsCounter); + // Setup buffer metrics mocks + when(pluginMetrics.timer("bufferWriteLatency")).thenReturn(bufferWriteLatencyTimer); + when(pluginMetrics.counter("bufferWriteAttempts")).thenReturn(bufferWriteAttemptsCounter); + when(pluginMetrics.counter("bufferWriteSuccess")).thenReturn(bufferWriteSuccessCounter); + when(pluginMetrics.counter("bufferWriteRetrySuccess")).thenReturn(bufferWriteRetrySuccessCounter); + when(pluginMetrics.counter("bufferWriteRetryAttempts")).thenReturn(bufferWriteRetryAttemptsCounter); + when(pluginMetrics.counter("bufferWriteFailures")).thenReturn(bufferWriteFailuresCounter); + // Setup shared metrics mocks when(pluginMetrics.counter("totalDataApiRequests")).thenReturn(totalDataApiRequestsCounter); when(pluginMetrics.counter("logsRequested")).thenReturn(logsRequestedCounter); @@ -143,6 +169,8 @@ void setUp() { when(pluginMetrics.counter("requestAccessDenied")).thenReturn(requestAccessDeniedCounter); when(pluginMetrics.counter("requestThrottled")).thenReturn(requestThrottledCounter); when(pluginMetrics.counter("resourceNotFound")).thenReturn(resourceNotFoundCounter); + when(pluginMetrics.counter("nonRetryableErrors")).thenReturn(nonRetryableErrorsCounter); + when(pluginMetrics.counter("retryableErrors")).thenReturn(retryableErrorsCounter); // Use explicit constructor with enabled=true to match existing test expectations recorder = new VendorAPIMetricsRecorder(pluginMetrics, true); @@ -181,6 +209,14 @@ void constructor_CreatesAllMetricsCorrectly() { verify(pluginMetrics).timer("listSubscriptionRequestLatency"); verify(pluginMetrics).counter("listSubscriptionApiCalls"); + // Verify buffer metrics creation + verify(pluginMetrics).timer("bufferWriteLatency"); + verify(pluginMetrics).counter("bufferWriteAttempts"); + verify(pluginMetrics).counter("bufferWriteSuccess"); + verify(pluginMetrics).counter("bufferWriteRetrySuccess"); + verify(pluginMetrics).counter("bufferWriteRetryAttempts"); + verify(pluginMetrics).counter("bufferWriteFailures"); + // Verify shared metrics creation verify(pluginMetrics).counter("totalDataApiRequests"); verify(pluginMetrics).counter("logsRequested"); @@ -189,6 +225,8 @@ void constructor_CreatesAllMetricsCorrectly() { verify(pluginMetrics).counter("requestAccessDenied"); verify(pluginMetrics).counter("requestThrottled"); verify(pluginMetrics).counter("resourceNotFound"); + verify(pluginMetrics).counter("nonRetryableErrors"); + verify(pluginMetrics).counter("retryableErrors"); } @Test @@ -449,6 +487,54 @@ void recordLogsRequested_IncrementsLogsRequestedCounter() { verify(logsRequestedCounter).increment(); } + @Test + void recordLogsRequested_WithCount_IncrementsLogsRequestedCounterByCount() { + int logCount = 5; + + recorder.recordLogsRequested(logCount); + + verify(logsRequestedCounter).increment(logCount); + } + + @Test + void recordLogsRequested_WithZeroCount_IncrementsLogsRequestedCounterByZero() { + int logCount = 0; + + recorder.recordLogsRequested(logCount); + + verify(logsRequestedCounter).increment(logCount); + } + + @Test + void recordLogsRequested_WithLargeCount_IncrementsLogsRequestedCounterByLargeCount() { + int logCount = 1000; + + recorder.recordLogsRequested(logCount); + + verify(logsRequestedCounter).increment(logCount); + } + + @Test + void recordLogsRequested_WithMultipleCounts_IncrementsLogsRequestedCounterCorrectly() { + recorder.recordLogsRequested(3); + recorder.recordLogsRequested(7); + recorder.recordLogsRequested(2); + + verify(logsRequestedCounter).increment(3); + verify(logsRequestedCounter).increment(7); + verify(logsRequestedCounter).increment(2); + } + + @Test + void recordLogsRequested_MixedWithParameterlessMethod_BothIncrementsWork() { + recorder.recordLogsRequested(); // No parameter - increments by 1 + recorder.recordLogsRequested(10); // With parameter - increments by 10 + recorder.recordLogsRequested(); // No parameter - increments by 1 again + + verify(logsRequestedCounter, times(2)).increment(); // Called twice without parameters + verify(logsRequestedCounter, times(1)).increment(10); // Called once with parameter + } + @Test void standaloneOperations_WorkCorrectly() { @@ -767,6 +853,54 @@ void recordListSubscriptionLatency_WithSupplier_RecordsLatencyAndReturnsResult() String result = recorder.recordListSubscriptionLatency(operation); verify(listSubscriptionLatencyTimer).record(eq(operation)); + } + + // Buffer metrics tests + + @Test + void recordBufferWriteAttempt_IncrementsBufferWriteAttemptsCounter() { + recorder.recordBufferWriteAttempt(); + + verify(bufferWriteAttemptsCounter).increment(); + } + + @Test + void recordBufferWriteSuccess_IncrementsBufferWriteSuccessCounter() { + recorder.recordBufferWriteSuccess(); + + verify(bufferWriteSuccessCounter).increment(); + } + + @Test + void recordBufferWriteRetrySuccess_IncrementsBufferWriteRetrySuccessCounter() { + recorder.recordBufferWriteRetrySuccess(); + + verify(bufferWriteRetrySuccessCounter).increment(); + } + + @Test + void recordBufferWriteRetryAttempt_IncrementsBufferWriteRetryAttemptsCounter() { + recorder.recordBufferWriteRetryAttempt(); + + verify(bufferWriteRetryAttemptsCounter).increment(); + } + + @Test + void recordBufferWriteFailure_IncrementsBufferWriteFailuresCounter() { + recorder.recordBufferWriteFailure(); + + verify(bufferWriteFailuresCounter).increment(); + } + + @Test + void recordBufferWriteLatency_WithSupplier_RecordsLatencyAndReturnsResult() { + String expectedResult = "buffer write result"; + Supplier operation = () -> expectedResult; + when(bufferWriteLatencyTimer.record(any(Supplier.class))).thenReturn(expectedResult); + + String result = recorder.recordBufferWriteLatency(operation); + + verify(bufferWriteLatencyTimer).record(eq(operation)); assertThat(result, equalTo(expectedResult)); } @@ -898,8 +1032,167 @@ void recordListSubscriptionLatencyPropagatesExceptions() { verify(listSubscriptionLatencyTimer, times(1)).record(failingOperation); } + + @Test + void recordBufferWriteLatency_WithRunnable_RecordsLatency() { + Runnable operation = () -> { /* void buffer write operation */ }; + + recorder.recordBufferWriteLatency(operation); + + verify(bufferWriteLatencyTimer).record(eq(operation)); + } + @Test - void recordListSubscriptionLatencyWithNullDuration() { + void recordBufferWriteLatency_WithDuration_RecordsLatency() { + Duration duration = Duration.ofMillis(150); + + recorder.recordBufferWriteLatency(duration); + + verify(bufferWriteLatencyTimer).record(duration); + } + + @Test + void recordNonRetryableError_IncrementsNonRetryableErrorsCounter() { + recorder.recordNonRetryableError(); + + verify(nonRetryableErrorsCounter).increment(); + } + + @Test + void recordRetryableError_IncrementsRetryableErrorsCounter() { + recorder.recordRetryableError(); + + verify(retryableErrorsCounter).increment(); + } + + @Test + void recordBufferWriteLatencyWithIntegerSupplier() { + Supplier operation = () -> 100; + when(bufferWriteLatencyTimer.record(operation)).thenReturn(100); + + Integer result = recorder.recordBufferWriteLatency(operation); + + assertEquals(100, result); + verify(bufferWriteLatencyTimer, times(1)).record(operation); + } + + @Test + void recordBufferWriteLatencyWithMultipleDurations() { + Duration duration1 = Duration.ofMillis(100); + Duration duration2 = Duration.ofMillis(250); + Duration duration3 = Duration.ofMillis(400); + + recorder.recordBufferWriteLatency(duration1); + recorder.recordBufferWriteLatency(duration2); + recorder.recordBufferWriteLatency(duration3); + + verify(bufferWriteLatencyTimer, times(1)).record(duration1); + verify(bufferWriteLatencyTimer, times(1)).record(duration2); + verify(bufferWriteLatencyTimer, times(1)).record(duration3); + } + + @Test + void recordMultipleBufferWriteAttempts() { + recorder.recordBufferWriteAttempt(); + recorder.recordBufferWriteAttempt(); + recorder.recordBufferWriteAttempt(); + + verify(bufferWriteAttemptsCounter, times(3)).increment(); + } + + @Test + void recordMultipleBufferWriteSuccesses() { + recorder.recordBufferWriteSuccess(); + recorder.recordBufferWriteSuccess(); + + verify(bufferWriteSuccessCounter, times(2)).increment(); + } + + @Test + void recordMultipleBufferWriteFailures() { + recorder.recordBufferWriteFailure(); + recorder.recordBufferWriteFailure(); + recorder.recordBufferWriteFailure(); + recorder.recordBufferWriteFailure(); + + verify(bufferWriteFailuresCounter, times(4)).increment(); + } + + @Test + void recordMultipleErrorCategorizations() { + recorder.recordRetryableError(); + recorder.recordRetryableError(); + recorder.recordNonRetryableError(); + + verify(retryableErrorsCounter, times(2)).increment(); + verify(nonRetryableErrorsCounter, times(1)).increment(); + } + + @Test + void realisticBufferWriteScenario() { + // Simulate buffer write operations with retries + recorder.recordBufferWriteAttempt(); // Initial attempt + recorder.recordBufferWriteRetryAttempt(); // First retry + recorder.recordBufferWriteRetryAttempt(); // Second retry + recorder.recordBufferWriteRetrySuccess(); // Success on retry + + verify(bufferWriteAttemptsCounter, times(1)).increment(); + verify(bufferWriteRetryAttemptsCounter, times(2)).increment(); + verify(bufferWriteRetrySuccessCounter, times(1)).increment(); + verify(bufferWriteSuccessCounter, times(0)).increment(); // No direct success + verify(bufferWriteFailuresCounter, times(0)).increment(); // No final failure + } + + @Test + void realisticBufferWriteFailureScenario() { + // Simulate buffer write operations that ultimately fail + recorder.recordBufferWriteAttempt(); // Initial attempt + recorder.recordBufferWriteRetryAttempt(); // First retry + recorder.recordBufferWriteRetryAttempt(); // Second retry + recorder.recordBufferWriteRetryAttempt(); // Third retry + recorder.recordBufferWriteFailure(); // Ultimate failure + + verify(bufferWriteAttemptsCounter, times(1)).increment(); + verify(bufferWriteRetryAttemptsCounter, times(3)).increment(); + verify(bufferWriteFailuresCounter, times(1)).increment(); + verify(bufferWriteSuccessCounter, times(0)).increment(); + verify(bufferWriteRetrySuccessCounter, times(0)).increment(); + } + + @Test + void mixedBufferAndErrorMetricsScenario() { + // Record various buffer and error metrics + recorder.recordBufferWriteAttempt(); + recorder.recordBufferWriteSuccess(); + recorder.recordRetryableError(); + recorder.recordNonRetryableError(); + + // Verify all metrics were recorded correctly + verify(bufferWriteAttemptsCounter, times(1)).increment(); + verify(bufferWriteSuccessCounter, times(1)).increment(); + verify(retryableErrorsCounter, times(1)).increment(); + verify(nonRetryableErrorsCounter, times(1)).increment(); + } + + @Test + void recordBufferWriteLatencyPropagatesExceptions() { + Supplier failingOperation = () -> { + throw new RuntimeException("Buffer write exception"); + }; + + // Configure the mock timer to propagate the exception + when(bufferWriteLatencyTimer.record(failingOperation)).thenThrow(new RuntimeException("Buffer write exception")); + + assertThrows(RuntimeException.class, () -> { + recorder.recordBufferWriteLatency(failingOperation); + }); + + // Timer should still be called even if the operation fails + verify(bufferWriteLatencyTimer, times(1)).record(failingOperation); + } + + @Test + void recordBufferWriteLatencyWithNullDuration() { // Duration should not be null in normal usage, but testing robustness Duration nullDuration = null; @@ -956,11 +1249,19 @@ void constructor_WithDisabledSubscriptionMetrics_DoesNotCreateSubscriptionMetric when(testPluginMetrics.counter("authenticationRequestsSuccess")).thenReturn(authSuccessCounter); when(testPluginMetrics.counter("authenticationRequestsFailed")).thenReturn(authFailureCounter); when(testPluginMetrics.timer("authenticationRequestLatency")).thenReturn(authLatencyTimer); + when(testPluginMetrics.timer("bufferWriteLatency")).thenReturn(bufferWriteLatencyTimer); + when(testPluginMetrics.counter("bufferWriteAttempts")).thenReturn(bufferWriteAttemptsCounter); + when(testPluginMetrics.counter("bufferWriteSuccess")).thenReturn(bufferWriteSuccessCounter); + when(testPluginMetrics.counter("bufferWriteRetrySuccess")).thenReturn(bufferWriteRetrySuccessCounter); + when(testPluginMetrics.counter("bufferWriteRetryAttempts")).thenReturn(bufferWriteRetryAttemptsCounter); + when(testPluginMetrics.counter("bufferWriteFailures")).thenReturn(bufferWriteFailuresCounter); when(testPluginMetrics.counter("totalDataApiRequests")).thenReturn(totalDataApiRequestsCounter); when(testPluginMetrics.counter("logsRequested")).thenReturn(logsRequestedCounter); when(testPluginMetrics.counter("requestAccessDenied")).thenReturn(requestAccessDeniedCounter); when(testPluginMetrics.counter("requestThrottled")).thenReturn(requestThrottledCounter); when(testPluginMetrics.counter("resourceNotFound")).thenReturn(resourceNotFoundCounter); + when(testPluginMetrics.counter("nonRetryableErrors")).thenReturn(nonRetryableErrorsCounter); + when(testPluginMetrics.counter("retryableErrors")).thenReturn(retryableErrorsCounter); VendorAPIMetricsRecorder recorderDisabled = new VendorAPIMetricsRecorder(testPluginMetrics, false); @@ -1191,11 +1492,19 @@ void subscriptionMetricsEnabled_WorkNormallyAfterConstruction() { when(separatePluginMetrics.counter("listSubscriptionRequestsFailed")).thenReturn(defaultCounter); when(separatePluginMetrics.timer("listSubscriptionRequestLatency")).thenReturn(defaultTimer); when(separatePluginMetrics.counter("listSubscriptionApiCalls")).thenReturn(defaultCounter); + when(separatePluginMetrics.timer("bufferWriteLatency")).thenReturn(defaultTimer); + when(separatePluginMetrics.counter("bufferWriteAttempts")).thenReturn(defaultCounter); + when(separatePluginMetrics.counter("bufferWriteSuccess")).thenReturn(defaultCounter); + when(separatePluginMetrics.counter("bufferWriteRetrySuccess")).thenReturn(defaultCounter); + when(separatePluginMetrics.counter("bufferWriteRetryAttempts")).thenReturn(defaultCounter); + when(separatePluginMetrics.counter("bufferWriteFailures")).thenReturn(defaultCounter); when(separatePluginMetrics.counter("totalDataApiRequests")).thenReturn(defaultCounter); when(separatePluginMetrics.counter("logsRequested")).thenReturn(defaultCounter); when(separatePluginMetrics.counter("requestAccessDenied")).thenReturn(defaultCounter); when(separatePluginMetrics.counter("requestThrottled")).thenReturn(defaultCounter); when(separatePluginMetrics.counter("resourceNotFound")).thenReturn(defaultCounter); + when(separatePluginMetrics.counter("nonRetryableErrors")).thenReturn(defaultCounter); + when(separatePluginMetrics.counter("retryableErrors")).thenReturn(defaultCounter); VendorAPIMetricsRecorder recorderEnabled = new VendorAPIMetricsRecorder(separatePluginMetrics, true); @@ -1302,11 +1611,19 @@ void multipleInstancesWithDifferentSettings_WorkIndependently() { when(separatePluginMetrics1.counter("listSubscriptionRequestsFailed")).thenReturn(defaultCounter); when(separatePluginMetrics1.timer("listSubscriptionRequestLatency")).thenReturn(defaultTimer); when(separatePluginMetrics1.counter("listSubscriptionApiCalls")).thenReturn(defaultCounter); + when(separatePluginMetrics1.timer("bufferWriteLatency")).thenReturn(defaultTimer); + when(separatePluginMetrics1.counter("bufferWriteAttempts")).thenReturn(defaultCounter); + when(separatePluginMetrics1.counter("bufferWriteSuccess")).thenReturn(defaultCounter); + when(separatePluginMetrics1.counter("bufferWriteRetrySuccess")).thenReturn(defaultCounter); + when(separatePluginMetrics1.counter("bufferWriteRetryAttempts")).thenReturn(defaultCounter); + when(separatePluginMetrics1.counter("bufferWriteFailures")).thenReturn(defaultCounter); when(separatePluginMetrics1.counter("totalDataApiRequests")).thenReturn(defaultCounter); when(separatePluginMetrics1.counter("logsRequested")).thenReturn(defaultCounter); when(separatePluginMetrics1.counter("requestAccessDenied")).thenReturn(defaultCounter); when(separatePluginMetrics1.counter("requestThrottled")).thenReturn(defaultCounter); when(separatePluginMetrics1.counter("resourceNotFound")).thenReturn(defaultCounter); + when(separatePluginMetrics1.counter("nonRetryableErrors")).thenReturn(defaultCounter); + when(separatePluginMetrics1.counter("retryableErrors")).thenReturn(defaultCounter); // Setup mocks for disabled recorder (without subscription metrics) when(separatePluginMetrics2.counter("searchRequestsSuccess")).thenReturn(separateSearchCounter2); @@ -1320,11 +1637,19 @@ void multipleInstancesWithDifferentSettings_WorkIndependently() { when(separatePluginMetrics2.counter("authenticationRequestsSuccess")).thenReturn(defaultCounter); when(separatePluginMetrics2.counter("authenticationRequestsFailed")).thenReturn(defaultCounter); when(separatePluginMetrics2.timer("authenticationRequestLatency")).thenReturn(defaultTimer); + when(separatePluginMetrics2.timer("bufferWriteLatency")).thenReturn(defaultTimer); + when(separatePluginMetrics2.counter("bufferWriteAttempts")).thenReturn(defaultCounter); + when(separatePluginMetrics2.counter("bufferWriteSuccess")).thenReturn(defaultCounter); + when(separatePluginMetrics2.counter("bufferWriteRetrySuccess")).thenReturn(defaultCounter); + when(separatePluginMetrics2.counter("bufferWriteRetryAttempts")).thenReturn(defaultCounter); + when(separatePluginMetrics2.counter("bufferWriteFailures")).thenReturn(defaultCounter); when(separatePluginMetrics2.counter("totalDataApiRequests")).thenReturn(defaultCounter); when(separatePluginMetrics2.counter("logsRequested")).thenReturn(defaultCounter); when(separatePluginMetrics2.counter("requestAccessDenied")).thenReturn(defaultCounter); when(separatePluginMetrics2.counter("requestThrottled")).thenReturn(defaultCounter); when(separatePluginMetrics2.counter("resourceNotFound")).thenReturn(defaultCounter); + when(separatePluginMetrics2.counter("nonRetryableErrors")).thenReturn(defaultCounter); + when(separatePluginMetrics2.counter("retryableErrors")).thenReturn(defaultCounter); VendorAPIMetricsRecorder recorderEnabled = new VendorAPIMetricsRecorder(separatePluginMetrics1, true); VendorAPIMetricsRecorder recorderDisabled = new VendorAPIMetricsRecorder(separatePluginMetrics2, false); @@ -1348,6 +1673,7 @@ void gatingLogic_VerifyNullChecks_DoNotCauseNullPointerExceptions() { // This test ensures that when subscription metrics are disabled, // the internal counters/timers are null but methods handle this gracefully VendorAPIMetricsRecorder recorderDisabled = new VendorAPIMetricsRecorder(pluginMetrics, false); + Duration nullDuration = null; // All these should complete without NPE recorderDisabled.recordSubscriptionSuccess(); @@ -1362,5 +1688,29 @@ void gatingLogic_VerifyNullChecks_DoNotCauseNullPointerExceptions() { // If we reach here, no NPEs were thrown assertThat(recorderDisabled, notNullValue()); + + doThrow(new NullPointerException()).when(bufferWriteLatencyTimer).record(nullDuration); + + assertThrows(NullPointerException.class, () -> { + recorder.recordBufferWriteLatency(nullDuration); + }); + } + + @Test + void mixedOperationsIncludingBuffer_WorkCorrectly() { + // Test that we can use different operation types including buffer metrics + recorder.recordSearchSuccess(); + recorder.recordGetSuccess(); + recorder.recordAuthSuccess(); + recorder.recordSubscriptionSuccess(); + recorder.recordBufferWriteSuccess(); + recorder.recordRetryableError(); + + verify(searchSuccessCounter).increment(); + verify(getSuccessCounter).increment(); + verify(authSuccessCounter).increment(); + verify(subscriptionSuccessCounter).increment(); + verify(bufferWriteSuccessCounter).increment(); + verify(retryableErrorsCounter).increment(); } }