Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.microsoft_office365.exception.Office365Exception;
import org.opensearch.dataprepper.plugins.source.source_crawler.exception.SaaSCrawlerException;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerClient;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.DimensionalTimeSliceWorkerProgressState;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
Expand All @@ -51,14 +51,16 @@
@Slf4j
@Named
public class Office365CrawlerClient implements CrawlerClient<DimensionalTimeSliceWorkerProgressState> {

public static final String NON_RETRYABLE_ERRORS = "nonRetryableErrors";
public static final String RETRYABLE_ERRORS = "retryableErrors";

private static final String BUFFER_WRITE_LATENCY = "bufferWriteLatency";
private static final String BUFFER_WRITE_ATTEMPTS = "bufferWriteAttempts";
private static final String BUFFER_WRITE_SUCCESS = "bufferWriteSuccess";
private static final String BUFFER_WRITE_RETRY_SUCCESS = "bufferWriteRetrySuccess";
private static final String BUFFER_WRITE_RETRY_ATTEMPTS = "bufferWriteRetryAttempts";
private static final String BUFFER_WRITE_FAILURES = "bufferWriteFailures";
private static final String NON_RETRYABLE_ERRORS = "nonRetryableErrors";
private static final String RETRYABLE_ERRORS = "retryableErrors";
private static final int BUFFER_TIMEOUT_IN_SECONDS = 10;
private static final String CONTENT_ID = "contentId";
private static final String CONTENT_URI = "contentUri";
Expand Down Expand Up @@ -129,23 +131,11 @@ public void executePartition(final DimensionalTimeSliceWorkerProgressState state
if (record != null) {
records.add(record);
}
} catch (Office365Exception e) {

} catch (SaaSCrawlerException e) {
boolean isRetryable = e.isRetryable();
log.error(NOISY, "{} error processing audit log: {}",
e.isRetryable() ? "Retryable" : "Non-retryable", logId, e);
if (e.isRetryable()) {
retryableErrorsCounter.increment();
throw new RuntimeException("Retryable error processing audit log: " + logId, e);
} else {
nonRetryableErrorsCounter.increment();
// TODO: When pipeline DLQ is ready, add this record to DLQ instead of dropping the record
log.error(NOISY, "Non-retryable error - record will be dropped. Error processing audit log: {}", logId, e);
}
} catch (Exception e) {
// Unexpected errors are treated as retryable to be safe
retryableErrorsCounter.increment();
log.error(NOISY, "Unexpected error processing audit log: {}", logId, e);
throw new RuntimeException("Unexpected error processing audit log: " + logId, e);
isRetryable ? "Retryable" : "Non-retryable", logId, e);
throw new SaaSCrawlerException("Error processing audit log: " + logId, e, isRetryable);
}
}
}
Expand All @@ -170,19 +160,30 @@ public void executePartition(final DimensionalTimeSliceWorkerProgressState state
log.error(NOISY, "Failed to process partition for log type {} from {} to {}",
logType, startTime, endTime, e);
requestErrorsCounter.increment();
throw e;
if (e instanceof SaaSCrawlerException) {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rebased from this PR: #6238

SaaSCrawlerException saasException = (SaaSCrawlerException) e;
if (saasException.isRetryable()) {
retryableErrorsCounter.increment();
} else {
nonRetryableErrorsCounter.increment();
}
throw e;
}
// any other exceptions = non-retryable
nonRetryableErrorsCounter.increment();
throw new SaaSCrawlerException("Failed to process partition", e, false);
}
}

private Record<Event> processAuditLog(Map<String, Object> metadata) throws Office365Exception {
private Record<Event> processAuditLog(Map<String, Object> metadata) throws SaaSCrawlerException {
String contentUri = (String) metadata.get(CONTENT_URI);
if (contentUri == null) {
throw new Office365Exception("Missing contentUri in metadata", false);
throw new SaaSCrawlerException("Missing contentUri in metadata", false);
}

String logContent = service.getAuditLog(contentUri);
if (logContent == null) {
throw new Office365Exception("Received null log content for URI: " + contentUri, false);
throw new SaaSCrawlerException("Received null log content for URI: " + contentUri, false);
}
String logId = (String) metadata.get(CONTENT_ID);

Expand All @@ -200,7 +201,7 @@ private Record<Event> processAuditLog(Map<String, Object> metadata) throws Offic

String contentType = (String) data.get("Workload");
if (contentType == null) {
throw new Office365Exception("Missing Workload field in audit log: " + logId, false);
throw new SaaSCrawlerException("Missing Workload field in audit log: " + logId, false);
}

Event event = JacksonEvent.builder()
Expand All @@ -211,7 +212,7 @@ private Record<Event> processAuditLog(Map<String, Object> metadata) throws Offic
return new Record<>(event);
} catch (JsonProcessingException e) {
// JSON parsing errors are non-retryable as they indicate malformed data
throw new Office365Exception("Failed to parse audit log: " + logId, e, false);
throw new SaaSCrawlerException("Failed to parse audit log: " + logId, e, false);
}
}

Expand Down Expand Up @@ -244,7 +245,8 @@ private void writeRecordsWithRetry(final List<Record<Event>> records,
retryCount++;
if (retryCount >= maxRetries) {
bufferWriteFailuresCounter.increment();
throw new RuntimeException("Failed to write to buffer after " + maxRetries + " attempts", e);
// allows all writeToBuffer exceptions to be retryable to keep current behaviour of immediate retry by WorkerScheduler
throw new SaaSCrawlerException("Failed to write to buffer after " + maxRetries + " attempts", e, true);
}

bufferWriteRetryAttemptsCounter.increment();
Expand All @@ -253,16 +255,13 @@ private void writeRecordsWithRetry(final List<Record<Event>> records,

try {
Thread.sleep(currentBackoff);
// TODO: Update worker partition state to prevent timeout
// Ideally, we want to call the saveWorkerPartitionState and extend the lease like so
// coordinator.saveProgressStateForPartition(leaderPartition, DEFAULT_EXTEND_LEASE_MINUTES);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("Buffer write retry interrupted", ie);
throw new SaaSCrawlerException("Buffer write retry interrupted", ie, true);
}
} catch (Exception e) {
bufferWriteFailuresCounter.increment();
throw new RuntimeException("Error writing to buffer", e);
throw new SaaSCrawlerException("Error writing to buffer", e, true);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import lombok.extern.slf4j.Slf4j;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.plugins.source.microsoft_office365.auth.Office365AuthenticationInterface;
import org.opensearch.dataprepper.plugins.source.microsoft_office365.exception.Office365Exception;
import org.opensearch.dataprepper.plugins.source.source_crawler.exception.SaaSCrawlerException;
import org.opensearch.dataprepper.plugins.source.microsoft_office365.models.AuditLogsResponse;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpEntity;
Expand Down Expand Up @@ -148,7 +148,7 @@ public void startSubscriptions() {
} catch (Exception e) {
publishErrorTypeMetricCounter(e, this.errorTypeMetricCounterMap);
log.error(NOISY, "Failed to initialize subscriptions", e);
throw new RuntimeException("Failed to initialize subscriptions: " + e.getMessage(), e);
throw new SaaSCrawlerException("Failed to initialize subscriptions: " + e.getMessage(), e, true);
}
}

Expand Down Expand Up @@ -212,7 +212,7 @@ public AuditLogsResponse searchAuditLogs(final String contentType,
} catch (Exception e) {
publishErrorTypeMetricCounter(e, this.errorTypeMetricCounterMap);
log.error(NOISY, "Error while fetching audit logs for content type {}", contentType, e);
throw new RuntimeException("Failed to fetch audit logs", e);
throw new SaaSCrawlerException("Failed to fetch audit logs", e, true);
}
});
}
Expand All @@ -226,7 +226,7 @@ public AuditLogsResponse searchAuditLogs(final String contentType,
*/
public String getAuditLog(String contentUri) {
if (!contentUri.startsWith(MANAGEMENT_API_BASE_URL)) {
throw new Office365Exception("ContentUri must be from Office365 Management API: " + contentUri, false);
throw new SaaSCrawlerException("ContentUri must be from Office365 Management API: " + contentUri, false);
}
auditLogsRequestedCounter.increment();
final HttpHeaders headers = new HttpHeaders();
Expand Down Expand Up @@ -256,7 +256,7 @@ public String getAuditLog(String contentUri) {
} catch (Exception e) {
publishErrorTypeMetricCounter(e, this.errorTypeMetricCounterMap);
log.error(NOISY, "Error while fetching audit log content from URI: {}", contentUri, e);
throw new RuntimeException("Failed to fetch audit log", e);
throw new SaaSCrawlerException("Failed to fetch audit log", e, true);
}
});
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.plugins.source.microsoft_office365.Office365RestClient;
import org.opensearch.dataprepper.plugins.source.microsoft_office365.Office365SourceConfig;
import org.opensearch.dataprepper.plugins.source.microsoft_office365.exception.Office365Exception;
import org.opensearch.dataprepper.plugins.source.source_crawler.exception.SaaSCrawlerException;
import org.opensearch.dataprepper.plugins.source.microsoft_office365.models.AuditLogsResponse;

import javax.inject.Named;
Expand Down Expand Up @@ -52,10 +52,10 @@ public AuditLogsResponse searchAuditLogs(final String logType,
final Instant endTime,
final String nextPageUri) {
if (startTime == null || endTime == null) {
throw new IllegalArgumentException("startTime and endTime must not be null");
throw new SaaSCrawlerException("startTime and endTime must not be null", false);
}
if (logType == null) {
throw new IllegalArgumentException("logType must not be null");
throw new SaaSCrawlerException("logType must not be null", false);
}
try {
// If pagination URI exists, use it directly
Expand All @@ -78,7 +78,7 @@ public AuditLogsResponse searchAuditLogs(final String logType,
return response;
} catch (Exception e) {
windowRetryCounter.increment();
throw new Office365Exception(
throw new SaaSCrawlerException(
String.format("Failed to fetch logs for time window %s to %s for log type %s.",
startTime, endTime, logType), e, true);
}
Expand Down
Loading
Loading