Skip to content

Commit 932b73c

Browse files
committed
add S3 Scan processing condition evaluator to ensure S3 object completeness
Signed-off-by: Xun Zhang <xunzh@amazon.com>
1 parent 3888138 commit 932b73c

11 files changed

Lines changed: 651 additions & 12 deletions

File tree

data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanObjectWorkerIT.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
4343
import org.opensearch.dataprepper.plugins.s3.common.ownership.BucketOwnerProvider;
4444
import org.opensearch.dataprepper.plugins.s3.common.source.S3ObjectPluginMetrics;
45+
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
4546
import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanBucketOption;
4647
import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanBucketOptions;
4748
import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanScanOptions;
@@ -224,8 +225,10 @@ private ScanObjectWorker createObjectUnderTest(final RecordsGenerator recordsGen
224225
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
225226
acknowledgementSetManager = new DefaultAcknowledgementSetManager(executor);
226227

228+
final S3ScanProcessingConditionEvaluator conditionEvaluator =
229+
new S3ScanProcessingConditionEvaluator(s3Client, mock(ExpressionEvaluator.class));
227230
return new ScanObjectWorker(s3Client, scanOptions, createObjectUnderTest(s3ObjectRequest)
228-
,bucketOwnerProvider, sourceCoordinator, s3SourceConfig, acknowledgementSetManager, s3ObjectDeleteWorker, 30000, pluginMetrics);
231+
,bucketOwnerProvider, sourceCoordinator, s3SourceConfig, acknowledgementSetManager, s3ObjectDeleteWorker, 30000, pluginMetrics, conditionEvaluator);
229232
}
230233

231234
@ParameterizedTest
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
package org.opensearch.dataprepper.plugins.source.s3;
11+
12+
import com.fasterxml.jackson.core.type.TypeReference;
13+
import com.fasterxml.jackson.databind.ObjectMapper;
14+
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
15+
import org.opensearch.dataprepper.model.event.JacksonEvent;
16+
import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanProcessingCondition;
17+
import org.slf4j.Logger;
18+
import org.slf4j.LoggerFactory;
19+
import software.amazon.awssdk.core.ResponseInputStream;
20+
import software.amazon.awssdk.services.s3.S3Client;
21+
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
22+
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
23+
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
24+
25+
import java.io.IOException;
26+
import java.nio.charset.StandardCharsets;
27+
import java.util.List;
28+
import java.util.Map;
29+
30+
/**
31+
* Evaluates {@link S3ScanProcessingCondition} entries for a given S3 object before
32+
* the object is processed. For each applicable condition the evaluator downloads the
33+
* manifest file co-located with the object and evaluates the {@code when} expression
34+
* against its JSON content.
35+
*/
36+
public class S3ScanProcessingConditionEvaluator {
37+
38+
private static final Logger LOG = LoggerFactory.getLogger(S3ScanProcessingConditionEvaluator.class);
39+
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
40+
41+
private final S3Client s3Client;
42+
private final ExpressionEvaluator expressionEvaluator;
43+
44+
public S3ScanProcessingConditionEvaluator(final S3Client s3Client,
45+
final ExpressionEvaluator expressionEvaluator) {
46+
this.s3Client = s3Client;
47+
this.expressionEvaluator = expressionEvaluator;
48+
}
49+
50+
/**
51+
* Returns {@code true} if every applicable condition in {@code conditions} is satisfied
52+
* for the given object. A condition is applicable when {@code include_prefix} is absent
53+
* or the object key starts with at least one of the listed prefixes.
54+
*/
55+
public boolean allConditionsMet(final String bucket,
56+
final String objectKey,
57+
final List<S3ScanProcessingCondition> conditions) {
58+
if (conditions == null || conditions.isEmpty()) {
59+
return true;
60+
}
61+
for (final S3ScanProcessingCondition condition : conditions) {
62+
if (!isApplicable(objectKey, condition)) {
63+
continue;
64+
}
65+
final String manifestKey = resolveManifestKey(objectKey, condition.getFileName());
66+
try {
67+
final String content = readS3ObjectAsString(bucket, manifestKey);
68+
final Map<String, Object> data = OBJECT_MAPPER.readValue(
69+
content, new TypeReference<Map<String, Object>>() {});
70+
final JacksonEvent event = JacksonEvent.builder()
71+
.withEventType("event")
72+
.withData(data)
73+
.build();
74+
if (!expressionEvaluator.evaluateConditional(condition.getWhen(), event)) {
75+
LOG.debug("Processing condition '{}' not satisfied for {}/{} using manifest {}",
76+
condition.getWhen(), bucket, objectKey, manifestKey);
77+
return false;
78+
}
79+
} catch (final NoSuchKeyException e) {
80+
LOG.debug("Manifest file {}/{} not found yet, condition not met", bucket, manifestKey);
81+
return false;
82+
} catch (final Exception e) {
83+
LOG.error("Error evaluating processing condition for {}/{}", bucket, objectKey, e);
84+
return false;
85+
}
86+
}
87+
return true;
88+
}
89+
90+
/**
91+
* Returns the first condition in {@code conditions} that is applicable to the given
92+
* object key, or {@code null} if none match.
93+
*/
94+
public S3ScanProcessingCondition findFirstMatching(final String objectKey,
95+
final List<S3ScanProcessingCondition> conditions) {
96+
if (conditions == null) {
97+
return null;
98+
}
99+
for (final S3ScanProcessingCondition condition : conditions) {
100+
if (isApplicable(objectKey, condition)) {
101+
return condition;
102+
}
103+
}
104+
return null;
105+
}
106+
107+
private boolean isApplicable(final String objectKey, final S3ScanProcessingCondition condition) {
108+
final List<String> includePrefix = condition.getIncludePrefix();
109+
return includePrefix == null || includePrefix.isEmpty() ||
110+
includePrefix.stream().anyMatch(objectKey::startsWith);
111+
}
112+
113+
private String resolveManifestKey(final String objectKey, final String fileName) {
114+
final String directory = objectKey.contains("/")
115+
? objectKey.substring(0, objectKey.lastIndexOf('/') + 1)
116+
: "";
117+
return directory + fileName;
118+
}
119+
120+
private String readS3ObjectAsString(final String bucket, final String key) throws IOException {
121+
final GetObjectRequest request = GetObjectRequest.builder().bucket(bucket).key(key).build();
122+
try (final ResponseInputStream<GetObjectResponse> response = s3Client.getObject(request)) {
123+
return new String(response.readAllBytes(), StandardCharsets.UTF_8);
124+
}
125+
}
126+
}

data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanService.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
*/
55
package org.opensearch.dataprepper.plugins.source.s3;
66

7+
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
78
import org.opensearch.dataprepper.metrics.PluginMetrics;
89
import org.opensearch.dataprepper.common.concurrent.BackgroundThreadFactory;
910
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
@@ -42,6 +43,7 @@ public class S3ScanService {
4243
private final AcknowledgementSetManager acknowledgementSetManager;
4344
private final S3ObjectDeleteWorker s3ObjectDeleteWorker;
4445
private final PluginMetrics pluginMetrics;
46+
private final ExpressionEvaluator expressionEvaluator;
4547
private final ExecutorService executorService;
4648
private final List<ScanObjectWorker> workers;
4749

@@ -52,7 +54,8 @@ public S3ScanService(final S3SourceConfig s3SourceConfig,
5254
final SourceCoordinator<S3SourceProgressState> sourceCoordinator,
5355
final AcknowledgementSetManager acknowledgementSetManager,
5456
final S3ObjectDeleteWorker s3ObjectDeleteWorker,
55-
final PluginMetrics pluginMetrics) {
57+
final PluginMetrics pluginMetrics,
58+
final ExpressionEvaluator expressionEvaluator) {
5659
this.s3SourceConfig = s3SourceConfig;
5760
this.s3ScanBucketOptions = s3SourceConfig.getS3ScanScanOptions().getBuckets();
5861
this.s3ClientBuilderFactory = s3ClientBuilderFactory;
@@ -65,15 +68,18 @@ public S3ScanService(final S3SourceConfig s3SourceConfig,
6568
this.acknowledgementSetManager = acknowledgementSetManager;
6669
this.s3ObjectDeleteWorker = s3ObjectDeleteWorker;
6770
this.pluginMetrics = pluginMetrics;
71+
this.expressionEvaluator = expressionEvaluator;
6872
this.workers = new ArrayList<>();
6973
this.executorService = Executors.newFixedThreadPool(s3SourceConfig.getNumWorkers(), BackgroundThreadFactory.defaultExecutorThreadFactory("s3-source-scan"));
7074
}
7175

7276
public void start() {
7377
long backOffMs = s3SourceConfig.getBackOff().toMillis();
78+
final S3ScanProcessingConditionEvaluator conditionEvaluator =
79+
new S3ScanProcessingConditionEvaluator(s3ClientBuilderFactory.getS3Client(), expressionEvaluator);
7480
for (int i = 0; i < s3SourceConfig.getNumWorkers(); i++) {
7581
ScanObjectWorker scanObjectWorker = new ScanObjectWorker(s3ClientBuilderFactory.getS3Client(),
76-
getScanOptions(),s3ObjectHandler,bucketOwnerProvider, sourceCoordinator, s3SourceConfig, acknowledgementSetManager, s3ObjectDeleteWorker, backOffMs, pluginMetrics);
82+
getScanOptions(),s3ObjectHandler,bucketOwnerProvider, sourceCoordinator, s3SourceConfig, acknowledgementSetManager, s3ObjectDeleteWorker, backOffMs, pluginMetrics, conditionEvaluator);
7783
workers.add(scanObjectWorker);
7884
executorService.submit(new Thread(scanObjectWorker));
7985
}

data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3Source.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.opensearch.dataprepper.plugins.source.s3;
77

88
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
9+
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
910
import org.opensearch.dataprepper.metrics.PluginMetrics;
1011
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
1112
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
@@ -47,6 +48,7 @@ public class S3Source implements Source<Record<Event>>, UsesSourceCoordination {
4748
private final AcknowledgementSetManager acknowledgementSetManager;
4849
private final AwsCredentialsSupplier awsCredentialsSupplier;
4950
private final boolean acknowledgementsEnabled;
51+
private final ExpressionEvaluator expressionEvaluator;
5052
private SourceCoordinator<S3SourceProgressState> sourceCoordinator;
5153

5254

@@ -56,14 +58,16 @@ public S3Source(
5658
final S3SourceConfig s3SourceConfig,
5759
final PluginFactory pluginFactory,
5860
final AcknowledgementSetManager acknowledgementSetManager,
59-
final AwsCredentialsSupplier awsCredentialsSupplier) {
61+
final AwsCredentialsSupplier awsCredentialsSupplier,
62+
final ExpressionEvaluator expressionEvaluator) {
6063
this.pluginMetrics = pluginMetrics;
6164
this.s3SourceConfig = s3SourceConfig;
6265
this.pluginFactory = pluginFactory;
6366
this.s3ScanScanOptional = Optional.ofNullable(s3SourceConfig.getS3ScanScanOptions());
6467
this.acknowledgementsEnabled = s3SourceConfig.getAcknowledgements();
6568
this.acknowledgementSetManager = acknowledgementSetManager;
6669
this.awsCredentialsSupplier = awsCredentialsSupplier;
70+
this.expressionEvaluator = expressionEvaluator;
6771
}
6872

6973
@Override
@@ -126,7 +130,7 @@ public void start(Buffer<Record<Event>> buffer) {
126130
sqsService.start();
127131
}
128132
if(s3ScanScanOptional.isPresent()) {
129-
s3ScanService = new S3ScanService(s3SourceConfig, s3ClientBuilderFactory, s3Handler, bucketOwnerProvider, sourceCoordinator, acknowledgementSetManager, s3ObjectDeleteWorker, pluginMetrics);
133+
s3ScanService = new S3ScanService(s3SourceConfig, s3ClientBuilderFactory, s3Handler, bucketOwnerProvider, sourceCoordinator, acknowledgementSetManager, s3ObjectDeleteWorker, pluginMetrics, expressionEvaluator);
130134
s3ScanService.start();
131135
}
132136
}

data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanBucketOptions;
2323
import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanBucketOption;
2424
import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanKeyPathOption;
25+
import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanProcessingCondition;
2526
import org.slf4j.Logger;
2627
import org.slf4j.LoggerFactory;
2728
import software.amazon.awssdk.services.s3.S3Client;
@@ -34,8 +35,8 @@
3435
import java.time.Duration;
3536
import java.time.Instant;
3637
import java.util.ArrayList;
37-
import java.util.HashSet;
3838
import java.util.HashMap;
39+
import java.util.HashSet;
3940
import java.util.List;
4041
import java.util.Map;
4142
import java.util.Objects;
@@ -104,6 +105,10 @@ public class ScanObjectWorker implements Runnable {
104105
private final Map<String, AtomicInteger> acknowledgmentsRemainingForPartitions;
105106
private final Map<String, Map<String, S3DataSelection>> bucketDataSelectionMap;
106107

108+
private final Map<String, List<S3ScanProcessingCondition>> bucketProcessingConditionsMap;
109+
110+
private final S3ScanProcessingConditionEvaluator processingConditionEvaluator;
111+
107112
private final Duration acknowledgmentSetTimeout;
108113

109114
public ScanObjectWorker(final S3Client s3Client,
@@ -115,7 +120,8 @@ public ScanObjectWorker(final S3Client s3Client,
115120
final AcknowledgementSetManager acknowledgementSetManager,
116121
final S3ObjectDeleteWorker s3ObjectDeleteWorker,
117122
final long backOffMs,
118-
final PluginMetrics pluginMetrics){
123+
final PluginMetrics pluginMetrics,
124+
final S3ScanProcessingConditionEvaluator processingConditionEvaluator){
119125
this.s3Client = s3Client;
120126
this.backOffMs = backOffMs;
121127
this.scanOptionsBuilderList = scanOptionsBuilderList;
@@ -144,6 +150,18 @@ public ScanObjectWorker(final S3Client s3Client,
144150
}
145151
}
146152
}
153+
this.bucketProcessingConditionsMap = new HashMap<>();
154+
if (s3SourceConfig.getS3ScanScanOptions().getBuckets() != null) {
155+
for (S3ScanBucketOptions bucketOption : s3SourceConfig.getS3ScanScanOptions().getBuckets()) {
156+
final S3ScanBucketOption s3ScanBucketOption = bucketOption.getS3ScanBucketOption();
157+
if (s3ScanBucketOption != null && s3ScanBucketOption.getProcessingConditions() != null) {
158+
bucketProcessingConditionsMap
159+
.computeIfAbsent(s3ScanBucketOption.getName(), k -> new ArrayList<>())
160+
.addAll(s3ScanBucketOption.getProcessingConditions());
161+
}
162+
}
163+
}
164+
this.processingConditionEvaluator = processingConditionEvaluator;
147165
this.endToEndAcknowledgementsEnabled = s3SourceConfig.getAcknowledgements();
148166
this.acknowledgementSetManager = acknowledgementSetManager;
149167
this.deleteS3ObjectsOnRead = s3SourceConfig.isDeleteS3ObjectsOnRead();
@@ -225,6 +243,17 @@ private void startProcessingObject(final long waitTimeMillis) {
225243
final String bucket = objectToProcess.get().getPartitionKey().split("\\|")[0];
226244
final String objectKey = objectToProcess.get().getPartitionKey().split("\\|")[1];
227245

246+
final List<S3ScanProcessingCondition> conditions = bucketProcessingConditionsMap.get(bucket);
247+
if (!processingConditionEvaluator.allConditionsMet(bucket, objectKey, conditions)) {
248+
final S3ScanProcessingCondition failedCondition = processingConditionEvaluator.findFirstMatching(objectKey, conditions);
249+
final Duration retryDelay = failedCondition != null ? failedCondition.getRetryDelay() : Duration.ofMinutes(5);
250+
final int maxRetry = failedCondition != null ? failedCondition.getMaxRetry() : 10;
251+
LOG.info("Processing conditions not met for {}/{}, rescheduling after {}", bucket, objectKey, retryDelay);
252+
sourceCoordinator.giveUpPartition(objectToProcess.get().getPartitionKey(), Instant.now().plus(retryDelay), maxRetry);
253+
partitionKeys.remove(objectToProcess.get().getPartitionKey());
254+
return;
255+
}
256+
228257
try {
229258
AcknowledgementSet acknowledgementSet = null;
230259

data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/configuration/S3ScanBucketOption.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@
77
import com.fasterxml.jackson.annotation.JsonProperty;
88
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
99
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
10+
import jakarta.validation.Valid;
1011
import jakarta.validation.constraints.AssertFalse;
1112
import jakarta.validation.constraints.AssertTrue;
1213
import jakarta.validation.constraints.NotEmpty;
1314
import jakarta.validation.constraints.Size;
1415

1516
import java.time.Duration;
1617
import java.time.LocalDateTime;
18+
import java.util.List;
1719
import java.util.Objects;
1820
import java.util.stream.Stream;
1921

@@ -45,6 +47,10 @@ public class S3ScanBucketOption {
4547
@JsonProperty("filter")
4648
private S3ScanKeyPathOption s3ScanFilter;
4749

50+
@JsonProperty("processing_conditions")
51+
@Valid
52+
private List<S3ScanProcessingCondition> processingConditions;
53+
4854
@AssertTrue(message = "At most two options from start_time, end_time and range can be specified at the same time")
4955
public boolean hasValidTimeOptions() {
5056
return Stream.of(startTime, endTime, range).filter(Objects::nonNull).count() < 3;
@@ -81,4 +87,8 @@ public Duration getRange() {
8187
public S3ScanKeyPathOption getS3ScanFilter() {
8288
return s3ScanFilter;
8389
}
90+
91+
public List<S3ScanProcessingCondition> getProcessingConditions() {
92+
return processingConditions;
93+
}
8494
}

0 commit comments

Comments
 (0)