From 2f95fba24adbb089eae793363d9b5bd78fb047fa Mon Sep 17 00:00:00 2001 From: Alekhya Parisha Date: Mon, 25 Aug 2025 13:21:58 -0700 Subject: [PATCH] Implement dimensional time slice crawler Signed-off-by: Alekhya Parisha --- .../Office365CrawlerClient.java | 215 ++++++------ .../Office365Iterator.java | 121 ------- .../Office365RestClient.java | 34 +- .../microsoft_office365/Office365Source.java | 23 +- .../Office365SourceConfig.java | 6 +- .../configuration/Office365ItemInfo.java | 61 ---- .../service/Office365Service.java | 141 ++------ .../Office365CrawlerClientTest.java | 158 ++++++--- .../Office365ItemInfoTest.java | 98 ------ .../Office365IteratorTest.java | 116 ------- .../Office365RestClientTest.java | 10 +- .../Office365SourceTest.java | 19 +- .../service/Office365ServiceTest.java | 308 +++++++----------- .../base/DimensionalTimeSliceCrawler.java | 176 ++++++++++ ...mensionalTimeSliceLeaderProgressState.java | 23 ++ ...mensionalTimeSliceWorkerProgressState.java | 20 ++ .../base/DimensionalTimeSliceCrawlerTest.java | 169 ++++++++++ .../scheduler/WorkerSchedulerTest.java | 26 ++ ...ionalTimeSliceLeaderProgressStateTest.java | 48 +++ ...ionalTimeSliceWorkerProgressStateTest.java | 51 +++ 20 files changed, 913 insertions(+), 910 deletions(-) delete mode 100644 data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365Iterator.java delete mode 100644 data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/configuration/Office365ItemInfo.java delete mode 100644 data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365ItemInfoTest.java delete mode 100644 data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365IteratorTest.java create mode 100644 data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawler.java create mode 100644 data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceLeaderProgressState.java create mode 100644 data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceWorkerProgressState.java create mode 100644 data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawlerTest.java create mode 100644 data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceLeaderProgressStateTest.java create mode 100644 data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceWorkerProgressStateTest.java diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClient.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClient.java index 786f88ade4..3d928cda69 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClient.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClient.java @@ -9,11 +9,11 @@ package org.opensearch.dataprepper.plugins.source.microsoft_office365; -import com.google.common.annotations.VisibleForTesting; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.annotations.VisibleForTesting; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Timer; import lombok.extern.slf4j.Slf4j; @@ -24,25 +24,22 @@ import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.source.microsoft_office365.exception.Office365Exception; import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerClient; -import org.opensearch.dataprepper.plugins.source.source_crawler.base.PluginExecutorServiceProvider; -import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.PaginationCrawlerWorkerProgressState; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.DimensionalTimeSliceWorkerProgressState; import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo; -import org.opensearch.dataprepper.plugins.source.microsoft_office365.exception.Office365Exception; -import org.springframework.web.client.ResourceAccessException; -import org.springframework.web.client.HttpClientErrorException; import org.opensearch.dataprepper.plugins.source.microsoft_office365.service.Office365Service; +import org.opensearch.dataprepper.plugins.source.microsoft_office365.models.AuditLogsResponse; import javax.inject.Named; import java.time.Duration; import java.time.Instant; -import java.util.Iterator; + import java.util.List; +import java.util.ArrayList; import java.util.Map; -import java.util.Objects; +import java.util.Iterator; import java.util.concurrent.TimeoutException; -import java.util.concurrent.ExecutorService; -import java.util.stream.Collectors; import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY; @@ -52,21 +49,18 @@ */ @Slf4j @Named -public class Office365CrawlerClient implements CrawlerClient { +public class Office365CrawlerClient implements CrawlerClient { private static final String BUFFER_WRITE_LATENCY = "bufferWriteLatency"; private static final String BUFFER_WRITE_ATTEMPTS = "bufferWriteAttempts"; private static final String BUFFER_WRITE_SUCCESS = "bufferWriteSuccess"; private static final String BUFFER_WRITE_RETRY_SUCCESS = "bufferWriteRetrySuccess"; private static final String BUFFER_WRITE_RETRY_ATTEMPTS = "bufferWriteRetryAttempts"; private static final String BUFFER_WRITE_FAILURES = "bufferWriteFailures"; - private static final String WORKER_STATE_UPDATES = "workerStateUpdates"; - - private static final String CONTENT_TYPE = "contentType"; private static final int BUFFER_TIMEOUT_IN_SECONDS = 10; + private static final String CONTENT_ID = "contentId"; + private static final String CONTENT_URI = "contentUri"; private final Office365Service service; - private final Office365Iterator office365Iterator; - private final ExecutorService executorService; private final Office365SourceConfig configuration; private final Timer bufferWriteLatencyTimer; private final Counter bufferWriteAttemptsCounter; @@ -77,13 +71,9 @@ public class Office365CrawlerClient implements CrawlerClient listItems(final Instant lastPollTime) { - log.info("Starting to list Office 365 audit logs from {}", lastPollTime); - - // TODO: Subscription management should be moved to a dedicated class in the future - // Currently, we initialize subscriptions in the leader partition to ensure that we're always subscribed - // to the required content type to ensure there hasn't been a subscription change - service.initializeSubscriptions(); - office365Iterator.initialize(lastPollTime); - return office365Iterator; + return null; } @Override - public void executePartition(final PaginationCrawlerWorkerProgressState state, + public void executePartition(final DimensionalTimeSliceWorkerProgressState state, final Buffer> buffer, final AcknowledgementSet acknowledgementSet) { - // Process a batch of audit log IDs and convert them to records - // If any record fails to process, the entire batch will be retried - log.info("Starting to execute partition with {} log(s)", state.getItemIds().size()); - List itemIds = state.getItemIds(); + final Instant startTime = state.getStartTime(); + final Instant endTime = state.getEndTime(); + final String logType = state.getDimensionType(); - // Process each audit log ID in the batch - List> records = itemIds.stream() - .map(id -> { - try { - return processAuditLog(id); - } catch (Office365Exception e) { - log.error(NOISY, "{} error processing audit log: {}", - e.isRetryable() ? "Retryable" : "Non-retryable", id, e); - if (e.isRetryable()) { - throw new RuntimeException("Retryable error processing audit log: " + id, e); - } else { - // TODO: When pipeline DLQ is ready, add this record to DLQ instead of dropping the record - log.error(NOISY, "Non-retryable error - record will be dropped. Error processing audit log: {}", id, e); - return null; + try { + String nextPageUri = null; + List> records = new ArrayList<>(); + + do { + AuditLogsResponse response = + service.searchAuditLogs(logType, startTime, endTime, nextPageUri); + + if (response.getItems() != null && !response.getItems().isEmpty()) { + for (Map metadata : response.getItems()) { + String logId = (String) metadata.get(CONTENT_ID); + try { + Record record = processAuditLog(metadata); + if (record != null) { + records.add(record); + } + } catch (Office365Exception e) { + + log.error(NOISY, "{} error processing audit log: {}", + e.isRetryable() ? "Retryable" : "Non-retryable", logId, e); + if (e.isRetryable()) { + throw new RuntimeException("Retryable error processing audit log: " + logId, e); + } else { + // TODO: When pipeline DLQ is ready, add this record to DLQ instead of dropping the record + log.error(NOISY, "Non-retryable error - record will be dropped. Error processing audit log: {}", logId, e); + } + } catch (Exception e) { + // Unexpected errors are treated as retryable to be safe + log.error(NOISY, "Unexpected error processing audit log: {}", logId, e); + throw new RuntimeException("Unexpected error processing audit log: " + logId, e); } - } catch (Exception e) { - // Unexpected errors are treated as retryable to be safe - log.error(NOISY, "Unexpected error processing audit log: {}", id, e); - throw new RuntimeException("Unexpected error processing audit log: " + id, e); } - }) - .filter(Objects::nonNull) - .collect(Collectors.toList()); - - bufferWriteLatencyTimer.record(() -> { - try { - writeRecordsWithRetry(records, buffer, acknowledgementSet, state); - } catch (Exception e) { - bufferWriteFailuresCounter.increment(); - throw e; - } - }); - } + } - private Record processAuditLog(String id) { - try { - String auditLog = service.getAuditLog(id); + nextPageUri = response.getNextPageUri(); + } while (nextPageUri != null); - // Handle HTTP errors in service layer - if (auditLog == null) { - throw new Office365Exception("Received null audit log for ID: " + id, false); - } + bufferWriteLatencyTimer.record(() -> { + try { + writeRecordsWithRetry(records, buffer, acknowledgementSet); + } catch (Exception e) { + bufferWriteFailuresCounter.increment(); + throw e; + } + }); - try { - JsonNode jsonNode = objectMapper.readTree(auditLog); - Map data; + } catch (Exception e) { + log.error(NOISY, "Failed to process partition for log type {} from {} to {}", + logType, startTime, endTime, e); + throw e; + } + } - // Office 365 API sometimes returns an array with a single item - // and sometimes returns a single object directly - if (jsonNode.isArray() && !jsonNode.isEmpty()) { - data = objectMapper.convertValue(jsonNode.get(0), new TypeReference>() {}); - } else { - data = objectMapper.readValue(auditLog, new TypeReference>() {}); - } + private Record processAuditLog(Map metadata) throws Office365Exception { + String contentUri = (String) metadata.get(CONTENT_URI); + if (contentUri == null) { + throw new Office365Exception("Missing contentUri in metadata", false); + } - // "Workload" is an Office 365 specific field that indicates the source of the audit log - String contentType = (String) data.get("Workload"); - if (contentType == null) { - throw new Office365Exception("Missing Workload field in audit log: " + id, false); - } + String logContent = service.getAuditLog(contentUri); + if (logContent == null) { + throw new Office365Exception("Received null log content for URI: " + contentUri, false); + } + String logId = (String) metadata.get(CONTENT_ID); - Event event = JacksonEvent.builder() - .withEventType(EventType.LOG.toString()) - .withData(data) - .build(); - event.getMetadata().setAttribute(CONTENT_TYPE, contentType); - return new Record<>(event); - } catch (JsonProcessingException e) { - // JSON parsing errors are non-retryable as they indicate malformed data - throw new Office365Exception("Failed to parse audit log: " + id, e, false); + try { + JsonNode jsonNode = objectMapper.readTree(logContent); + Map data; + + // Office 365 API sometimes returns an array with a single item + // and sometimes returns a single object directly + if (jsonNode.isArray() && !jsonNode.isEmpty()) { + data = objectMapper.convertValue(jsonNode.get(0), new TypeReference>() {}); + } else { + data = objectMapper.readValue(logContent, new TypeReference>() {}); } - } catch (HttpClientErrorException e) { - switch (e.getStatusCode()) { - case UNAUTHORIZED: - case FORBIDDEN: - // Auth errors might be temporary due to token expiration - throw new Office365Exception("Authentication failed while fetching audit log: " + id, e, true); - case NOT_FOUND: - // Log doesn't exist - non-retryable - throw new Office365Exception("Audit log not found: " + id, e, false); - case TOO_MANY_REQUESTS: - // Rate limiting - retryable - throw new Office365Exception("Rate limited while fetching audit log: " + id, e, true); - default: - // Other client errors are non-retryable - throw new Office365Exception("Client error while fetching audit log: " + id, e, false); + + String contentType = (String) data.get("Workload"); + if (contentType == null) { + throw new Office365Exception("Missing Workload field in audit log: " + logId, false); } - } catch (ResourceAccessException e) { - // Network/connection issues are retryable - throw new Office365Exception("Network error while fetching audit log: " + id, e, true); + + Event event = JacksonEvent.builder() + .withEventType(EventType.LOG.toString()) + .withData(data) + .build(); + event.getMetadata().setAttribute("contentType", contentType); + return new Record<>(event); + } catch (JsonProcessingException e) { + // JSON parsing errors are non-retryable as they indicate malformed data + throw new Office365Exception("Failed to parse audit log: " + logId, e, false); } } private void writeRecordsWithRetry(final List> records, final Buffer> buffer, - final AcknowledgementSet acknowledgementSet, - final PaginationCrawlerWorkerProgressState state) { + final AcknowledgementSet acknowledgementSet) { bufferWriteAttemptsCounter.increment(); int retryCount = 0; int currentBackoff = 1000; // Start with 1 second diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365Iterator.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365Iterator.java deleted file mode 100644 index 7133f14bdf..0000000000 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365Iterator.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.dataprepper.plugins.source.microsoft_office365; - -import lombok.Setter; -import lombok.extern.slf4j.Slf4j; -import org.opensearch.dataprepper.plugins.source.source_crawler.base.PluginExecutorServiceProvider; -import org.opensearch.dataprepper.plugins.source.microsoft_office365.service.Office365Service; -import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo; -import org.springframework.util.CollectionUtils; - -import javax.inject.Named; -import java.time.Instant; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; - -/** - * Iterator implementation for Office 365 audit logs. - * Manages the asynchronous fetching and iteration of audit log entries. - */ -@Slf4j -@Named -public class Office365Iterator implements Iterator { - private static final int HAS_NEXT_TIMEOUT = 60; - - private final Office365Service service; - private final ExecutorService crawlerTaskExecutor; - - @Setter - private long crawlerQWaitTimeMillis = 2000; - private Queue itemInfoQueue; - private Instant lastPollTime; - private boolean firstTime = true; - private final List> futureList; - - public Office365Iterator(final Office365Service service, - final PluginExecutorServiceProvider executorServiceProvider) { - this.service = service; - this.crawlerTaskExecutor = executorServiceProvider.get(); - this.futureList = new ArrayList<>(); - } - - @Override - public boolean hasNext() { - if (firstTime) { - log.debug("Starting initial crawl for Office 365 audit logs"); - startCrawlerThreads(); - firstTime = false; - } - - int timeout = HAS_NEXT_TIMEOUT; - while (isCrawlerRunning() && isQueueEmpty() && timeout > 0) { - try { - log.trace("Waiting for crawler queue to be filled, timeout in {} seconds", timeout); - Thread.sleep(crawlerQWaitTimeMillis); - timeout--; - } catch (InterruptedException e) { - log.error("Thread interrupted while waiting for crawler queue", e); - Thread.currentThread().interrupt(); - return false; - } - } - - return !isQueueEmpty(); - } - - @Override - public ItemInfo next() { - if (hasNext()) { - return itemInfoQueue.remove(); - } - throw new NoSuchElementException("No more items available in the Office 365 audit log queue"); - } - - public void initialize(final Instant startTime) { - log.info("Initializing Office 365 iterator from timestamp: {}", startTime); - this.itemInfoQueue = new ConcurrentLinkedQueue<>(); - this.lastPollTime = startTime; - this.firstTime = true; - this.futureList.clear(); - } - - private boolean isCrawlerRunning() { - if (CollectionUtils.isEmpty(futureList)) { - return false; - } - return futureList.stream().anyMatch(future -> !future.isDone()); - } - - private boolean isQueueEmpty() { - return itemInfoQueue == null || itemInfoQueue.isEmpty(); - } - - void startCrawlerThreads() { - log.debug("Starting crawler thread for Office 365 audit logs with lastPollTime: {}", lastPollTime); - Future future = crawlerTaskExecutor.submit(() -> { - try { - service.getOffice365Entities(lastPollTime, itemInfoQueue); - log.debug("Completed crawler thread for Office 365 audit logs with lastPollTime: {}", lastPollTime); - return true; - } catch (Exception e) { - log.error("Error in crawler thread while fetching Office 365 audit logs", e); - return false; - } - }); - futureList.add(future); - } -} diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClient.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClient.java index 0446ec555b..bfcdcb0ac2 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClient.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClient.java @@ -14,6 +14,7 @@ import lombok.extern.slf4j.Slf4j; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.plugins.source.microsoft_office365.auth.Office365AuthenticationInterface; +import org.opensearch.dataprepper.plugins.source.microsoft_office365.exception.Office365Exception; import org.opensearch.dataprepper.plugins.source.microsoft_office365.models.AuditLogsResponse; import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.HttpEntity; @@ -142,9 +143,9 @@ public void startSubscriptions() { * @return AuditLogsResponse containing the list of audit log entries and the next page URI */ public AuditLogsResponse searchAuditLogs(final String contentType, - final Instant startTime, - final Instant endTime, - String pageUri) { + final Instant startTime, + final Instant endTime, + String pageUri) { final String GET_AUDIT_LOGS_URL = MANAGEMENT_API_BASE_URL + "%s/activity/feed/subscriptions/content?contentType=%s&startTime=%s&endTime=%s"; @@ -178,6 +179,7 @@ public AuditLogsResponse searchAuditLogs(final String contentType, if (nextPageUri != null) { log.debug("Next page URI found: {}", nextPageUri); } + return new AuditLogsResponse(response.getBody(), nextPageUri); }, authConfig::renewCredentials @@ -190,31 +192,26 @@ public AuditLogsResponse searchAuditLogs(final String contentType, }); } + /** - * Retrieves a specific audit log entry by its content ID. - * Implements retry with exponential backoff for recoverable errors. + * Retrieves the audit log content from a specific content URI. * - * @param contentId the ID of the audit log entry to retrieve - * @return the audit log entry as a string + * @param contentUri the URI of the audit log content + * @return the audit log content as a string */ - public String getAuditLog(final String contentId) { + public String getAuditLog(String contentUri) { + if (!contentUri.startsWith(MANAGEMENT_API_BASE_URL)) { + throw new Office365Exception("ContentUri must be from Office365 Management API: " + contentUri, false); + } auditLogsRequestedCounter.increment(); - - final String FETCH_AUDIT_LOG_URL = MANAGEMENT_API_BASE_URL + "%s/activity/feed/audit/%s"; - - final String url = String.format(FETCH_AUDIT_LOG_URL, - authConfig.getTenantId(), - contentId); - final HttpHeaders headers = new HttpHeaders(); return auditLogFetchLatencyTimer.record(() -> { try { String response = RetryHandler.executeWithRetry(() -> { headers.setBearerAuth(authConfig.getAccessToken()); - return restTemplate.exchange( - url, + contentUri, HttpMethod.GET, new HttpEntity<>(headers), String.class @@ -223,10 +220,11 @@ public String getAuditLog(final String contentId) { auditLogRequestsSuccessCounter.increment(); return response; } catch (Exception e) { - log.error(NOISY, "Error while fetching audit log with ID {}", contentId, e); + log.error(NOISY, "Error while fetching audit log content from URI: {}", contentUri, e); auditLogRequestsFailedCounter.increment(); throw new RuntimeException("Failed to fetch audit log", e); } }); } + } diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365Source.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365Source.java index c9358af81e..0d117c9b8f 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365Source.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365Source.java @@ -20,24 +20,23 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.Source; import org.opensearch.dataprepper.plugins.source.microsoft_office365.auth.Office365AuthenticationInterface; +import org.opensearch.dataprepper.plugins.source.microsoft_office365.service.Office365Service; +import org.opensearch.dataprepper.plugins.source.microsoft_office365.utils.Constants; import org.opensearch.dataprepper.plugins.source.source_crawler.CrawlerApplicationContextMarker; -import org.opensearch.dataprepper.plugins.source.source_crawler.base.PaginationCrawler; +import org.opensearch.dataprepper.plugins.source.source_crawler.base.DimensionalTimeSliceCrawler; import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerSourcePlugin; import org.opensearch.dataprepper.plugins.source.source_crawler.base.PluginExecutorServiceProvider; import org.opensearch.dataprepper.plugins.source.source_crawler.base.LeaderProgressState; -import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.PaginationCrawlerLeaderProgressState; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.DimensionalTimeSliceLeaderProgressState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; import java.util.concurrent.atomic.AtomicBoolean; import java.time.Instant; import static org.opensearch.dataprepper.plugins.source.microsoft_office365.utils.Constants.PLUGIN_NAME; -/** - * Office 365 Connector main entry point. - * This class extends CrawlerSourcePlugin to provide Office 365 specific functionality. - */ @Experimental @DataPrepperPlugin(name = PLUGIN_NAME, pluginType = Source.class, @@ -48,7 +47,9 @@ public class Office365Source extends CrawlerSourcePlugin { private static final Logger LOG = LoggerFactory.getLogger(Office365Source.class); private final Office365SourceConfig office365SourceConfig; private final Office365AuthenticationInterface office365AuthProvider; + private final Office365Service office365Service; private final AtomicBoolean isRunning = new AtomicBoolean(false); + private static final int OFFICE365_LOOKBACK_HOURS = 7 * 24; @DataPrepperPluginConstructor public Office365Source(final PluginMetrics pluginMetrics, @@ -56,13 +57,16 @@ public Office365Source(final PluginMetrics pluginMetrics, final Office365AuthenticationInterface office365AuthProvider, final PluginFactory pluginFactory, final AcknowledgementSetManager acknowledgementSetManager, - final PaginationCrawler crawler, - final PluginExecutorServiceProvider executorServiceProvider) { + final DimensionalTimeSliceCrawler crawler, + final PluginExecutorServiceProvider executorServiceProvider, + final Office365Service office365Service) { super(PLUGIN_NAME, pluginMetrics, office365SourceConfig, pluginFactory, acknowledgementSetManager, crawler, executorServiceProvider); + crawler.initialize(Arrays.asList(Constants.CONTENT_TYPES)); LOG.info("Creating Office365 Source Plugin"); this.office365SourceConfig = office365SourceConfig; this.office365AuthProvider = office365AuthProvider; + this.office365Service = office365Service; } @Override @@ -70,6 +74,7 @@ public void start(Buffer> buffer) { LOG.info("Starting Office365 Source Plugin..."); try { office365AuthProvider.initCredentials(); + office365Service.initializeSubscriptions(); super.start(buffer); } catch (Exception e) { LOG.error("Error starting Office365 Source Plugin", e); @@ -80,7 +85,7 @@ public void start(Buffer> buffer) { @Override protected LeaderProgressState createLeaderProgressState() { - return new PaginationCrawlerLeaderProgressState(Instant.EPOCH); + return new DimensionalTimeSliceLeaderProgressState(Instant.now(), OFFICE365_LOOKBACK_HOURS); } @Override diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365SourceConfig.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365SourceConfig.java index 9ff7d0925d..e435c51390 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365SourceConfig.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365SourceConfig.java @@ -21,11 +21,6 @@ */ @Getter public class Office365SourceConfig implements CrawlerSourceConfig { - /** - * Default number of records to retrieve in a single batch. - */ - private static final int DEFAULT_BATCH_SIZE = 50; - /** * The Office 365 tenant ID that uniquely identifies the Microsoft Entra organization. */ @@ -41,6 +36,7 @@ public class Office365SourceConfig implements CrawlerSourceConfig { @JsonProperty("authentication") @Valid private AuthenticationConfiguration authenticationConfiguration; + /** * Flag to enable/disable acknowledgments for processed records. * When enabled, ensures records are processed at least once. diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/configuration/Office365ItemInfo.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/configuration/Office365ItemInfo.java deleted file mode 100644 index 905dddb78c..0000000000 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/configuration/Office365ItemInfo.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.dataprepper.plugins.source.microsoft_office365.configuration; - -import lombok.Builder; -import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo; - -import java.time.Instant; -import java.util.Map; - -@Builder -public class Office365ItemInfo implements ItemInfo { - private final String itemId; - private final Instant eventTime; - private final Map metadata; - private final String partitionKey; - private final Map keyAttributes; - private final Instant lastModifiedAt; - - @Override - public String getItemId() { - return itemId; - } - - @Override - public Map getMetadata() { - return metadata; - } - - @Override - public Instant getEventTime() { - return eventTime; - } - - @Override - public String getPartitionKey() { - return partitionKey; - } - - @Override - public String getId() { - return itemId; - } - - @Override - public Map getKeyAttributes() { - return keyAttributes; - } - - @Override - public Instant getLastModifiedAt() { - return lastModifiedAt; - } -} diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365Service.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365Service.java index 39bb4a290f..d4e3ce5168 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365Service.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365Service.java @@ -14,39 +14,21 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.plugins.source.microsoft_office365.Office365RestClient; import org.opensearch.dataprepper.plugins.source.microsoft_office365.Office365SourceConfig; -import org.opensearch.dataprepper.plugins.source.microsoft_office365.configuration.Office365ItemInfo; -import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo; +import org.opensearch.dataprepper.plugins.source.microsoft_office365.exception.Office365Exception; import org.opensearch.dataprepper.plugins.source.microsoft_office365.models.AuditLogsResponse; import javax.inject.Named; import java.time.Duration; import java.time.Instant; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.UUID; -import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY; -import static org.opensearch.dataprepper.plugins.source.microsoft_office365.utils.Constants.CONTENT_TYPES; - -/** - * Service class for managing Office 365 audit log retrieval. - */ @Slf4j @Named public class Office365Service { private final Office365SourceConfig office365SourceConfig; private final Office365RestClient office365RestClient; private final Counter searchResultsFoundCounter; - private final Counter sevenDayLimitHitCounter; - private final Counter daysAdjustedCounter; private final Counter windowRetryCounter; - private static final String CONTENT_ID_KEY = "contentId"; - private static final String CONTENT_CREATED_KEY = "contentCreated"; - private static final String CONTENT_URI_KEY = "contentUri"; - private static final String TYPE_KEY = "type"; - private static final Duration TIME_WINDOW = Duration.ofHours(1); - private static final Duration RETRY_DELAY = Duration.ofSeconds(1); + private static final Duration SEVEN_DAYS = Duration.ofDays(7); public Office365Service(final Office365SourceConfig office365SourceConfig, final Office365RestClient office365RestClient, @@ -54,8 +36,6 @@ public Office365Service(final Office365SourceConfig office365SourceConfig, this.office365SourceConfig = office365SourceConfig; this.office365RestClient = office365RestClient; this.searchResultsFoundCounter = pluginMetrics.counter("searchResultsFound"); - this.sevenDayLimitHitCounter = pluginMetrics.counter("sevenDayLimitHit"); - this.daysAdjustedCounter = pluginMetrics.counter("daysAdjusted"); this.windowRetryCounter = pluginMetrics.counter("windowRetry"); } @@ -63,101 +43,44 @@ public void initializeSubscriptions() { office365RestClient.startSubscriptions(); } - public void getOffice365Entities(final Instant timestamp, - final Queue itemInfoQueue) { - log.trace("Started to fetch entities"); - searchForNewLogs(timestamp, itemInfoQueue); - log.trace("Creating item information and adding in queue"); + public String getAuditLog(String contentUri) { + return office365RestClient.getAuditLog(contentUri); } - public String getAuditLog(final String contentId) { - return office365RestClient.getAuditLog(contentId); - } - - private void searchForNewLogs(final Instant timestamp, - final Queue itemInfoQueue) { - Instant endTime = Instant.now(); - log.info("Searching for logs between {} and {}", timestamp, endTime); - Instant startTime = timestamp; - - Instant sevenDaysAgo = endTime.minus(Duration.ofDays(7)); - if (startTime.isBefore(sevenDaysAgo)) { - long daysAdjusted = Duration.between(startTime, sevenDaysAgo).toDays(); - sevenDayLimitHitCounter.increment(); // Track that we hit the limit - daysAdjustedCounter.increment(daysAdjusted); // Track by how many days - log.warn("Adjusting start time from {} to {} ({} days beyond 7-day limit)", - startTime, sevenDaysAgo, daysAdjusted); - startTime = sevenDaysAgo; + public AuditLogsResponse searchAuditLogs(final String logType, + Instant startTime, + final Instant endTime, + final String nextPageUri) { + if (startTime == null || endTime == null) { + throw new IllegalArgumentException("startTime and endTime must not be null"); } - - while (startTime.isBefore(endTime)) { - Instant windowEnd = startTime.plus(TIME_WINDOW); - if (windowEnd.isAfter(endTime)) { - windowEnd = endTime; + if (logType == null) { + throw new IllegalArgumentException("logType must not be null"); + } + try { + // If pagination URI exists, use it directly + if (nextPageUri != null) { + return office365RestClient.searchAuditLogs(logType, startTime, endTime, nextPageUri); } - log.debug("Processing time window: {} to {}", startTime, windowEnd); - boolean windowSuccessful = true; - for (String contentType : CONTENT_TYPES) { - String nextPageUri = null; - try { - do { - AuditLogsResponse response = - office365RestClient.searchAuditLogs(contentType, startTime, windowEnd, nextPageUri); - - if (response.getItems() == null || response.getItems().isEmpty()) { - break; - } - nextPageUri = response.getNextPageUri(); - addItemsToQueue(response.getItems(), contentType, itemInfoQueue, nextPageUri == null); - } while (nextPageUri != null); - } catch (Exception e) { - log.error(NOISY, "Failed to fetch logs for time window {} to {} for content type {}. Will retry this window.", - startTime, windowEnd, contentType, e); - windowSuccessful = false; - windowRetryCounter.increment(); - break; // Exit the content type loop on failure - } + // Check and adjust start time for 7-day limit + Instant adjustedStartTime = startTime; + Instant sevenDaysAgo = Instant.now().minus(SEVEN_DAYS); + if (startTime.isBefore(sevenDaysAgo)) { + adjustedStartTime = sevenDaysAgo; } - // Only move the pointer if all content types were processed successfully - if (windowSuccessful) { - log.trace("Successfully completed time window: {} to {}, moving to next window", startTime, windowEnd); - startTime = windowEnd; - } else { - log.error("Failed to complete time window: {} to {}, retrying after delay", startTime, windowEnd); - // Add a small delay before retrying the same window - try { - Thread.sleep(RETRY_DELAY.toMillis()); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Interrupted while waiting to retry time window", ie); - } + AuditLogsResponse response = + office365RestClient.searchAuditLogs(logType, adjustedStartTime, endTime, null); + if (response.getItems() != null) { + searchResultsFoundCounter.increment(response.getItems().size()); } + return response; + } catch (Exception e) { + windowRetryCounter.increment(); + throw new Office365Exception( + String.format("Failed to fetch logs for time window %s to %s for log type %s.", + startTime, endTime, logType), e, true); } } - - private void addItemsToQueue(final List> items, - final String contentType, - final Queue itemInfoQueue, - final boolean lastPage) { - items.forEach(item -> { - Instant contentCreated = Instant.parse((String) item.get(CONTENT_CREATED_KEY)); - // If last page, add 1ms offset for next poll start time to avoid duplication when all events are processed. - // 1ms is m365's smallest time unit so polling data starting from next 1ms would not skip any event. - // If not last page, keep nextPollAttemptStartTime to be contentCreated time so to avoid data loss in a rare scenario - // where 1ms have multiple events and split by nextPageUri - Instant nextPollAttemptStartTime = lastPage ? contentCreated.plusMillis(1) : contentCreated; - ItemInfo itemInfo = Office365ItemInfo.builder() - .itemId((String) item.get(CONTENT_ID_KEY)) - .eventTime(contentCreated) - .partitionKey(contentType + UUID.randomUUID()) - .metadata(item) - .keyAttributes(Map.of(TYPE_KEY, contentType, CONTENT_URI_KEY, item.get(CONTENT_URI_KEY))) - .lastModifiedAt(nextPollAttemptStartTime) - .build(); - itemInfoQueue.add(itemInfo); - }); - searchResultsFoundCounter.increment(items.size()); - } } diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClientTest.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClientTest.java index 2d9eacd821..4bebeccb29 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClientTest.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365CrawlerClientTest.java @@ -20,29 +20,35 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.source.source_crawler.base.PluginExecutorServiceProvider; -import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.PaginationCrawlerWorkerProgressState; +import org.opensearch.dataprepper.plugins.source.microsoft_office365.models.AuditLogsResponse; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.DimensionalTimeSliceWorkerProgressState; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.plugins.source.microsoft_office365.service.Office365Service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; import java.time.Instant; import java.util.Arrays; import java.util.Collection; -import java.util.List; +import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; + +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; @@ -51,12 +57,13 @@ import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) class Office365CrawlerClientTest { @Mock private Buffer> buffer; @Mock - private PaginationCrawlerWorkerProgressState state; + private DimensionalTimeSliceWorkerProgressState state; @Mock private AcknowledgementSet acknowledgementSet; @@ -67,9 +74,6 @@ class Office365CrawlerClientTest { @Mock private Office365Service service; - @Mock - private Office365Iterator office365Iterator; - @Mock private PluginMetrics pluginMetrics; @@ -79,10 +83,6 @@ class Office365CrawlerClientTest { @Mock private static Logger log; - private final PluginExecutorServiceProvider executorServiceProvider = new PluginExecutorServiceProvider(); - - private static final String BUFFER_WRITE_FAILURES_TIMEOUT = "bufferWriteFailuresTimeout"; - @BeforeAll static void setupLogger() { log = mock(Logger.class); @@ -93,29 +93,36 @@ static void setupLogger() { void setUp() { when(pluginMetrics.timer(anyString())).thenReturn(bufferWriteLatencyTimer); when(pluginMetrics.counter(anyString())).thenReturn(mock(Counter.class)); + when(state.getStartTime()).thenReturn(Instant.now().minus(Duration.ofHours(1))); + when(state.getEndTime()).thenReturn(Instant.now()); + when(state.getDimensionType()).thenReturn("Exchange"); } @Test void testConstructor() { - Office365CrawlerClient client = new Office365CrawlerClient(service, office365Iterator, executorServiceProvider, sourceConfig, pluginMetrics); + Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, pluginMetrics); assertNotNull(client); } - @Test - void testListItems() { - Office365CrawlerClient client = new Office365CrawlerClient(service, office365Iterator, executorServiceProvider, sourceConfig, pluginMetrics); - assertNotNull(client.listItems(Instant.ofEpochSecond(1234L))); - verify(service).initializeSubscriptions(); - verify(office365Iterator).initialize(any(Instant.class)); - } - @Test void testExecutePartition() throws Exception { - Office365CrawlerClient client = new Office365CrawlerClient(service, office365Iterator, executorServiceProvider, sourceConfig, pluginMetrics); + Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, pluginMetrics); + + AuditLogsResponse response = new AuditLogsResponse( + Arrays.asList(Map.of( + "contentId", "ID1", + "contentUri", "uri1" + )), null); + + when(service.searchAuditLogs( + eq("Exchange"), + any(Instant.class), + any(Instant.class), + isNull() + )).thenReturn(response); - List itemIds = Arrays.asList("ID1", "ID2"); - when(state.getItemIds()).thenReturn(itemIds); - when(service.getAuditLog(anyString())).thenReturn("{\"Workload\":\"Exchange\",\"Operation\":\"Test\"}"); + when(service.getAuditLog(anyString())) + .thenReturn("{\"Workload\":\"Exchange\",\"Operation\":\"Test\"}"); doAnswer(invocation -> { Runnable runnable = invocation.getArgument(0); @@ -131,7 +138,7 @@ void testExecutePartition() throws Exception { Collection> capturedRecords = recordsCaptor.getValue(); assertFalse(capturedRecords.isEmpty()); - assertEquals(itemIds.size(), capturedRecords.size()); + assertEquals(1, capturedRecords.size()); for (Record record : capturedRecords) { assertNotNull(record.getData()); assertEquals("Exchange", record.getData().getMetadata().getAttribute("contentType")); @@ -140,14 +147,24 @@ void testExecutePartition() throws Exception { @Test void testExecutePartitionWithJsonProcessingError() throws Exception { - Office365CrawlerClient client = new Office365CrawlerClient(service, office365Iterator, executorServiceProvider, sourceConfig, pluginMetrics); + Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, pluginMetrics); ObjectMapper mockObjectMapper = mock(ObjectMapper.class); client.injectObjectMapper(mockObjectMapper); - List itemIds = List.of("ID1"); - when(state.getItemIds()).thenReturn(itemIds); - when(service.getAuditLog(anyString())).thenReturn("{\"invalid\":json}"); + AuditLogsResponse response = new AuditLogsResponse( + Arrays.asList(Map.of( + "contentId", "ID1", + "contentUri", "uri1" + )), null); + + when(service.searchAuditLogs( + anyString(), + any(Instant.class), + any(Instant.class), + any() + )).thenReturn(response); + when(service.getAuditLog(anyString())).thenReturn("{\"invalid\":json}"); when(mockObjectMapper.readTree(anyString())).thenThrow(new JsonProcessingException("Test error") {}); doAnswer(invocation -> { @@ -163,11 +180,23 @@ void testExecutePartitionWithJsonProcessingError() throws Exception { @Test void testBufferWriteWithAcknowledgements() throws Exception { - Office365CrawlerClient client = new Office365CrawlerClient(service, office365Iterator, executorServiceProvider, sourceConfig, pluginMetrics); - - List itemIds = List.of("ID1"); - when(state.getItemIds()).thenReturn(itemIds); - when(service.getAuditLog(anyString())).thenReturn("{\"Workload\":\"Exchange\",\"Operation\":\"Test\"}"); + Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, pluginMetrics); + + AuditLogsResponse response = new AuditLogsResponse( + Arrays.asList(Map.of( + "contentId", "ID1", + "contentUri", "uri1" + )), null); + + when(service.searchAuditLogs( + anyString(), + any(Instant.class), + any(Instant.class), + any() + )).thenReturn(response); + + when(service.getAuditLog(anyString())) + .thenReturn("{\"Workload\":\"Exchange\",\"Operation\":\"Test\"}"); when(sourceConfig.isAcknowledgments()).thenReturn(true); doAnswer(invocation -> { @@ -185,41 +214,57 @@ void testBufferWriteWithAcknowledgements() throws Exception { @Test void testBufferWriteTimeout() throws Exception { - Office365CrawlerClient client = new Office365CrawlerClient(service, office365Iterator, executorServiceProvider, sourceConfig, pluginMetrics); + Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, pluginMetrics); + + AuditLogsResponse response = new AuditLogsResponse( + Arrays.asList(Map.of( + "contentId", "ID1", + "contentUri", "uri1" + )), null); + + when(service.searchAuditLogs( + anyString(), + any(Instant.class), + any(Instant.class), + any() + )).thenReturn(response); - List itemIds = List.of("ID1"); - when(state.getItemIds()).thenReturn(itemIds); - when(service.getAuditLog(anyString())).thenReturn("{\"Workload\":\"Exchange\",\"Operation\":\"Test\"}"); + when(service.getAuditLog(anyString())) + .thenReturn("{\"Workload\":\"Exchange\",\"Operation\":\"Test\"}"); - // Mock the timer to execute the runnable doAnswer(invocation -> { Runnable runnable = invocation.getArgument(0); runnable.run(); return null; }).when(bufferWriteLatencyTimer).record(any(Runnable.class)); - // Configure buffer to throw RuntimeException directly doThrow(new RuntimeException("Error writing to buffer")) .when(buffer) .writeAll(any(), anyInt()); - // Execute and verify exception RuntimeException exception = assertThrows(RuntimeException.class, () -> client.executePartition(state, buffer, acknowledgementSet)); - // Verify the exact error message assertEquals("Error writing to buffer", exception.getMessage()); - - // Verify that buffer.writeAll was called exactly once verify(buffer).writeAll(any(), anyInt()); } @Test void testNonRetryableError() throws Exception { - Office365CrawlerClient client = new Office365CrawlerClient(service, office365Iterator, executorServiceProvider, sourceConfig, pluginMetrics); + Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, pluginMetrics); + + AuditLogsResponse response = new AuditLogsResponse( + Arrays.asList(Map.of( + "contentId", "ID1", + "contentUri", "uri1")), null); + + when(service.searchAuditLogs( + anyString(), + any(Instant.class), + any(Instant.class), + any() + )).thenReturn(response); - List itemIds = List.of("ID1"); - when(state.getItemIds()).thenReturn(itemIds); when(service.getAuditLog(anyString())).thenReturn(null); doAnswer(invocation -> { @@ -235,10 +280,21 @@ void testNonRetryableError() throws Exception { @Test void testMissingWorkloadField() throws Exception { - Office365CrawlerClient client = new Office365CrawlerClient(service, office365Iterator, executorServiceProvider, sourceConfig, pluginMetrics); + Office365CrawlerClient client = new Office365CrawlerClient(service, sourceConfig, pluginMetrics); + + AuditLogsResponse response = new AuditLogsResponse( + Arrays.asList(Map.of( + "contentId", "ID1", + "contentUri", "uri1" + )), null); + + when(service.searchAuditLogs( + anyString(), + any(Instant.class), + any(Instant.class), + any() + )).thenReturn(response); - List itemIds = List.of("ID1"); - when(state.getItemIds()).thenReturn(itemIds); when(service.getAuditLog(anyString())).thenReturn("{\"Operation\":\"Test\"}"); doAnswer(invocation -> { diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365ItemInfoTest.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365ItemInfoTest.java deleted file mode 100644 index 5e9ba1f2d6..0000000000 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365ItemInfoTest.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.dataprepper.plugins.source.microsoft_office365; - -import org.junit.jupiter.api.Test; -import org.opensearch.dataprepper.plugins.source.microsoft_office365.configuration.Office365ItemInfo; - -import java.time.Instant; -import java.util.HashMap; -import java.util.Map; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; - -class Office365ItemInfoTest { - - @Test - void testBuilderAndGetters() { - // Setup test data - String itemId = "test-id"; - Instant eventTime = Instant.now(); - Map metadata = new HashMap<>(); - metadata.put("key1", "value1"); - String partitionKey = "partition1"; - Map keyAttributes = new HashMap<>(); - keyAttributes.put("key2", "value2"); - Instant lastModifiedAt = Instant.now(); - - // Create Office365ItemInfo using builder - Office365ItemInfo itemInfo = Office365ItemInfo.builder() - .itemId(itemId) - .eventTime(eventTime) - .metadata(metadata) - .partitionKey(partitionKey) - .keyAttributes(keyAttributes) - .lastModifiedAt(lastModifiedAt) - .build(); - - // Verify all getters - assertNotNull(itemInfo, "ItemInfo should not be null"); - assertEquals(itemId, itemInfo.getItemId(), "ItemId should match"); - assertEquals(itemId, itemInfo.getId(), "Id should match itemId"); - assertEquals(eventTime, itemInfo.getEventTime(), "EventTime should match"); - assertEquals(metadata, itemInfo.getMetadata(), "Metadata should match"); - assertEquals(partitionKey, itemInfo.getPartitionKey(), "PartitionKey should match"); - assertEquals(keyAttributes, itemInfo.getKeyAttributes(), "KeyAttributes should match"); - assertEquals(lastModifiedAt, itemInfo.getLastModifiedAt(), "LastModifiedAt should match"); - } - - @Test - void testWithNullValues() { - // Create Office365ItemInfo with null values - Office365ItemInfo itemInfo = Office365ItemInfo.builder() - .itemId(null) - .eventTime(null) - .metadata(null) - .partitionKey(null) - .keyAttributes(null) - .lastModifiedAt(null) - .build(); - - // Verify null values are handled - assertNotNull(itemInfo, "ItemInfo should not be null even with null values"); - assertEquals(null, itemInfo.getItemId()); - assertEquals(null, itemInfo.getId()); - assertEquals(null, itemInfo.getEventTime()); - assertEquals(null, itemInfo.getMetadata()); - assertEquals(null, itemInfo.getPartitionKey()); - assertEquals(null, itemInfo.getKeyAttributes()); - assertEquals(null, itemInfo.getLastModifiedAt()); - } - - @Test - void testWithEmptyMaps() { - // Create Office365ItemInfo with empty maps - Office365ItemInfo itemInfo = Office365ItemInfo.builder() - .itemId("test-id") - .eventTime(Instant.now()) - .metadata(new HashMap<>()) - .partitionKey("partition1") - .keyAttributes(new HashMap<>()) - .lastModifiedAt(Instant.now()) - .build(); - - // Verify empty maps - assertNotNull(itemInfo.getMetadata(), "Metadata should not be null"); - assertEquals(0, itemInfo.getMetadata().size(), "Metadata should be empty"); - assertNotNull(itemInfo.getKeyAttributes(), "KeyAttributes should not be null"); - assertEquals(0, itemInfo.getKeyAttributes().size(), "KeyAttributes should be empty"); - } -} diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365IteratorTest.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365IteratorTest.java deleted file mode 100644 index c29a853eb1..0000000000 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365IteratorTest.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.dataprepper.plugins.source.microsoft_office365; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.dataprepper.plugins.source.microsoft_office365.configuration.Office365ItemInfo; -import org.opensearch.dataprepper.plugins.source.microsoft_office365.service.Office365Service; -import org.opensearch.dataprepper.plugins.source.source_crawler.base.PluginExecutorServiceProvider; -import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo; - -import java.time.Instant; -import java.util.NoSuchElementException; -import java.util.Queue; -import java.util.UUID; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doAnswer; - -@ExtendWith(MockitoExtension.class) -class Office365IteratorTest { - - @Mock - private Office365Service office365Service; - - private Office365Iterator office365Iterator; - - private final PluginExecutorServiceProvider executorServiceProvider = new PluginExecutorServiceProvider(); - - @BeforeEach - void setUp() { - office365Iterator = new Office365Iterator(office365Service, executorServiceProvider); - } - - @Test - void testInitialization() { - assertNotNull(office365Iterator); - office365Iterator.initialize(Instant.EPOCH); - assertFalse(office365Iterator.hasNext()); - } - - @Test - void testEmptyQueueBehavior() { - office365Iterator.initialize(Instant.EPOCH); - office365Iterator.setCrawlerQWaitTimeMillis(1); - - assertFalse(office365Iterator.hasNext()); - assertThrows(NoSuchElementException.class, () -> office365Iterator.next()); - } - - @Test - void testQueueOperations(){ - // Initialize iterator - office365Iterator.initialize(Instant.EPOCH); - office365Iterator.setCrawlerQWaitTimeMillis(1); - - // Initially queue should be empty - assertFalse(office365Iterator.hasNext()); - - // Create specific mock item to track - Office365ItemInfo mockItem = createMockItemInfo(); - - // Mock service to add our specific mock item - doAnswer(invocation -> { - Queue queue = invocation.getArgument(1); - queue.add(mockItem); - return null; - }).when(office365Service).getOffice365Entities(any(Instant.class), any()); - - // Manually trigger the crawler thread - office365Iterator.startCrawlerThreads(); - - // Now verify item is available - assertTrue(office365Iterator.hasNext()); - - // Verify we get the correct item - assertEquals(mockItem, office365Iterator.next()); - - // Verify queue is empty after retrieval - assertFalse(office365Iterator.hasNext()); - } - - @Test - void testInterruptedException() { - office365Iterator.initialize(Instant.EPOCH); - office365Iterator.setCrawlerQWaitTimeMillis(1); - - Thread.currentThread().interrupt(); - assertFalse(office365Iterator.hasNext()); - assertTrue(Thread.interrupted()); - } - - private Office365ItemInfo createMockItemInfo() { - return Office365ItemInfo.builder() - .itemId(UUID.randomUUID().toString()) - .eventTime(Instant.now()) - .partitionKey("test-partition") - .lastModifiedAt(Instant.now()) - .build(); - } -} \ No newline at end of file diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClientTest.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClientTest.java index b8660a9820..718edf0f15 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClientTest.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClientTest.java @@ -214,26 +214,28 @@ void testSearchAuditLogsFailure() { @Test void testGetAuditLog() { + String contentUri = "https://manage.office.com/api/v1.0/test-tenant/activity/feed/audit/123"; String mockAuditLog = "{\"id\":\"123\",\"contentType\":\"Audit.AzureActiveDirectory\"}"; ResponseEntity mockResponse = new ResponseEntity<>(mockAuditLog, HttpStatus.OK); when(restTemplate.exchange( - anyString(), + eq(contentUri), eq(HttpMethod.GET), any(), eq(String.class) )).thenReturn(mockResponse); - String result = office365RestClient.getAuditLog("test-content-id"); + String result = office365RestClient.getAuditLog(contentUri); assertEquals(mockAuditLog, result); } @Test void testGetAuditLogFailure() { + String contentUri = "https://manage.office.com/api/v1.0/test-tenant/activity/feed/audit/123"; // Mock REST template to throw an exception when(restTemplate.exchange( - anyString(), + eq(contentUri), eq(HttpMethod.GET), any(), eq(String.class) @@ -241,7 +243,7 @@ void testGetAuditLogFailure() { // Verify that the exception is propagated RuntimeException exception = assertThrows(RuntimeException.class, - () -> office365RestClient.getAuditLog("test-content-id")); + () -> office365RestClient.getAuditLog(contentUri)); assertEquals("Failed to fetch audit log", exception.getMessage()); } diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365SourceTest.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365SourceTest.java index 084bf02333..cf5ee0f752 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365SourceTest.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365SourceTest.java @@ -21,7 +21,8 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.source.microsoft_office365.auth.AuthenticationConfiguration; import org.opensearch.dataprepper.plugins.source.microsoft_office365.auth.Office365AuthenticationInterface; -import org.opensearch.dataprepper.plugins.source.source_crawler.base.PaginationCrawler; +import org.opensearch.dataprepper.plugins.source.microsoft_office365.service.Office365Service; +import org.opensearch.dataprepper.plugins.source.source_crawler.base.DimensionalTimeSliceCrawler; import org.opensearch.dataprepper.plugins.source.source_crawler.base.PluginExecutorServiceProvider; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; @@ -58,7 +59,7 @@ class Office365SourceTest { private AcknowledgementSetManager acknowledgementSetManager; @Mock - private PaginationCrawler crawler; + private DimensionalTimeSliceCrawler crawler; @Mock private PluginExecutorServiceProvider executorServiceProvider; @@ -75,13 +76,16 @@ class Office365SourceTest { @Mock private EnhancedSourceCoordinator sourceCoordinator; + @Mock + private Office365Service office365Service; + @Test void initialization() { when(executorServiceProvider.get()).thenReturn(executorService); Office365Source source = new Office365Source(pluginMetrics, office365SourceConfig, office365AuthProvider, pluginFactory, acknowledgementSetManager, crawler, - executorServiceProvider); + executorServiceProvider, office365Service); assertNotNull(source); } @@ -91,12 +95,13 @@ void testStart() { Office365Source source = new Office365Source(pluginMetrics, office365SourceConfig, office365AuthProvider, pluginFactory, acknowledgementSetManager, crawler, - executorServiceProvider); + executorServiceProvider, office365Service); source.setEnhancedSourceCoordinator(sourceCoordinator); source.start(buffer); verify(office365AuthProvider).initCredentials(); + verify(office365Service).initializeSubscriptions(); verify(executorService, atLeast(1)).submit(any(Runnable.class)); } @@ -106,7 +111,7 @@ void testStartWithAuthenticationFailure() { Office365Source source = new Office365Source(pluginMetrics, office365SourceConfig, office365AuthProvider, pluginFactory, acknowledgementSetManager, crawler, - executorServiceProvider); + executorServiceProvider, office365Service); doThrow(new RuntimeException("Authentication failed")) .when(office365AuthProvider).initCredentials(); @@ -125,7 +130,7 @@ void testStop() { Office365Source source = new Office365Source(pluginMetrics, office365SourceConfig, office365AuthProvider, pluginFactory, acknowledgementSetManager, crawler, - executorServiceProvider); + executorServiceProvider, office365Service); source.setEnhancedSourceCoordinator(sourceCoordinator); source.start(buffer); @@ -140,7 +145,7 @@ void testStop_WhenNotStarted() { Office365Source source = new Office365Source(pluginMetrics, office365SourceConfig, office365AuthProvider, pluginFactory, acknowledgementSetManager, crawler, - executorServiceProvider); + executorServiceProvider, office365Service); source.stop(); diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365ServiceTest.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365ServiceTest.java index cb8c6b8320..d9c6dbf86a 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365ServiceTest.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365ServiceTest.java @@ -12,37 +12,29 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.plugins.source.microsoft_office365.Office365RestClient; import org.opensearch.dataprepper.plugins.source.microsoft_office365.Office365SourceConfig; +import org.opensearch.dataprepper.plugins.source.microsoft_office365.exception.Office365Exception; import org.opensearch.dataprepper.plugins.source.microsoft_office365.models.AuditLogsResponse; -import org.opensearch.dataprepper.plugins.source.microsoft_office365.utils.Constants; -import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicInteger; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; -import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -54,7 +46,8 @@ class Office365ServiceTest { @Mock private Office365SourceConfig sourceConfig; - private org.opensearch.dataprepper.plugins.source.microsoft_office365.service.Office365Service office365Service; + private Office365Service office365Service; + private final PluginMetrics pluginMetrics = PluginMetrics.fromNames("Office365ServiceTest", "office365"); @BeforeEach @@ -70,206 +63,133 @@ void testServiceInitialization() { } @Test - void testGetOffice365EntitiesWithMultipleTimeWindows() { - // Create test data - Instant contentCreated = Instant.now().minusSeconds(100); - List> items1 = new ArrayList<>(); - List> items2 = new ArrayList<>(); - items1.add(createTestItem("id1", contentCreated)); - items2.add(createTestItem("id2", contentCreated)); - - AuditLogsResponse response1 = new AuditLogsResponse(items1, "nextPageUri1"); - AuditLogsResponse response2 = new AuditLogsResponse(items2, null); + void testSearchAuditLogs() { + Instant startTime = Instant.now().minus(Duration.ofHours(1)); + Instant endTime = Instant.now(); + String logType = "Exchange"; - // Fix the time windows - use current time as reference - Instant now = Instant.now(); - Instant startTime = now.minus(Duration.ofHours(3)); // Start 3 hours ago + List> items = new ArrayList<>(); + items.add(createTestItem("id1", Instant.now())); + AuditLogsResponse expectedResponse = new AuditLogsResponse(items, null); - // Setup mock behavior - return response for any time range - when(office365RestClient.searchAuditLogs( - anyString(), - any(Instant.class), - any(Instant.class), - isNull())) - .thenReturn(response1); when(office365RestClient.searchAuditLogs( - anyString(), - any(Instant.class), - any(Instant.class), - eq("nextPageUri1"))) - .thenReturn(response2); - - Queue itemInfoQueue = new ConcurrentLinkedQueue<>(); - - office365Service.getOffice365Entities(startTime, itemInfoQueue); - - // Verify the API calls - ArgumentCaptor contentTypeCaptor = ArgumentCaptor.forClass(String.class); - ArgumentCaptor startTimeCaptor = ArgumentCaptor.forClass(Instant.class); - ArgumentCaptor endTimeCaptor = ArgumentCaptor.forClass(Instant.class); - - verify(office365RestClient, atLeast(Constants.CONTENT_TYPES.length * 3)) - .searchAuditLogs( - contentTypeCaptor.capture(), - startTimeCaptor.capture(), - endTimeCaptor.capture(), - isNull()); - - // Calculate the actual number of time windows - long distinctTimeWindows = startTimeCaptor.getAllValues().stream() - .distinct() - .count(); - - // Verify the number of items - int expectedItems = Constants.CONTENT_TYPES.length * (int)distinctTimeWindows * 2; - assertEquals(expectedItems, itemInfoQueue.size(), - String.format("Expected %d items (%d content types * %d time windows * 1 item), but found %d", - expectedItems, Constants.CONTENT_TYPES.length, distinctTimeWindows, itemInfoQueue.size())); - - - Instant expectedLastModifiedAtForSecondPage = contentCreated.plusMillis(1); - for (int i = 0; i < Constants.CONTENT_TYPES.length * (int) distinctTimeWindows * 2; i++) { - ItemInfo itemInfo = itemInfoQueue.poll(); - assertEquals(contentCreated, itemInfo.getEventTime()); - if (i % 2 == 0) { - assertEquals("id1", itemInfo.getItemId()); - assertEquals(contentCreated, itemInfo.getLastModifiedAt(), - "Expect first page's lastModifiedAt timestamp to be same as contentCreated timestamp"); - } else { - assertEquals("id2", itemInfo.getItemId()); - assertEquals(expectedLastModifiedAtForSecondPage, itemInfo.getLastModifiedAt(), - "Expect second page's lastModifiedAt timestamp to be 1ms after contentCreated timestamp"); - } - } - - // Verify we have at least 3 distinct time windows - assertTrue(distinctTimeWindows >= 3, - "Expected at least 3 distinct time windows, but found " + distinctTimeWindows); - - // Verify all content types were used - Set usedContentTypes = new HashSet<>(contentTypeCaptor.getAllValues()); - assertEquals(Constants.CONTENT_TYPES.length, usedContentTypes.size(), - "Not all content types were processed"); - - // Verify that each time window is one hour long (or less for the last one) - List startTimes = startTimeCaptor.getAllValues(); - List endTimes = endTimeCaptor.getAllValues(); - for (int i = 0; i < startTimes.size(); i++) { - Duration windowDuration = Duration.between(startTimes.get(i), endTimes.get(i)); - assertTrue(windowDuration.compareTo(Duration.ofHours(1)) <= 0, - "Time window " + i + " is longer than one hour: " + windowDuration); - } + eq(logType), + eq(startTime), + eq(endTime), + isNull() + )).thenReturn(expectedResponse); + + AuditLogsResponse response = office365Service.searchAuditLogs( + logType, + startTime, + endTime, + null + ); + + assertNotNull(response); + assertEquals(1, response.getItems().size()); + verify(office365RestClient).searchAuditLogs(logType, startTime, endTime, null); } @Test - void testEmptyResponse() { - AuditLogsResponse emptyResponse = new AuditLogsResponse(new ArrayList<>(), null); - when(office365RestClient.searchAuditLogs(anyString(), any(Instant.class), any(Instant.class), any())) - .thenReturn(emptyResponse); + void testSearchAuditLogsWithPagination() { + Instant startTime = Instant.now().minus(Duration.ofHours(1)); + Instant endTime = Instant.now(); + String logType = "Exchange"; - Queue itemInfoQueue = new ConcurrentLinkedQueue<>(); - Instant timestamp = Instant.now().minus(Duration.ofHours(1)); + // First page response + List> items1 = new ArrayList<>(); + items1.add(createTestItem("id1", Instant.now())); + AuditLogsResponse response1 = new AuditLogsResponse(items1, "nextPage"); - office365Service.getOffice365Entities(timestamp, itemInfoQueue); + // Second page response + List> items2 = new ArrayList<>(); + items2.add(createTestItem("id2", Instant.now())); + AuditLogsResponse response2 = new AuditLogsResponse(items2, null); + + when(office365RestClient.searchAuditLogs( + eq(logType), + eq(startTime), + eq(endTime), + isNull() + )).thenReturn(response1); - assertTrue(itemInfoQueue.isEmpty(), "Queue should be empty when receiving empty response"); + when(office365RestClient.searchAuditLogs( + eq(logType), + eq(startTime), + eq(endTime), + eq("nextPage") + )).thenReturn(response2); + + // Execute first page + AuditLogsResponse firstResponse = office365Service.searchAuditLogs( + logType, + startTime, + endTime, + null + ); + + // Execute second page + AuditLogsResponse secondResponse = office365Service.searchAuditLogs( + logType, + startTime, + endTime, + firstResponse.getNextPageUri() + ); + + assertNotNull(firstResponse); + assertEquals("nextPage", firstResponse.getNextPageUri()); + assertEquals(1, firstResponse.getItems().size()); + + assertNotNull(secondResponse); + assertNull(secondResponse.getNextPageUri()); + assertEquals(1, secondResponse.getItems().size()); } @Test - void testGetOffice365EntitiesWithSevenDayLimit() { - List> items = new ArrayList<>(); - Map item = createTestItem("id", Instant.now()); - items.add(item); + void testSearchAuditLogsWithEmptyResponse() { + Instant startTime = Instant.now().minus(Duration.ofHours(1)); + Instant endTime = Instant.now(); + String logType = "Exchange"; + + AuditLogsResponse emptyResponse = new AuditLogsResponse(new ArrayList<>(), null); - AuditLogsResponse mockResponse = new AuditLogsResponse(items, null); when(office365RestClient.searchAuditLogs( - anyString(), - any(Instant.class), - any(Instant.class), - any())) - .thenReturn(mockResponse); - - Queue itemInfoQueue = new ConcurrentLinkedQueue<>(); - Instant now = Instant.now(); - Instant timestamp = now.minus(Duration.ofDays(10)); // Start 10 days ago - - office365Service.getOffice365Entities(timestamp, itemInfoQueue); - - // Verify that items were added to the queue - assertFalse(itemInfoQueue.isEmpty(), "Queue should not be empty"); - - // Verify that the REST client was called with adjusted time (7 days ago) - ArgumentCaptor startTimeCaptor = ArgumentCaptor.forClass(Instant.class); - verify(office365RestClient, atLeast(1)) - .searchAuditLogs(anyString(), startTimeCaptor.capture(), any(Instant.class), any()); - - Instant firstStartTime = startTimeCaptor.getAllValues().get(0); - Duration adjustedDuration = Duration.between(firstStartTime, now); - assertTrue(adjustedDuration.toDays() <= 7, - "Start time should be adjusted to 7 days ago, but was " + adjustedDuration.toDays() + " days"); + eq(logType), + eq(startTime), + eq(endTime), + isNull() + )).thenReturn(emptyResponse); + + AuditLogsResponse response = office365Service.searchAuditLogs( + logType, + startTime, + endTime, + null + ); + + assertNotNull(response); + assertEquals(0, response.getItems().size()); + assertNull(response.getNextPageUri()); } @Test - void testRetryBehaviorOnFailure() { - Instant now = Instant.now(); - Instant startTime = now.minus(Duration.ofMinutes(10)); // Small window to ensure single time slice - Queue itemInfoQueue = new ConcurrentLinkedQueue<>(); - - // Create successful response - List> items = new ArrayList<>(); - items.add(createTestItem("id", Instant.now())); - AuditLogsResponse successResponse = new AuditLogsResponse(items, null); + void testSearchAuditLogsError() { + Instant startTime = Instant.now().minus(Duration.ofHours(1)); + Instant endTime = Instant.now(); + String logType = "Exchange"; - // Set up mock to fail first then succeed for first content type - AtomicInteger firstTypeCallCount = new AtomicInteger(0); when(office365RestClient.searchAuditLogs( - eq(Constants.CONTENT_TYPES[0]), - any(Instant.class), - any(Instant.class), - isNull())) - .thenAnswer(invocation -> { - if (firstTypeCallCount.getAndIncrement() == 0) { - throw new RuntimeException("API Error"); - } - return successResponse; - }); - - // Other content types succeed immediately - for (int i = 1; i < Constants.CONTENT_TYPES.length; i++) { - when(office365RestClient.searchAuditLogs( - eq(Constants.CONTENT_TYPES[i]), - any(Instant.class), - any(Instant.class), - isNull())) - .thenReturn(successResponse); - } - - office365Service.getOffice365Entities(startTime, itemInfoQueue); - - // Verify that the first content type was called at least twice (one failure, one success) - verify(office365RestClient, atLeast(2)) - .searchAuditLogs( - eq(Constants.CONTENT_TYPES[0]), - any(Instant.class), - any(Instant.class), - isNull()); - - // Verify that other content types were called at least once - for (int i = 1; i < Constants.CONTENT_TYPES.length; i++) { - verify(office365RestClient, atLeast(1)) - .searchAuditLogs( - eq(Constants.CONTENT_TYPES[i]), - any(Instant.class), - any(Instant.class), - isNull()); - } - - // Verify that items were eventually added to the queue - assertFalse(itemInfoQueue.isEmpty(), "Queue should not be empty after successful retry"); - - // Verify total number of items in queue - assertEquals(Constants.CONTENT_TYPES.length, itemInfoQueue.size(), - "Should have one item per content type"); + any(), any(), any(), any() + )).thenThrow(new RuntimeException("API Error")); + + Office365Exception exception = assertThrows( + Office365Exception.class, + () -> office365Service.searchAuditLogs(logType, startTime, endTime, null) + ); + + assertEquals("Failed to fetch logs for time window " + startTime + " to " + endTime + + " for log type " + logType + ".", exception.getMessage()); } @Test diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawler.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawler.java new file mode 100644 index 0000000000..69b3313d60 --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawler.java @@ -0,0 +1,176 @@ +package org.opensearch.dataprepper.plugins.source.source_crawler.base; + +import io.micrometer.core.instrument.Counter; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.LeaderPartition; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.SaasSourcePartition; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.DimensionalTimeSliceLeaderProgressState; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.DimensionalTimeSliceWorkerProgressState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.inject.Named; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.List; +import java.util.Objects; +import java.util.UUID; + +import static org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler.LeaderScheduler.DEFAULT_EXTEND_LEASE_MINUTES; + +/** + * A crawler implementation that partitions work along two dimensions: + * 1. Dimension type (e.g., log types) + * 2. Time slices (hourly windows) + * + * This crawler supports both historical data ingestion and incremental updates, + * creating separate partitions for each combination of dimension type and time window. + */ +@Named +public class DimensionalTimeSliceCrawler implements Crawler { + private static final Logger log = LoggerFactory.getLogger(DimensionalTimeSliceCrawler.class); + private static final String DIMENSIONAL_TIME_SLICE_WORKER_PARTITIONS_CREATED = "DimensionalTimeSliceWorkerPartitionsCreated"; + private static final Duration HOUR_DURATION = Duration.ofHours(1); + + private final CrawlerClient client; + private final Counter partitionsCreatedCounter; + private List dimensionTypes; + private static final String LAST_UPDATED_KEY = "last_updated|"; + + public DimensionalTimeSliceCrawler(CrawlerClient client, + PluginMetrics pluginMetrics) { + this.client = client; + this.partitionsCreatedCounter = pluginMetrics.counter(DIMENSIONAL_TIME_SLICE_WORKER_PARTITIONS_CREATED); + } + + /** + * Initializes the crawler with a list of dimension types. + * Must be called before the crawler can be used. + */ + public void initialize(List dimensionTypes) { + if (this.dimensionTypes != null) { + throw new IllegalStateException("Crawler already initialized"); + } + this.dimensionTypes = Objects.requireNonNull(dimensionTypes, "dimensionTypes must not be null"); + } + + /** + * Creates partitions for the current crawl cycle. For historical pulls, creates hourly partitions + * for each dimension type. For incremental sync, creates one partition per dimension type. + */ + @Override + public Instant crawl(LeaderPartition leaderPartition, EnhancedSourceCoordinator coordinator) { + Instant latestModifiedTime = Instant.now(); + double startCount = partitionsCreatedCounter.count(); + + createPartitionsForDimensionTypes(leaderPartition, coordinator, latestModifiedTime, dimensionTypes); + + double partitionsInThisCrawl = partitionsCreatedCounter.count() - startCount; + log.info("Total partitions created in this crawl: {}", partitionsInThisCrawl); + return latestModifiedTime; + } + + @Override + public void executePartition(DimensionalTimeSliceWorkerProgressState state, Buffer> buffer, AcknowledgementSet acknowledgementSet) { + client.executePartition(state, buffer, acknowledgementSet); + } + + private void createPartitionsForDimensionTypes(LeaderPartition leaderPartition, + EnhancedSourceCoordinator coordinator, + Instant latestModifiedTime, + List dimensionTypes) { + DimensionalTimeSliceLeaderProgressState leaderProgressState = + (DimensionalTimeSliceLeaderProgressState) leaderPartition.getProgressState().get(); + + if (leaderProgressState.getRemainingHours() == 0) { + createPartitionForIncrementalSync(leaderPartition, coordinator, + latestModifiedTime, dimensionTypes); + } else { + createPartitionForHistoricalPull(leaderPartition, coordinator, + latestModifiedTime, dimensionTypes); + } + } + + /** + * Creates partitions for historical data pull. Creates hourly partitions + * for each dimension type, working backwards from the current time. + */ + private void createPartitionForHistoricalPull(LeaderPartition leaderPartition, + EnhancedSourceCoordinator coordinator, + Instant latestModifiedTime, + List dimensionTypes) { + DimensionalTimeSliceLeaderProgressState leaderProgressState = + (DimensionalTimeSliceLeaderProgressState) leaderPartition.getProgressState().get(); + int remainingHours = leaderProgressState.getRemainingHours(); + Instant initialTime = leaderProgressState.getLastPollTime(); + Instant nowUtc = initialTime.truncatedTo(ChronoUnit.HOURS); + for (int i = remainingHours; i > 0; i-- ) { + Instant startTime = nowUtc.minus(Duration.ofHours(i));; + Instant endTime = startTime.plus(HOUR_DURATION); + + for (String dimensionType : dimensionTypes) { + createWorkerPartition(startTime, endTime, dimensionType, coordinator); + } + } + + // Create final partitions from last hour to now + for (String dimensionType : dimensionTypes) { + createWorkerPartition(nowUtc, latestModifiedTime, dimensionType, coordinator); + } + + updateLeaderProgressState(leaderPartition, 0, latestModifiedTime, coordinator); + } + + /** + * Creates partitions for incremental sync. Creates one partition per dimension type + * from the last poll time to current time. + */ + private void createPartitionForIncrementalSync(LeaderPartition leaderPartition, + EnhancedSourceCoordinator coordinator, + Instant latestModifiedTime, + List dimensionTypes) { + LeaderProgressState leaderProgressState = leaderPartition.getProgressState().get(); + Instant lastPollTime = leaderProgressState.getLastPollTime(); + + // Create one partition from lastPollTime to latestModifiedTime for each type + for (String dimensionType : dimensionTypes) { + createWorkerPartition(lastPollTime, latestModifiedTime, dimensionType, coordinator); + } + + updateLeaderProgressState(leaderPartition, 0, latestModifiedTime, coordinator); + } + + void createWorkerPartition(Instant startTime, Instant endTime, + String dimensionType, EnhancedSourceCoordinator coordinator) { + DimensionalTimeSliceWorkerProgressState workerState = new DimensionalTimeSliceWorkerProgressState(); + workerState.setStartTime(startTime); + workerState.setEndTime(endTime); + workerState.setDimensionType(dimensionType); + + SaasSourcePartition partition = new SaasSourcePartition(workerState, LAST_UPDATED_KEY + UUID.randomUUID()); + coordinator.createPartition(partition); + partitionsCreatedCounter.increment(); + } + + /** + * Updates the leader progress state with the latest poll timestamp and remaining hours. + * This method also persists the updated state in the source coordinator. + */ + private void updateLeaderProgressState(LeaderPartition leaderPartition, + int remainingHours, + Instant updatedPollTime, + EnhancedSourceCoordinator coordinator) { + DimensionalTimeSliceLeaderProgressState state = + (DimensionalTimeSliceLeaderProgressState) leaderPartition.getProgressState().get(); + state.setRemainingHours(remainingHours); + state.setLastPollTime(updatedPollTime); + leaderPartition.setLeaderProgressState(state); + coordinator.saveProgressStateForPartition(leaderPartition, DEFAULT_EXTEND_LEASE_MINUTES); + } +} diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceLeaderProgressState.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceLeaderProgressState.java new file mode 100644 index 0000000000..fb8ecb3bb9 --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceLeaderProgressState.java @@ -0,0 +1,23 @@ +package org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; +import org.opensearch.dataprepper.plugins.source.source_crawler.base.LeaderProgressState; + +import java.time.Instant; + +@Data +public class DimensionalTimeSliceLeaderProgressState implements LeaderProgressState { + + @JsonProperty("last_poll_time") + private Instant lastPollTime; + + @JsonProperty("remaining_hours") + private int remainingHours; + + public DimensionalTimeSliceLeaderProgressState(@JsonProperty("last_poll_time") final Instant lastPollTime, + @JsonProperty("remaining_hours") int remainingHours) { + this.lastPollTime = lastPollTime; + this.remainingHours = remainingHours; + } +} diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceWorkerProgressState.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceWorkerProgressState.java new file mode 100644 index 0000000000..312eeabf3a --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceWorkerProgressState.java @@ -0,0 +1,20 @@ +package org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; +import org.opensearch.dataprepper.plugins.source.source_crawler.base.SaasWorkerProgressState; + +import java.time.Instant; + +@Data +public class DimensionalTimeSliceWorkerProgressState implements SaasWorkerProgressState { + + @JsonProperty("startTime") + private Instant startTime; + + @JsonProperty("endTime") + private Instant endTime; + + @JsonProperty("dimensionType") + private String dimensionType; +} \ No newline at end of file diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawlerTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawlerTest.java new file mode 100644 index 0000000000..c9c8e95800 --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawlerTest.java @@ -0,0 +1,169 @@ +package org.opensearch.dataprepper.plugins.source.source_crawler.base; + +import io.micrometer.core.instrument.Counter; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.LeaderPartition; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.SaasSourcePartition; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.DimensionalTimeSliceLeaderProgressState; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.DimensionalTimeSliceWorkerProgressState; + +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Arrays; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class DimensionalTimeSliceCrawlerTest { + + @Mock + private CrawlerClient client; + + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private Counter partitionsCreatedCounter; + + @Mock + private EnhancedSourceCoordinator coordinator; + + @InjectMocks + private DimensionalTimeSliceCrawler crawler; + + @Captor + private ArgumentCaptor partitionCaptor; + + private static final List LOG_TYPES = Arrays.asList("Exchange", "SharePoint", "Teams"); + + @BeforeEach + void setUp() { + when(pluginMetrics.counter(anyString())).thenReturn(partitionsCreatedCounter); + crawler = new DimensionalTimeSliceCrawler(client, pluginMetrics); + crawler.initialize(LOG_TYPES); + } + + @Test + void testCrawl_withIncrementalSync() { + Instant lastPollTime = Instant.now().minus(Duration.ofHours(1)); + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(lastPollTime, 0); + LeaderPartition leaderPartition = new LeaderPartition(state); + + Instant latest = crawler.crawl(leaderPartition, coordinator); + + assertNotNull(latest); + verify(coordinator, times(LOG_TYPES.size())).createPartition(partitionCaptor.capture()); + verify(coordinator).saveProgressStateForPartition(eq(leaderPartition), any()); + verify(partitionsCreatedCounter, times(LOG_TYPES.size())).increment(); + + List createdPartitions = partitionCaptor.getAllValues(); + assertEquals(LOG_TYPES.size(), createdPartitions.size()); + + for (int i = 0; i < LOG_TYPES.size(); i++) { + DimensionalTimeSliceWorkerProgressState workerState = + (DimensionalTimeSliceWorkerProgressState) createdPartitions.get(i).getProgressState().get(); + assertEquals(lastPollTime, workerState.getStartTime()); + assertEquals(latest, workerState.getEndTime()); + assertEquals(LOG_TYPES.get(i), workerState.getDimensionType()); + } + } + + @Test + void testCrawl_withHistoricalSync() { + Instant now = Instant.now().truncatedTo(ChronoUnit.HOURS); + int lookbackHours = 3; + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(now, lookbackHours); + LeaderPartition leaderPartition = new LeaderPartition(state); + + Instant latest = crawler.crawl(leaderPartition, coordinator); + + assertNotNull(latest); + // Expecting (lookbackHours + 1) * LOG_TYPES.size() partitions + int expectedPartitions = (lookbackHours + 1) * LOG_TYPES.size(); + verify(coordinator, times(expectedPartitions)).createPartition(partitionCaptor.capture()); + verify(coordinator, atLeastOnce()).saveProgressStateForPartition(eq(leaderPartition), any()); + verify(partitionsCreatedCounter, times(expectedPartitions)).increment(); + + List createdPartitions = partitionCaptor.getAllValues(); + assertEquals(expectedPartitions, createdPartitions.size()); + + // Verify first hour's partitions + for (int i = 0; i < LOG_TYPES.size(); i++) { + DimensionalTimeSliceWorkerProgressState workerState = + (DimensionalTimeSliceWorkerProgressState) createdPartitions.get(i).getProgressState().get(); + assertEquals(now.minus(Duration.ofHours(lookbackHours)), workerState.getStartTime()); + assertEquals(now.minus(Duration.ofHours(lookbackHours-1)), workerState.getEndTime()); + assertEquals(LOG_TYPES.get(i), workerState.getDimensionType()); + } + } + + @Test + void testCreateWorkerPartition() { + Instant start = Instant.parse("2024-10-30T00:00:00Z"); + Instant end = start.plus(Duration.ofHours(1)); + String logType = "Exchange"; + + crawler.createWorkerPartition(start, end, logType, coordinator); + + verify(coordinator).createPartition(partitionCaptor.capture()); + verify(partitionsCreatedCounter).increment(); + + SaasSourcePartition partition = partitionCaptor.getValue(); + DimensionalTimeSliceWorkerProgressState state = + (DimensionalTimeSliceWorkerProgressState) partition.getProgressState().get(); + assertEquals(start, state.getStartTime()); + assertEquals(end, state.getEndTime()); + assertEquals(logType, state.getDimensionType()); + } + + @Test + void testExecutePartition() { + DimensionalTimeSliceWorkerProgressState state = new DimensionalTimeSliceWorkerProgressState(); + Buffer> buffer = mock(Buffer.class); + AcknowledgementSet ackSet = mock(AcknowledgementSet.class); + + crawler.executePartition(state, buffer, ackSet); + + verify(client).executePartition(eq(state), eq(buffer), eq(ackSet)); + } + + @Test + void testInitialize_ThrowsExceptionWhenCalledTwice() { + assertThrows(IllegalStateException.class, () -> { + crawler.initialize(LOG_TYPES); // Second call should throw + }); + } + + @Test + void testInitialize_ThrowsExceptionWithNullLogTypes() { + DimensionalTimeSliceCrawler newCrawler = new DimensionalTimeSliceCrawler(client, pluginMetrics); + assertThrows(NullPointerException.class, () -> { + newCrawler.initialize(null); + }); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/WorkerSchedulerTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/WorkerSchedulerTest.java index 430b1606a3..a8ecf6dd84 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/WorkerSchedulerTest.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/WorkerSchedulerTest.java @@ -20,6 +20,7 @@ import org.opensearch.dataprepper.plugins.source.source_crawler.base.SaasWorkerProgressState; import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.PartitionFactory; import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.SaasSourcePartition; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.DimensionalTimeSliceWorkerProgressState; import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.PaginationCrawlerWorkerProgressState; import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.CrowdStrikeWorkerProgressState; import java.util.Optional; @@ -119,6 +120,31 @@ void testDeserializeCrowdStrikeWorkerProgressState() throws Exception { assertNotNull(csState.getEndTime()); } + @Test + void testDeserializeDimensionalTimeSliceWorkerProgressState() throws Exception { + String json = "{\n" + + " \"@class\": \"org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.DimensionalTimeSliceWorkerProgressState\",\n" + + " \"startTime\": 1729391235717,\n" + + " \"endTime\": 1729395235717,\n" + + " \"dimensionType\": \"Exchange\"\n" + + "}"; + + ObjectMapper mapper = new ObjectMapper(); + mapper.registerSubtypes(DimensionalTimeSliceWorkerProgressState.class); + mapper.registerModule(new JavaTimeModule()); + + SaasWorkerProgressState state = mapper.readValue(json, SaasWorkerProgressState.class); + + assertTrue(state instanceof DimensionalTimeSliceWorkerProgressState); + DimensionalTimeSliceWorkerProgressState dimensionalState = + (DimensionalTimeSliceWorkerProgressState) state; + + assertNotNull(dimensionalState.getStartTime()); + assertNotNull(dimensionalState.getEndTime()); + assertEquals("Exchange", dimensionalState.getDimensionType()); + } + + @Test void testEmptyProgressState() throws InterruptedException { WorkerScheduler workerScheduler = new WorkerScheduler(pluginName, buffer, diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceLeaderProgressStateTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceLeaderProgressStateTest.java new file mode 100644 index 0000000000..9c784aeb38 --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceLeaderProgressStateTest.java @@ -0,0 +1,48 @@ +package org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Instant; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class DimensionalTimeSliceLeaderProgressStateTest { + + private ObjectMapper objectMapper; + + @BeforeEach + void setUp() { + objectMapper = new ObjectMapper() + .registerModule(new JavaTimeModule()) + .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + } + + @Test + void testDeserializeDimensionalTimeSliceLeaderProgressState_withTypeInfo() throws JsonProcessingException { + String json = "{\n" + + " \"@class\": \"org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.DimensionalTimeSliceLeaderProgressState\",\n" + + " \"last_poll_time\": \"2024-10-20T02:27:15.717Z\",\n" + + " \"remaining_hours\": 24\n" + + "}"; + + DimensionalTimeSliceLeaderProgressState state = objectMapper.readValue(json, DimensionalTimeSliceLeaderProgressState.class); + assertEquals(Instant.parse("2024-10-20T02:27:15.717Z"), state.getLastPollTime()); + assertEquals(24, state.getRemainingHours()); + } + + @Test + void testConstructor_setsValuesCorrectly() { + Instant now = Instant.now(); + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(now, 48); + + assertNotNull(state); + assertEquals(now, state.getLastPollTime()); + assertEquals(48, state.getRemainingHours()); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceWorkerProgressStateTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceWorkerProgressStateTest.java new file mode 100644 index 0000000000..642c7fc4dd --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceWorkerProgressStateTest.java @@ -0,0 +1,51 @@ +package org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Instant; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class DimensionalTimeSliceWorkerProgressStateTest { + + private ObjectMapper objectMapper; + + @BeforeEach + void setUp() { + objectMapper = new ObjectMapper() + .registerModule(new JavaTimeModule()) + .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + } + + @Test + void testDefaultValues() throws JsonProcessingException { + DimensionalTimeSliceWorkerProgressState originalState = new DimensionalTimeSliceWorkerProgressState(); + String serializedState = objectMapper.writeValueAsString(originalState); + DimensionalTimeSliceWorkerProgressState state = objectMapper.readValue(serializedState, DimensionalTimeSliceWorkerProgressState.class); + + assertNull(state.getStartTime()); + assertNull(state.getEndTime()); + assertNull(state.getDimensionType()); + } + + @Test + void testDeserializeDimensionalTimeSliceWorkerProgressState_withTypeInfo() throws JsonProcessingException { + String json = "{\n" + + " \"@class\": \"org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.DimensionalTimeSliceWorkerProgressState\",\n" + + " \"startTime\": \"2024-10-20T02:27:15.717Z\",\n" + + " \"endTime\": \"2024-10-20T03:27:15.717Z\",\n" + + " \"dimensionType\": \"Exchange\"\n" + + "}"; + + DimensionalTimeSliceWorkerProgressState state = objectMapper.readValue(json, DimensionalTimeSliceWorkerProgressState.class); + assertEquals(Instant.parse("2024-10-20T02:27:15.717Z"), state.getStartTime()); + assertEquals(Instant.parse("2024-10-20T03:27:15.717Z"), state.getEndTime()); + assertEquals("Exchange", state.getDimensionType()); + } +}