Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,18 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import lombok.extern.slf4j.Slf4j;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventType;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.source_crawler.exception.SaaSCrawlerException;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerClient;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.DimensionalTimeSliceWorkerProgressState;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
import org.opensearch.dataprepper.plugins.source.source_crawler.metrics.VendorAPIMetricsRecorder;
import org.opensearch.dataprepper.plugins.source.microsoft_office365.service.Office365Service;
import org.opensearch.dataprepper.plugins.source.microsoft_office365.models.AuditLogsResponse;

Expand All @@ -42,7 +40,6 @@
import java.util.concurrent.TimeoutException;

import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;
import static org.opensearch.dataprepper.plugins.source.source_crawler.utils.MetricsHelper.REQUEST_ERRORS;

/**
* Implementation of CrawlerClient for Office 365 audit logs.
Expand All @@ -52,49 +49,22 @@
@Named
public class Office365CrawlerClient implements CrawlerClient<DimensionalTimeSliceWorkerProgressState> {

public static final String NON_RETRYABLE_ERRORS = "nonRetryableErrors";
public static final String RETRYABLE_ERRORS = "retryableErrors";

private static final String BUFFER_WRITE_LATENCY = "bufferWriteLatency";
private static final String BUFFER_WRITE_ATTEMPTS = "bufferWriteAttempts";
private static final String BUFFER_WRITE_SUCCESS = "bufferWriteSuccess";
private static final String BUFFER_WRITE_RETRY_SUCCESS = "bufferWriteRetrySuccess";
private static final String BUFFER_WRITE_RETRY_ATTEMPTS = "bufferWriteRetryAttempts";
private static final String BUFFER_WRITE_FAILURES = "bufferWriteFailures";
private static final int BUFFER_TIMEOUT_IN_SECONDS = 10;
private static final String CONTENT_ID = "contentId";
private static final String CONTENT_URI = "contentUri";

private final Office365Service service;
private final Office365SourceConfig configuration;
private final Timer bufferWriteLatencyTimer;
private final Counter bufferWriteAttemptsCounter;
private final Counter bufferWriteSuccessCounter;
private final Counter bufferWriteRetrySuccessCounter;
private final Counter bufferWriteRetryAttemptsCounter;
private final Counter bufferWriteFailuresCounter;
private final Counter requestErrorsCounter;
private final Counter nonRetryableErrorsCounter;
private final Counter retryableErrorsCounter;
private final VendorAPIMetricsRecorder metricsRecorder;
private ObjectMapper objectMapper;

public Office365CrawlerClient(final Office365Service service,
final Office365SourceConfig sourceConfig,
final PluginMetrics pluginMetrics) {
final VendorAPIMetricsRecorder metricsRecorder) {
this.service = service;
this.configuration = sourceConfig;
this.metricsRecorder = metricsRecorder;
this.objectMapper = new ObjectMapper();

// Initialize metrics
this.bufferWriteLatencyTimer = pluginMetrics.timer(BUFFER_WRITE_LATENCY);
this.bufferWriteAttemptsCounter = pluginMetrics.counter(BUFFER_WRITE_ATTEMPTS);
this.bufferWriteSuccessCounter = pluginMetrics.counter(BUFFER_WRITE_SUCCESS);
this.bufferWriteRetrySuccessCounter = pluginMetrics.counter(BUFFER_WRITE_RETRY_SUCCESS);
this.bufferWriteRetryAttemptsCounter = pluginMetrics.counter(BUFFER_WRITE_RETRY_ATTEMPTS);
this.bufferWriteFailuresCounter = pluginMetrics.counter(BUFFER_WRITE_FAILURES);
this.requestErrorsCounter = pluginMetrics.counter(REQUEST_ERRORS);
this.nonRetryableErrorsCounter = pluginMetrics.counter(NON_RETRYABLE_ERRORS);
this.retryableErrorsCounter = pluginMetrics.counter(RETRYABLE_ERRORS);
}

@VisibleForTesting
Expand Down Expand Up @@ -141,13 +111,8 @@ public void executePartition(final DimensionalTimeSliceWorkerProgressState state
}

// Write Records to the buffer after processing a page of data
bufferWriteLatencyTimer.record(() -> {
try {
writeRecordsWithRetry(records, buffer, acknowledgementSet);
} catch (Exception e) {
bufferWriteFailuresCounter.increment();
throw e;
}
metricsRecorder.recordBufferWriteLatency(() -> {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we don't need to wrap this with the try-catch block for error right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct! We don't need the try-catch wrapper for two reasons: 1) writeRecordsWithRetry() already calls metricsRecorder.recordBufferWriteFailure() internally when exceptions occur, so wrapping it would double-count buffer write failures, and 2) it's already inside the outer try-catch block in executePartition() which properly handles all exceptions with metrics recording and retryable error categorization.

writeRecordsWithRetry(records, buffer, acknowledgementSet);
});

nextPageUri = response.getNextPageUri();
Expand All @@ -159,18 +124,18 @@ public void executePartition(final DimensionalTimeSliceWorkerProgressState state
} catch (Exception e) {
log.error(NOISY, "Failed to process partition for log type {} from {} to {}",
logType, startTime, endTime, e);
requestErrorsCounter.increment();
metricsRecorder.recordError(e);
if (e instanceof SaaSCrawlerException) {
SaaSCrawlerException saasException = (SaaSCrawlerException) e;
if (saasException.isRetryable()) {
retryableErrorsCounter.increment();
metricsRecorder.recordRetryableError();
} else {
nonRetryableErrorsCounter.increment();
metricsRecorder.recordNonRetryableError();
}
throw e;
}
// any other exceptions = non-retryable
nonRetryableErrorsCounter.increment();
metricsRecorder.recordNonRetryableError();
throw new SaaSCrawlerException("Failed to process partition", e, false);
}
}
Expand Down Expand Up @@ -219,7 +184,7 @@ private Record<Event> processAuditLog(Map<String, Object> metadata) throws SaaSC
private void writeRecordsWithRetry(final List<Record<Event>> records,
final Buffer<Record<Event>> buffer,
final AcknowledgementSet acknowledgementSet) {
bufferWriteAttemptsCounter.increment();
metricsRecorder.recordBufferWriteAttempt();
int retryCount = 0;
int currentBackoff = 1000; // Start with 1 second
final int maxBackoff = 30000; // Max 30 seconds
Expand All @@ -235,21 +200,21 @@ private void writeRecordsWithRetry(final List<Record<Event>> records,
}

if (retryCount > 0) {
bufferWriteRetrySuccessCounter.increment();
metricsRecorder.recordBufferWriteRetrySuccess();
} else {
bufferWriteSuccessCounter.increment();
metricsRecorder.recordBufferWriteSuccess();
}
return;

} catch (TimeoutException e) {
retryCount++;
if (retryCount >= maxRetries) {
bufferWriteFailuresCounter.increment();
metricsRecorder.recordBufferWriteFailure();
// allows all writeToBuffer exceptions to be retryable to keep current behaviour of immediate retry by WorkerScheduler
throw new SaaSCrawlerException("Failed to write to buffer after " + maxRetries + " attempts", e, true);
}

bufferWriteRetryAttemptsCounter.increment();
metricsRecorder.recordBufferWriteRetryAttempt();
currentBackoff = Math.min((int)(currentBackoff * 2.0), maxBackoff);
log.info("Buffer full, backing off for {} ms before retry", currentBackoff);

Expand All @@ -260,7 +225,7 @@ private void writeRecordsWithRetry(final List<Record<Event>> records,
throw new SaaSCrawlerException("Buffer write retry interrupted", ie, true);
}
} catch (Exception e) {
bufferWriteFailuresCounter.increment();
metricsRecorder.recordBufferWriteFailure();
throw new SaaSCrawlerException("Error writing to buffer", e, true);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
package org.opensearch.dataprepper.plugins.source.microsoft_office365.configuration;

import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.plugins.source.microsoft_office365.Office365CrawlerClient;
import org.opensearch.dataprepper.plugins.source.microsoft_office365.Office365RestClient;
import org.opensearch.dataprepper.plugins.source.microsoft_office365.Office365SourceConfig;
import org.opensearch.dataprepper.plugins.source.microsoft_office365.auth.Office365AuthenticationInterface;
import org.opensearch.dataprepper.plugins.source.microsoft_office365.auth.Office365AuthenticationProvider;
import org.opensearch.dataprepper.plugins.source.microsoft_office365.service.Office365Service;
import org.opensearch.dataprepper.plugins.source.source_crawler.metrics.VendorAPIMetricsRecorder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand Down Expand Up @@ -72,4 +74,20 @@ public Office365RestClient office365RestClient(
VendorAPIMetricsRecorder vendorAPIMetricsRecorder) {
return new Office365RestClient(authConfig, vendorAPIMetricsRecorder);
}

/**
* Creates Office365CrawlerClient with unified metrics recorder.
*
* @param service The Office 365 service
* @param sourceConfig The Office 365 source configuration
* @param metricsRecorder The unified metrics recorder
* @return Configured Office365CrawlerClient
*/
@Bean
public Office365CrawlerClient office365CrawlerClient(
Office365Service service,
Office365SourceConfig sourceConfig,
VendorAPIMetricsRecorder metricsRecorder) {
return new Office365CrawlerClient(service, sourceConfig, metricsRecorder);
}
}
Loading
Loading