66package org .opensearch .dataprepper .plugins .source .s3 ;
77
88import org .opensearch .dataprepper .model .source .coordinator .PartitionIdentifier ;
9+ import org .opensearch .dataprepper .model .source .coordinator .SourceCoordinator ;
910import org .opensearch .dataprepper .plugins .source .s3 .configuration .FolderPartitioningOptions ;
1011import org .opensearch .dataprepper .plugins .source .s3 .configuration .S3ScanKeyPathOption ;
1112import org .opensearch .dataprepper .plugins .source .s3 .configuration .S3ScanSchedulingOptions ;
@@ -53,19 +54,23 @@ public class S3ScanPartitionCreationSupplier implements Function<Map<String, Obj
5354
5455 private final boolean deleteS3ObjectsOnRead ;
5556
57+ private final SourceCoordinator <S3SourceProgressState > sourceCoordinator ;
58+
5659 public S3ScanPartitionCreationSupplier (final S3Client s3Client ,
5760 final BucketOwnerProvider bucketOwnerProvider ,
5861 final List <ScanOptions > scanOptionsList ,
5962 final S3ScanSchedulingOptions schedulingOptions ,
6063 final FolderPartitioningOptions folderPartitioningOptions ,
61- final boolean deleteS3ObjectsOnRead ) {
64+ final boolean deleteS3ObjectsOnRead ,
65+ final SourceCoordinator <S3SourceProgressState > sourceCoordinator ) {
6266
6367 this .s3Client = s3Client ;
6468 this .bucketOwnerProvider = bucketOwnerProvider ;
6569 this .scanOptionsList = scanOptionsList ;
6670 this .schedulingOptions = schedulingOptions ;
6771 this .folderPartitioningOptions = folderPartitioningOptions ;
6872 this .deleteS3ObjectsOnRead = deleteS3ObjectsOnRead ;
73+ this .sourceCoordinator = sourceCoordinator ;
6974 }
7075
7176 @ Override
@@ -99,12 +104,13 @@ public List<PartitionIdentifier> apply(final Map<String, Object> globalStateMap)
99104 if (Objects .nonNull (s3ScanKeyPathOption ) && Objects .nonNull (s3ScanKeyPathOption .getS3scanIncludePrefixOptions ()))
100105 s3ScanKeyPathOption .getS3scanIncludePrefixOptions ().forEach (includePath -> {
101106 listObjectsV2Request .prefix (includePath );
102- objectsToProcess . addAll ( listFilteredS3ObjectsForBucket (excludeItems , listObjectsV2Request ,
103- bucketName , scanOptions .getUseStartDateTime (), scanOptions .getUseEndDateTime (), globalStateMap )) ;
107+ createFilteredS3ObjectPartitionsForBucket (excludeItems , listObjectsV2Request ,
108+ bucketName , scanOptions .getUseStartDateTime (), scanOptions .getUseEndDateTime (), globalStateMap );
104109 });
105110 else
106- objectsToProcess .addAll (listFilteredS3ObjectsForBucket (excludeItems , listObjectsV2Request ,
107- bucketName , scanOptions .getUseStartDateTime (), scanOptions .getUseEndDateTime (), globalStateMap ));
111+ createFilteredS3ObjectPartitionsForBucket (excludeItems , listObjectsV2Request ,
112+ bucketName , scanOptions .getUseStartDateTime (), scanOptions .getUseEndDateTime (), globalStateMap );
113+
108114 if (!bucketScanTime .containsKey (bucketName )) {
109115 bucketScanTime .put (bucketName , updatedScanTime .toString ());
110116 }
@@ -117,22 +123,23 @@ public List<PartitionIdentifier> apply(final Map<String, Object> globalStateMap)
117123 globalStateMap .put (SCAN_COUNT , (Integer ) globalStateMap .get (SCAN_COUNT ) + 1 );
118124 globalStateMap .put (LAST_SCAN_TIME , Instant .now ().toEpochMilli ());
119125
120- return objectsToProcess ;
126+ // Partitions are created by the supplier, so source coordinator does not need to attempt to create any partitions
127+ return Collections .emptyList ();
121128 }
122129
123- private List < PartitionIdentifier > listFilteredS3ObjectsForBucket (final List <String > excludeKeyPaths ,
130+ private void createFilteredS3ObjectPartitionsForBucket (final List <String > excludeKeyPaths ,
124131 final ListObjectsV2Request .Builder listObjectsV2Request ,
125132 final String bucket ,
126133 final LocalDateTime startDateTime ,
127134 final LocalDateTime endDateTime ,
128135 final Map <String , Object > globalStateMap ) {
129136 final Instant previousScanTime = globalStateMap .get (bucket ) != null ? Instant .parse ((String ) globalStateMap .get (bucket )) : null ;
130137 final boolean isFirstScan = previousScanTime == null ;
131- final List <PartitionIdentifier > allPartitionIdentifiers = new ArrayList <>();
132138 ListObjectsV2Response listObjectsV2Response = null ;
139+
133140 do {
134141 listObjectsV2Response = s3Client .listObjectsV2 (listObjectsV2Request .fetchOwner (true ).continuationToken (Objects .nonNull (listObjectsV2Response ) ? listObjectsV2Response .nextContinuationToken () : null ).build ());
135- allPartitionIdentifiers . addAll ( listObjectsV2Response .contents ().stream ()
142+ final List < PartitionIdentifier > partitionsForPage = listObjectsV2Response .contents ().stream ()
136143 .filter (s3Object -> deleteS3ObjectsOnRead || isLastModifiedTimeAfterMostRecentScanForBucket (previousScanTime , s3Object ))
137144 .map (s3Object -> Pair .of (s3Object .key (), instantToLocalDateTime (s3Object .lastModified ())))
138145 .filter (keyTimestampPair -> !keyTimestampPair .left ().endsWith ("/" ))
@@ -141,32 +148,17 @@ private List<PartitionIdentifier> listFilteredS3ObjectsForBucket(final List<Stri
141148 .filter (keyTimestampPair -> isKeyMatchedBetweenTimeRange (keyTimestampPair .right (), startDateTime , endDateTime , isFirstScan ))
142149 .map (Pair ::left )
143150 .map (objectKey -> PartitionIdentifier .builder ().withPartitionKey (String .format (BUCKET_OBJECT_PARTITION_KEY_FORMAT , bucket , objectKey )).build ())
144- .collect (Collectors .toList ()));
145-
151+ .collect (Collectors .toList ());
146152 LOG .info ("Found page of {} objects from bucket {}" , listObjectsV2Response .keyCount (), bucket );
147- } while (listObjectsV2Response .isTruncated ());
148-
149- if (folderPartitioningOptions != null ) {
150- final Set <PartitionIdentifier > folderPartitions = allPartitionIdentifiers .stream ()
151- .map (partitionIdentifier -> {
152- final String fullObjectKey = partitionIdentifier .getPartitionKey ();
153- final String prefix = getPrefixWithDepth (fullObjectKey );
154- if (prefix == null ) {
155- return null ;
156- }
157- return PartitionIdentifier .builder ().withPartitionKey (prefix ).build ();
158- })
159- .filter (Objects ::nonNull )
160- .collect (Collectors .toSet ());
161-
162- LOG .info ("Running in folder_partitions mode at depth {}, found {} unique prefixes from {} objects" , folderPartitioningOptions .getFolderDepth (), folderPartitions .size (), allPartitionIdentifiers .size ());
163-
164- return new ArrayList <>(folderPartitions );
165- } else {
166- LOG .info ("Returning partitions for {} S3 objects from bucket {}" , allPartitionIdentifiers .size (), bucket );
167- }
168153
169- return allPartitionIdentifiers ;
154+ if (folderPartitioningOptions != null ) {
155+ final List <PartitionIdentifier > folderPartitionsForPage = getFolderPartitionIdentifiers (partitionsForPage );
156+ sourceCoordinator .createPartitions (folderPartitionsForPage );
157+ } else {
158+ LOG .info ("Creating partitions for {} S3 objects from bucket {}" , partitionsForPage .size (), bucket );
159+ sourceCoordinator .createPartitions (partitionsForPage );
160+ }
161+ } while (listObjectsV2Response .isTruncated ());
170162 }
171163
172164 private LocalDateTime instantToLocalDateTime (final Instant instant ) {
@@ -264,4 +256,23 @@ private String getPrefixWithDepth(final String fullObjectKey) {
264256 int actualDepth = min (folderPartitioningOptions .getFolderDepth (), folders .length - 1 );
265257 return String .join ("/" , Arrays .copyOfRange (folders , 0 , actualDepth )) + "/" ;
266258 }
259+
260+ private List <PartitionIdentifier > getFolderPartitionIdentifiers (final List <PartitionIdentifier > objectPartitionIdentifiers ) {
261+
262+ final Set <PartitionIdentifier > folderPartitions = objectPartitionIdentifiers .stream ()
263+ .map (partitionIdentifier -> {
264+ final String fullObjectKey = partitionIdentifier .getPartitionKey ();
265+ final String prefix = getPrefixWithDepth (fullObjectKey );
266+ if (prefix == null ) {
267+ return null ;
268+ }
269+ return PartitionIdentifier .builder ().withPartitionKey (prefix ).build ();
270+ })
271+ .filter (Objects ::nonNull )
272+ .collect (Collectors .toSet ());
273+
274+ LOG .info ("Running in folder_partitions mode at depth {}, found {} unique prefixes from {} objects" , folderPartitioningOptions .getFolderDepth (), folderPartitions .size (), objectPartitionIdentifiers .size ());
275+
276+ return new ArrayList <>(folderPartitions );
277+ }
267278}
0 commit comments