-
Notifications
You must be signed in to change notification settings - Fork 325
Implement dimensional time slice crawler #6011
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,11 +9,11 @@ | |
|
|
||
| package org.opensearch.dataprepper.plugins.source.microsoft_office365; | ||
|
|
||
| import com.google.common.annotations.VisibleForTesting; | ||
| import com.fasterxml.jackson.core.JsonProcessingException; | ||
| import com.fasterxml.jackson.core.type.TypeReference; | ||
| import com.fasterxml.jackson.databind.JsonNode; | ||
| import com.fasterxml.jackson.databind.ObjectMapper; | ||
| import com.fasterxml.jackson.core.JsonProcessingException; | ||
| import com.google.common.annotations.VisibleForTesting; | ||
| import io.micrometer.core.instrument.Counter; | ||
| import io.micrometer.core.instrument.Timer; | ||
| import lombok.extern.slf4j.Slf4j; | ||
|
|
@@ -24,25 +24,22 @@ | |
| import org.opensearch.dataprepper.model.event.JacksonEvent; | ||
| import org.opensearch.dataprepper.metrics.PluginMetrics; | ||
| import org.opensearch.dataprepper.model.record.Record; | ||
| import org.opensearch.dataprepper.plugins.source.microsoft_office365.exception.Office365Exception; | ||
| import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerClient; | ||
| import org.opensearch.dataprepper.plugins.source.source_crawler.base.PluginExecutorServiceProvider; | ||
| import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.PaginationCrawlerWorkerProgressState; | ||
| import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.DimensionalTimeSliceWorkerProgressState; | ||
| import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo; | ||
| import org.opensearch.dataprepper.plugins.source.microsoft_office365.exception.Office365Exception; | ||
| import org.springframework.web.client.ResourceAccessException; | ||
| import org.springframework.web.client.HttpClientErrorException; | ||
| import org.opensearch.dataprepper.plugins.source.microsoft_office365.service.Office365Service; | ||
| import org.opensearch.dataprepper.plugins.source.microsoft_office365.models.AuditLogsResponse; | ||
|
|
||
| import javax.inject.Named; | ||
| import java.time.Duration; | ||
| import java.time.Instant; | ||
| import java.util.Iterator; | ||
|
|
||
| import java.util.List; | ||
| import java.util.ArrayList; | ||
| import java.util.Map; | ||
| import java.util.Objects; | ||
| import java.util.Iterator; | ||
| import java.util.concurrent.TimeoutException; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY; | ||
|
|
||
|
|
@@ -52,21 +49,18 @@ | |
| */ | ||
| @Slf4j | ||
| @Named | ||
| public class Office365CrawlerClient implements CrawlerClient<PaginationCrawlerWorkerProgressState> { | ||
| public class Office365CrawlerClient implements CrawlerClient<DimensionalTimeSliceWorkerProgressState> { | ||
| private static final String BUFFER_WRITE_LATENCY = "bufferWriteLatency"; | ||
| private static final String BUFFER_WRITE_ATTEMPTS = "bufferWriteAttempts"; | ||
| private static final String BUFFER_WRITE_SUCCESS = "bufferWriteSuccess"; | ||
| private static final String BUFFER_WRITE_RETRY_SUCCESS = "bufferWriteRetrySuccess"; | ||
| private static final String BUFFER_WRITE_RETRY_ATTEMPTS = "bufferWriteRetryAttempts"; | ||
| private static final String BUFFER_WRITE_FAILURES = "bufferWriteFailures"; | ||
| private static final String WORKER_STATE_UPDATES = "workerStateUpdates"; | ||
|
|
||
| private static final String CONTENT_TYPE = "contentType"; | ||
| private static final int BUFFER_TIMEOUT_IN_SECONDS = 10; | ||
| private static final String CONTENT_ID = "contentId"; | ||
| private static final String CONTENT_URI = "contentUri"; | ||
|
|
||
| private final Office365Service service; | ||
| private final Office365Iterator office365Iterator; | ||
| private final ExecutorService executorService; | ||
| private final Office365SourceConfig configuration; | ||
| private final Timer bufferWriteLatencyTimer; | ||
| private final Counter bufferWriteAttemptsCounter; | ||
|
|
@@ -77,13 +71,9 @@ public class Office365CrawlerClient implements CrawlerClient<PaginationCrawlerWo | |
| private ObjectMapper objectMapper; | ||
|
|
||
| public Office365CrawlerClient(final Office365Service service, | ||
| final Office365Iterator office365Iterator, | ||
| final PluginExecutorServiceProvider executorServiceProvider, | ||
| final Office365SourceConfig sourceConfig, | ||
| final PluginMetrics pluginMetrics) { | ||
| this.service = service; | ||
| this.office365Iterator = office365Iterator; | ||
| this.executorService = executorServiceProvider.get(); | ||
| this.configuration = sourceConfig; | ||
| this.objectMapper = new ObjectMapper(); | ||
|
|
||
|
|
@@ -101,125 +91,116 @@ void injectObjectMapper(final ObjectMapper objectMapper) { | |
| this.objectMapper = objectMapper; | ||
| } | ||
|
|
||
|
|
||
| @Override | ||
| public Iterator<ItemInfo> listItems(final Instant lastPollTime) { | ||
| log.info("Starting to list Office 365 audit logs from {}", lastPollTime); | ||
|
|
||
| // TODO: Subscription management should be moved to a dedicated class in the future | ||
| // Currently, we initialize subscriptions in the leader partition to ensure that we're always subscribed | ||
| // to the required content type to ensure there hasn't been a subscription change | ||
| service.initializeSubscriptions(); | ||
| office365Iterator.initialize(lastPollTime); | ||
| return office365Iterator; | ||
| return null; | ||
| } | ||
|
|
||
| @Override | ||
| public void executePartition(final PaginationCrawlerWorkerProgressState state, | ||
| public void executePartition(final DimensionalTimeSliceWorkerProgressState state, | ||
| final Buffer<Record<Event>> buffer, | ||
| final AcknowledgementSet acknowledgementSet) { | ||
| // Process a batch of audit log IDs and convert them to records | ||
| // If any record fails to process, the entire batch will be retried | ||
| log.info("Starting to execute partition with {} log(s)", state.getItemIds().size()); | ||
| List<String> itemIds = state.getItemIds(); | ||
| final Instant startTime = state.getStartTime(); | ||
| final Instant endTime = state.getEndTime(); | ||
| final String logType = state.getDimensionType(); | ||
|
|
||
| // Process each audit log ID in the batch | ||
| List<Record<Event>> records = itemIds.stream() | ||
| .map(id -> { | ||
| try { | ||
| return processAuditLog(id); | ||
| } catch (Office365Exception e) { | ||
| log.error(NOISY, "{} error processing audit log: {}", | ||
| e.isRetryable() ? "Retryable" : "Non-retryable", id, e); | ||
| if (e.isRetryable()) { | ||
| throw new RuntimeException("Retryable error processing audit log: " + id, e); | ||
| } else { | ||
| // TODO: When pipeline DLQ is ready, add this record to DLQ instead of dropping the record | ||
| log.error(NOISY, "Non-retryable error - record will be dropped. Error processing audit log: {}", id, e); | ||
| return null; | ||
| try { | ||
| String nextPageUri = null; | ||
| List<Record<Event>> records = new ArrayList<>(); | ||
|
|
||
| do { | ||
| AuditLogsResponse response = | ||
| service.searchAuditLogs(logType, startTime, endTime, nextPageUri); | ||
|
|
||
| if (response.getItems() != null && !response.getItems().isEmpty()) { | ||
| for (Map<String, Object> metadata : response.getItems()) { | ||
| String logId = (String) metadata.get(CONTENT_ID); | ||
| try { | ||
| Record<Event> record = processAuditLog(metadata); | ||
| if (record != null) { | ||
| records.add(record); | ||
| } | ||
| } catch (Office365Exception e) { | ||
|
|
||
| log.error(NOISY, "{} error processing audit log: {}", | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Adding
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. logType is already included in the outer catch block's error message, which covers the entire partition, so adding it to individual error logs would be redundant. |
||
| e.isRetryable() ? "Retryable" : "Non-retryable", logId, e); | ||
| if (e.isRetryable()) { | ||
| throw new RuntimeException("Retryable error processing audit log: " + logId, e); | ||
| } else { | ||
| // TODO: When pipeline DLQ is ready, add this record to DLQ instead of dropping the record | ||
| log.error(NOISY, "Non-retryable error - record will be dropped. Error processing audit log: {}", logId, e); | ||
| } | ||
| } catch (Exception e) { | ||
| // Unexpected errors are treated as retryable to be safe | ||
| log.error(NOISY, "Unexpected error processing audit log: {}", logId, e); | ||
| throw new RuntimeException("Unexpected error processing audit log: " + logId, e); | ||
| } | ||
| } catch (Exception e) { | ||
| // Unexpected errors are treated as retryable to be safe | ||
| log.error(NOISY, "Unexpected error processing audit log: {}", id, e); | ||
| throw new RuntimeException("Unexpected error processing audit log: " + id, e); | ||
| } | ||
| }) | ||
| .filter(Objects::nonNull) | ||
| .collect(Collectors.toList()); | ||
|
|
||
| bufferWriteLatencyTimer.record(() -> { | ||
| try { | ||
| writeRecordsWithRetry(records, buffer, acknowledgementSet, state); | ||
| } catch (Exception e) { | ||
| bufferWriteFailuresCounter.increment(); | ||
| throw e; | ||
| } | ||
| }); | ||
| } | ||
| } | ||
|
|
||
| private Record<Event> processAuditLog(String id) { | ||
| try { | ||
| String auditLog = service.getAuditLog(id); | ||
| nextPageUri = response.getNextPageUri(); | ||
| } while (nextPageUri != null); | ||
|
|
||
| // Handle HTTP errors in service layer | ||
| if (auditLog == null) { | ||
| throw new Office365Exception("Received null audit log for ID: " + id, false); | ||
| } | ||
| bufferWriteLatencyTimer.record(() -> { | ||
| try { | ||
| writeRecordsWithRetry(records, buffer, acknowledgementSet); | ||
| } catch (Exception e) { | ||
| bufferWriteFailuresCounter.increment(); | ||
| throw e; | ||
| } | ||
| }); | ||
|
|
||
| try { | ||
| JsonNode jsonNode = objectMapper.readTree(auditLog); | ||
| Map<String, Object> data; | ||
| } catch (Exception e) { | ||
| log.error(NOISY, "Failed to process partition for log type {} from {} to {}", | ||
| logType, startTime, endTime, e); | ||
| throw e; | ||
| } | ||
| } | ||
|
|
||
| // Office 365 API sometimes returns an array with a single item | ||
| // and sometimes returns a single object directly | ||
| if (jsonNode.isArray() && !jsonNode.isEmpty()) { | ||
| data = objectMapper.convertValue(jsonNode.get(0), new TypeReference<Map<String, Object>>() {}); | ||
| } else { | ||
| data = objectMapper.readValue(auditLog, new TypeReference<Map<String, Object>>() {}); | ||
| } | ||
| private Record<Event> processAuditLog(Map<String, Object> metadata) throws Office365Exception { | ||
| String contentUri = (String) metadata.get(CONTENT_URI); | ||
| if (contentUri == null) { | ||
| throw new Office365Exception("Missing contentUri in metadata", false); | ||
| } | ||
|
|
||
| // "Workload" is an Office 365 specific field that indicates the source of the audit log | ||
| String contentType = (String) data.get("Workload"); | ||
| if (contentType == null) { | ||
| throw new Office365Exception("Missing Workload field in audit log: " + id, false); | ||
| } | ||
| String logContent = service.getAuditLog(contentUri); | ||
| if (logContent == null) { | ||
| throw new Office365Exception("Received null log content for URI: " + contentUri, false); | ||
| } | ||
| String logId = (String) metadata.get(CONTENT_ID); | ||
|
|
||
| Event event = JacksonEvent.builder() | ||
| .withEventType(EventType.LOG.toString()) | ||
| .withData(data) | ||
| .build(); | ||
| event.getMetadata().setAttribute(CONTENT_TYPE, contentType); | ||
| return new Record<>(event); | ||
| } catch (JsonProcessingException e) { | ||
| // JSON parsing errors are non-retryable as they indicate malformed data | ||
| throw new Office365Exception("Failed to parse audit log: " + id, e, false); | ||
| try { | ||
| JsonNode jsonNode = objectMapper.readTree(logContent); | ||
| Map<String, Object> data; | ||
|
|
||
| // Office 365 API sometimes returns an array with a single item | ||
| // and sometimes returns a single object directly | ||
| if (jsonNode.isArray() && !jsonNode.isEmpty()) { | ||
| data = objectMapper.convertValue(jsonNode.get(0), new TypeReference<Map<String, Object>>() {}); | ||
| } else { | ||
| data = objectMapper.readValue(logContent, new TypeReference<Map<String, Object>>() {}); | ||
| } | ||
| } catch (HttpClientErrorException e) { | ||
| switch (e.getStatusCode()) { | ||
| case UNAUTHORIZED: | ||
| case FORBIDDEN: | ||
| // Auth errors might be temporary due to token expiration | ||
| throw new Office365Exception("Authentication failed while fetching audit log: " + id, e, true); | ||
| case NOT_FOUND: | ||
| // Log doesn't exist - non-retryable | ||
| throw new Office365Exception("Audit log not found: " + id, e, false); | ||
| case TOO_MANY_REQUESTS: | ||
| // Rate limiting - retryable | ||
| throw new Office365Exception("Rate limited while fetching audit log: " + id, e, true); | ||
| default: | ||
| // Other client errors are non-retryable | ||
| throw new Office365Exception("Client error while fetching audit log: " + id, e, false); | ||
|
|
||
| String contentType = (String) data.get("Workload"); | ||
| if (contentType == null) { | ||
| throw new Office365Exception("Missing Workload field in audit log: " + logId, false); | ||
| } | ||
| } catch (ResourceAccessException e) { | ||
| // Network/connection issues are retryable | ||
| throw new Office365Exception("Network error while fetching audit log: " + id, e, true); | ||
|
|
||
| Event event = JacksonEvent.builder() | ||
| .withEventType(EventType.LOG.toString()) | ||
| .withData(data) | ||
| .build(); | ||
| event.getMetadata().setAttribute("contentType", contentType); | ||
| return new Record<>(event); | ||
| } catch (JsonProcessingException e) { | ||
| // JSON parsing errors are non-retryable as they indicate malformed data | ||
| throw new Office365Exception("Failed to parse audit log: " + logId, e, false); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will this cause infinite loop of processing?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is marked as non-retryable and swallowed / sent to pipeline DLQ (when available). |
||
| } | ||
| } | ||
|
|
||
| private void writeRecordsWithRetry(final List<Record<Event>> records, | ||
| final Buffer<Record<Event>> buffer, | ||
| final AcknowledgementSet acknowledgementSet, | ||
| final PaginationCrawlerWorkerProgressState state) { | ||
| final AcknowledgementSet acknowledgementSet) { | ||
| bufferWriteAttemptsCounter.increment(); | ||
| int retryCount = 0; | ||
| int currentBackoff = 1000; // Start with 1 second | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where are these steps taking place now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't add this initialization before, as subscriptions were likely already set up for this tenant, allowing log fetching without explicit initialization. I'll now add it to Office365Source's start() method, which is a more appropriate place since it's a one-time setup that should happen when the plugin starts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, we no longer need the iterator pattern since we're now processing logs in time-based chunks rather than page by page.