diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplier.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplier.java index 28685e6e2c..98f59fde95 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplier.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplier.java @@ -183,6 +183,9 @@ private boolean isKeyMatchedBetweenTimeRange(final LocalDateTime lastModifiedTim final LocalDateTime endDateTime, final boolean isFirstScan) { if (!isFirstScan && schedulingOptions != null) { + if (startDateTime != null) { + return lastModifiedTime.isAfter(startDateTime); + } return true; } else if (Objects.isNull(startDateTime) && Objects.isNull(endDateTime)) { return true; diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java index 2183f69d02..5214399d4b 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java @@ -296,7 +296,7 @@ void getNextPartition_supplier_with_scheduling_options_returns_expected_Partitio @Test - void scheduled_scan_filters_on_start_time_and_end_time_for_the_first_scan_and_does_not_filter_on_subsequent_scans() { + void scheduled_scan_filters_on_start_time_and_end_time_for_the_first_scan_and_filters_on_start_time_for_subsequent_scans() { schedulingOptions = mock(S3ScanSchedulingOptions.class); given(schedulingOptions.getCount()).willReturn(2); @@ -308,8 +308,10 @@ void scheduled_scan_filters_on_start_time_and_end_time_for_the_first_scan_and_do globalStateMap.put(notFirstScanBucket, "2024-09-07T20:43:34.384822Z"); globalStateMap.put(SCAN_COUNT, 0); - final LocalDateTime startTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(1725907846000L), ZoneId.systemDefault()); - final LocalDateTime endTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(1725907849100L), ZoneId.systemDefault()); + long startMillis = 1725907846000L; + long endMillis = 1725907849100L; + final LocalDateTime startTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(startMillis), ZoneId.systemDefault()); + final LocalDateTime endTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(endMillis), ZoneId.systemDefault()); final ScanOptions firstBucketScanOptions = mock(ScanOptions.class); final S3ScanBucketOption firstBucketScanBucketOption = mock(S3ScanBucketOption.class); @@ -330,12 +332,18 @@ void scheduled_scan_filters_on_start_time_and_end_time_for_the_first_scan_and_do final ListObjectsV2Response listObjectsResponse = mock(ListObjectsV2Response.class); final List s3ObjectsList = new ArrayList<>(); - final Instant objectNotBetweenStartAndEndTime = Instant.ofEpochMilli(1725907846000L).minus(500L, TimeUnit.SECONDS.toChronoUnit()); + final Instant objectAfterStartAndEndTime = Instant.ofEpochMilli(endMillis).plus(500L, TimeUnit.SECONDS.toChronoUnit()); final S3Object validObject = mock(S3Object.class); given(validObject.key()).willReturn("valid"); - given(validObject.lastModified()).willReturn(objectNotBetweenStartAndEndTime); + given(validObject.lastModified()).willReturn(objectAfterStartAndEndTime); s3ObjectsList.add(validObject); + final Instant objectBeforeStartTime = Instant.ofEpochMilli(startMillis).minus(500L, TimeUnit.SECONDS.toChronoUnit()); + final S3Object invalidObject = mock(S3Object.class); + given(invalidObject.key()).willReturn("invalid"); + given(invalidObject.lastModified()).willReturn(objectBeforeStartTime); + s3ObjectsList.add(invalidObject); + final List expectedPartitionIdentifiers = new ArrayList<>(); expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(notFirstScanBucket + "|" + validObject.key()).build());