Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ private boolean isKeyMatchedBetweenTimeRange(final LocalDateTime lastModifiedTim
final LocalDateTime endDateTime,
final boolean isFirstScan) {
if (!isFirstScan && schedulingOptions != null) {
if (startDateTime != null) {
return lastModifiedTime.isAfter(startDateTime);
}
return true;
} else if (Objects.isNull(startDateTime) && Objects.isNull(endDateTime)) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ void getNextPartition_supplier_with_scheduling_options_returns_expected_Partitio


@Test
void scheduled_scan_filters_on_start_time_and_end_time_for_the_first_scan_and_does_not_filter_on_subsequent_scans() {
void scheduled_scan_filters_on_start_time_and_end_time_for_the_first_scan_and_filters_on_start_time_for_subsequent_scans() {
schedulingOptions = mock(S3ScanSchedulingOptions.class);
given(schedulingOptions.getCount()).willReturn(2);

Expand All @@ -308,8 +308,10 @@ void scheduled_scan_filters_on_start_time_and_end_time_for_the_first_scan_and_do
globalStateMap.put(notFirstScanBucket, "2024-09-07T20:43:34.384822Z");
globalStateMap.put(SCAN_COUNT, 0);

final LocalDateTime startTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(1725907846000L), ZoneId.systemDefault());
final LocalDateTime endTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(1725907849100L), ZoneId.systemDefault());
long startMillis = 1725907846000L;
long endMillis = 1725907849100L;
final LocalDateTime startTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(startMillis), ZoneId.systemDefault());
final LocalDateTime endTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(endMillis), ZoneId.systemDefault());

final ScanOptions firstBucketScanOptions = mock(ScanOptions.class);
final S3ScanBucketOption firstBucketScanBucketOption = mock(S3ScanBucketOption.class);
Expand All @@ -330,12 +332,18 @@ void scheduled_scan_filters_on_start_time_and_end_time_for_the_first_scan_and_do
final ListObjectsV2Response listObjectsResponse = mock(ListObjectsV2Response.class);
final List<S3Object> s3ObjectsList = new ArrayList<>();

final Instant objectNotBetweenStartAndEndTime = Instant.ofEpochMilli(1725907846000L).minus(500L, TimeUnit.SECONDS.toChronoUnit());
final Instant objectAfterStartAndEndTime = Instant.ofEpochMilli(endMillis).plus(500L, TimeUnit.SECONDS.toChronoUnit());
final S3Object validObject = mock(S3Object.class);
given(validObject.key()).willReturn("valid");
given(validObject.lastModified()).willReturn(objectNotBetweenStartAndEndTime);
given(validObject.lastModified()).willReturn(objectAfterStartAndEndTime);
s3ObjectsList.add(validObject);

final Instant objectBeforeStartTime = Instant.ofEpochMilli(startMillis).minus(500L, TimeUnit.SECONDS.toChronoUnit());
final S3Object invalidObject = mock(S3Object.class);
given(invalidObject.key()).willReturn("invalid");
given(invalidObject.lastModified()).willReturn(objectBeforeStartTime);
s3ObjectsList.add(invalidObject);

final List<PartitionIdentifier> expectedPartitionIdentifiers = new ArrayList<>();
expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(notFirstScanBucket + "|" + validObject.key()).build());

Expand Down
Loading