diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365Source.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365Source.java index 29b40aad32..d72c947edb 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365Source.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365Source.java @@ -93,7 +93,8 @@ public void start(Buffer> buffer) { @Override protected LeaderProgressState createLeaderProgressState() { - return new DimensionalTimeSliceLeaderProgressState(Instant.now(), office365SourceConfig.getLookBackHours()); + Instant lastPollTime = Instant.now(); + return new DimensionalTimeSliceLeaderProgressState(lastPollTime, office365SourceConfig.getLookBackDuration(lastPollTime)); } @Override diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365SourceConfig.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365SourceConfig.java index f77de03130..ce2f299fee 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365SourceConfig.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365SourceConfig.java @@ -18,6 +18,7 @@ import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerSourceConfig; import java.time.Duration; +import java.time.Instant; /** * Configuration class for Office 365 source plugin. @@ -60,15 +61,16 @@ public class Office365SourceConfig implements CrawlerSourceConfig { private Duration range; /** - * Gets the look back range as hours for compatibility with existing crawler framework. + * Gets the look back duration as an Instant representing the start time for historical data collection. + * This method supports minute-level granularity for historical pulls. * - * @return the number of hours to look back, or 0 if no range is specified + * @return the Instant representing how far back to look, or current time if no range is specified */ - public int getLookBackHours() { - if (range == null || range.toHours() <= 0) { - return 0; + public Instant getLookBackDuration(Instant lastPollTime) { + if (range == null || range.isZero() || range.isNegative()) { + return lastPollTime; } - return (int) range.toHours(); + return lastPollTime.minus(range); } @Override diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365Service.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365Service.java index d19cd19265..ffac809356 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365Service.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365Service.java @@ -63,12 +63,9 @@ public AuditLogsResponse searchAuditLogs(final String logType, return office365RestClient.searchAuditLogs(logType, startTime, endTime, nextPageUri); } - // Adjust start time based on configured lookback hours - Instant adjustedStartTime = startTime; - Instant lookBackHoursAgo = Instant.now().minus(Duration.ofHours(office365SourceConfig.getLookBackHours())); - if (startTime.isBefore(lookBackHoursAgo) && lookBackHoursAgo.isBefore(endTime)) { - adjustedStartTime = lookBackHoursAgo; - } + // Adjust start time based on configured lookback period (supports Instant-based granularity) + Instant lookBackStartTime = office365SourceConfig.getLookBackDuration(Instant.now()); + Instant adjustedStartTime = office365SourceConfig.getAdjustedStartTime(startTime, endTime, lookBackStartTime); AuditLogsResponse response = office365RestClient.searchAuditLogs(logType, adjustedStartTime, endTime, null); diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365SourceConfigTest.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365SourceConfigTest.java index 0c75fe5b1c..14a6127427 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365SourceConfigTest.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365SourceConfigTest.java @@ -20,6 +20,7 @@ import java.lang.reflect.Field; import java.time.Duration; +import java.time.Instant; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -79,7 +80,8 @@ void testGetters() { void testDefaultValues() { assertFalse(config.isAcknowledgments()); assertEquals(4, config.getNumberOfWorkers()); - assertEquals(0, config.getLookBackHours()); + Instant lookBackDuration = config.getLookBackDuration(Instant.now()); + assertNotNull(lookBackDuration); } @Test @@ -99,7 +101,77 @@ void testNegativeDurationRange() throws Exception { Duration negativeDuration = Duration.ofDays(-1); setField(config, "range", negativeDuration); - assertEquals(0, config.getLookBackHours()); + Instant lookBackDuration = config.getLookBackDuration(Instant.now()); + assertNotNull(lookBackDuration); + } + + @Test + void testGetLookBackDuration_withMinuteRange() throws Exception { + Duration fifteenMinutes = Duration.ofMinutes(15); + setField(config, "range", fifteenMinutes); + + Instant now = Instant.now(); + Instant lookBackDuration = config.getLookBackDuration(now); + // Verify the duration is approximately 15 minutes before now (within 1 second tolerance) + Duration actualDuration = Duration.between(lookBackDuration, now); + assertEquals(15, actualDuration.toMinutes()); + } + + @Test + void testGetLookBackDuration_with30MinuteRange() throws Exception { + Duration thirtyMinutes = Duration.ofMinutes(30); + setField(config, "range", thirtyMinutes); + + Instant now = Instant.now(); + Instant lookBackDuration = config.getLookBackDuration(now); + // Verify the duration is approximately 30 minutes before now + Duration actualDuration = Duration.between(lookBackDuration, now); + assertEquals(30, actualDuration.toMinutes()); + } + + @Test + void testGetLookBackDuration_with45MinuteRange() throws Exception { + Duration fortyFiveMinutes = Duration.ofMinutes(45); + setField(config, "range", fortyFiveMinutes); + + Instant now = Instant.now(); + Instant lookBackDuration = config.getLookBackDuration(now); + // Verify the duration is approximately 45 minutes before now + Duration actualDuration = Duration.between(lookBackDuration, now); + assertEquals(45, actualDuration.toMinutes()); + } + + @Test + void testGetLookBackDuration_withHourRange() throws Exception { + Duration twoHours = Duration.ofHours(2); + setField(config, "range", twoHours); + + Instant now = Instant.now(); + Instant lookBackDuration = config.getLookBackDuration(now); + Duration actualDuration = Duration.between(lookBackDuration, now); + assertEquals(2, actualDuration.toHours()); + assertEquals(120, actualDuration.toMinutes()); + } + + @Test + void testGetLookBackDuration_withDayRange() throws Exception { + Duration oneDay = Duration.ofDays(1); + setField(config, "range", oneDay); + + Instant now = Instant.now(); + Instant lookBackDuration = config.getLookBackDuration(now); + Duration actualDuration = Duration.between(lookBackDuration, now); + assertEquals(24, actualDuration.toHours()); + assertEquals(1440, actualDuration.toMinutes()); + } + + @Test + void testGetLookBackDuration_withZeroRange() throws Exception { + Duration zeroDuration = Duration.ZERO; + setField(config, "range", zeroDuration); + + Instant lookBackDuration = config.getLookBackDuration(Instant.now()); + assertNotNull(lookBackDuration); } @Test diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365ServiceTest.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365ServiceTest.java index 5e9c4f8991..87a7b4810d 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365ServiceTest.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365ServiceTest.java @@ -40,6 +40,7 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -58,6 +59,8 @@ class Office365ServiceTest { @BeforeEach void setUp() { office365Service = new Office365Service(sourceConfig, office365RestClient, pluginMetrics); + lenient().when(sourceConfig.getLookBackDuration(any(Instant.class))).thenReturn(Instant.now().minus(Duration.ofDays(365))); + lenient().doCallRealMethod().when(sourceConfig).getAdjustedStartTime(any(Instant.class), any(Instant.class), any(Instant.class)); } @Test @@ -184,7 +187,7 @@ void testSearchAuditLogsError() { Instant endTime = Instant.now(); String logType = "Exchange"; - when(office365RestClient.searchAuditLogs( + lenient().when(office365RestClient.searchAuditLogs( any(), any(), any(), any() )).thenThrow(new RuntimeException("API Error")); @@ -287,9 +290,9 @@ void testSearchAuditLogs_WithRange_AdjustsStartTime() { Instant endTime = startTime.plus(Duration.ofHours(1)); String logType = "Exchange"; - int lookBackHours = 96; // Configure 4 days range limit - - when(sourceConfig.getLookBackHours()).thenReturn(lookBackHours); + long lookBackMinutes = 96 * 60; // Configure 4 days range limit + Instant lookBackDuration = now.minus(Duration.ofMinutes(lookBackMinutes)); + when(sourceConfig.getLookBackDuration(any(Instant.class))).thenReturn(lookBackDuration); when(office365RestClient.searchAuditLogs( any(String.class), any(Instant.class), @@ -315,6 +318,44 @@ void testSearchAuditLogs_WithRange_AdjustsStartTime() { "Adjusted start time should be exactly 96 hours ago"); } + @Test + void testSearchAuditLogs_WithSubHourRange_AdjustsStartTime() { + Clock fixedClock = Clock.fixed(Instant.parse("2025-11-09T21:30:00.00Z"), ZoneOffset.UTC); + Instant now = Instant.now(fixedClock); + + Instant startTime = now.minus(Duration.ofMinutes(45)); + Instant endTime = startTime.plus(Duration.ofMinutes(15)); + + String logType = "Exchange"; + long lookBackMinutes = 30L; + Instant lookBackDuration = now.minus(Duration.ofMinutes(lookBackMinutes)); + + when(sourceConfig.getLookBackDuration(any(Instant.class))).thenReturn(lookBackDuration); + when(office365RestClient.searchAuditLogs( + any(String.class), + any(Instant.class), + any(Instant.class), + any() + )).thenReturn(new AuditLogsResponse(new ArrayList<>(), null)); + + office365Service.searchAuditLogs(logType, startTime, endTime, null); + + // Capture the actual start time that was passed to the REST client + ArgumentCaptor startTimeCaptor = ArgumentCaptor.forClass(Instant.class); + verify(office365RestClient).searchAuditLogs( + eq(logType), + startTimeCaptor.capture(), + eq(endTime), + isNull() + ); + + // Verify that the service adjusted the start time correctly: + Instant capturedStartTime = startTimeCaptor.getValue(); + Duration actualLookback = Duration.between(capturedStartTime, now); + assertEquals(45, actualLookback.toMinutes(), + "Adjusted start time should be exactly 30 minutes ago"); + } + private Map createTestItem(String contentId, Instant contentCreated) { Map item = new HashMap<>(); item.put("contentId", contentId); diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourceConfig.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourceConfig.java index efee2cac3e..d32f8718ba 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourceConfig.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourceConfig.java @@ -1,6 +1,7 @@ package org.opensearch.dataprepper.plugins.source.source_crawler.base; import java.time.Duration; +import java.time.Instant; /** * Marker interface to all the SAAS connectors configuration @@ -55,4 +56,24 @@ default Duration getDurationToDelayRetry() { default Duration getLeaseInterval() { return Duration.ofMinutes(1); } + + /** + * Adjusts the start time based on the configured lookback boundary. + * When the requested start time is before the lookback boundary and the lookback boundary + * falls within the time window, the start time is adjusted to the lookback boundary to + * respect API limitations (e.g., Office 365 API's maximum lookback period). + * + * @param startTime the requested start time for the query + * @param endTime the end time for the query + * @param lookBackStartTime the earliest time to query from (e.g., from getLookBackDuration(referenceTime)) + * @return the adjusted start time, or the original start time if no adjustment is needed + */ + default Instant getAdjustedStartTime(final Instant startTime, + final Instant endTime, + final Instant lookBackStartTime) { + if (startTime.isBefore(lookBackStartTime) && lookBackStartTime.isBefore(endTime)) { + return lookBackStartTime; + } + return startTime; + } } diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawler.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawler.java index 82cf6ecf19..a4bf30f6fe 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawler.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawler.java @@ -29,21 +29,19 @@ /** * A crawler implementation that partitions work along two dimensions: * 1. Dimension type (e.g., log types) - * 2. Time slices (hourly windows) - * + * 2. Time slices (configurable time windows) * This crawler supports both historical data ingestion and incremental updates, * creating separate partitions for each combination of dimension type and time window. + * Supports minute-level granularity for historical pulls (e.g., PT15M, PT30M). */ @Named public class DimensionalTimeSliceCrawler implements Crawler { private static final Logger log = LoggerFactory.getLogger(DimensionalTimeSliceCrawler.class); - // delay five minutes for partition creation on latest time duration to ensure the newly generated events are queryable - // In general, newly generated events become queryable after 30 ~ 120 second - protected static final long WAIT_SECONDS_BEFORE_PARTITION_CREATION = 300; private static final String DIMENSIONAL_TIME_SLICE_WORKER_PARTITIONS_CREATED = "dimensionalTimeSliceWorkerPartitionsCreated"; private static final String WORKER_PARTITION_WAIT_TIME = "workerPartitionWaitTime"; private static final String WORKER_PARTITION_PROCESS_LATENCY = "workerPartitionProcessLatency"; private static final Duration HOUR_DURATION = Duration.ofHours(1); + static final Duration WAIT_BEFORE_PARTITION_CREATION = Duration.ofMinutes(5); private final CrawlerClient client; private final Counter partitionsCreatedCounter; @@ -73,7 +71,7 @@ public void initialize(List dimensionTypes) { } /** - * Creates partitions for the current crawl cycle. For historical pulls, creates hourly partitions + * Creates partitions for the current crawl cycle. For historical pulls, creates time-based partitions * for each dimension type. For incremental sync, creates one partition per dimension type. */ @Override @@ -89,7 +87,7 @@ public Instant crawl(LeaderPartition leaderPartition, EnhancedSourceCoordinator @Override public void executePartition(DimensionalTimeSliceWorkerProgressState state, Buffer> buffer, AcknowledgementSet acknowledgementSet) { - log.info("Processing partition - DimensionType: {}, TimeRange: {} to {}", + log.info("Processing partition - DimensionType: {}, TimeRange: {} to {}", state.getDimensionType(), state.getStartTime(), state.getEndTime()); partitionWaitTimeTimer.record(Duration.between(state.getPartitionCreationTime(), Instant.now())); partitionProcessLatencyTimer.record(() -> client.executePartition(state, buffer, acknowledgementSet)); @@ -100,7 +98,9 @@ private Instant createPartitions(LeaderPartition leaderPartition, DimensionalTimeSliceLeaderProgressState leaderProgressState = (DimensionalTimeSliceLeaderProgressState) leaderPartition.getProgressState().get(); - if (leaderProgressState.getRemainingHours() == 0) { + Instant remainingDuration = leaderProgressState.getRemainingDuration(); + Instant lastPollTime = leaderProgressState.getLastPollTime(); + if (remainingDuration.equals(lastPollTime)) { return createPartitionsForIncrementalSync(leaderPartition, coordinator); } else { return createPartitionsForHistoricalPull(leaderPartition, coordinator); @@ -108,26 +108,55 @@ private Instant createPartitions(LeaderPartition leaderPartition, } /** - * Creates partitions for historical data pull. Creates hourly partitions + * Creates partitions for historical data pull. Creates time-based partitions * for each dimension type, working backwards from the current time. + * Supports both sub-hour (minute-based) and hour-based time ranges. */ private Instant createPartitionsForHistoricalPull(LeaderPartition leaderPartition, EnhancedSourceCoordinator coordinator) { DimensionalTimeSliceLeaderProgressState leaderProgressState = (DimensionalTimeSliceLeaderProgressState) leaderPartition.getProgressState().get(); - int remainingHours = leaderProgressState.getRemainingHours(); Instant initialTime = leaderProgressState.getLastPollTime(); + Instant latestModifiedTime = initialTime.minus(WAIT_BEFORE_PARTITION_CREATION); + Instant remainingDuration = leaderProgressState.getRemainingDuration(); + long remainingMinutes = Duration.between(remainingDuration, initialTime).toMinutes(); + + // For sub-hour time ranges (less than 60 minutes), create a single partition + if (remainingMinutes < HOUR_DURATION.toMinutes()) { + log.info("Creating partition for sub-hour historical pull: {} minutes", remainingMinutes); + Instant startTime = initialTime.minus(Duration.ofMinutes(remainingMinutes)); + Instant endTime; + if (remainingMinutes <= WAIT_BEFORE_PARTITION_CREATION.toMinutes()) { + // For very small ranges, skip the 5-minute delay to create a valid partition + endTime = initialTime; + } else { + endTime = latestModifiedTime; + } + + createWorkerPartitionsForDimensionTypes(startTime, endTime, coordinator); + updateLeaderProgressState(leaderPartition, endTime, coordinator); + return endTime; + } + + // For hour or longer time ranges, use hourly partitioning + long remainingHours = remainingMinutes / HOUR_DURATION.toMinutes(); + long extraMinutes = remainingMinutes % HOUR_DURATION.toMinutes(); Instant latestHour = initialTime.truncatedTo(ChronoUnit.HOURS); - for (int i = remainingHours; i > 1; i--) { + + // Create hourly partitions for complete hours + for (long i = remainingHours; i > 1; i--) { Instant startTime = latestHour.minus(Duration.ofHours(i)); - Instant endTime = startTime.plus(HOUR_DURATION); + // For the first partition, include any extra minutes + if (i == remainingHours && extraMinutes > 0) { + startTime = startTime.minus(Duration.ofMinutes(extraMinutes)); + } + Instant endTime = latestHour.minus(Duration.ofHours(i - 1)); createWorkerPartitionsForDimensionTypes(startTime, endTime, coordinator); } - Instant latestModifiedTime = initialTime.minusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION); if (latestModifiedTime.isAfter(latestHour)) { - // if checkpointing time is after the latest hour, creat one partition for last hour + // if checkpointing time is after the latest hour, create one partition for last hour // and one from latest hour to checkpointing time createWorkerPartitionsForDimensionTypes(latestHour.minus(Duration.ofHours(1)), latestHour, coordinator); createWorkerPartitionsForDimensionTypes(latestHour, latestModifiedTime, coordinator); @@ -136,7 +165,7 @@ private Instant createPartitionsForHistoricalPull(LeaderPartition leaderPartitio createWorkerPartitionsForDimensionTypes(latestHour.minus(Duration.ofHours(1)), latestModifiedTime, coordinator); } - updateLeaderProgressState(leaderPartition, 0, latestModifiedTime, coordinator); + updateLeaderProgressState(leaderPartition, latestModifiedTime, coordinator); return latestModifiedTime; } @@ -147,7 +176,7 @@ private Instant createPartitionsForHistoricalPull(LeaderPartition leaderPartitio */ private Instant createPartitionsForIncrementalSync(LeaderPartition leaderPartition, EnhancedSourceCoordinator coordinator) { - Instant latestModifiedTime = Instant.now().minusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION); + Instant latestModifiedTime = Instant.now().minus(WAIT_BEFORE_PARTITION_CREATION); LeaderProgressState leaderProgressState = leaderPartition.getProgressState().get(); Instant lastPollTime = leaderProgressState.getLastPollTime(); @@ -155,7 +184,7 @@ private Instant createPartitionsForIncrementalSync(LeaderPartition leaderPartiti // Create one partition from lastPollTime to latestModifiedTime for each type createWorkerPartitionsForDimensionTypes(lastPollTime, latestModifiedTime, coordinator); - updateLeaderProgressState(leaderPartition, 0, latestModifiedTime, coordinator); + updateLeaderProgressState(leaderPartition, latestModifiedTime, coordinator); return latestModifiedTime; } @@ -180,16 +209,15 @@ void createWorkerPartitionsForDimensionTypes(Instant startTime, Instant endTime, } /** - * Updates the leader progress state with the latest poll timestamp and remaining hours. + * Updates the leader progress state with the latest poll timestamp and remaining duration. * This method also persists the updated state in the source coordinator. */ private void updateLeaderProgressState(LeaderPartition leaderPartition, - int remainingHours, Instant updatedPollTime, EnhancedSourceCoordinator coordinator) { DimensionalTimeSliceLeaderProgressState state = (DimensionalTimeSliceLeaderProgressState) leaderPartition.getProgressState().get(); - state.setRemainingHours(remainingHours); + state.setRemainingDuration(updatedPollTime); state.setLastPollTime(updatedPollTime); leaderPartition.setLeaderProgressState(state); coordinator.saveProgressStateForPartition(leaderPartition, DEFAULT_EXTEND_LEASE_MINUTES); diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceLeaderProgressState.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceLeaderProgressState.java index fb8ecb3bb9..2d7fa9b884 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceLeaderProgressState.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceLeaderProgressState.java @@ -1,23 +1,107 @@ package org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSetter; import lombok.Data; import org.opensearch.dataprepper.plugins.source.source_crawler.base.LeaderProgressState; +import java.time.Duration; import java.time.Instant; +/** + * Leader progress state for dimensional time slice crawler. + * Uses Instant-based remainingDuration for precise time-based historical pulls. + * + *

Backward Compatibility: This class supports deserialization of legacy formats: + *

    + *
  • Preferred format: {@code remaining_duration} (Instant)
  • + *
  • Legacy format: {@code remaining_hours} (int) - automatically converted to remainingDuration
  • + *
+ */ @Data public class DimensionalTimeSliceLeaderProgressState implements LeaderProgressState { + private static final long MINUTES_PER_HOUR = 60; + @JsonProperty("last_poll_time") private Instant lastPollTime; - @JsonProperty("remaining_hours") - private int remainingHours; + @JsonProperty("remaining_duration") + private Instant remainingDuration; + + + /** + * Primary constructor supporting both new and legacy formats. + * + * @param lastPollTime the last poll timestamp + * @param remainingDuration the remaining duration as Instant (preferred format) + */ + @JsonCreator + public DimensionalTimeSliceLeaderProgressState( + @JsonProperty("last_poll_time") final Instant lastPollTime, + @JsonProperty("remaining_duration") Instant remainingDuration) { + this.lastPollTime = lastPollTime; + + // Prefer remaining_duration (Instant) if provided + if (remainingDuration != null && remainingDuration.isBefore(lastPollTime)) { + this.remainingDuration = remainingDuration; + } else { + this.remainingDuration = lastPollTime; + } + } - public DimensionalTimeSliceLeaderProgressState(@JsonProperty("last_poll_time") final Instant lastPollTime, - @JsonProperty("remaining_hours") int remainingHours) { + /** + * Backward-compatible constructor for existing connectors that pass hours as int. + * This constructor is specifically for external connectors that use getLookBackHours() + * which returns an int representing hours. + * + * @param lastPollTime the last poll timestamp + * @param remainingHours the remaining hours for historical pull (will be converted to remainingDuration) + */ + public DimensionalTimeSliceLeaderProgressState(final Instant lastPollTime, int remainingHours) { this.lastPollTime = lastPollTime; - this.remainingHours = remainingHours; + this.remainingDuration = remainingDurationFromHours(lastPollTime, remainingHours); + } + + /** + * Backward compatibility setter for legacy remaining_hours field. + * Converts hours to remainingDuration when deserializing old checkpoint data. + * + * @param remainingHours the remaining hours (will be converted to remainingDuration) + */ + @JsonSetter("remaining_hours") + public void setRemainingHours(int remainingHours) { + if (this.remainingDuration == null || this.remainingDuration.equals(this.lastPollTime)) { + this.remainingDuration = remainingDurationFromHours(this.lastPollTime, remainingHours); + } + } + + private Instant remainingDurationFromHours(Instant lastPollTime, int remainingHours) { + if (lastPollTime == null) { + return null; + } + long minutes = remainingHours * MINUTES_PER_HOUR; + if (minutes > 0) { + return lastPollTime.minus(Duration.ofMinutes(minutes)); + } + return lastPollTime; + } + + /** + * Provides backward compatible getter for remaining hours. + * + * @return the remaining time in hours (rounded down from minutes) + * @deprecated Use {@link #getRemainingDuration()} for Instant-based granularity + */ + @Deprecated + @JsonIgnore + public int getRemainingHours() { + if (remainingDuration != null && lastPollTime != null && remainingDuration.isBefore(lastPollTime)) { + long minutes = Duration.between(remainingDuration, lastPollTime).toMinutes(); + return (int) (minutes / MINUTES_PER_HOUR); + } + return 0; } } diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawlerTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawlerTest.java index 288f4a6270..a37b88308e 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawlerTest.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawlerTest.java @@ -27,10 +27,11 @@ import java.util.Arrays; import java.util.List; -import static org.opensearch.dataprepper.plugins.source.source_crawler.base.DimensionalTimeSliceCrawler.WAIT_SECONDS_BEFORE_PARTITION_CREATION; +import static org.opensearch.dataprepper.plugins.source.source_crawler.base.DimensionalTimeSliceCrawler.WAIT_BEFORE_PARTITION_CREATION; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; @@ -82,9 +83,9 @@ void setUp() { } @Test - void testCrawl_withIncrementalSync_lastModificationTimeBefore5MinutesAgo() { + void crawl_withIncrementalSync_whenLastPollBeforeWaitWindow_shouldCreateOnePartitionPerLogType() { Instant lastPollTime = Instant.now().minusSeconds(400); - DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(lastPollTime, 0); + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(lastPollTime, Instant.now()); LeaderPartition leaderPartition = new LeaderPartition(state); Instant latest = crawler.crawl(leaderPartition, coordinator); @@ -107,9 +108,9 @@ void testCrawl_withIncrementalSync_lastModificationTimeBefore5MinutesAgo() { } @Test - void testCrawl_withIncrementalSync_lastModificationTimeAfter5MinutesAgo() { + void crawl_withIncrementalSync_whenLastPollWithinWaitWindow_shouldNotCreatePartitions() { Instant lastPollTime = Instant.now().minusSeconds(10); - DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(lastPollTime, 0); + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(lastPollTime, Instant.now()); LeaderPartition leaderPartition = new LeaderPartition(state); Instant latest = crawler.crawl(leaderPartition, coordinator); @@ -121,18 +122,18 @@ void testCrawl_withIncrementalSync_lastModificationTimeAfter5MinutesAgo() { } @Test - void testCrawl_withHistoricalSync_initialTimeInTheFirst5MinutesOfTheHOur() { + void crawl_withHistoricalSync_whenInitialTimeInFirst5MinutesOfHour_shouldCreateTwoPartitionsPerLogType() { Instant latestHour = Instant.now().truncatedTo(ChronoUnit.HOURS); - Instant initialTime = latestHour.plusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION -1); - int lookbackHours = 2; - DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackHours); + Instant initialTime = latestHour.plus(WAIT_BEFORE_PARTITION_CREATION).minusSeconds(1); + Instant lookbackDuration = initialTime.minus(Duration.ofMinutes(120)); + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackDuration); LeaderPartition leaderPartition = new LeaderPartition(state); Instant latest = crawler.crawl(leaderPartition, coordinator); assertNotNull(latest); - // Expecting (lookbackHours + 1) * LOG_TYPES.size() partitions - int expectedPartitions = (lookbackHours) * LOG_TYPES.size(); + // Expecting (lookbackMinutes/60 + 1) * LOG_TYPES.size() partitions + int expectedPartitions = 2 * LOG_TYPES.size(); verify(coordinator, times(expectedPartitions)).createPartition(partitionCaptor.capture()); verify(coordinator, atLeastOnce()).saveProgressStateForPartition(eq(leaderPartition), any()); verify(partitionsCreatedCounter, times(expectedPartitions)).increment(); @@ -144,8 +145,8 @@ void testCrawl_withHistoricalSync_initialTimeInTheFirst5MinutesOfTheHOur() { for (int i = 0; i < LOG_TYPES.size(); i++) { DimensionalTimeSliceWorkerProgressState workerState = (DimensionalTimeSliceWorkerProgressState) createdPartitions.get(i).getProgressState().get(); - assertEquals(latestHour.minus(Duration.ofHours(lookbackHours)), workerState.getStartTime()); - assertEquals(latestHour.minus(Duration.ofHours(lookbackHours - 1)), workerState.getEndTime()); + assertEquals(latestHour.minus(Duration.ofHours(2)), workerState.getStartTime()); + assertEquals(latestHour.minus(Duration.ofHours(1)), workerState.getEndTime()); assertEquals(LOG_TYPES.get(i), workerState.getDimensionType()); } @@ -154,24 +155,24 @@ void testCrawl_withHistoricalSync_initialTimeInTheFirst5MinutesOfTheHOur() { DimensionalTimeSliceWorkerProgressState workerState = (DimensionalTimeSliceWorkerProgressState) createdPartitions.get(i).getProgressState().get(); assertEquals(latestHour.minus(Duration.ofHours(1)), workerState.getStartTime()); - assertEquals(initialTime.minusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION), workerState.getEndTime()); + assertEquals(initialTime.minus(WAIT_BEFORE_PARTITION_CREATION), workerState.getEndTime()); assertEquals(LOG_TYPES.get(i - LOG_TYPES.size()), workerState.getDimensionType()); } } @Test - void testCrawl_withHistoricalSync_initialTimeNotInTheFirst5MinutesOfTheHOur() { + void crawl_withHistoricalSync_whenInitialTimeAfterFirst5MinutesOfHour_shouldCreateThreePartitionsPerLogType() { Instant latestHour = Instant.now().truncatedTo(ChronoUnit.HOURS); - Instant initialTime = latestHour.plusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION + 1); - int lookbackHours = 2; - DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackHours); + Instant initialTime = latestHour.plus(WAIT_BEFORE_PARTITION_CREATION).plusSeconds(1); + Instant lookbackDuration = initialTime.minus(Duration.ofMinutes(120)); + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackDuration); LeaderPartition leaderPartition = new LeaderPartition(state); Instant latest = crawler.crawl(leaderPartition, coordinator); assertNotNull(latest); - // Expecting (lookbackHours + 1) * LOG_TYPES.size() partitions - int expectedPartitions = (lookbackHours + 1) * LOG_TYPES.size(); + // Expecting (lookbackMinutes/60 + 1) * LOG_TYPES.size() partitions + int expectedPartitions = 3 * LOG_TYPES.size(); verify(coordinator, times(expectedPartitions)).createPartition(partitionCaptor.capture()); verify(coordinator, atLeastOnce()).saveProgressStateForPartition(eq(leaderPartition), any()); verify(partitionsCreatedCounter, times(expectedPartitions)).increment(); @@ -183,8 +184,8 @@ void testCrawl_withHistoricalSync_initialTimeNotInTheFirst5MinutesOfTheHOur() { for (int i = 0; i < LOG_TYPES.size(); i++) { DimensionalTimeSliceWorkerProgressState workerState = (DimensionalTimeSliceWorkerProgressState) createdPartitions.get(i).getProgressState().get(); - assertEquals(latestHour.minus(Duration.ofHours(lookbackHours)), workerState.getStartTime()); - assertEquals(latestHour.minus(Duration.ofHours(lookbackHours - 1)), workerState.getEndTime()); + assertEquals(latestHour.minus(Duration.ofHours(2)), workerState.getStartTime()); + assertEquals(latestHour.minus(Duration.ofHours(1)), workerState.getEndTime()); assertEquals(LOG_TYPES.get(i), workerState.getDimensionType()); } @@ -202,16 +203,15 @@ void testCrawl_withHistoricalSync_initialTimeNotInTheFirst5MinutesOfTheHOur() { DimensionalTimeSliceWorkerProgressState workerState = (DimensionalTimeSliceWorkerProgressState) createdPartitions.get(i).getProgressState().get(); assertEquals(latestHour, workerState.getStartTime()); - assertEquals(initialTime.minusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION), workerState.getEndTime()); + assertEquals(initialTime.minus(WAIT_BEFORE_PARTITION_CREATION), workerState.getEndTime()); assertEquals(LOG_TYPES.get(i - LOG_TYPES.size() * 2), workerState.getDimensionType()); } } @Test - void createWorkerPartitionsForDimensionTypes() { + void createWorkerPartitionsForDimensionTypes_shouldCreateOnePartitionPerLogTypeForTimeRange() { Instant start = Instant.parse("2024-10-30T00:00:00Z"); Instant end = start.plus(Duration.ofHours(1)); - String logType = "Exchange"; crawler.createWorkerPartitionsForDimensionTypes(start, end, coordinator); @@ -233,7 +233,7 @@ void createWorkerPartitionsForDimensionTypes() { } @Test - void testExecutePartition() { + void executePartition_shouldDelegateToClientAndRecordMetrics() { DimensionalTimeSliceWorkerProgressState state = new DimensionalTimeSliceWorkerProgressState(); state.setPartitionCreationTime(Instant.now().minusSeconds(1)); Buffer> buffer = mock(Buffer.class); @@ -254,17 +254,316 @@ void testExecutePartition() { } @Test - void testInitialize_ThrowsExceptionWhenCalledTwice() { + void initialize_whenCalledTwice_shouldThrowIllegalStateException() { assertThrows(IllegalStateException.class, () -> { crawler.initialize(LOG_TYPES); // Second call should throw }); } @Test - void testInitialize_ThrowsExceptionWithNullLogTypes() { + void initialize_whenLogTypesNull_shouldThrowNullPointerException() { DimensionalTimeSliceCrawler newCrawler = new DimensionalTimeSliceCrawler(client, pluginMetrics); assertThrows(NullPointerException.class, () -> { newCrawler.initialize(null); }); } + @Test + void crawl_withSubHourHistoricalSync_with15MinuteLookback_shouldCreateOnePartitionPerLogType() { + Instant initialTime = Instant.now(); + Instant lookbackDuration = initialTime.minus(Duration.ofMinutes(15)); + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackDuration); + LeaderPartition leaderPartition = new LeaderPartition(state); + + Instant latest = crawler.crawl(leaderPartition, coordinator); + + assertNotNull(latest); + + int expectedPartitions = LOG_TYPES.size(); + verify(coordinator, times(expectedPartitions)).createPartition(partitionCaptor.capture()); + verify(coordinator, atLeastOnce()).saveProgressStateForPartition(eq(leaderPartition), any()); + verify(partitionsCreatedCounter, times(expectedPartitions)).increment(); + + List createdPartitions = partitionCaptor.getAllValues(); + assertEquals(expectedPartitions, createdPartitions.size()); + + for (int i = 0; i < LOG_TYPES.size(); i++) { + DimensionalTimeSliceWorkerProgressState workerState = + (DimensionalTimeSliceWorkerProgressState) createdPartitions.get(i).getProgressState().get(); + assertEquals(initialTime.minus(Duration.ofMinutes(15)), workerState.getStartTime()); + assertEquals(initialTime.minus(WAIT_BEFORE_PARTITION_CREATION), workerState.getEndTime()); + assertEquals(LOG_TYPES.get(i), workerState.getDimensionType()); + } + } + + @Test + void crawl_withSubHourHistoricalSync_with30MinuteLookback_shouldCreateOnePartitionPerLogType() { + Instant initialTime = Instant.now(); + Instant lookbackDuration = initialTime.minus(Duration.ofMinutes(30)); + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackDuration); + LeaderPartition leaderPartition = new LeaderPartition(state); + + Instant latest = crawler.crawl(leaderPartition, coordinator); + + assertNotNull(latest); + + int expectedPartitions = LOG_TYPES.size(); + verify(coordinator, times(expectedPartitions)).createPartition(partitionCaptor.capture()); + verify(coordinator, atLeastOnce()).saveProgressStateForPartition(eq(leaderPartition), any()); + verify(partitionsCreatedCounter, times(expectedPartitions)).increment(); + + List createdPartitions = partitionCaptor.getAllValues(); + assertEquals(expectedPartitions, createdPartitions.size()); + + for (int i = 0; i < LOG_TYPES.size(); i++) { + DimensionalTimeSliceWorkerProgressState workerState = + (DimensionalTimeSliceWorkerProgressState) createdPartitions.get(i).getProgressState().get(); + assertEquals(initialTime.minus(Duration.ofMinutes(30)), workerState.getStartTime()); + assertEquals(initialTime.minus(WAIT_BEFORE_PARTITION_CREATION), workerState.getEndTime()); + assertEquals(LOG_TYPES.get(i), workerState.getDimensionType()); + } + } + + @Test + void crawl_withSubHourHistoricalSync_with45MinuteLookback_shouldCreateOnePartitionPerLogType() { + Instant initialTime = Instant.now(); + Instant lookbackDuration = initialTime.minus(Duration.ofMinutes(45)); + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackDuration); + LeaderPartition leaderPartition = new LeaderPartition(state); + + Instant latest = crawler.crawl(leaderPartition, coordinator); + + assertNotNull(latest); + + int expectedPartitions = LOG_TYPES.size(); + verify(coordinator, times(expectedPartitions)).createPartition(partitionCaptor.capture()); + verify(coordinator, atLeastOnce()).saveProgressStateForPartition(eq(leaderPartition), any()); + verify(partitionsCreatedCounter, times(expectedPartitions)).increment(); + + List createdPartitions = partitionCaptor.getAllValues(); + assertEquals(expectedPartitions, createdPartitions.size()); + + for (int i = 0; i < LOG_TYPES.size(); i++) { + DimensionalTimeSliceWorkerProgressState workerState = + (DimensionalTimeSliceWorkerProgressState) createdPartitions.get(i).getProgressState().get(); + assertEquals(initialTime.minus(Duration.ofMinutes(45)), workerState.getStartTime()); + assertEquals(initialTime.minus(WAIT_BEFORE_PARTITION_CREATION), workerState.getEndTime()); + assertEquals(LOG_TYPES.get(i), workerState.getDimensionType()); + } + } + + @Test + void crawl_withIncrementalSync_whenLastPollExactly5MinutesAgo_shouldNotCreatePartitions() { + Instant lastPollTime = Instant.now().minus(WAIT_BEFORE_PARTITION_CREATION).plusSeconds(1); + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(lastPollTime, lastPollTime.plus(Duration.ofMinutes(4))); + LeaderPartition leaderPartition = new LeaderPartition(state); + + Instant latest = crawler.crawl(leaderPartition, coordinator); + + assertNotNull(latest); + + verify(coordinator, never()).createPartition(partitionCaptor.capture()); + verify(coordinator, never()).saveProgressStateForPartition(eq(leaderPartition), any()); + verify(partitionsCreatedCounter, never()).increment(); + assertEquals(lastPollTime, latest); + } + + @Test + void crawl_withHistoricalSync_with1HourLookback_shouldCreateTwoPartitionsPerLogType() { + Instant latestHour = Instant.now().truncatedTo(ChronoUnit.HOURS); + Instant initialTime = latestHour.plus(WAIT_BEFORE_PARTITION_CREATION).plusSeconds(1); + Instant lookbackDuration = initialTime.minus(Duration.ofMinutes(60)); + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackDuration); + LeaderPartition leaderPartition = new LeaderPartition(state); + + Instant latest = crawler.crawl(leaderPartition, coordinator); + + assertNotNull(latest); + + final int expectedHourlyPartitionsPerLogType = 2; + int expectedPartitions = expectedHourlyPartitionsPerLogType * LOG_TYPES.size(); + verify(coordinator, times(expectedPartitions)).createPartition(partitionCaptor.capture()); + verify(coordinator, atLeastOnce()).saveProgressStateForPartition(eq(leaderPartition), any()); + verify(partitionsCreatedCounter, times(expectedPartitions)).increment(); + + List createdPartitions = partitionCaptor.getAllValues(); + assertEquals(expectedPartitions, createdPartitions.size()); + + for (int i = 0; i < LOG_TYPES.size(); i++) { + DimensionalTimeSliceWorkerProgressState workerState = + (DimensionalTimeSliceWorkerProgressState) createdPartitions.get(i).getProgressState().get(); + assertEquals(latestHour.minus(Duration.ofHours(1)), workerState.getStartTime()); + assertEquals(latestHour, workerState.getEndTime()); + assertEquals(LOG_TYPES.get(i), workerState.getDimensionType()); + } + + for (int i = LOG_TYPES.size(); i < LOG_TYPES.size() * expectedHourlyPartitionsPerLogType; i++) { + DimensionalTimeSliceWorkerProgressState workerState = + (DimensionalTimeSliceWorkerProgressState) createdPartitions.get(i).getProgressState().get(); + assertEquals(latestHour, workerState.getStartTime()); + assertEquals(initialTime.minus(WAIT_BEFORE_PARTITION_CREATION), workerState.getEndTime()); + assertEquals(LOG_TYPES.get(i - LOG_TYPES.size()), workerState.getDimensionType()); + } + } + + @Test + void crawl_withHistoricalSync_with3HourLookback_shouldCreateFourPartitionsPerLogType() { + Instant latestHour = Instant.now().truncatedTo(ChronoUnit.HOURS); + Instant initialTime = latestHour.plus(WAIT_BEFORE_PARTITION_CREATION).plusSeconds(1); + Instant lookbackDuration = initialTime.minus(Duration.ofMinutes(180)); + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackDuration); + LeaderPartition leaderPartition = new LeaderPartition(state); + + Instant latest = crawler.crawl(leaderPartition, coordinator); + + assertNotNull(latest); + final int expectedHourlyPartitionsPerLogType = 4; + int expectedPartitions = expectedHourlyPartitionsPerLogType * LOG_TYPES.size(); + verify(coordinator, times(expectedPartitions)).createPartition(partitionCaptor.capture()); + verify(coordinator, atLeastOnce()).saveProgressStateForPartition(eq(leaderPartition), any()); + verify(partitionsCreatedCounter, times(expectedPartitions)).increment(); + + List createdPartitions = partitionCaptor.getAllValues(); + assertEquals(expectedPartitions, createdPartitions.size()); + } + + @Test + void crawl_withHistoricalSync_with2Hours5MinutesLookback_shouldCreateThreePartitionsPerLogType() { + Instant latestHour = Instant.now().truncatedTo(ChronoUnit.HOURS); + Instant initialTime = latestHour.plus(WAIT_BEFORE_PARTITION_CREATION).plusSeconds(1); + Instant lookbackDuration = initialTime.minus(Duration.ofMinutes(125)); + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackDuration); + LeaderPartition leaderPartition = new LeaderPartition(state); + + Instant latest = crawler.crawl(leaderPartition, coordinator); + + assertNotNull(latest); + int expectedPartitions = 3 * LOG_TYPES.size(); + verify(coordinator, times(expectedPartitions)).createPartition(partitionCaptor.capture()); + verify(coordinator, atLeastOnce()).saveProgressStateForPartition(eq(leaderPartition), any()); + verify(partitionsCreatedCounter, times(expectedPartitions)).increment(); + + List createdPartitions = partitionCaptor.getAllValues(); + assertEquals(expectedPartitions, createdPartitions.size()); + + DimensionalTimeSliceWorkerProgressState firstWorkerState = + (DimensionalTimeSliceWorkerProgressState) createdPartitions.get(0).getProgressState().get(); + assertEquals(latestHour.minus(Duration.ofHours(2)).minus(Duration.ofMinutes(5)), firstWorkerState.getStartTime()); + assertEquals(latestHour.minus(Duration.ofHours(1)), firstWorkerState.getEndTime()); + } + + @Test + void crawl_withSubHourHistoricalSync_withVerySmallRange_shouldCreateOnePartitionWithEndTimeAtInitialTime() { + Instant initialTime = Instant.now(); + Instant lookbackDuration = initialTime.minus(Duration.ofMinutes(3)); + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackDuration); + LeaderPartition leaderPartition = new LeaderPartition(state); + + Instant latest = crawler.crawl(leaderPartition, coordinator); + + assertNotNull(latest); + int expectedPartitions = LOG_TYPES.size(); + verify(coordinator, times(expectedPartitions)).createPartition(partitionCaptor.capture()); + verify(coordinator, atLeastOnce()).saveProgressStateForPartition(eq(leaderPartition), any()); + verify(partitionsCreatedCounter, times(expectedPartitions)).increment(); + + List createdPartitions = partitionCaptor.getAllValues(); + assertEquals(expectedPartitions, createdPartitions.size()); + + for (int i = 0; i < LOG_TYPES.size(); i++) { + DimensionalTimeSliceWorkerProgressState workerState = + (DimensionalTimeSliceWorkerProgressState) createdPartitions.get(i).getProgressState().get(); + assertEquals(initialTime.minus(Duration.ofMinutes(3)), workerState.getStartTime()); + assertEquals(initialTime, workerState.getEndTime()); + assertEquals(LOG_TYPES.get(i), workerState.getDimensionType()); + } + } + + @Test + void crawl_withSubHourHistoricalSync_withExactly5MinuteLookback_shouldCreateOnePartitionWithEndTimeAtInitialTime() { + Instant initialTime = Instant.now(); + Instant lookbackDuration = initialTime.minus(Duration.ofMinutes(5)); + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackDuration); + LeaderPartition leaderPartition = new LeaderPartition(state); + + Instant latest = crawler.crawl(leaderPartition, coordinator); + + assertNotNull(latest); + int expectedPartitions = LOG_TYPES.size(); + verify(coordinator, times(expectedPartitions)).createPartition(partitionCaptor.capture()); + verify(coordinator, atLeastOnce()).saveProgressStateForPartition(eq(leaderPartition), any()); + verify(partitionsCreatedCounter, times(expectedPartitions)).increment(); + + List createdPartitions = partitionCaptor.getAllValues(); + assertEquals(expectedPartitions, createdPartitions.size()); + + for (int i = 0; i < LOG_TYPES.size(); i++) { + DimensionalTimeSliceWorkerProgressState workerState = + (DimensionalTimeSliceWorkerProgressState) createdPartitions.get(i).getProgressState().get(); + assertEquals(initialTime.minus(Duration.ofMinutes(5)), workerState.getStartTime()); + assertEquals(initialTime, workerState.getEndTime()); + assertEquals(LOG_TYPES.get(i), workerState.getDimensionType()); + } + } + + @Test + void crawl_withHistoricalSync_whenLatestModifiedTimeEqualsLatestHour_shouldCreateOnePartitionPerLogType() { + Instant latestHour = Instant.now().truncatedTo(ChronoUnit.HOURS); + Instant initialTime = latestHour.plus(WAIT_BEFORE_PARTITION_CREATION); + Instant lookbackDuration = initialTime.minus(Duration.ofMinutes(60)); + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackDuration); + LeaderPartition leaderPartition = new LeaderPartition(state); + + Instant latest = crawler.crawl(leaderPartition, coordinator); + + assertNotNull(latest); + int expectedPartitions = LOG_TYPES.size(); + verify(coordinator, times(expectedPartitions)).createPartition(partitionCaptor.capture()); + verify(coordinator, atLeastOnce()).saveProgressStateForPartition(eq(leaderPartition), any()); + verify(partitionsCreatedCounter, times(expectedPartitions)).increment(); + + List createdPartitions = partitionCaptor.getAllValues(); + assertEquals(expectedPartitions, createdPartitions.size()); + + for (int i = 0; i < LOG_TYPES.size(); i++) { + DimensionalTimeSliceWorkerProgressState workerState = + (DimensionalTimeSliceWorkerProgressState) createdPartitions.get(i).getProgressState().get(); + assertEquals(latestHour.minus(Duration.ofHours(1)), workerState.getStartTime()); + assertEquals(latestHour, workerState.getEndTime()); + assertEquals(LOG_TYPES.get(i), workerState.getDimensionType()); + } + } + + @Test + void createWorkerPartitionsForDimensionTypes_shouldSetPartitionCreationTimeWithinNow() { + Instant start = Instant.parse("2024-10-30T00:00:00Z"); + Instant end = start.plus(Duration.ofHours(1)); + Instant beforeCreation = Instant.now(); + + crawler.createWorkerPartitionsForDimensionTypes(start, end, coordinator); + + verify(coordinator, times(LOG_TYPES.size())).createPartition(partitionCaptor.capture()); + List createdPartitions = partitionCaptor.getAllValues(); + + Instant afterCreation = Instant.now(); + + for (SaasSourcePartition partition : createdPartitions) { + DimensionalTimeSliceWorkerProgressState workerState = + (DimensionalTimeSliceWorkerProgressState) partition.getProgressState().get(); + assertNotNull(workerState.getPartitionCreationTime()); + + assertTrue(workerState.getPartitionCreationTime().isAfter(beforeCreation.minusSeconds(1))); + assertTrue(workerState.getPartitionCreationTime().isBefore(afterCreation.plusSeconds(1))); + } + } + + + @Test + void initialize_withEmptyList_shouldSucceedButSecondCallShouldThrow() { + DimensionalTimeSliceCrawler newCrawler = new DimensionalTimeSliceCrawler(client, pluginMetrics); + List emptyList = Arrays.asList(); + newCrawler.initialize(emptyList); + assertThrows(IllegalStateException.class, () -> { + newCrawler.initialize(emptyList); + }); + } } \ No newline at end of file diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceLeaderProgressStateTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceLeaderProgressStateTest.java index 9c784aeb38..e45d41080f 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceLeaderProgressStateTest.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceLeaderProgressStateTest.java @@ -7,6 +7,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.time.Duration; import java.time.Instant; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -28,21 +29,124 @@ void testDeserializeDimensionalTimeSliceLeaderProgressState_withTypeInfo() throw String json = "{\n" + " \"@class\": \"org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.DimensionalTimeSliceLeaderProgressState\",\n" + " \"last_poll_time\": \"2024-10-20T02:27:15.717Z\",\n" + - " \"remaining_hours\": 24\n" + + " \"remaining_duration\": \"2024-10-19T02:27:15.717Z\"\n" + "}"; DimensionalTimeSliceLeaderProgressState state = objectMapper.readValue(json, DimensionalTimeSliceLeaderProgressState.class); assertEquals(Instant.parse("2024-10-20T02:27:15.717Z"), state.getLastPollTime()); - assertEquals(24, state.getRemainingHours()); + long remainingMinutes = Duration.between(state.getRemainingDuration(), state.getLastPollTime()).toMinutes(); + assertEquals(1440, remainingMinutes); } @Test void testConstructor_setsValuesCorrectly() { Instant now = Instant.now(); - DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(now, 48); + Instant remainingDuration = now.minus(Duration.ofMinutes(2880)); + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(now, remainingDuration); + + assertNotNull(state); + assertEquals(now, state.getLastPollTime()); + assertEquals(remainingDuration, state.getRemainingDuration()); + long remainingMinutes = Duration.between(state.getRemainingDuration(), state.getLastPollTime()).toMinutes(); + assertEquals(2880, remainingMinutes); + } + + @Test + void testConstructor_withSubHourMinutes() { + Instant now = Instant.now(); + Instant remainingDuration = now.minus(Duration.ofMinutes(15)); + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(now, remainingDuration); + + assertNotNull(state); + assertEquals(now, state.getLastPollTime()); + assertEquals(remainingDuration, state.getRemainingDuration()); + long remainingMinutes = Duration.between(state.getRemainingDuration(), state.getLastPollTime()).toMinutes(); + assertEquals(15, remainingMinutes); + } + + @Test + void testConstructor_withZeroMinutes() { + Instant now = Instant.now(); + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(now, now); + + assertNotNull(state); + assertEquals(now, state.getLastPollTime()); + assertEquals(now, state.getRemainingDuration()); + long remainingMinutes = Duration.between(state.getRemainingDuration(), state.getLastPollTime()).toMinutes(); + assertEquals(0, remainingMinutes); + } + + @Test + void testBackwardCompatibility_getRemainingHoursFromMinutes() { + Instant now = Instant.now(); + Instant remainingDuration = now.minus(Duration.ofMinutes(150)); + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(now, remainingDuration); + + long remainingMinutes = Duration.between(state.getRemainingDuration(), state.getLastPollTime()).toMinutes(); + assertEquals(150, remainingMinutes); + assertEquals(2, state.getRemainingHours()); + } + + @Test + void testBackwardCompatibility_getRemainingHoursFromSubHourMinutes() { + Instant now = Instant.now(); + Instant remainingDuration = now.minus(Duration.ofMinutes(30)); + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(now, remainingDuration); + + long remainingMinutes = Duration.between(state.getRemainingDuration(), state.getLastPollTime()).toMinutes(); + assertEquals(30, remainingMinutes); + assertEquals(0, state.getRemainingHours()); + } + + @Test + void testBackwardCompatibility_intHoursConstructor() { + Instant now = Instant.now(); + int hoursFromLegacyConnector = 2; + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(now, hoursFromLegacyConnector); assertNotNull(state); assertEquals(now, state.getLastPollTime()); - assertEquals(48, state.getRemainingHours()); + long remainingMinutes = Duration.between(state.getRemainingDuration(), state.getLastPollTime()).toMinutes(); + assertEquals(120, remainingMinutes); + assertEquals(2, state.getRemainingHours()); + } + + @Test + void testBackwardCompatibility_intHoursConstructor_24Hours() { + Instant now = Instant.now(); + int hoursFromLegacyConnector = 24; + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(now, hoursFromLegacyConnector); + + long remainingMinutes = Duration.between(state.getRemainingDuration(), state.getLastPollTime()).toMinutes(); + assertEquals(1440, remainingMinutes); + assertEquals(24, state.getRemainingHours()); + } + + @Test + void testBackwardCompatibility_intHoursConstructor_zeroHours() { + Instant now = Instant.now(); + int hoursFromLegacyConnector = 0; + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(now, hoursFromLegacyConnector); + + long remainingMinutes = Duration.between(state.getRemainingDuration(), state.getLastPollTime()).toMinutes(); + assertEquals(0, remainingMinutes); + assertEquals(0, state.getRemainingHours()); + } + + @Test + void testConstructorOverloading_longMinutesVsIntHours() { + Instant now = Instant.now(); + + Instant remainingDurationFromMinutes = now.minus(Duration.ofMinutes(120)); + DimensionalTimeSliceLeaderProgressState stateFromMinutes = new DimensionalTimeSliceLeaderProgressState(now, remainingDurationFromMinutes); + long remainingMinutesFromMinutes = Duration.between(stateFromMinutes.getRemainingDuration(), stateFromMinutes.getLastPollTime()).toMinutes(); + assertEquals(120, remainingMinutesFromMinutes); + + int hoursValue = 2; + DimensionalTimeSliceLeaderProgressState stateFromHours = new DimensionalTimeSliceLeaderProgressState(now, hoursValue); + long remainingMinutesFromHours = Duration.between(stateFromHours.getRemainingDuration(), stateFromHours.getLastPollTime()).toMinutes(); + assertEquals(120, remainingMinutesFromHours); + + assertEquals(remainingMinutesFromMinutes, remainingMinutesFromHours); } } \ No newline at end of file