Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import java.util.concurrent.TimeoutException;

import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;
import static org.opensearch.dataprepper.plugins.source.source_crawler.utils.MetricsHelper.REQEUEST_ERRORS;
import static org.opensearch.dataprepper.plugins.source.source_crawler.utils.MetricsHelper.REQUEST_ERRORS;

/**
* Implementation of CrawlerClient for Office 365 audit logs.
Expand Down Expand Up @@ -86,7 +86,7 @@ public Office365CrawlerClient(final Office365Service service,
this.bufferWriteRetrySuccessCounter = pluginMetrics.counter(BUFFER_WRITE_RETRY_SUCCESS);
this.bufferWriteRetryAttemptsCounter = pluginMetrics.counter(BUFFER_WRITE_RETRY_ATTEMPTS);
this.bufferWriteFailuresCounter = pluginMetrics.counter(BUFFER_WRITE_FAILURES);
this.requestErrorsCounter = pluginMetrics.counter(REQEUEST_ERRORS);
this.requestErrorsCounter = pluginMetrics.counter(REQUEST_ERRORS);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.springframework.web.client.HttpClientErrorException;
import org.springframework.web.client.HttpServerErrorException;
import org.springframework.web.client.RestTemplate;
import org.springframework.http.HttpStatus;

import javax.inject.Named;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -142,17 +141,8 @@ public void startSubscriptions() {
}
}, authConfig::renewCredentials);
}
} catch (HttpClientErrorException | HttpServerErrorException e) {
HttpStatus statusCode = e.getStatusCode();
publishErrorTypeMetricCounter(statusCode.getReasonPhrase(), this.errorTypeMetricCounterMap);
log.error(NOISY, "Failed to initialize subscriptions with status code {}: {}",
statusCode, e.getMessage());
throw new RuntimeException("Failed to initialize subscriptions: " + e.getMessage(), e);
} catch (Exception e) {
// FORBIDDEN throws SecurityException in RetryHandler
if (e instanceof SecurityException) {
publishErrorTypeMetricCounter(HttpStatus.FORBIDDEN.getReasonPhrase(), this.errorTypeMetricCounterMap);
}
publishErrorTypeMetricCounter(e, this.errorTypeMetricCounterMap);
log.error(NOISY, "Failed to initialize subscriptions", e);
throw new RuntimeException("Failed to initialize subscriptions: " + e.getMessage(), e);
}
Expand Down Expand Up @@ -214,16 +204,8 @@ public AuditLogsResponse searchAuditLogs(final String contentType,
authConfig::renewCredentials,
searchRequestsFailedCounter
);
} catch (HttpClientErrorException | HttpServerErrorException e) {
HttpStatus statusCode = e.getStatusCode();
publishErrorTypeMetricCounter(statusCode.getReasonPhrase(), this.errorTypeMetricCounterMap);
log.error(NOISY, "Error while fetching audit logs for content type {}", contentType, e);
throw new RuntimeException("Failed to fetch audit logs", e);
} catch (Exception e) {
// FORBIDDEN throws SecurityException in RetryHandler
if (e instanceof SecurityException) {
publishErrorTypeMetricCounter(HttpStatus.FORBIDDEN.getReasonPhrase(), this.errorTypeMetricCounterMap);
}
publishErrorTypeMetricCounter(e, this.errorTypeMetricCounterMap);
log.error(NOISY, "Error while fetching audit logs for content type {}", contentType, e);
throw new RuntimeException("Failed to fetch audit logs", e);
}
Expand Down Expand Up @@ -265,16 +247,8 @@ public String getAuditLog(String contentUri) {
}, authConfig::renewCredentials, auditLogRequestsFailedCounter);
auditLogRequestsSuccessCounter.increment();
return response;
} catch (HttpClientErrorException | HttpServerErrorException e) {
HttpStatus statusCode = e.getStatusCode();
publishErrorTypeMetricCounter(statusCode.getReasonPhrase(), this.errorTypeMetricCounterMap);
log.error(NOISY, "Error while fetching audit log content from URI: {}", contentUri, e);
throw new RuntimeException("Failed to fetch audit log", e);
} catch (Exception e) {
// FORBIDDEN throws SecurityException in RetryHandler
if (e instanceof SecurityException) {
publishErrorTypeMetricCounter(HttpStatus.FORBIDDEN.getReasonPhrase(), this.errorTypeMetricCounterMap);
}
publishErrorTypeMetricCounter(e, this.errorTypeMetricCounterMap);
log.error(NOISY, "Error while fetching audit log content from URI: {}", contentUri, e);
throw new RuntimeException("Failed to fetch audit log", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.never;

import static org.opensearch.dataprepper.plugins.source.source_crawler.utils.MetricsHelper.REQUEST_ERRORS;

@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
class Office365CrawlerClientTest {
Expand Down Expand Up @@ -154,7 +156,7 @@ void testExecutePartitionWithJsonProcessingError() throws Exception {

// Mock the total failures counter
Counter mockRequestErrorsCounter = mock(Counter.class);
when(pluginMetrics.counter("requestErrors")).thenReturn(mockRequestErrorsCounter);
when(pluginMetrics.counter(REQUEST_ERRORS)).thenReturn(mockRequestErrorsCounter);

AuditLogsResponse response = new AuditLogsResponse(
Arrays.asList(Map.of(
Expand Down Expand Up @@ -318,7 +320,7 @@ void testMissingWorkloadField() throws Exception {
void testExecutePartitionWithSearchAuditLogsError() throws Exception {
// Mock the total failures counter before creating the client
Counter mockRequestErrorsCounter = mock(Counter.class);
when(pluginMetrics.counter("requestErrors")).thenReturn(mockRequestErrorsCounter);
when(pluginMetrics.counter(REQUEST_ERRORS)).thenReturn(mockRequestErrorsCounter);

Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, pluginMetrics);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@
import io.micrometer.core.instrument.Counter;
import org.springframework.http.HttpStatus;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.springframework.web.client.HttpClientErrorException;
import org.springframework.web.client.HttpServerErrorException;

import java.util.Map;
import java.util.Optional;
import java.util.HashMap;
/**
* The MetricsHelper class.
Expand All @@ -27,7 +30,7 @@ public class MetricsHelper {
private static final String RESOURCE_NOT_FOUND = "resourceNotFound";

// other errors in crawlerClient
public static final String REQEUEST_ERRORS = "requestErrors";
public static final String REQUEST_ERRORS = "requestErrors";

/**
* Get the metric counter map for specific errorType
Expand All @@ -54,12 +57,27 @@ public static Map<String, Counter> getErrorTypeMetricCounterMap(PluginMetrics pl
* TOO_MANY_REQUESTS = requestThrottled
* NOT_FOUND = resourceNotFound
*
* @param errorType - the httpStatusCode string represenation
* @param ex - exception from RestClient
* @param errorTypeMetricCounterMap - the map of errorType to metric counter
*/
public static void publishErrorTypeMetricCounter(String errorType, Map<String, Counter> errorTypeMetricCounterMap) {
if (errorTypeMetricCounterMap != null && errorTypeMetricCounterMap.containsKey(errorType)) {
errorTypeMetricCounterMap.get(errorType).increment();
public static void publishErrorTypeMetricCounter(Exception ex, Map<String, Counter> errorTypeMetricCounterMap) {
Optional<String> statusCode = Optional.empty();
if (ex instanceof HttpClientErrorException) {
HttpClientErrorException httpE = (HttpClientErrorException) ex;
statusCode = Optional.ofNullable(httpE.getStatusCode().getReasonPhrase());
} else if (ex instanceof HttpServerErrorException) {
HttpServerErrorException httpE = (HttpServerErrorException) ex;
statusCode = Optional.ofNullable(httpE.getStatusCode().getReasonPhrase());
} else if (ex instanceof SecurityException) { // FORBIDDEN throws SecurityException in RetryHandler
statusCode = Optional.ofNullable(HttpStatus.FORBIDDEN.getReasonPhrase());
} // ignore for others

if (statusCode.isPresent()) {
String errorType = statusCode.get();
if (errorTypeMetricCounterMap != null && errorTypeMetricCounterMap.containsKey(errorType)) {
errorTypeMetricCounterMap.get(errorType).increment();
}
}

}
}
Loading