Skip to content

Commit 84049a0

Browse files
authored
S3 scheduled scan should consider start time on subsequent scans (#5920)
Signed-off-by: Taylor Gray <tylgry@amazon.com>
1 parent c6f072a commit 84049a0

2 files changed

Lines changed: 16 additions & 5 deletions

File tree

data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplier.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,9 @@ private boolean isKeyMatchedBetweenTimeRange(final LocalDateTime lastModifiedTim
183183
final LocalDateTime endDateTime,
184184
final boolean isFirstScan) {
185185
if (!isFirstScan && schedulingOptions != null) {
186+
if (startDateTime != null) {
187+
return lastModifiedTime.isAfter(startDateTime);
188+
}
186189
return true;
187190
} else if (Objects.isNull(startDateTime) && Objects.isNull(endDateTime)) {
188191
return true;

data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ void getNextPartition_supplier_with_scheduling_options_returns_expected_Partitio
296296

297297

298298
@Test
299-
void scheduled_scan_filters_on_start_time_and_end_time_for_the_first_scan_and_does_not_filter_on_subsequent_scans() {
299+
void scheduled_scan_filters_on_start_time_and_end_time_for_the_first_scan_and_filters_on_start_time_for_subsequent_scans() {
300300
schedulingOptions = mock(S3ScanSchedulingOptions.class);
301301
given(schedulingOptions.getCount()).willReturn(2);
302302

@@ -308,8 +308,10 @@ void scheduled_scan_filters_on_start_time_and_end_time_for_the_first_scan_and_do
308308
globalStateMap.put(notFirstScanBucket, "2024-09-07T20:43:34.384822Z");
309309
globalStateMap.put(SCAN_COUNT, 0);
310310

311-
final LocalDateTime startTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(1725907846000L), ZoneId.systemDefault());
312-
final LocalDateTime endTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(1725907849100L), ZoneId.systemDefault());
311+
long startMillis = 1725907846000L;
312+
long endMillis = 1725907849100L;
313+
final LocalDateTime startTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(startMillis), ZoneId.systemDefault());
314+
final LocalDateTime endTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(endMillis), ZoneId.systemDefault());
313315

314316
final ScanOptions firstBucketScanOptions = mock(ScanOptions.class);
315317
final S3ScanBucketOption firstBucketScanBucketOption = mock(S3ScanBucketOption.class);
@@ -330,12 +332,18 @@ void scheduled_scan_filters_on_start_time_and_end_time_for_the_first_scan_and_do
330332
final ListObjectsV2Response listObjectsResponse = mock(ListObjectsV2Response.class);
331333
final List<S3Object> s3ObjectsList = new ArrayList<>();
332334

333-
final Instant objectNotBetweenStartAndEndTime = Instant.ofEpochMilli(1725907846000L).minus(500L, TimeUnit.SECONDS.toChronoUnit());
335+
final Instant objectAfterStartAndEndTime = Instant.ofEpochMilli(endMillis).plus(500L, TimeUnit.SECONDS.toChronoUnit());
334336
final S3Object validObject = mock(S3Object.class);
335337
given(validObject.key()).willReturn("valid");
336-
given(validObject.lastModified()).willReturn(objectNotBetweenStartAndEndTime);
338+
given(validObject.lastModified()).willReturn(objectAfterStartAndEndTime);
337339
s3ObjectsList.add(validObject);
338340

341+
final Instant objectBeforeStartTime = Instant.ofEpochMilli(startMillis).minus(500L, TimeUnit.SECONDS.toChronoUnit());
342+
final S3Object invalidObject = mock(S3Object.class);
343+
given(invalidObject.key()).willReturn("invalid");
344+
given(invalidObject.lastModified()).willReturn(objectBeforeStartTime);
345+
s3ObjectsList.add(invalidObject);
346+
339347
final List<PartitionIdentifier> expectedPartitionIdentifiers = new ArrayList<>();
340348
expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(notFirstScanBucket + "|" + validObject.key()).build());
341349

0 commit comments

Comments
 (0)