From 0bdd0530c1e572976d2e30de950d113495202327 Mon Sep 17 00:00:00 2001 From: enugraju Date: Fri, 26 Dec 2025 12:47:27 +0530 Subject: [PATCH 1/4] Add minute range support to Dimensional TimeSlice crawler framework Signed-off-by: enugraju --- .../microsoft_office365/Office365Source.java | 2 +- .../Office365SourceConfig.java | 15 + .../service/Office365Service.java | 8 +- .../Office365SourceConfigTest.java | 62 +++- .../service/Office365ServiceTest.java | 41 ++- .../base/DimensionalTimeSliceCrawler.java | 58 +++- ...mensionalTimeSliceLeaderProgressState.java | 99 +++++- .../base/DimensionalTimeSliceCrawlerTest.java | 322 +++++++++++++++++- ...ionalTimeSliceLeaderProgressStateTest.java | 96 +++++- 9 files changed, 660 insertions(+), 43 deletions(-) diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365Source.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365Source.java index 29b40aad32..09f3a14c8f 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365Source.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365Source.java @@ -93,7 +93,7 @@ public void start(Buffer> buffer) { @Override protected LeaderProgressState createLeaderProgressState() { - return new DimensionalTimeSliceLeaderProgressState(Instant.now(), office365SourceConfig.getLookBackHours()); + return new DimensionalTimeSliceLeaderProgressState(Instant.now(), office365SourceConfig.getLookBackMinutes()); } @Override diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365SourceConfig.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365SourceConfig.java index f77de03130..96fc692121 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365SourceConfig.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365SourceConfig.java @@ -59,11 +59,26 @@ public class Office365SourceConfig implements CrawlerSourceConfig { @DurationMax(days = 7, message = "Range cannot exceed 7 days due to Office 365 API limitation") private Duration range; + /** + * Gets the look back range as minutes for the crawler framework. + * This method supports minute-level granularity for historical pulls. + * + * @return the number of minutes to look back, or 0 if no range is specified + */ + public long getLookBackMinutes() { + if (range == null || range.isZero() || range.isNegative()) { + return 0; + } + return range.toMinutes(); + } + /** * Gets the look back range as hours for compatibility with existing crawler framework. * * @return the number of hours to look back, or 0 if no range is specified + * @deprecated Use {@link #getLookBackMinutes()} for minute-level granularity support */ + @Deprecated public int getLookBackHours() { if (range == null || range.toHours() <= 0) { return 0; diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365Service.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365Service.java index d19cd19265..67d6850d77 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365Service.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365Service.java @@ -63,11 +63,11 @@ public AuditLogsResponse searchAuditLogs(final String logType, return office365RestClient.searchAuditLogs(logType, startTime, endTime, nextPageUri); } - // Adjust start time based on configured lookback hours + // Adjust start time based on configured lookback period (supports minute-level granularity) Instant adjustedStartTime = startTime; - Instant lookBackHoursAgo = Instant.now().minus(Duration.ofHours(office365SourceConfig.getLookBackHours())); - if (startTime.isBefore(lookBackHoursAgo) && lookBackHoursAgo.isBefore(endTime)) { - adjustedStartTime = lookBackHoursAgo; + Instant lookBackTimeAgo = Instant.now().minus(Duration.ofMinutes(office365SourceConfig.getLookBackMinutes())); + if (startTime.isBefore(lookBackTimeAgo) && lookBackTimeAgo.isBefore(endTime)) { + adjustedStartTime = lookBackTimeAgo; } AuditLogsResponse response = diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365SourceConfigTest.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365SourceConfigTest.java index 0c75fe5b1c..aaef7cec98 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365SourceConfigTest.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365SourceConfigTest.java @@ -79,7 +79,7 @@ void testGetters() { void testDefaultValues() { assertFalse(config.isAcknowledgments()); assertEquals(4, config.getNumberOfWorkers()); - assertEquals(0, config.getLookBackHours()); + assertEquals(0, config.getLookBackMinutes()); } @Test @@ -99,7 +99,67 @@ void testNegativeDurationRange() throws Exception { Duration negativeDuration = Duration.ofDays(-1); setField(config, "range", negativeDuration); + assertEquals(0, config.getLookBackMinutes()); + } + + @Test + void testGetLookBackMinutes_withMinuteRange() throws Exception { + Duration fifteenMinutes = Duration.ofMinutes(15); + setField(config, "range", fifteenMinutes); + + // getLookBackHours should return 0 for sub-hour range + assertEquals(0, config.getLookBackHours()); + // getLookBackMinutes should return 15 + assertEquals(15, config.getLookBackMinutes()); + } + + @Test + void testGetLookBackMinutes_with30MinuteRange() throws Exception { + Duration thirtyMinutes = Duration.ofMinutes(30); + setField(config, "range", thirtyMinutes); + + // getLookBackHours should return 0 for sub-hour range + assertEquals(0, config.getLookBackHours()); + // getLookBackMinutes should return 30 + assertEquals(30, config.getLookBackMinutes()); + } + + @Test + void testGetLookBackMinutes_with45MinuteRange() throws Exception { + Duration fortyFiveMinutes = Duration.ofMinutes(45); + setField(config, "range", fortyFiveMinutes); + + // getLookBackHours should return 0 for sub-hour range + assertEquals(0, config.getLookBackHours()); + // getLookBackMinutes should return 45 + assertEquals(45, config.getLookBackMinutes()); + } + + @Test + void testGetLookBackMinutes_withHourRange() throws Exception { + Duration twoHours = Duration.ofHours(2); + setField(config, "range", twoHours); + + assertEquals(2, config.getLookBackHours()); + assertEquals(120, config.getLookBackMinutes()); + } + + @Test + void testGetLookBackMinutes_withDayRange() throws Exception { + Duration oneDay = Duration.ofDays(1); + setField(config, "range", oneDay); + + assertEquals(24, config.getLookBackHours()); + assertEquals(1440, config.getLookBackMinutes()); + } + + @Test + void testGetLookBackMinutes_withZeroRange() throws Exception { + Duration zeroDuration = Duration.ZERO; + setField(config, "range", zeroDuration); + assertEquals(0, config.getLookBackHours()); + assertEquals(0, config.getLookBackMinutes()); } @Test diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365ServiceTest.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365ServiceTest.java index 5e9c4f8991..440776f3ce 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365ServiceTest.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365ServiceTest.java @@ -287,9 +287,9 @@ void testSearchAuditLogs_WithRange_AdjustsStartTime() { Instant endTime = startTime.plus(Duration.ofHours(1)); String logType = "Exchange"; - int lookBackHours = 96; // Configure 4 days range limit + long lookBackMinutes = 96 * 60; // Configure 4 days range limit - when(sourceConfig.getLookBackHours()).thenReturn(lookBackHours); + when(sourceConfig.getLookBackMinutes()).thenReturn(lookBackMinutes); when(office365RestClient.searchAuditLogs( any(String.class), any(Instant.class), @@ -315,6 +315,43 @@ void testSearchAuditLogs_WithRange_AdjustsStartTime() { "Adjusted start time should be exactly 96 hours ago"); } + @Test + void testSearchAuditLogs_WithSubHourRange_AdjustsStartTime() { + Clock fixedClock = Clock.fixed(Instant.parse("2025-11-09T21:30:00.00Z"), ZoneOffset.UTC); + Instant now = Instant.now(fixedClock); + + Instant startTime = now.minus(Duration.ofMinutes(45)); + Instant endTime = startTime.plus(Duration.ofMinutes(15)); + + String logType = "Exchange"; + long lookBackMinutes = 30L; + + when(sourceConfig.getLookBackMinutes()).thenReturn(lookBackMinutes); + when(office365RestClient.searchAuditLogs( + any(String.class), + any(Instant.class), + any(Instant.class), + any() + )).thenReturn(new AuditLogsResponse(new ArrayList<>(), null)); + + office365Service.searchAuditLogs(logType, startTime, endTime, null); + + // Capture the actual start time that was passed to the REST client + ArgumentCaptor startTimeCaptor = ArgumentCaptor.forClass(Instant.class); + verify(office365RestClient).searchAuditLogs( + eq(logType), + startTimeCaptor.capture(), + eq(endTime), + isNull() + ); + + // Verify that the service adjusted the start time correctly: + Instant capturedStartTime = startTimeCaptor.getValue(); + Duration actualLookback = Duration.between(capturedStartTime, now); + assertEquals(45, actualLookback.toMinutes(), + "Adjusted start time should be exactly 30 minutes ago"); + } + private Map createTestItem(String contentId, Instant contentCreated) { Map item = new HashMap<>(); item.put("contentId", contentId); diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawler.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawler.java index 82cf6ecf19..3034d7e564 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawler.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawler.java @@ -29,10 +29,10 @@ /** * A crawler implementation that partitions work along two dimensions: * 1. Dimension type (e.g., log types) - * 2. Time slices (hourly windows) - * + * 2. Time slices (configurable time windows) * This crawler supports both historical data ingestion and incremental updates, * creating separate partitions for each combination of dimension type and time window. + * Supports minute-level granularity for historical pulls (e.g., PT15M, PT30M). */ @Named public class DimensionalTimeSliceCrawler implements Crawler { @@ -44,6 +44,8 @@ public class DimensionalTimeSliceCrawler implements Crawler dimensionTypes) { } /** - * Creates partitions for the current crawl cycle. For historical pulls, creates hourly partitions + * Creates partitions for the current crawl cycle. For historical pulls, creates time-based partitions * for each dimension type. For incremental sync, creates one partition per dimension type. */ @Override @@ -89,7 +91,7 @@ public Instant crawl(LeaderPartition leaderPartition, EnhancedSourceCoordinator @Override public void executePartition(DimensionalTimeSliceWorkerProgressState state, Buffer> buffer, AcknowledgementSet acknowledgementSet) { - log.info("Processing partition - DimensionType: {}, TimeRange: {} to {}", + log.info("Processing partition - DimensionType: {}, TimeRange: {} to {}", state.getDimensionType(), state.getStartTime(), state.getEndTime()); partitionWaitTimeTimer.record(Duration.between(state.getPartitionCreationTime(), Instant.now())); partitionProcessLatencyTimer.record(() -> client.executePartition(state, buffer, acknowledgementSet)); @@ -100,7 +102,7 @@ private Instant createPartitions(LeaderPartition leaderPartition, DimensionalTimeSliceLeaderProgressState leaderProgressState = (DimensionalTimeSliceLeaderProgressState) leaderPartition.getProgressState().get(); - if (leaderProgressState.getRemainingHours() == 0) { + if (leaderProgressState.getRemainingMinutes() == 0) { return createPartitionsForIncrementalSync(leaderPartition, coordinator); } else { return createPartitionsForHistoricalPull(leaderPartition, coordinator); @@ -108,26 +110,54 @@ private Instant createPartitions(LeaderPartition leaderPartition, } /** - * Creates partitions for historical data pull. Creates hourly partitions + * Creates partitions for historical data pull. Creates time-based partitions * for each dimension type, working backwards from the current time. + * Supports both sub-hour (minute-based) and hour-based time ranges. */ private Instant createPartitionsForHistoricalPull(LeaderPartition leaderPartition, EnhancedSourceCoordinator coordinator) { DimensionalTimeSliceLeaderProgressState leaderProgressState = (DimensionalTimeSliceLeaderProgressState) leaderPartition.getProgressState().get(); - int remainingHours = leaderProgressState.getRemainingHours(); + long remainingMinutes = leaderProgressState.getRemainingMinutes(); Instant initialTime = leaderProgressState.getLastPollTime(); + Instant latestModifiedTime = initialTime.minusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION); + + // For sub-hour time ranges (less than 60 minutes), create a single partition + if (remainingMinutes < MINUTES_PER_HOUR) { + log.info("Creating partition for sub-hour historical pull: {} minutes", remainingMinutes); + Instant startTime = initialTime.minus(Duration.ofMinutes(remainingMinutes)); + Instant endTime; + if (remainingMinutes <= WAIT_MINUTES_BEFORE_PARTITION_CREATION) { + // For very small ranges, skip the 5-minute delay to create a valid partition + endTime = initialTime; + } else { + endTime = latestModifiedTime; + } + + createWorkerPartitionsForDimensionTypes(startTime, endTime, coordinator); + updateLeaderProgressState(leaderPartition, 0, endTime, coordinator); + return endTime; + } + + // For hour or longer time ranges, use hourly partitioning + long remainingHours = remainingMinutes / MINUTES_PER_HOUR; + long extraMinutes = remainingMinutes % MINUTES_PER_HOUR; Instant latestHour = initialTime.truncatedTo(ChronoUnit.HOURS); - for (int i = remainingHours; i > 1; i--) { + + // Create hourly partitions for complete hours + for (long i = remainingHours; i > 1; i--) { Instant startTime = latestHour.minus(Duration.ofHours(i)); - Instant endTime = startTime.plus(HOUR_DURATION); + // For the first partition, include any extra minutes + if (i == remainingHours && extraMinutes > 0) { + startTime = startTime.minus(Duration.ofMinutes(extraMinutes)); + } + Instant endTime = latestHour.minus(Duration.ofHours(i - 1)); createWorkerPartitionsForDimensionTypes(startTime, endTime, coordinator); } - Instant latestModifiedTime = initialTime.minusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION); if (latestModifiedTime.isAfter(latestHour)) { - // if checkpointing time is after the latest hour, creat one partition for last hour + // if checkpointing time is after the latest hour, create one partition for last hour // and one from latest hour to checkpointing time createWorkerPartitionsForDimensionTypes(latestHour.minus(Duration.ofHours(1)), latestHour, coordinator); createWorkerPartitionsForDimensionTypes(latestHour, latestModifiedTime, coordinator); @@ -180,16 +210,16 @@ void createWorkerPartitionsForDimensionTypes(Instant startTime, Instant endTime, } /** - * Updates the leader progress state with the latest poll timestamp and remaining hours. + * Updates the leader progress state with the latest poll timestamp and remaining minutes. * This method also persists the updated state in the source coordinator. */ private void updateLeaderProgressState(LeaderPartition leaderPartition, - int remainingHours, + long remainingMinutes, Instant updatedPollTime, EnhancedSourceCoordinator coordinator) { DimensionalTimeSliceLeaderProgressState state = (DimensionalTimeSliceLeaderProgressState) leaderPartition.getProgressState().get(); - state.setRemainingHours(remainingHours); + state.setRemainingMinutes(remainingMinutes); state.setLastPollTime(updatedPollTime); leaderPartition.setLeaderProgressState(state); coordinator.saveProgressStateForPartition(leaderPartition, DEFAULT_EXTEND_LEASE_MINUTES); diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceLeaderProgressState.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceLeaderProgressState.java index fb8ecb3bb9..57480bac33 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceLeaderProgressState.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceLeaderProgressState.java @@ -1,23 +1,112 @@ package org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSetter; import lombok.Data; import org.opensearch.dataprepper.plugins.source.source_crawler.base.LeaderProgressState; import java.time.Instant; +/** + * Leader progress state for dimensional time slice crawler. + * Supports minute-level granularity for historical pulls. + * + *

Backward Compatibility: This class supports deserialization of both: + *

    + *
  • New format: {@code remaining_minutes} (long) - preferred
  • + *
  • Old format: {@code remaining_hours} (int) - automatically converted to minutes
  • + *
+ */ @Data public class DimensionalTimeSliceLeaderProgressState implements LeaderProgressState { + private static final long MINUTES_PER_HOUR = 60; + @JsonProperty("last_poll_time") private Instant lastPollTime; - @JsonProperty("remaining_hours") - private int remainingHours; + @JsonProperty("remaining_minutes") + private long remainingMinutes; + - public DimensionalTimeSliceLeaderProgressState(@JsonProperty("last_poll_time") final Instant lastPollTime, - @JsonProperty("remaining_hours") int remainingHours) { + /** + * Primary constructor supporting both new and legacy formats. + * + * @param lastPollTime the last poll timestamp + * @param remainingMinutes the remaining minutes for historical pull (new format) + * @param remainingHours the remaining hours for historical pull (legacy format, converted to minutes) + */ + @JsonCreator + public DimensionalTimeSliceLeaderProgressState( + @JsonProperty("last_poll_time") final Instant lastPollTime, + @JsonProperty("remaining_minutes") Long remainingMinutes, + @JsonProperty("remaining_hours") Integer remainingHours) { this.lastPollTime = lastPollTime; - this.remainingHours = remainingHours; + + // Prefer remaining_minutes if provided, otherwise convert from remaining_hours + if (remainingMinutes != null) { + this.remainingMinutes = remainingMinutes; + } else if (remainingHours != null) { + // Backward compatibility: convert hours to minutes + this.remainingMinutes = remainingHours * MINUTES_PER_HOUR; + } else { + this.remainingMinutes = 0; + } + } + + /** + * Convenience constructor for creating new state with minutes. + * + * @param lastPollTime the last poll timestamp + * @param remainingMinutes the remaining minutes for historical pull + */ + public DimensionalTimeSliceLeaderProgressState(final Instant lastPollTime, long remainingMinutes) { + this(lastPollTime, remainingMinutes, null); + } + + /** + * Backward-compatible constructor for existing connectors that pass hours as int. + * This constructor is specifically for external connectors that use getLookBackHours() + * which returns an int representing hours. + * + *

Note: Java would auto-promote int to long for the minutes constructor, + * so we need this explicit int constructor to maintain backward compatibility + * with connectors passing hours. + * + * @param lastPollTime the last poll timestamp + * @param remainingHours the remaining hours for historical pull (will be converted to minutes) + * @deprecated Use the long constructor with minutes directly for new implementations + */ + @Deprecated + public DimensionalTimeSliceLeaderProgressState(final Instant lastPollTime, int remainingHours) { + this(lastPollTime, null, remainingHours); + } + + /** + * Backward compatibility setter for legacy remaining_hours field. + * Converts hours to minutes when deserializing old checkpoint data. + * + * @param remainingHours the remaining hours (will be converted to minutes) + */ + @JsonSetter("remaining_hours") + public void setRemainingHours(int remainingHours) { + // Only set if remainingMinutes hasn't been set yet (prefer minutes to hours) + if (this.remainingMinutes == 0) { + this.remainingMinutes = remainingHours * MINUTES_PER_HOUR; + } + } + + /** + * Provides backward compatible getter for remaining hours. + * + * @return the remaining time in hours (rounded down from minutes) + * @deprecated Use {@link #getRemainingMinutes()} for minute-level granularity + */ + @Deprecated + @JsonIgnore + public int getRemainingHours() { + return (int) (remainingMinutes / MINUTES_PER_HOUR); } } diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawlerTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawlerTest.java index 288f4a6270..4146201250 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawlerTest.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawlerTest.java @@ -31,6 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; @@ -124,15 +125,15 @@ void testCrawl_withIncrementalSync_lastModificationTimeAfter5MinutesAgo() { void testCrawl_withHistoricalSync_initialTimeInTheFirst5MinutesOfTheHOur() { Instant latestHour = Instant.now().truncatedTo(ChronoUnit.HOURS); Instant initialTime = latestHour.plusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION -1); - int lookbackHours = 2; - DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackHours); + long lookbackMinutes = 120; + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackMinutes); LeaderPartition leaderPartition = new LeaderPartition(state); Instant latest = crawler.crawl(leaderPartition, coordinator); assertNotNull(latest); - // Expecting (lookbackHours + 1) * LOG_TYPES.size() partitions - int expectedPartitions = (lookbackHours) * LOG_TYPES.size(); + // Expecting (lookbackMinutes/60 + 1) * LOG_TYPES.size() partitions + int expectedPartitions = 2 * LOG_TYPES.size(); verify(coordinator, times(expectedPartitions)).createPartition(partitionCaptor.capture()); verify(coordinator, atLeastOnce()).saveProgressStateForPartition(eq(leaderPartition), any()); verify(partitionsCreatedCounter, times(expectedPartitions)).increment(); @@ -144,8 +145,8 @@ void testCrawl_withHistoricalSync_initialTimeInTheFirst5MinutesOfTheHOur() { for (int i = 0; i < LOG_TYPES.size(); i++) { DimensionalTimeSliceWorkerProgressState workerState = (DimensionalTimeSliceWorkerProgressState) createdPartitions.get(i).getProgressState().get(); - assertEquals(latestHour.minus(Duration.ofHours(lookbackHours)), workerState.getStartTime()); - assertEquals(latestHour.minus(Duration.ofHours(lookbackHours - 1)), workerState.getEndTime()); + assertEquals(latestHour.minus(Duration.ofHours(2)), workerState.getStartTime()); + assertEquals(latestHour.minus(Duration.ofHours(1)), workerState.getEndTime()); assertEquals(LOG_TYPES.get(i), workerState.getDimensionType()); } @@ -163,15 +164,15 @@ void testCrawl_withHistoricalSync_initialTimeInTheFirst5MinutesOfTheHOur() { void testCrawl_withHistoricalSync_initialTimeNotInTheFirst5MinutesOfTheHOur() { Instant latestHour = Instant.now().truncatedTo(ChronoUnit.HOURS); Instant initialTime = latestHour.plusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION + 1); - int lookbackHours = 2; - DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackHours); + long lookbackMinutes = 120; + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackMinutes); LeaderPartition leaderPartition = new LeaderPartition(state); Instant latest = crawler.crawl(leaderPartition, coordinator); assertNotNull(latest); - // Expecting (lookbackHours + 1) * LOG_TYPES.size() partitions - int expectedPartitions = (lookbackHours + 1) * LOG_TYPES.size(); + // Expecting (lookbackMinutes/60 + 1) * LOG_TYPES.size() partitions + int expectedPartitions = 3 * LOG_TYPES.size(); verify(coordinator, times(expectedPartitions)).createPartition(partitionCaptor.capture()); verify(coordinator, atLeastOnce()).saveProgressStateForPartition(eq(leaderPartition), any()); verify(partitionsCreatedCounter, times(expectedPartitions)).increment(); @@ -183,8 +184,8 @@ void testCrawl_withHistoricalSync_initialTimeNotInTheFirst5MinutesOfTheHOur() { for (int i = 0; i < LOG_TYPES.size(); i++) { DimensionalTimeSliceWorkerProgressState workerState = (DimensionalTimeSliceWorkerProgressState) createdPartitions.get(i).getProgressState().get(); - assertEquals(latestHour.minus(Duration.ofHours(lookbackHours)), workerState.getStartTime()); - assertEquals(latestHour.minus(Duration.ofHours(lookbackHours - 1)), workerState.getEndTime()); + assertEquals(latestHour.minus(Duration.ofHours(2)), workerState.getStartTime()); + assertEquals(latestHour.minus(Duration.ofHours(1)), workerState.getEndTime()); assertEquals(LOG_TYPES.get(i), workerState.getDimensionType()); } @@ -267,4 +268,301 @@ void testInitialize_ThrowsExceptionWithNullLogTypes() { newCrawler.initialize(null); }); } + @Test + void testCrawl_withSubHourHistoricalSync_15Minutes() { + Instant initialTime = Instant.now(); + long lookbackMinutes = 15; + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackMinutes); + LeaderPartition leaderPartition = new LeaderPartition(state); + + Instant latest = crawler.crawl(leaderPartition, coordinator); + + assertNotNull(latest); + + int expectedPartitions = LOG_TYPES.size(); + verify(coordinator, times(expectedPartitions)).createPartition(partitionCaptor.capture()); + verify(coordinator, atLeastOnce()).saveProgressStateForPartition(eq(leaderPartition), any()); + verify(partitionsCreatedCounter, times(expectedPartitions)).increment(); + + List createdPartitions = partitionCaptor.getAllValues(); + assertEquals(expectedPartitions, createdPartitions.size()); + + for (int i = 0; i < LOG_TYPES.size(); i++) { + DimensionalTimeSliceWorkerProgressState workerState = + (DimensionalTimeSliceWorkerProgressState) createdPartitions.get(i).getProgressState().get(); + assertEquals(initialTime.minus(Duration.ofMinutes(15)), workerState.getStartTime()); + assertEquals(initialTime.minusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION), workerState.getEndTime()); + assertEquals(LOG_TYPES.get(i), workerState.getDimensionType()); + } + } + + @Test + void testCrawl_withSubHourHistoricalSync_30Minutes() { + Instant initialTime = Instant.now(); + long lookbackMinutes = 30; + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackMinutes); + LeaderPartition leaderPartition = new LeaderPartition(state); + + Instant latest = crawler.crawl(leaderPartition, coordinator); + + assertNotNull(latest); + + int expectedPartitions = LOG_TYPES.size(); + verify(coordinator, times(expectedPartitions)).createPartition(partitionCaptor.capture()); + verify(coordinator, atLeastOnce()).saveProgressStateForPartition(eq(leaderPartition), any()); + verify(partitionsCreatedCounter, times(expectedPartitions)).increment(); + + List createdPartitions = partitionCaptor.getAllValues(); + assertEquals(expectedPartitions, createdPartitions.size()); + + for (int i = 0; i < LOG_TYPES.size(); i++) { + DimensionalTimeSliceWorkerProgressState workerState = + (DimensionalTimeSliceWorkerProgressState) createdPartitions.get(i).getProgressState().get(); + assertEquals(initialTime.minus(Duration.ofMinutes(30)), workerState.getStartTime()); + assertEquals(initialTime.minusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION), workerState.getEndTime()); + assertEquals(LOG_TYPES.get(i), workerState.getDimensionType()); + } + } + + @Test + void testCrawl_withSubHourHistoricalSync_45Minutes() { + Instant initialTime = Instant.now(); + long lookbackMinutes = 45; + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackMinutes); + LeaderPartition leaderPartition = new LeaderPartition(state); + + Instant latest = crawler.crawl(leaderPartition, coordinator); + + assertNotNull(latest); + + int expectedPartitions = LOG_TYPES.size(); + verify(coordinator, times(expectedPartitions)).createPartition(partitionCaptor.capture()); + verify(coordinator, atLeastOnce()).saveProgressStateForPartition(eq(leaderPartition), any()); + verify(partitionsCreatedCounter, times(expectedPartitions)).increment(); + + List createdPartitions = partitionCaptor.getAllValues(); + assertEquals(expectedPartitions, createdPartitions.size()); + + for (int i = 0; i < LOG_TYPES.size(); i++) { + DimensionalTimeSliceWorkerProgressState workerState = + (DimensionalTimeSliceWorkerProgressState) createdPartitions.get(i).getProgressState().get(); + assertEquals(initialTime.minus(Duration.ofMinutes(45)), workerState.getStartTime()); + assertEquals(initialTime.minusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION), workerState.getEndTime()); + assertEquals(LOG_TYPES.get(i), workerState.getDimensionType()); + } + } + + @Test + void testCrawl_withIncrementalSync_exactly5MinutesAgo() { + Instant lastPollTime = Instant.now().minusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION); + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(lastPollTime, 0); + LeaderPartition leaderPartition = new LeaderPartition(state); + + Instant latest = crawler.crawl(leaderPartition, coordinator); + + assertNotNull(latest); + + verify(coordinator, never()).createPartition(partitionCaptor.capture()); + verify(coordinator, never()).saveProgressStateForPartition(eq(leaderPartition), any()); + verify(partitionsCreatedCounter, never()).increment(); + assertEquals(lastPollTime, latest); + } + + @Test + void testCrawl_withHistoricalSync_exactly1Hour() { + Instant latestHour = Instant.now().truncatedTo(ChronoUnit.HOURS); + Instant initialTime = latestHour.plusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION + 1); + long lookbackMinutes = 60; + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackMinutes); + LeaderPartition leaderPartition = new LeaderPartition(state); + + Instant latest = crawler.crawl(leaderPartition, coordinator); + + assertNotNull(latest); + + int expectedPartitions = 2 * LOG_TYPES.size(); + verify(coordinator, times(expectedPartitions)).createPartition(partitionCaptor.capture()); + verify(coordinator, atLeastOnce()).saveProgressStateForPartition(eq(leaderPartition), any()); + verify(partitionsCreatedCounter, times(expectedPartitions)).increment(); + + List createdPartitions = partitionCaptor.getAllValues(); + assertEquals(expectedPartitions, createdPartitions.size()); + + for (int i = 0; i < LOG_TYPES.size(); i++) { + DimensionalTimeSliceWorkerProgressState workerState = + (DimensionalTimeSliceWorkerProgressState) createdPartitions.get(i).getProgressState().get(); + assertEquals(latestHour.minus(Duration.ofHours(1)), workerState.getStartTime()); + assertEquals(latestHour, workerState.getEndTime()); + assertEquals(LOG_TYPES.get(i), workerState.getDimensionType()); + } + + for (int i = LOG_TYPES.size(); i < LOG_TYPES.size() * 2; i++) { + DimensionalTimeSliceWorkerProgressState workerState = + (DimensionalTimeSliceWorkerProgressState) createdPartitions.get(i).getProgressState().get(); + assertEquals(latestHour, workerState.getStartTime()); + assertEquals(initialTime.minusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION), workerState.getEndTime()); + assertEquals(LOG_TYPES.get(i - LOG_TYPES.size()), workerState.getDimensionType()); + } + } + + @Test + void testCrawl_withHistoricalSync_3Hours() { + Instant latestHour = Instant.now().truncatedTo(ChronoUnit.HOURS); + Instant initialTime = latestHour.plusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION + 1); + long lookbackMinutes = 180; + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackMinutes); + LeaderPartition leaderPartition = new LeaderPartition(state); + + Instant latest = crawler.crawl(leaderPartition, coordinator); + + assertNotNull(latest); + int expectedPartitions = 4 * LOG_TYPES.size(); + verify(coordinator, times(expectedPartitions)).createPartition(partitionCaptor.capture()); + verify(coordinator, atLeastOnce()).saveProgressStateForPartition(eq(leaderPartition), any()); + verify(partitionsCreatedCounter, times(expectedPartitions)).increment(); + + List createdPartitions = partitionCaptor.getAllValues(); + assertEquals(expectedPartitions, createdPartitions.size()); + } + + @Test + void testCrawl_withHistoricalSync_withExtraMinutes() { + Instant latestHour = Instant.now().truncatedTo(ChronoUnit.HOURS); + Instant initialTime = latestHour.plusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION + 1); + long lookbackMinutes = 125; + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackMinutes); + LeaderPartition leaderPartition = new LeaderPartition(state); + + Instant latest = crawler.crawl(leaderPartition, coordinator); + + assertNotNull(latest); + int expectedPartitions = 3 * LOG_TYPES.size(); + verify(coordinator, times(expectedPartitions)).createPartition(partitionCaptor.capture()); + verify(coordinator, atLeastOnce()).saveProgressStateForPartition(eq(leaderPartition), any()); + verify(partitionsCreatedCounter, times(expectedPartitions)).increment(); + + List createdPartitions = partitionCaptor.getAllValues(); + assertEquals(expectedPartitions, createdPartitions.size()); + + DimensionalTimeSliceWorkerProgressState firstWorkerState = + (DimensionalTimeSliceWorkerProgressState) createdPartitions.get(0).getProgressState().get(); + assertEquals(latestHour.minus(Duration.ofHours(2)).minus(Duration.ofMinutes(5)), firstWorkerState.getStartTime()); + assertEquals(latestHour.minus(Duration.ofHours(1)), firstWorkerState.getEndTime()); + } + + @Test + void testCrawl_withSubHourHistoricalSync_verySmallRange() { + Instant initialTime = Instant.now(); + long lookbackMinutes = 3; + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackMinutes); + LeaderPartition leaderPartition = new LeaderPartition(state); + + Instant latest = crawler.crawl(leaderPartition, coordinator); + + assertNotNull(latest); + int expectedPartitions = LOG_TYPES.size(); + verify(coordinator, times(expectedPartitions)).createPartition(partitionCaptor.capture()); + verify(coordinator, atLeastOnce()).saveProgressStateForPartition(eq(leaderPartition), any()); + verify(partitionsCreatedCounter, times(expectedPartitions)).increment(); + + List createdPartitions = partitionCaptor.getAllValues(); + assertEquals(expectedPartitions, createdPartitions.size()); + + for (int i = 0; i < LOG_TYPES.size(); i++) { + DimensionalTimeSliceWorkerProgressState workerState = + (DimensionalTimeSliceWorkerProgressState) createdPartitions.get(i).getProgressState().get(); + assertEquals(initialTime.minus(Duration.ofMinutes(3)), workerState.getStartTime()); + assertEquals(initialTime, workerState.getEndTime()); + assertEquals(LOG_TYPES.get(i), workerState.getDimensionType()); + } + } + + @Test + void testCrawl_withSubHourHistoricalSync_exactly5Minutes() { + Instant initialTime = Instant.now(); + long lookbackMinutes = 5; + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackMinutes); + LeaderPartition leaderPartition = new LeaderPartition(state); + + Instant latest = crawler.crawl(leaderPartition, coordinator); + + assertNotNull(latest); + int expectedPartitions = LOG_TYPES.size(); + verify(coordinator, times(expectedPartitions)).createPartition(partitionCaptor.capture()); + verify(coordinator, atLeastOnce()).saveProgressStateForPartition(eq(leaderPartition), any()); + verify(partitionsCreatedCounter, times(expectedPartitions)).increment(); + + List createdPartitions = partitionCaptor.getAllValues(); + assertEquals(expectedPartitions, createdPartitions.size()); + + for (int i = 0; i < LOG_TYPES.size(); i++) { + DimensionalTimeSliceWorkerProgressState workerState = + (DimensionalTimeSliceWorkerProgressState) createdPartitions.get(i).getProgressState().get(); + assertEquals(initialTime.minus(Duration.ofMinutes(5)), workerState.getStartTime()); + assertEquals(initialTime, workerState.getEndTime()); + assertEquals(LOG_TYPES.get(i), workerState.getDimensionType()); + } + } + + @Test + void testCrawl_withHistoricalSync_latestModifiedTimeEqualsLatestHour() { + Instant latestHour = Instant.now().truncatedTo(ChronoUnit.HOURS); + Instant initialTime = latestHour.plusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION); + long lookbackMinutes = 60; + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackMinutes); + LeaderPartition leaderPartition = new LeaderPartition(state); + + Instant latest = crawler.crawl(leaderPartition, coordinator); + + assertNotNull(latest); + int expectedPartitions = LOG_TYPES.size(); + verify(coordinator, times(expectedPartitions)).createPartition(partitionCaptor.capture()); + verify(coordinator, atLeastOnce()).saveProgressStateForPartition(eq(leaderPartition), any()); + verify(partitionsCreatedCounter, times(expectedPartitions)).increment(); + + List createdPartitions = partitionCaptor.getAllValues(); + assertEquals(expectedPartitions, createdPartitions.size()); + + for (int i = 0; i < LOG_TYPES.size(); i++) { + DimensionalTimeSliceWorkerProgressState workerState = + (DimensionalTimeSliceWorkerProgressState) createdPartitions.get(i).getProgressState().get(); + assertEquals(latestHour.minus(Duration.ofHours(1)), workerState.getStartTime()); + assertEquals(latestHour, workerState.getEndTime()); + assertEquals(LOG_TYPES.get(i), workerState.getDimensionType()); + } + } + + @Test + void testCreateWorkerPartitionsForDimensionTypes_setsPartitionCreationTime() { + Instant start = Instant.parse("2024-10-30T00:00:00Z"); + Instant end = start.plus(Duration.ofHours(1)); + Instant beforeCreation = Instant.now(); + + crawler.createWorkerPartitionsForDimensionTypes(start, end, coordinator); + + verify(coordinator, times(LOG_TYPES.size())).createPartition(partitionCaptor.capture()); + List createdPartitions = partitionCaptor.getAllValues(); + + Instant afterCreation = Instant.now(); + + for (SaasSourcePartition partition : createdPartitions) { + DimensionalTimeSliceWorkerProgressState workerState = + (DimensionalTimeSliceWorkerProgressState) partition.getProgressState().get(); + assertNotNull(workerState.getPartitionCreationTime()); + + assertTrue(workerState.getPartitionCreationTime().isAfter(beforeCreation.minusSeconds(1))); + assertTrue(workerState.getPartitionCreationTime().isBefore(afterCreation.plusSeconds(1))); + } + } + + + @Test + void testInitialize_withEmptyList() { + DimensionalTimeSliceCrawler newCrawler = new DimensionalTimeSliceCrawler(client, pluginMetrics); + List emptyList = Arrays.asList(); + newCrawler.initialize(emptyList); + assertThrows(IllegalStateException.class, () -> { + newCrawler.initialize(emptyList); + }); + } } \ No newline at end of file diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceLeaderProgressStateTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceLeaderProgressStateTest.java index 9c784aeb38..a2d30ce896 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceLeaderProgressStateTest.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceLeaderProgressStateTest.java @@ -28,21 +28,109 @@ void testDeserializeDimensionalTimeSliceLeaderProgressState_withTypeInfo() throw String json = "{\n" + " \"@class\": \"org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.DimensionalTimeSliceLeaderProgressState\",\n" + " \"last_poll_time\": \"2024-10-20T02:27:15.717Z\",\n" + - " \"remaining_hours\": 24\n" + + " \"remaining_minutes\": 1440\n" + "}"; DimensionalTimeSliceLeaderProgressState state = objectMapper.readValue(json, DimensionalTimeSliceLeaderProgressState.class); assertEquals(Instant.parse("2024-10-20T02:27:15.717Z"), state.getLastPollTime()); - assertEquals(24, state.getRemainingHours()); + assertEquals(1440, state.getRemainingMinutes()); } @Test void testConstructor_setsValuesCorrectly() { Instant now = Instant.now(); - DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(now, 48); + long remainingMinutes = 2880; + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(now, remainingMinutes); + + assertNotNull(state); + assertEquals(now, state.getLastPollTime()); + assertEquals(2880, state.getRemainingMinutes()); + } + + @Test + void testConstructor_withSubHourMinutes() { + Instant now = Instant.now(); + long remainingMinutes = 15; + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(now, remainingMinutes); + + assertNotNull(state); + assertEquals(now, state.getLastPollTime()); + assertEquals(15, state.getRemainingMinutes()); + } + + @Test + void testConstructor_withZeroMinutes() { + Instant now = Instant.now(); + long remainingMinutes = 0; + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(now, remainingMinutes); + + assertNotNull(state); + assertEquals(now, state.getLastPollTime()); + assertEquals(0, state.getRemainingMinutes()); + } + + @Test + void testBackwardCompatibility_getRemainingHoursFromMinutes() { + Instant now = Instant.now(); + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(now, 150L); + + assertEquals(150, state.getRemainingMinutes()); + assertEquals(2, state.getRemainingHours()); + } + + @Test + void testBackwardCompatibility_getRemainingHoursFromSubHourMinutes() { + Instant now = Instant.now(); + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(now, 30L); + + assertEquals(30, state.getRemainingMinutes()); + assertEquals(0, state.getRemainingHours()); + } + + @Test + void testBackwardCompatibility_intHoursConstructor() { + Instant now = Instant.now(); + int hoursFromLegacyConnector = 2; + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(now, hoursFromLegacyConnector); assertNotNull(state); assertEquals(now, state.getLastPollTime()); - assertEquals(48, state.getRemainingHours()); + assertEquals(120, state.getRemainingMinutes()); + assertEquals(2, state.getRemainingHours()); + } + + @Test + void testBackwardCompatibility_intHoursConstructor_24Hours() { + Instant now = Instant.now(); + int hoursFromLegacyConnector = 24; + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(now, hoursFromLegacyConnector); + + assertEquals(1440, state.getRemainingMinutes()); + assertEquals(24, state.getRemainingHours()); + } + + @Test + void testBackwardCompatibility_intHoursConstructor_zeroHours() { + Instant now = Instant.now(); + int hoursFromLegacyConnector = 0; + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(now, hoursFromLegacyConnector); + + assertEquals(0, state.getRemainingMinutes()); + assertEquals(0, state.getRemainingHours()); + } + + @Test + void testConstructorOverloading_longMinutesVsIntHours() { + Instant now = Instant.now(); + + long minutesValue = 120L; + DimensionalTimeSliceLeaderProgressState stateFromMinutes = new DimensionalTimeSliceLeaderProgressState(now, minutesValue); + assertEquals(120, stateFromMinutes.getRemainingMinutes()); + + int hoursValue = 2; + DimensionalTimeSliceLeaderProgressState stateFromHours = new DimensionalTimeSliceLeaderProgressState(now, hoursValue); + assertEquals(120, stateFromHours.getRemainingMinutes()); + + assertEquals(stateFromMinutes.getRemainingMinutes(), stateFromHours.getRemainingMinutes()); } } \ No newline at end of file From 838067c082b5b47b43c73387db11f3bf9037adac Mon Sep 17 00:00:00 2001 From: enugraju Date: Wed, 21 Jan 2026 11:50:18 +0530 Subject: [PATCH 2/4] Changed return type of getLookBackMinutes to Instant Signed-off-by: enugraju --- .../microsoft_office365/Office365Source.java | 3 +- .../Office365SourceConfig.java | 25 ++---- .../service/Office365Service.java | 8 +- .../Office365SourceConfigTest.java | 64 +++++++++------ .../service/Office365ServiceTest.java | 11 ++- .../base/DimensionalTimeSliceCrawler.java | 18 +++-- ...mensionalTimeSliceLeaderProgressState.java | 79 +++++++++---------- .../base/DimensionalTimeSliceCrawlerTest.java | 53 ++++++------- ...ionalTimeSliceLeaderProgressStateTest.java | 62 +++++++++------ 9 files changed, 169 insertions(+), 154 deletions(-) diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365Source.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365Source.java index 09f3a14c8f..d72c947edb 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365Source.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365Source.java @@ -93,7 +93,8 @@ public void start(Buffer> buffer) { @Override protected LeaderProgressState createLeaderProgressState() { - return new DimensionalTimeSliceLeaderProgressState(Instant.now(), office365SourceConfig.getLookBackMinutes()); + Instant lastPollTime = Instant.now(); + return new DimensionalTimeSliceLeaderProgressState(lastPollTime, office365SourceConfig.getLookBackDuration(lastPollTime)); } @Override diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365SourceConfig.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365SourceConfig.java index 96fc692121..ce2f299fee 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365SourceConfig.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365SourceConfig.java @@ -18,6 +18,7 @@ import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerSourceConfig; import java.time.Duration; +import java.time.Instant; /** * Configuration class for Office 365 source plugin. @@ -60,30 +61,16 @@ public class Office365SourceConfig implements CrawlerSourceConfig { private Duration range; /** - * Gets the look back range as minutes for the crawler framework. + * Gets the look back duration as an Instant representing the start time for historical data collection. * This method supports minute-level granularity for historical pulls. * - * @return the number of minutes to look back, or 0 if no range is specified + * @return the Instant representing how far back to look, or current time if no range is specified */ - public long getLookBackMinutes() { + public Instant getLookBackDuration(Instant lastPollTime) { if (range == null || range.isZero() || range.isNegative()) { - return 0; + return lastPollTime; } - return range.toMinutes(); - } - - /** - * Gets the look back range as hours for compatibility with existing crawler framework. - * - * @return the number of hours to look back, or 0 if no range is specified - * @deprecated Use {@link #getLookBackMinutes()} for minute-level granularity support - */ - @Deprecated - public int getLookBackHours() { - if (range == null || range.toHours() <= 0) { - return 0; - } - return (int) range.toHours(); + return lastPollTime.minus(range); } @Override diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365Service.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365Service.java index 67d6850d77..961ea0e0d0 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365Service.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365Service.java @@ -63,11 +63,11 @@ public AuditLogsResponse searchAuditLogs(final String logType, return office365RestClient.searchAuditLogs(logType, startTime, endTime, nextPageUri); } - // Adjust start time based on configured lookback period (supports minute-level granularity) + // Adjust start time based on configured lookback period (supports Instant-based granularity) Instant adjustedStartTime = startTime; - Instant lookBackTimeAgo = Instant.now().minus(Duration.ofMinutes(office365SourceConfig.getLookBackMinutes())); - if (startTime.isBefore(lookBackTimeAgo) && lookBackTimeAgo.isBefore(endTime)) { - adjustedStartTime = lookBackTimeAgo; + Instant lookBackDuration = office365SourceConfig.getLookBackDuration(Instant.now()); + if (startTime.isBefore(lookBackDuration) && lookBackDuration.isBefore(endTime)) { + adjustedStartTime = lookBackDuration; } AuditLogsResponse response = diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365SourceConfigTest.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365SourceConfigTest.java index aaef7cec98..4766dad366 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365SourceConfigTest.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365SourceConfigTest.java @@ -20,6 +20,7 @@ import java.lang.reflect.Field; import java.time.Duration; +import java.time.Instant; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -79,7 +80,8 @@ void testGetters() { void testDefaultValues() { assertFalse(config.isAcknowledgments()); assertEquals(4, config.getNumberOfWorkers()); - assertEquals(0, config.getLookBackMinutes()); + Instant lookBackDuration = config.getLookBackDuration(Instant.now()); + assertNotNull(lookBackDuration); } @Test @@ -99,67 +101,77 @@ void testNegativeDurationRange() throws Exception { Duration negativeDuration = Duration.ofDays(-1); setField(config, "range", negativeDuration); - assertEquals(0, config.getLookBackMinutes()); + Instant lookBackDuration = config.getLookBackDuration(Instant.now()); + assertNotNull(lookBackDuration); } @Test - void testGetLookBackMinutes_withMinuteRange() throws Exception { + void testGetLookBackDuration_withMinuteRange() throws Exception { Duration fifteenMinutes = Duration.ofMinutes(15); setField(config, "range", fifteenMinutes); - // getLookBackHours should return 0 for sub-hour range - assertEquals(0, config.getLookBackHours()); - // getLookBackMinutes should return 15 - assertEquals(15, config.getLookBackMinutes()); + Instant now = Instant.now(); + Instant lookBackDuration = config.getLookBackDuration(Instant.now()); + // Verify the duration is approximately 15 minutes before now (within 1 second tolerance) + Duration actualDuration = Duration.between(lookBackDuration, now); + assertEquals(15, actualDuration.toMinutes()); } @Test - void testGetLookBackMinutes_with30MinuteRange() throws Exception { + void testGetLookBackDuration_with30MinuteRange() throws Exception { Duration thirtyMinutes = Duration.ofMinutes(30); setField(config, "range", thirtyMinutes); - // getLookBackHours should return 0 for sub-hour range - assertEquals(0, config.getLookBackHours()); - // getLookBackMinutes should return 30 - assertEquals(30, config.getLookBackMinutes()); + Instant now = Instant.now(); + Instant lookBackDuration = config.getLookBackDuration(Instant.now()); + // Verify the duration is approximately 30 minutes before now + Duration actualDuration = Duration.between(lookBackDuration, now); + assertEquals(30, actualDuration.toMinutes()); } @Test - void testGetLookBackMinutes_with45MinuteRange() throws Exception { + void testGetLookBackDuration_with45MinuteRange() throws Exception { Duration fortyFiveMinutes = Duration.ofMinutes(45); setField(config, "range", fortyFiveMinutes); - // getLookBackHours should return 0 for sub-hour range - assertEquals(0, config.getLookBackHours()); - // getLookBackMinutes should return 45 - assertEquals(45, config.getLookBackMinutes()); + Instant now = Instant.now(); + Instant lookBackDuration = config.getLookBackDuration(Instant.now()); + // Verify the duration is approximately 45 minutes before now + Duration actualDuration = Duration.between(lookBackDuration, now); + assertEquals(45, actualDuration.toMinutes()); } @Test - void testGetLookBackMinutes_withHourRange() throws Exception { + void testGetLookBackDuration_withHourRange() throws Exception { Duration twoHours = Duration.ofHours(2); setField(config, "range", twoHours); - assertEquals(2, config.getLookBackHours()); - assertEquals(120, config.getLookBackMinutes()); + Instant now = Instant.now(); + Instant lookBackDuration = config.getLookBackDuration(Instant.now()); + Duration actualDuration = Duration.between(lookBackDuration, now); + assertEquals(2, actualDuration.toHours()); + assertEquals(120, actualDuration.toMinutes()); } @Test - void testGetLookBackMinutes_withDayRange() throws Exception { + void testGetLookBackDuration_withDayRange() throws Exception { Duration oneDay = Duration.ofDays(1); setField(config, "range", oneDay); - assertEquals(24, config.getLookBackHours()); - assertEquals(1440, config.getLookBackMinutes()); + Instant now = Instant.now(); + Instant lookBackDuration = config.getLookBackDuration(Instant.now()); + Duration actualDuration = Duration.between(lookBackDuration, now); + assertEquals(24, actualDuration.toHours()); + assertEquals(1440, actualDuration.toMinutes()); } @Test - void testGetLookBackMinutes_withZeroRange() throws Exception { + void testGetLookBackDuration_withZeroRange() throws Exception { Duration zeroDuration = Duration.ZERO; setField(config, "range", zeroDuration); - assertEquals(0, config.getLookBackHours()); - assertEquals(0, config.getLookBackMinutes()); + Instant lookBackDuration = config.getLookBackDuration(Instant.now()); + assertNotNull(lookBackDuration); } @Test diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365ServiceTest.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365ServiceTest.java index 440776f3ce..998fcd4dee 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365ServiceTest.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365ServiceTest.java @@ -40,6 +40,7 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -58,6 +59,7 @@ class Office365ServiceTest { @BeforeEach void setUp() { office365Service = new Office365Service(sourceConfig, office365RestClient, pluginMetrics); + lenient().when(sourceConfig.getLookBackDuration(any(Instant.class))).thenReturn(Instant.now().minus(Duration.ofDays(365))); } @Test @@ -184,7 +186,7 @@ void testSearchAuditLogsError() { Instant endTime = Instant.now(); String logType = "Exchange"; - when(office365RestClient.searchAuditLogs( + lenient().when(office365RestClient.searchAuditLogs( any(), any(), any(), any() )).thenThrow(new RuntimeException("API Error")); @@ -288,8 +290,8 @@ void testSearchAuditLogs_WithRange_AdjustsStartTime() { String logType = "Exchange"; long lookBackMinutes = 96 * 60; // Configure 4 days range limit - - when(sourceConfig.getLookBackMinutes()).thenReturn(lookBackMinutes); + Instant lookBackDuration = now.minus(Duration.ofMinutes(lookBackMinutes)); + when(sourceConfig.getLookBackDuration(any(Instant.class))).thenReturn(lookBackDuration); when(office365RestClient.searchAuditLogs( any(String.class), any(Instant.class), @@ -325,8 +327,9 @@ void testSearchAuditLogs_WithSubHourRange_AdjustsStartTime() { String logType = "Exchange"; long lookBackMinutes = 30L; + Instant lookBackDuration = now.minus(Duration.ofMinutes(lookBackMinutes)); - when(sourceConfig.getLookBackMinutes()).thenReturn(lookBackMinutes); + when(sourceConfig.getLookBackDuration(any(Instant.class))).thenReturn(lookBackDuration); when(office365RestClient.searchAuditLogs( any(String.class), any(Instant.class), diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawler.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawler.java index 3034d7e564..5e0ac8fbe9 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawler.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawler.java @@ -102,7 +102,9 @@ private Instant createPartitions(LeaderPartition leaderPartition, DimensionalTimeSliceLeaderProgressState leaderProgressState = (DimensionalTimeSliceLeaderProgressState) leaderPartition.getProgressState().get(); - if (leaderProgressState.getRemainingMinutes() == 0) { + Instant remainingDuration = leaderProgressState.getRemainingDuration(); + Instant lastPollTime = leaderProgressState.getLastPollTime(); + if (remainingDuration.equals(lastPollTime)) { return createPartitionsForIncrementalSync(leaderPartition, coordinator); } else { return createPartitionsForHistoricalPull(leaderPartition, coordinator); @@ -118,9 +120,10 @@ private Instant createPartitionsForHistoricalPull(LeaderPartition leaderPartitio EnhancedSourceCoordinator coordinator) { DimensionalTimeSliceLeaderProgressState leaderProgressState = (DimensionalTimeSliceLeaderProgressState) leaderPartition.getProgressState().get(); - long remainingMinutes = leaderProgressState.getRemainingMinutes(); Instant initialTime = leaderProgressState.getLastPollTime(); Instant latestModifiedTime = initialTime.minusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION); + Instant remainingDuration = leaderProgressState.getRemainingDuration(); + long remainingMinutes = Duration.between(remainingDuration, initialTime).toMinutes(); // For sub-hour time ranges (less than 60 minutes), create a single partition if (remainingMinutes < MINUTES_PER_HOUR) { @@ -135,7 +138,7 @@ private Instant createPartitionsForHistoricalPull(LeaderPartition leaderPartitio } createWorkerPartitionsForDimensionTypes(startTime, endTime, coordinator); - updateLeaderProgressState(leaderPartition, 0, endTime, coordinator); + updateLeaderProgressState(leaderPartition, endTime, coordinator); return endTime; } @@ -166,7 +169,7 @@ private Instant createPartitionsForHistoricalPull(LeaderPartition leaderPartitio createWorkerPartitionsForDimensionTypes(latestHour.minus(Duration.ofHours(1)), latestModifiedTime, coordinator); } - updateLeaderProgressState(leaderPartition, 0, latestModifiedTime, coordinator); + updateLeaderProgressState(leaderPartition, latestModifiedTime, coordinator); return latestModifiedTime; } @@ -185,7 +188,7 @@ private Instant createPartitionsForIncrementalSync(LeaderPartition leaderPartiti // Create one partition from lastPollTime to latestModifiedTime for each type createWorkerPartitionsForDimensionTypes(lastPollTime, latestModifiedTime, coordinator); - updateLeaderProgressState(leaderPartition, 0, latestModifiedTime, coordinator); + updateLeaderProgressState(leaderPartition, latestModifiedTime, coordinator); return latestModifiedTime; } @@ -210,16 +213,15 @@ void createWorkerPartitionsForDimensionTypes(Instant startTime, Instant endTime, } /** - * Updates the leader progress state with the latest poll timestamp and remaining minutes. + * Updates the leader progress state with the latest poll timestamp and remaining duration. * This method also persists the updated state in the source coordinator. */ private void updateLeaderProgressState(LeaderPartition leaderPartition, - long remainingMinutes, Instant updatedPollTime, EnhancedSourceCoordinator coordinator) { DimensionalTimeSliceLeaderProgressState state = (DimensionalTimeSliceLeaderProgressState) leaderPartition.getProgressState().get(); - state.setRemainingMinutes(remainingMinutes); + state.setRemainingDuration(updatedPollTime); state.setLastPollTime(updatedPollTime); leaderPartition.setLeaderProgressState(state); coordinator.saveProgressStateForPartition(leaderPartition, DEFAULT_EXTEND_LEASE_MINUTES); diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceLeaderProgressState.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceLeaderProgressState.java index 57480bac33..7c2075cc4e 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceLeaderProgressState.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceLeaderProgressState.java @@ -7,16 +7,17 @@ import lombok.Data; import org.opensearch.dataprepper.plugins.source.source_crawler.base.LeaderProgressState; +import java.time.Duration; import java.time.Instant; /** * Leader progress state for dimensional time slice crawler. - * Supports minute-level granularity for historical pulls. + * Uses Instant-based remainingDuration for precise time-based historical pulls. * - *

Backward Compatibility: This class supports deserialization of both: + *

Backward Compatibility: This class supports deserialization of legacy formats: *

    - *
  • New format: {@code remaining_minutes} (long) - preferred
  • - *
  • Old format: {@code remaining_hours} (int) - automatically converted to minutes
  • + *
  • Preferred format: {@code remaining_duration} (Instant)
  • + *
  • Legacy format: {@code remaining_hours} (int) - automatically converted to remainingDuration
  • *
*/ @Data @@ -27,74 +28,64 @@ public class DimensionalTimeSliceLeaderProgressState implements LeaderProgressSt @JsonProperty("last_poll_time") private Instant lastPollTime; - @JsonProperty("remaining_minutes") - private long remainingMinutes; + @JsonProperty("remaining_duration") + private Instant remainingDuration; /** * Primary constructor supporting both new and legacy formats. * * @param lastPollTime the last poll timestamp - * @param remainingMinutes the remaining minutes for historical pull (new format) - * @param remainingHours the remaining hours for historical pull (legacy format, converted to minutes) + * @param remainingDuration the remaining duration as Instant (preferred format) */ @JsonCreator public DimensionalTimeSliceLeaderProgressState( @JsonProperty("last_poll_time") final Instant lastPollTime, - @JsonProperty("remaining_minutes") Long remainingMinutes, - @JsonProperty("remaining_hours") Integer remainingHours) { + @JsonProperty("remaining_duration") Instant remainingDuration) { this.lastPollTime = lastPollTime; - // Prefer remaining_minutes if provided, otherwise convert from remaining_hours - if (remainingMinutes != null) { - this.remainingMinutes = remainingMinutes; - } else if (remainingHours != null) { - // Backward compatibility: convert hours to minutes - this.remainingMinutes = remainingHours * MINUTES_PER_HOUR; + // Prefer remaining_duration (Instant) if provided + if (remainingDuration != null && remainingDuration.isBefore(lastPollTime)) { + this.remainingDuration = remainingDuration; } else { - this.remainingMinutes = 0; + this.remainingDuration = lastPollTime; } } - /** - * Convenience constructor for creating new state with minutes. - * - * @param lastPollTime the last poll timestamp - * @param remainingMinutes the remaining minutes for historical pull - */ - public DimensionalTimeSliceLeaderProgressState(final Instant lastPollTime, long remainingMinutes) { - this(lastPollTime, remainingMinutes, null); - } - /** * Backward-compatible constructor for existing connectors that pass hours as int. * This constructor is specifically for external connectors that use getLookBackHours() * which returns an int representing hours. * - *

Note: Java would auto-promote int to long for the minutes constructor, - * so we need this explicit int constructor to maintain backward compatibility - * with connectors passing hours. - * * @param lastPollTime the last poll timestamp - * @param remainingHours the remaining hours for historical pull (will be converted to minutes) - * @deprecated Use the long constructor with minutes directly for new implementations + * @param remainingHours the remaining hours for historical pull (will be converted to remainingDuration) */ - @Deprecated public DimensionalTimeSliceLeaderProgressState(final Instant lastPollTime, int remainingHours) { - this(lastPollTime, null, remainingHours); + this.lastPollTime = lastPollTime; + long minutes = remainingHours * MINUTES_PER_HOUR; + if (minutes > 0) { + this.remainingDuration = lastPollTime.minus(Duration.ofMinutes(minutes)); + } else { + this.remainingDuration = lastPollTime; + } } /** * Backward compatibility setter for legacy remaining_hours field. - * Converts hours to minutes when deserializing old checkpoint data. + * Converts hours to remainingDuration when deserializing old checkpoint data. * - * @param remainingHours the remaining hours (will be converted to minutes) + * @param remainingHours the remaining hours (will be converted to remainingDuration) */ @JsonSetter("remaining_hours") public void setRemainingHours(int remainingHours) { - // Only set if remainingMinutes hasn't been set yet (prefer minutes to hours) - if (this.remainingMinutes == 0) { - this.remainingMinutes = remainingHours * MINUTES_PER_HOUR; + if (this.remainingDuration == null || this.remainingDuration.equals(this.lastPollTime)) { + // Only set if remainingDuration hasn't been set yet (prefer remainingDuration to hours) + long minutes = remainingHours * MINUTES_PER_HOUR; + if (minutes > 0 && this.lastPollTime != null) { + this.remainingDuration = this.lastPollTime.minus(Duration.ofMinutes(minutes)); + } else if (this.lastPollTime != null) { + this.remainingDuration = this.lastPollTime; + } } } @@ -102,11 +93,15 @@ public void setRemainingHours(int remainingHours) { * Provides backward compatible getter for remaining hours. * * @return the remaining time in hours (rounded down from minutes) - * @deprecated Use {@link #getRemainingMinutes()} for minute-level granularity + * @deprecated Use {@link #getRemainingDuration()} for Instant-based granularity */ @Deprecated @JsonIgnore public int getRemainingHours() { - return (int) (remainingMinutes / MINUTES_PER_HOUR); + if (remainingDuration != null && lastPollTime != null && remainingDuration.isBefore(lastPollTime)) { + long minutes = Duration.between(remainingDuration, lastPollTime).toMinutes(); + return (int) (minutes / MINUTES_PER_HOUR); + } + return 0; } } diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawlerTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawlerTest.java index 4146201250..38944a02ff 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawlerTest.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawlerTest.java @@ -85,7 +85,7 @@ void setUp() { @Test void testCrawl_withIncrementalSync_lastModificationTimeBefore5MinutesAgo() { Instant lastPollTime = Instant.now().minusSeconds(400); - DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(lastPollTime, 0); + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(lastPollTime, Instant.now()); LeaderPartition leaderPartition = new LeaderPartition(state); Instant latest = crawler.crawl(leaderPartition, coordinator); @@ -110,7 +110,7 @@ void testCrawl_withIncrementalSync_lastModificationTimeBefore5MinutesAgo() { @Test void testCrawl_withIncrementalSync_lastModificationTimeAfter5MinutesAgo() { Instant lastPollTime = Instant.now().minusSeconds(10); - DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(lastPollTime, 0); + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(lastPollTime, Instant.now()); LeaderPartition leaderPartition = new LeaderPartition(state); Instant latest = crawler.crawl(leaderPartition, coordinator); @@ -125,8 +125,8 @@ void testCrawl_withIncrementalSync_lastModificationTimeAfter5MinutesAgo() { void testCrawl_withHistoricalSync_initialTimeInTheFirst5MinutesOfTheHOur() { Instant latestHour = Instant.now().truncatedTo(ChronoUnit.HOURS); Instant initialTime = latestHour.plusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION -1); - long lookbackMinutes = 120; - DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackMinutes); + Instant lookbackDuration = initialTime.minus(Duration.ofMinutes(120)); + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackDuration); LeaderPartition leaderPartition = new LeaderPartition(state); Instant latest = crawler.crawl(leaderPartition, coordinator); @@ -164,8 +164,8 @@ void testCrawl_withHistoricalSync_initialTimeInTheFirst5MinutesOfTheHOur() { void testCrawl_withHistoricalSync_initialTimeNotInTheFirst5MinutesOfTheHOur() { Instant latestHour = Instant.now().truncatedTo(ChronoUnit.HOURS); Instant initialTime = latestHour.plusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION + 1); - long lookbackMinutes = 120; - DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackMinutes); + Instant lookbackDuration = initialTime.minus(Duration.ofMinutes(120)); + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackDuration); LeaderPartition leaderPartition = new LeaderPartition(state); Instant latest = crawler.crawl(leaderPartition, coordinator); @@ -212,7 +212,6 @@ void testCrawl_withHistoricalSync_initialTimeNotInTheFirst5MinutesOfTheHOur() { void createWorkerPartitionsForDimensionTypes() { Instant start = Instant.parse("2024-10-30T00:00:00Z"); Instant end = start.plus(Duration.ofHours(1)); - String logType = "Exchange"; crawler.createWorkerPartitionsForDimensionTypes(start, end, coordinator); @@ -271,8 +270,8 @@ void testInitialize_ThrowsExceptionWithNullLogTypes() { @Test void testCrawl_withSubHourHistoricalSync_15Minutes() { Instant initialTime = Instant.now(); - long lookbackMinutes = 15; - DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackMinutes); + Instant lookbackDuration = initialTime.minus(Duration.ofMinutes(15)); + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackDuration); LeaderPartition leaderPartition = new LeaderPartition(state); Instant latest = crawler.crawl(leaderPartition, coordinator); @@ -299,8 +298,8 @@ void testCrawl_withSubHourHistoricalSync_15Minutes() { @Test void testCrawl_withSubHourHistoricalSync_30Minutes() { Instant initialTime = Instant.now(); - long lookbackMinutes = 30; - DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackMinutes); + Instant lookbackDuration = initialTime.minus(Duration.ofMinutes(30)); + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackDuration); LeaderPartition leaderPartition = new LeaderPartition(state); Instant latest = crawler.crawl(leaderPartition, coordinator); @@ -327,8 +326,8 @@ void testCrawl_withSubHourHistoricalSync_30Minutes() { @Test void testCrawl_withSubHourHistoricalSync_45Minutes() { Instant initialTime = Instant.now(); - long lookbackMinutes = 45; - DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackMinutes); + Instant lookbackDuration = initialTime.minus(Duration.ofMinutes(45)); + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackDuration); LeaderPartition leaderPartition = new LeaderPartition(state); Instant latest = crawler.crawl(leaderPartition, coordinator); @@ -354,8 +353,8 @@ void testCrawl_withSubHourHistoricalSync_45Minutes() { @Test void testCrawl_withIncrementalSync_exactly5MinutesAgo() { - Instant lastPollTime = Instant.now().minusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION); - DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(lastPollTime, 0); + Instant lastPollTime = Instant.now().minusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION - 1); + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(lastPollTime, lastPollTime.plus(Duration.ofMinutes(4))); LeaderPartition leaderPartition = new LeaderPartition(state); Instant latest = crawler.crawl(leaderPartition, coordinator); @@ -372,8 +371,8 @@ void testCrawl_withIncrementalSync_exactly5MinutesAgo() { void testCrawl_withHistoricalSync_exactly1Hour() { Instant latestHour = Instant.now().truncatedTo(ChronoUnit.HOURS); Instant initialTime = latestHour.plusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION + 1); - long lookbackMinutes = 60; - DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackMinutes); + Instant lookbackDuration = initialTime.minus(Duration.ofMinutes(60)); + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackDuration); LeaderPartition leaderPartition = new LeaderPartition(state); Instant latest = crawler.crawl(leaderPartition, coordinator); @@ -409,8 +408,8 @@ void testCrawl_withHistoricalSync_exactly1Hour() { void testCrawl_withHistoricalSync_3Hours() { Instant latestHour = Instant.now().truncatedTo(ChronoUnit.HOURS); Instant initialTime = latestHour.plusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION + 1); - long lookbackMinutes = 180; - DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackMinutes); + Instant lookbackDuration = initialTime.minus(Duration.ofMinutes(180)); + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackDuration); LeaderPartition leaderPartition = new LeaderPartition(state); Instant latest = crawler.crawl(leaderPartition, coordinator); @@ -429,8 +428,8 @@ void testCrawl_withHistoricalSync_3Hours() { void testCrawl_withHistoricalSync_withExtraMinutes() { Instant latestHour = Instant.now().truncatedTo(ChronoUnit.HOURS); Instant initialTime = latestHour.plusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION + 1); - long lookbackMinutes = 125; - DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackMinutes); + Instant lookbackDuration = initialTime.minus(Duration.ofMinutes(125)); + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackDuration); LeaderPartition leaderPartition = new LeaderPartition(state); Instant latest = crawler.crawl(leaderPartition, coordinator); @@ -453,8 +452,8 @@ void testCrawl_withHistoricalSync_withExtraMinutes() { @Test void testCrawl_withSubHourHistoricalSync_verySmallRange() { Instant initialTime = Instant.now(); - long lookbackMinutes = 3; - DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackMinutes); + Instant lookbackDuration = initialTime.minus(Duration.ofMinutes(3)); + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackDuration); LeaderPartition leaderPartition = new LeaderPartition(state); Instant latest = crawler.crawl(leaderPartition, coordinator); @@ -480,8 +479,8 @@ void testCrawl_withSubHourHistoricalSync_verySmallRange() { @Test void testCrawl_withSubHourHistoricalSync_exactly5Minutes() { Instant initialTime = Instant.now(); - long lookbackMinutes = 5; - DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackMinutes); + Instant lookbackDuration = initialTime.minus(Duration.ofMinutes(5)); + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackDuration); LeaderPartition leaderPartition = new LeaderPartition(state); Instant latest = crawler.crawl(leaderPartition, coordinator); @@ -508,8 +507,8 @@ void testCrawl_withSubHourHistoricalSync_exactly5Minutes() { void testCrawl_withHistoricalSync_latestModifiedTimeEqualsLatestHour() { Instant latestHour = Instant.now().truncatedTo(ChronoUnit.HOURS); Instant initialTime = latestHour.plusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION); - long lookbackMinutes = 60; - DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackMinutes); + Instant lookbackDuration = initialTime.minus(Duration.ofMinutes(60)); + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackDuration); LeaderPartition leaderPartition = new LeaderPartition(state); Instant latest = crawler.crawl(leaderPartition, coordinator); diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceLeaderProgressStateTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceLeaderProgressStateTest.java index a2d30ce896..e45d41080f 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceLeaderProgressStateTest.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceLeaderProgressStateTest.java @@ -7,6 +7,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.time.Duration; import java.time.Instant; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -28,62 +29,72 @@ void testDeserializeDimensionalTimeSliceLeaderProgressState_withTypeInfo() throw String json = "{\n" + " \"@class\": \"org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.DimensionalTimeSliceLeaderProgressState\",\n" + " \"last_poll_time\": \"2024-10-20T02:27:15.717Z\",\n" + - " \"remaining_minutes\": 1440\n" + + " \"remaining_duration\": \"2024-10-19T02:27:15.717Z\"\n" + "}"; DimensionalTimeSliceLeaderProgressState state = objectMapper.readValue(json, DimensionalTimeSliceLeaderProgressState.class); assertEquals(Instant.parse("2024-10-20T02:27:15.717Z"), state.getLastPollTime()); - assertEquals(1440, state.getRemainingMinutes()); + long remainingMinutes = Duration.between(state.getRemainingDuration(), state.getLastPollTime()).toMinutes(); + assertEquals(1440, remainingMinutes); } @Test void testConstructor_setsValuesCorrectly() { Instant now = Instant.now(); - long remainingMinutes = 2880; - DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(now, remainingMinutes); + Instant remainingDuration = now.minus(Duration.ofMinutes(2880)); + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(now, remainingDuration); assertNotNull(state); assertEquals(now, state.getLastPollTime()); - assertEquals(2880, state.getRemainingMinutes()); + assertEquals(remainingDuration, state.getRemainingDuration()); + long remainingMinutes = Duration.between(state.getRemainingDuration(), state.getLastPollTime()).toMinutes(); + assertEquals(2880, remainingMinutes); } @Test void testConstructor_withSubHourMinutes() { Instant now = Instant.now(); - long remainingMinutes = 15; - DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(now, remainingMinutes); + Instant remainingDuration = now.minus(Duration.ofMinutes(15)); + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(now, remainingDuration); assertNotNull(state); assertEquals(now, state.getLastPollTime()); - assertEquals(15, state.getRemainingMinutes()); + assertEquals(remainingDuration, state.getRemainingDuration()); + long remainingMinutes = Duration.between(state.getRemainingDuration(), state.getLastPollTime()).toMinutes(); + assertEquals(15, remainingMinutes); } @Test void testConstructor_withZeroMinutes() { Instant now = Instant.now(); - long remainingMinutes = 0; - DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(now, remainingMinutes); + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(now, now); assertNotNull(state); assertEquals(now, state.getLastPollTime()); - assertEquals(0, state.getRemainingMinutes()); + assertEquals(now, state.getRemainingDuration()); + long remainingMinutes = Duration.between(state.getRemainingDuration(), state.getLastPollTime()).toMinutes(); + assertEquals(0, remainingMinutes); } @Test void testBackwardCompatibility_getRemainingHoursFromMinutes() { Instant now = Instant.now(); - DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(now, 150L); + Instant remainingDuration = now.minus(Duration.ofMinutes(150)); + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(now, remainingDuration); - assertEquals(150, state.getRemainingMinutes()); + long remainingMinutes = Duration.between(state.getRemainingDuration(), state.getLastPollTime()).toMinutes(); + assertEquals(150, remainingMinutes); assertEquals(2, state.getRemainingHours()); } @Test void testBackwardCompatibility_getRemainingHoursFromSubHourMinutes() { Instant now = Instant.now(); - DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(now, 30L); + Instant remainingDuration = now.minus(Duration.ofMinutes(30)); + DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(now, remainingDuration); - assertEquals(30, state.getRemainingMinutes()); + long remainingMinutes = Duration.between(state.getRemainingDuration(), state.getLastPollTime()).toMinutes(); + assertEquals(30, remainingMinutes); assertEquals(0, state.getRemainingHours()); } @@ -95,7 +106,8 @@ void testBackwardCompatibility_intHoursConstructor() { assertNotNull(state); assertEquals(now, state.getLastPollTime()); - assertEquals(120, state.getRemainingMinutes()); + long remainingMinutes = Duration.between(state.getRemainingDuration(), state.getLastPollTime()).toMinutes(); + assertEquals(120, remainingMinutes); assertEquals(2, state.getRemainingHours()); } @@ -105,7 +117,8 @@ void testBackwardCompatibility_intHoursConstructor_24Hours() { int hoursFromLegacyConnector = 24; DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(now, hoursFromLegacyConnector); - assertEquals(1440, state.getRemainingMinutes()); + long remainingMinutes = Duration.between(state.getRemainingDuration(), state.getLastPollTime()).toMinutes(); + assertEquals(1440, remainingMinutes); assertEquals(24, state.getRemainingHours()); } @@ -115,7 +128,8 @@ void testBackwardCompatibility_intHoursConstructor_zeroHours() { int hoursFromLegacyConnector = 0; DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(now, hoursFromLegacyConnector); - assertEquals(0, state.getRemainingMinutes()); + long remainingMinutes = Duration.between(state.getRemainingDuration(), state.getLastPollTime()).toMinutes(); + assertEquals(0, remainingMinutes); assertEquals(0, state.getRemainingHours()); } @@ -123,14 +137,16 @@ void testBackwardCompatibility_intHoursConstructor_zeroHours() { void testConstructorOverloading_longMinutesVsIntHours() { Instant now = Instant.now(); - long minutesValue = 120L; - DimensionalTimeSliceLeaderProgressState stateFromMinutes = new DimensionalTimeSliceLeaderProgressState(now, minutesValue); - assertEquals(120, stateFromMinutes.getRemainingMinutes()); + Instant remainingDurationFromMinutes = now.minus(Duration.ofMinutes(120)); + DimensionalTimeSliceLeaderProgressState stateFromMinutes = new DimensionalTimeSliceLeaderProgressState(now, remainingDurationFromMinutes); + long remainingMinutesFromMinutes = Duration.between(stateFromMinutes.getRemainingDuration(), stateFromMinutes.getLastPollTime()).toMinutes(); + assertEquals(120, remainingMinutesFromMinutes); int hoursValue = 2; DimensionalTimeSliceLeaderProgressState stateFromHours = new DimensionalTimeSliceLeaderProgressState(now, hoursValue); - assertEquals(120, stateFromHours.getRemainingMinutes()); + long remainingMinutesFromHours = Duration.between(stateFromHours.getRemainingDuration(), stateFromHours.getLastPollTime()).toMinutes(); + assertEquals(120, remainingMinutesFromHours); - assertEquals(stateFromMinutes.getRemainingMinutes(), stateFromHours.getRemainingMinutes()); + assertEquals(remainingMinutesFromMinutes, remainingMinutesFromHours); } } \ No newline at end of file From 8cf5caabfce5bb222f18432838ac0d0865168b15 Mon Sep 17 00:00:00 2001 From: enugraju Date: Wed, 11 Feb 2026 16:04:43 +0530 Subject: [PATCH 3/4] Review comment fixes Signed-off-by: enugraju --- .../base/DimensionalTimeSliceCrawler.java | 18 ++--- ...mensionalTimeSliceLeaderProgressState.java | 26 +++---- .../base/DimensionalTimeSliceCrawlerTest.java | 76 ++++++++++--------- 3 files changed, 59 insertions(+), 61 deletions(-) diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawler.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawler.java index 5e0ac8fbe9..a4bf30f6fe 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawler.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawler.java @@ -37,15 +37,11 @@ @Named public class DimensionalTimeSliceCrawler implements Crawler { private static final Logger log = LoggerFactory.getLogger(DimensionalTimeSliceCrawler.class); - // delay five minutes for partition creation on latest time duration to ensure the newly generated events are queryable - // In general, newly generated events become queryable after 30 ~ 120 second - protected static final long WAIT_SECONDS_BEFORE_PARTITION_CREATION = 300; private static final String DIMENSIONAL_TIME_SLICE_WORKER_PARTITIONS_CREATED = "dimensionalTimeSliceWorkerPartitionsCreated"; private static final String WORKER_PARTITION_WAIT_TIME = "workerPartitionWaitTime"; private static final String WORKER_PARTITION_PROCESS_LATENCY = "workerPartitionProcessLatency"; private static final Duration HOUR_DURATION = Duration.ofHours(1); - private static final long MINUTES_PER_HOUR = 60; - private static final long WAIT_MINUTES_BEFORE_PARTITION_CREATION = WAIT_SECONDS_BEFORE_PARTITION_CREATION / 60; // 5 minutes + static final Duration WAIT_BEFORE_PARTITION_CREATION = Duration.ofMinutes(5); private final CrawlerClient client; private final Counter partitionsCreatedCounter; @@ -121,16 +117,16 @@ private Instant createPartitionsForHistoricalPull(LeaderPartition leaderPartitio DimensionalTimeSliceLeaderProgressState leaderProgressState = (DimensionalTimeSliceLeaderProgressState) leaderPartition.getProgressState().get(); Instant initialTime = leaderProgressState.getLastPollTime(); - Instant latestModifiedTime = initialTime.minusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION); + Instant latestModifiedTime = initialTime.minus(WAIT_BEFORE_PARTITION_CREATION); Instant remainingDuration = leaderProgressState.getRemainingDuration(); long remainingMinutes = Duration.between(remainingDuration, initialTime).toMinutes(); // For sub-hour time ranges (less than 60 minutes), create a single partition - if (remainingMinutes < MINUTES_PER_HOUR) { + if (remainingMinutes < HOUR_DURATION.toMinutes()) { log.info("Creating partition for sub-hour historical pull: {} minutes", remainingMinutes); Instant startTime = initialTime.minus(Duration.ofMinutes(remainingMinutes)); Instant endTime; - if (remainingMinutes <= WAIT_MINUTES_BEFORE_PARTITION_CREATION) { + if (remainingMinutes <= WAIT_BEFORE_PARTITION_CREATION.toMinutes()) { // For very small ranges, skip the 5-minute delay to create a valid partition endTime = initialTime; } else { @@ -143,8 +139,8 @@ private Instant createPartitionsForHistoricalPull(LeaderPartition leaderPartitio } // For hour or longer time ranges, use hourly partitioning - long remainingHours = remainingMinutes / MINUTES_PER_HOUR; - long extraMinutes = remainingMinutes % MINUTES_PER_HOUR; + long remainingHours = remainingMinutes / HOUR_DURATION.toMinutes(); + long extraMinutes = remainingMinutes % HOUR_DURATION.toMinutes(); Instant latestHour = initialTime.truncatedTo(ChronoUnit.HOURS); // Create hourly partitions for complete hours @@ -180,7 +176,7 @@ private Instant createPartitionsForHistoricalPull(LeaderPartition leaderPartitio */ private Instant createPartitionsForIncrementalSync(LeaderPartition leaderPartition, EnhancedSourceCoordinator coordinator) { - Instant latestModifiedTime = Instant.now().minusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION); + Instant latestModifiedTime = Instant.now().minus(WAIT_BEFORE_PARTITION_CREATION); LeaderProgressState leaderProgressState = leaderPartition.getProgressState().get(); Instant lastPollTime = leaderProgressState.getLastPollTime(); diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceLeaderProgressState.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceLeaderProgressState.java index 7c2075cc4e..2d7fa9b884 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceLeaderProgressState.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/DimensionalTimeSliceLeaderProgressState.java @@ -62,12 +62,7 @@ public DimensionalTimeSliceLeaderProgressState( */ public DimensionalTimeSliceLeaderProgressState(final Instant lastPollTime, int remainingHours) { this.lastPollTime = lastPollTime; - long minutes = remainingHours * MINUTES_PER_HOUR; - if (minutes > 0) { - this.remainingDuration = lastPollTime.minus(Duration.ofMinutes(minutes)); - } else { - this.remainingDuration = lastPollTime; - } + this.remainingDuration = remainingDurationFromHours(lastPollTime, remainingHours); } /** @@ -79,14 +74,19 @@ public DimensionalTimeSliceLeaderProgressState(final Instant lastPollTime, int r @JsonSetter("remaining_hours") public void setRemainingHours(int remainingHours) { if (this.remainingDuration == null || this.remainingDuration.equals(this.lastPollTime)) { - // Only set if remainingDuration hasn't been set yet (prefer remainingDuration to hours) - long minutes = remainingHours * MINUTES_PER_HOUR; - if (minutes > 0 && this.lastPollTime != null) { - this.remainingDuration = this.lastPollTime.minus(Duration.ofMinutes(minutes)); - } else if (this.lastPollTime != null) { - this.remainingDuration = this.lastPollTime; - } + this.remainingDuration = remainingDurationFromHours(this.lastPollTime, remainingHours); + } + } + + private Instant remainingDurationFromHours(Instant lastPollTime, int remainingHours) { + if (lastPollTime == null) { + return null; + } + long minutes = remainingHours * MINUTES_PER_HOUR; + if (minutes > 0) { + return lastPollTime.minus(Duration.ofMinutes(minutes)); } + return lastPollTime; } /** diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawlerTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawlerTest.java index 38944a02ff..a37b88308e 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawlerTest.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawlerTest.java @@ -27,7 +27,7 @@ import java.util.Arrays; import java.util.List; -import static org.opensearch.dataprepper.plugins.source.source_crawler.base.DimensionalTimeSliceCrawler.WAIT_SECONDS_BEFORE_PARTITION_CREATION; +import static org.opensearch.dataprepper.plugins.source.source_crawler.base.DimensionalTimeSliceCrawler.WAIT_BEFORE_PARTITION_CREATION; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -83,7 +83,7 @@ void setUp() { } @Test - void testCrawl_withIncrementalSync_lastModificationTimeBefore5MinutesAgo() { + void crawl_withIncrementalSync_whenLastPollBeforeWaitWindow_shouldCreateOnePartitionPerLogType() { Instant lastPollTime = Instant.now().minusSeconds(400); DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(lastPollTime, Instant.now()); LeaderPartition leaderPartition = new LeaderPartition(state); @@ -108,7 +108,7 @@ void testCrawl_withIncrementalSync_lastModificationTimeBefore5MinutesAgo() { } @Test - void testCrawl_withIncrementalSync_lastModificationTimeAfter5MinutesAgo() { + void crawl_withIncrementalSync_whenLastPollWithinWaitWindow_shouldNotCreatePartitions() { Instant lastPollTime = Instant.now().minusSeconds(10); DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(lastPollTime, Instant.now()); LeaderPartition leaderPartition = new LeaderPartition(state); @@ -122,9 +122,9 @@ void testCrawl_withIncrementalSync_lastModificationTimeAfter5MinutesAgo() { } @Test - void testCrawl_withHistoricalSync_initialTimeInTheFirst5MinutesOfTheHOur() { + void crawl_withHistoricalSync_whenInitialTimeInFirst5MinutesOfHour_shouldCreateTwoPartitionsPerLogType() { Instant latestHour = Instant.now().truncatedTo(ChronoUnit.HOURS); - Instant initialTime = latestHour.plusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION -1); + Instant initialTime = latestHour.plus(WAIT_BEFORE_PARTITION_CREATION).minusSeconds(1); Instant lookbackDuration = initialTime.minus(Duration.ofMinutes(120)); DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackDuration); LeaderPartition leaderPartition = new LeaderPartition(state); @@ -155,15 +155,15 @@ void testCrawl_withHistoricalSync_initialTimeInTheFirst5MinutesOfTheHOur() { DimensionalTimeSliceWorkerProgressState workerState = (DimensionalTimeSliceWorkerProgressState) createdPartitions.get(i).getProgressState().get(); assertEquals(latestHour.minus(Duration.ofHours(1)), workerState.getStartTime()); - assertEquals(initialTime.minusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION), workerState.getEndTime()); + assertEquals(initialTime.minus(WAIT_BEFORE_PARTITION_CREATION), workerState.getEndTime()); assertEquals(LOG_TYPES.get(i - LOG_TYPES.size()), workerState.getDimensionType()); } } @Test - void testCrawl_withHistoricalSync_initialTimeNotInTheFirst5MinutesOfTheHOur() { + void crawl_withHistoricalSync_whenInitialTimeAfterFirst5MinutesOfHour_shouldCreateThreePartitionsPerLogType() { Instant latestHour = Instant.now().truncatedTo(ChronoUnit.HOURS); - Instant initialTime = latestHour.plusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION + 1); + Instant initialTime = latestHour.plus(WAIT_BEFORE_PARTITION_CREATION).plusSeconds(1); Instant lookbackDuration = initialTime.minus(Duration.ofMinutes(120)); DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackDuration); LeaderPartition leaderPartition = new LeaderPartition(state); @@ -203,13 +203,13 @@ void testCrawl_withHistoricalSync_initialTimeNotInTheFirst5MinutesOfTheHOur() { DimensionalTimeSliceWorkerProgressState workerState = (DimensionalTimeSliceWorkerProgressState) createdPartitions.get(i).getProgressState().get(); assertEquals(latestHour, workerState.getStartTime()); - assertEquals(initialTime.minusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION), workerState.getEndTime()); + assertEquals(initialTime.minus(WAIT_BEFORE_PARTITION_CREATION), workerState.getEndTime()); assertEquals(LOG_TYPES.get(i - LOG_TYPES.size() * 2), workerState.getDimensionType()); } } @Test - void createWorkerPartitionsForDimensionTypes() { + void createWorkerPartitionsForDimensionTypes_shouldCreateOnePartitionPerLogTypeForTimeRange() { Instant start = Instant.parse("2024-10-30T00:00:00Z"); Instant end = start.plus(Duration.ofHours(1)); @@ -233,7 +233,7 @@ void createWorkerPartitionsForDimensionTypes() { } @Test - void testExecutePartition() { + void executePartition_shouldDelegateToClientAndRecordMetrics() { DimensionalTimeSliceWorkerProgressState state = new DimensionalTimeSliceWorkerProgressState(); state.setPartitionCreationTime(Instant.now().minusSeconds(1)); Buffer> buffer = mock(Buffer.class); @@ -254,21 +254,21 @@ void testExecutePartition() { } @Test - void testInitialize_ThrowsExceptionWhenCalledTwice() { + void initialize_whenCalledTwice_shouldThrowIllegalStateException() { assertThrows(IllegalStateException.class, () -> { crawler.initialize(LOG_TYPES); // Second call should throw }); } @Test - void testInitialize_ThrowsExceptionWithNullLogTypes() { + void initialize_whenLogTypesNull_shouldThrowNullPointerException() { DimensionalTimeSliceCrawler newCrawler = new DimensionalTimeSliceCrawler(client, pluginMetrics); assertThrows(NullPointerException.class, () -> { newCrawler.initialize(null); }); } @Test - void testCrawl_withSubHourHistoricalSync_15Minutes() { + void crawl_withSubHourHistoricalSync_with15MinuteLookback_shouldCreateOnePartitionPerLogType() { Instant initialTime = Instant.now(); Instant lookbackDuration = initialTime.minus(Duration.ofMinutes(15)); DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackDuration); @@ -290,13 +290,13 @@ void testCrawl_withSubHourHistoricalSync_15Minutes() { DimensionalTimeSliceWorkerProgressState workerState = (DimensionalTimeSliceWorkerProgressState) createdPartitions.get(i).getProgressState().get(); assertEquals(initialTime.minus(Duration.ofMinutes(15)), workerState.getStartTime()); - assertEquals(initialTime.minusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION), workerState.getEndTime()); + assertEquals(initialTime.minus(WAIT_BEFORE_PARTITION_CREATION), workerState.getEndTime()); assertEquals(LOG_TYPES.get(i), workerState.getDimensionType()); } } @Test - void testCrawl_withSubHourHistoricalSync_30Minutes() { + void crawl_withSubHourHistoricalSync_with30MinuteLookback_shouldCreateOnePartitionPerLogType() { Instant initialTime = Instant.now(); Instant lookbackDuration = initialTime.minus(Duration.ofMinutes(30)); DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackDuration); @@ -318,13 +318,13 @@ void testCrawl_withSubHourHistoricalSync_30Minutes() { DimensionalTimeSliceWorkerProgressState workerState = (DimensionalTimeSliceWorkerProgressState) createdPartitions.get(i).getProgressState().get(); assertEquals(initialTime.minus(Duration.ofMinutes(30)), workerState.getStartTime()); - assertEquals(initialTime.minusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION), workerState.getEndTime()); + assertEquals(initialTime.minus(WAIT_BEFORE_PARTITION_CREATION), workerState.getEndTime()); assertEquals(LOG_TYPES.get(i), workerState.getDimensionType()); } } @Test - void testCrawl_withSubHourHistoricalSync_45Minutes() { + void crawl_withSubHourHistoricalSync_with45MinuteLookback_shouldCreateOnePartitionPerLogType() { Instant initialTime = Instant.now(); Instant lookbackDuration = initialTime.minus(Duration.ofMinutes(45)); DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackDuration); @@ -346,14 +346,14 @@ void testCrawl_withSubHourHistoricalSync_45Minutes() { DimensionalTimeSliceWorkerProgressState workerState = (DimensionalTimeSliceWorkerProgressState) createdPartitions.get(i).getProgressState().get(); assertEquals(initialTime.minus(Duration.ofMinutes(45)), workerState.getStartTime()); - assertEquals(initialTime.minusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION), workerState.getEndTime()); + assertEquals(initialTime.minus(WAIT_BEFORE_PARTITION_CREATION), workerState.getEndTime()); assertEquals(LOG_TYPES.get(i), workerState.getDimensionType()); } } @Test - void testCrawl_withIncrementalSync_exactly5MinutesAgo() { - Instant lastPollTime = Instant.now().minusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION - 1); + void crawl_withIncrementalSync_whenLastPollExactly5MinutesAgo_shouldNotCreatePartitions() { + Instant lastPollTime = Instant.now().minus(WAIT_BEFORE_PARTITION_CREATION).plusSeconds(1); DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(lastPollTime, lastPollTime.plus(Duration.ofMinutes(4))); LeaderPartition leaderPartition = new LeaderPartition(state); @@ -368,9 +368,9 @@ void testCrawl_withIncrementalSync_exactly5MinutesAgo() { } @Test - void testCrawl_withHistoricalSync_exactly1Hour() { + void crawl_withHistoricalSync_with1HourLookback_shouldCreateTwoPartitionsPerLogType() { Instant latestHour = Instant.now().truncatedTo(ChronoUnit.HOURS); - Instant initialTime = latestHour.plusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION + 1); + Instant initialTime = latestHour.plus(WAIT_BEFORE_PARTITION_CREATION).plusSeconds(1); Instant lookbackDuration = initialTime.minus(Duration.ofMinutes(60)); DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackDuration); LeaderPartition leaderPartition = new LeaderPartition(state); @@ -379,7 +379,8 @@ void testCrawl_withHistoricalSync_exactly1Hour() { assertNotNull(latest); - int expectedPartitions = 2 * LOG_TYPES.size(); + final int expectedHourlyPartitionsPerLogType = 2; + int expectedPartitions = expectedHourlyPartitionsPerLogType * LOG_TYPES.size(); verify(coordinator, times(expectedPartitions)).createPartition(partitionCaptor.capture()); verify(coordinator, atLeastOnce()).saveProgressStateForPartition(eq(leaderPartition), any()); verify(partitionsCreatedCounter, times(expectedPartitions)).increment(); @@ -395,19 +396,19 @@ void testCrawl_withHistoricalSync_exactly1Hour() { assertEquals(LOG_TYPES.get(i), workerState.getDimensionType()); } - for (int i = LOG_TYPES.size(); i < LOG_TYPES.size() * 2; i++) { + for (int i = LOG_TYPES.size(); i < LOG_TYPES.size() * expectedHourlyPartitionsPerLogType; i++) { DimensionalTimeSliceWorkerProgressState workerState = (DimensionalTimeSliceWorkerProgressState) createdPartitions.get(i).getProgressState().get(); assertEquals(latestHour, workerState.getStartTime()); - assertEquals(initialTime.minusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION), workerState.getEndTime()); + assertEquals(initialTime.minus(WAIT_BEFORE_PARTITION_CREATION), workerState.getEndTime()); assertEquals(LOG_TYPES.get(i - LOG_TYPES.size()), workerState.getDimensionType()); } } @Test - void testCrawl_withHistoricalSync_3Hours() { + void crawl_withHistoricalSync_with3HourLookback_shouldCreateFourPartitionsPerLogType() { Instant latestHour = Instant.now().truncatedTo(ChronoUnit.HOURS); - Instant initialTime = latestHour.plusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION + 1); + Instant initialTime = latestHour.plus(WAIT_BEFORE_PARTITION_CREATION).plusSeconds(1); Instant lookbackDuration = initialTime.minus(Duration.ofMinutes(180)); DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackDuration); LeaderPartition leaderPartition = new LeaderPartition(state); @@ -415,7 +416,8 @@ void testCrawl_withHistoricalSync_3Hours() { Instant latest = crawler.crawl(leaderPartition, coordinator); assertNotNull(latest); - int expectedPartitions = 4 * LOG_TYPES.size(); + final int expectedHourlyPartitionsPerLogType = 4; + int expectedPartitions = expectedHourlyPartitionsPerLogType * LOG_TYPES.size(); verify(coordinator, times(expectedPartitions)).createPartition(partitionCaptor.capture()); verify(coordinator, atLeastOnce()).saveProgressStateForPartition(eq(leaderPartition), any()); verify(partitionsCreatedCounter, times(expectedPartitions)).increment(); @@ -425,9 +427,9 @@ void testCrawl_withHistoricalSync_3Hours() { } @Test - void testCrawl_withHistoricalSync_withExtraMinutes() { + void crawl_withHistoricalSync_with2Hours5MinutesLookback_shouldCreateThreePartitionsPerLogType() { Instant latestHour = Instant.now().truncatedTo(ChronoUnit.HOURS); - Instant initialTime = latestHour.plusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION + 1); + Instant initialTime = latestHour.plus(WAIT_BEFORE_PARTITION_CREATION).plusSeconds(1); Instant lookbackDuration = initialTime.minus(Duration.ofMinutes(125)); DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackDuration); LeaderPartition leaderPartition = new LeaderPartition(state); @@ -450,7 +452,7 @@ void testCrawl_withHistoricalSync_withExtraMinutes() { } @Test - void testCrawl_withSubHourHistoricalSync_verySmallRange() { + void crawl_withSubHourHistoricalSync_withVerySmallRange_shouldCreateOnePartitionWithEndTimeAtInitialTime() { Instant initialTime = Instant.now(); Instant lookbackDuration = initialTime.minus(Duration.ofMinutes(3)); DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackDuration); @@ -477,7 +479,7 @@ void testCrawl_withSubHourHistoricalSync_verySmallRange() { } @Test - void testCrawl_withSubHourHistoricalSync_exactly5Minutes() { + void crawl_withSubHourHistoricalSync_withExactly5MinuteLookback_shouldCreateOnePartitionWithEndTimeAtInitialTime() { Instant initialTime = Instant.now(); Instant lookbackDuration = initialTime.minus(Duration.ofMinutes(5)); DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackDuration); @@ -504,9 +506,9 @@ void testCrawl_withSubHourHistoricalSync_exactly5Minutes() { } @Test - void testCrawl_withHistoricalSync_latestModifiedTimeEqualsLatestHour() { + void crawl_withHistoricalSync_whenLatestModifiedTimeEqualsLatestHour_shouldCreateOnePartitionPerLogType() { Instant latestHour = Instant.now().truncatedTo(ChronoUnit.HOURS); - Instant initialTime = latestHour.plusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION); + Instant initialTime = latestHour.plus(WAIT_BEFORE_PARTITION_CREATION); Instant lookbackDuration = initialTime.minus(Duration.ofMinutes(60)); DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackDuration); LeaderPartition leaderPartition = new LeaderPartition(state); @@ -532,7 +534,7 @@ void testCrawl_withHistoricalSync_latestModifiedTimeEqualsLatestHour() { } @Test - void testCreateWorkerPartitionsForDimensionTypes_setsPartitionCreationTime() { + void createWorkerPartitionsForDimensionTypes_shouldSetPartitionCreationTimeWithinNow() { Instant start = Instant.parse("2024-10-30T00:00:00Z"); Instant end = start.plus(Duration.ofHours(1)); Instant beforeCreation = Instant.now(); @@ -556,7 +558,7 @@ void testCreateWorkerPartitionsForDimensionTypes_setsPartitionCreationTime() { @Test - void testInitialize_withEmptyList() { + void initialize_withEmptyList_shouldSucceedButSecondCallShouldThrow() { DimensionalTimeSliceCrawler newCrawler = new DimensionalTimeSliceCrawler(client, pluginMetrics); List emptyList = Arrays.asList(); newCrawler.initialize(emptyList); From 70581f24d1fa1e5b94f90d87af3eede4f692cc15 Mon Sep 17 00:00:00 2001 From: enugraju Date: Thu, 19 Feb 2026 12:41:28 +0530 Subject: [PATCH 4/4] Created a generic method to centralize the logic for wrapping adjustedStartTime Signed-off-by: enugraju --- .../service/Office365Service.java | 7 ++----- .../Office365SourceConfigTest.java | 10 ++++----- .../service/Office365ServiceTest.java | 1 + .../base/CrawlerSourceConfig.java | 21 +++++++++++++++++++ 4 files changed, 29 insertions(+), 10 deletions(-) diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365Service.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365Service.java index 961ea0e0d0..ffac809356 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365Service.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365Service.java @@ -64,11 +64,8 @@ public AuditLogsResponse searchAuditLogs(final String logType, } // Adjust start time based on configured lookback period (supports Instant-based granularity) - Instant adjustedStartTime = startTime; - Instant lookBackDuration = office365SourceConfig.getLookBackDuration(Instant.now()); - if (startTime.isBefore(lookBackDuration) && lookBackDuration.isBefore(endTime)) { - adjustedStartTime = lookBackDuration; - } + Instant lookBackStartTime = office365SourceConfig.getLookBackDuration(Instant.now()); + Instant adjustedStartTime = office365SourceConfig.getAdjustedStartTime(startTime, endTime, lookBackStartTime); AuditLogsResponse response = office365RestClient.searchAuditLogs(logType, adjustedStartTime, endTime, null); diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365SourceConfigTest.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365SourceConfigTest.java index 4766dad366..14a6127427 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365SourceConfigTest.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365SourceConfigTest.java @@ -111,7 +111,7 @@ void testGetLookBackDuration_withMinuteRange() throws Exception { setField(config, "range", fifteenMinutes); Instant now = Instant.now(); - Instant lookBackDuration = config.getLookBackDuration(Instant.now()); + Instant lookBackDuration = config.getLookBackDuration(now); // Verify the duration is approximately 15 minutes before now (within 1 second tolerance) Duration actualDuration = Duration.between(lookBackDuration, now); assertEquals(15, actualDuration.toMinutes()); @@ -123,7 +123,7 @@ void testGetLookBackDuration_with30MinuteRange() throws Exception { setField(config, "range", thirtyMinutes); Instant now = Instant.now(); - Instant lookBackDuration = config.getLookBackDuration(Instant.now()); + Instant lookBackDuration = config.getLookBackDuration(now); // Verify the duration is approximately 30 minutes before now Duration actualDuration = Duration.between(lookBackDuration, now); assertEquals(30, actualDuration.toMinutes()); @@ -135,7 +135,7 @@ void testGetLookBackDuration_with45MinuteRange() throws Exception { setField(config, "range", fortyFiveMinutes); Instant now = Instant.now(); - Instant lookBackDuration = config.getLookBackDuration(Instant.now()); + Instant lookBackDuration = config.getLookBackDuration(now); // Verify the duration is approximately 45 minutes before now Duration actualDuration = Duration.between(lookBackDuration, now); assertEquals(45, actualDuration.toMinutes()); @@ -147,7 +147,7 @@ void testGetLookBackDuration_withHourRange() throws Exception { setField(config, "range", twoHours); Instant now = Instant.now(); - Instant lookBackDuration = config.getLookBackDuration(Instant.now()); + Instant lookBackDuration = config.getLookBackDuration(now); Duration actualDuration = Duration.between(lookBackDuration, now); assertEquals(2, actualDuration.toHours()); assertEquals(120, actualDuration.toMinutes()); @@ -159,7 +159,7 @@ void testGetLookBackDuration_withDayRange() throws Exception { setField(config, "range", oneDay); Instant now = Instant.now(); - Instant lookBackDuration = config.getLookBackDuration(Instant.now()); + Instant lookBackDuration = config.getLookBackDuration(now); Duration actualDuration = Duration.between(lookBackDuration, now); assertEquals(24, actualDuration.toHours()); assertEquals(1440, actualDuration.toMinutes()); diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365ServiceTest.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365ServiceTest.java index 998fcd4dee..87a7b4810d 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365ServiceTest.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/service/Office365ServiceTest.java @@ -60,6 +60,7 @@ class Office365ServiceTest { void setUp() { office365Service = new Office365Service(sourceConfig, office365RestClient, pluginMetrics); lenient().when(sourceConfig.getLookBackDuration(any(Instant.class))).thenReturn(Instant.now().minus(Duration.ofDays(365))); + lenient().doCallRealMethod().when(sourceConfig).getAdjustedStartTime(any(Instant.class), any(Instant.class), any(Instant.class)); } @Test diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourceConfig.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourceConfig.java index efee2cac3e..d32f8718ba 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourceConfig.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourceConfig.java @@ -1,6 +1,7 @@ package org.opensearch.dataprepper.plugins.source.source_crawler.base; import java.time.Duration; +import java.time.Instant; /** * Marker interface to all the SAAS connectors configuration @@ -55,4 +56,24 @@ default Duration getDurationToDelayRetry() { default Duration getLeaseInterval() { return Duration.ofMinutes(1); } + + /** + * Adjusts the start time based on the configured lookback boundary. + * When the requested start time is before the lookback boundary and the lookback boundary + * falls within the time window, the start time is adjusted to the lookback boundary to + * respect API limitations (e.g., Office 365 API's maximum lookback period). + * + * @param startTime the requested start time for the query + * @param endTime the end time for the query + * @param lookBackStartTime the earliest time to query from (e.g., from getLookBackDuration(referenceTime)) + * @return the adjusted start time, or the original start time if no adjustment is needed + */ + default Instant getAdjustedStartTime(final Instant startTime, + final Instant endTime, + final Instant lookBackStartTime) { + if (startTime.isBefore(lookBackStartTime) && lookBackStartTime.isBefore(endTime)) { + return lookBackStartTime; + } + return startTime; + } }