Skip to content

Commit 363af8f

Browse files
authored
Add S3 Scan processing condition evaluator to ensure object completeness (#6624)
Add S3 Scan processing condition evaluator to ensure S3 object completeness Signed-off-by: Xun Zhang <xunzh@amazon.com>
1 parent c11b644 commit 363af8f

11 files changed

Lines changed: 830 additions & 13 deletions

File tree

data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanObjectWorkerIT.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,15 @@
3535
import org.opensearch.dataprepper.model.configuration.PluginModel;
3636
import org.opensearch.dataprepper.model.configuration.PluginSetting;
3737
import org.opensearch.dataprepper.model.event.Event;
38+
import org.opensearch.dataprepper.model.plugin.PluginFactory;
3839
import org.opensearch.dataprepper.model.record.Record;
3940
import org.opensearch.dataprepper.model.source.SourceCoordinationStore;
4041
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
4142
import org.opensearch.dataprepper.core.parser.model.SourceCoordinationConfig;
4243
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
4344
import org.opensearch.dataprepper.plugins.s3.common.ownership.BucketOwnerProvider;
4445
import org.opensearch.dataprepper.plugins.s3.common.source.S3ObjectPluginMetrics;
46+
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
4547
import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanBucketOption;
4648
import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanBucketOptions;
4749
import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanScanOptions;
@@ -224,8 +226,11 @@ private ScanObjectWorker createObjectUnderTest(final RecordsGenerator recordsGen
224226
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
225227
acknowledgementSetManager = new DefaultAcknowledgementSetManager(executor);
226228

227-
return new ScanObjectWorker(s3Client, scanOptions, createObjectUnderTest(s3ObjectRequest)
228-
,bucketOwnerProvider, sourceCoordinator, s3SourceConfig, acknowledgementSetManager, s3ObjectDeleteWorker, 30000, pluginMetrics);
229+
final S3ScanProcessingConditionEvaluator conditionEvaluator =
230+
new S3ScanProcessingConditionEvaluator(s3Client, mock(ExpressionEvaluator.class), mock(PluginFactory.class), Collections.emptyList());
231+
return new ScanObjectWorker(s3Client, scanOptions, createObjectUnderTest(s3ObjectRequest),
232+
bucketOwnerProvider, sourceCoordinator, s3SourceConfig, acknowledgementSetManager, s3ObjectDeleteWorker, 30000, pluginMetrics,
233+
conditionEvaluator, Collections.emptyMap());
229234
}
230235

231236
@ParameterizedTest
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
package org.opensearch.dataprepper.plugins.source.s3;
11+
12+
import com.fasterxml.jackson.core.type.TypeReference;
13+
import com.fasterxml.jackson.databind.ObjectMapper;
14+
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
15+
import org.opensearch.dataprepper.model.codec.InputCodec;
16+
import org.opensearch.dataprepper.model.configuration.PluginModel;
17+
import org.opensearch.dataprepper.model.configuration.PluginSetting;
18+
import org.opensearch.dataprepper.model.event.Event;
19+
import org.opensearch.dataprepper.model.event.JacksonEvent;
20+
import org.opensearch.dataprepper.model.plugin.PluginFactory;
21+
import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanProcessingCondition;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
import software.amazon.awssdk.core.ResponseInputStream;
25+
import software.amazon.awssdk.services.s3.S3Client;
26+
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
27+
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
28+
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
29+
30+
import java.io.IOException;
31+
import java.nio.charset.StandardCharsets;
32+
import java.util.Collection;
33+
import java.util.Collections;
34+
import java.util.IdentityHashMap;
35+
import java.util.List;
36+
import java.util.Map;
37+
import java.util.Optional;
38+
import java.util.concurrent.atomic.AtomicReference;
39+
40+
/**
41+
* Evaluates {@link S3ScanProcessingCondition} entries for a given S3 object before
42+
* the object is processed. For each applicable condition the evaluator downloads the
43+
* condition object co-located with the S3 object and evaluates the {@code when} expression
44+
* against its content. When a {@code codec} is configured on the condition the object is
45+
* parsed with that codec; otherwise the object is parsed as a JSON document.
46+
*/
47+
public class S3ScanProcessingConditionEvaluator {
48+
49+
private static final Logger LOG = LoggerFactory.getLogger(S3ScanProcessingConditionEvaluator.class);
50+
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
51+
52+
private final S3Client s3Client;
53+
private final ExpressionEvaluator expressionEvaluator;
54+
private final Map<S3ScanProcessingCondition, InputCodec> codecCache;
55+
56+
public S3ScanProcessingConditionEvaluator(final S3Client s3Client,
57+
final ExpressionEvaluator expressionEvaluator,
58+
final PluginFactory pluginFactory,
59+
final Collection<S3ScanProcessingCondition> allConditions) {
60+
this.s3Client = s3Client;
61+
this.expressionEvaluator = expressionEvaluator;
62+
final IdentityHashMap<S3ScanProcessingCondition, InputCodec> cache = new IdentityHashMap<>();
63+
for (final S3ScanProcessingCondition condition : allConditions) {
64+
if (condition.getCodec() != null) {
65+
final PluginModel codecModel = condition.getCodec();
66+
final PluginSetting pluginSetting = new PluginSetting(
67+
codecModel.getPluginName(), codecModel.getPluginSettings());
68+
cache.put(condition, pluginFactory.loadPlugin(InputCodec.class, pluginSetting));
69+
}
70+
}
71+
this.codecCache = Collections.unmodifiableMap(cache);
72+
}
73+
74+
/**
75+
* Returns the first condition in {@code conditions} that is not yet satisfied for the given
76+
* object, or {@link Optional#empty()} if all applicable conditions are met. A condition is
77+
* applicable when {@code applicable_prefix} is absent or the object key starts with at least
78+
* one of the listed applicable prefixes.
79+
*/
80+
public Optional<S3ScanProcessingCondition> firstUnmetCondition(final String bucket,
81+
final String objectKey,
82+
final List<S3ScanProcessingCondition> conditions) {
83+
if (conditions == null || conditions.isEmpty()) {
84+
return Optional.empty();
85+
}
86+
for (final S3ScanProcessingCondition condition : conditions) {
87+
if (!isApplicable(objectKey, condition)) {
88+
continue;
89+
}
90+
final String conditionObjectKey = resolveConditionObjectKey(objectKey, condition.getObjectName());
91+
try {
92+
final Event event;
93+
final InputCodec codec = codecCache.get(condition);
94+
if (codec != null) {
95+
event = parseFirstEventWithCodec(bucket, conditionObjectKey, codec);
96+
} else {
97+
final String content = readS3ObjectAsString(bucket, conditionObjectKey);
98+
final Map<String, Object> data = OBJECT_MAPPER.readValue(
99+
content, new TypeReference<Map<String, Object>>() {});
100+
event = JacksonEvent.builder()
101+
.withEventType("event")
102+
.withData(data)
103+
.build();
104+
}
105+
if (event == null || !expressionEvaluator.evaluateConditional(condition.getWhen(), event)) {
106+
LOG.debug("Processing condition '{}' not satisfied for {}/{} using condition object {}",
107+
condition.getWhen(), bucket, objectKey, conditionObjectKey);
108+
return Optional.of(condition);
109+
}
110+
} catch (final NoSuchKeyException e) {
111+
LOG.debug("Object for condition {}/{} not found yet, condition not met", bucket, conditionObjectKey);
112+
return Optional.of(condition);
113+
} catch (final Exception e) {
114+
LOG.warn("Error reading or evaluating processing condition for {}/{}, processing object as-is",
115+
bucket, objectKey, e);
116+
return Optional.empty();
117+
}
118+
}
119+
return Optional.empty();
120+
}
121+
122+
private boolean isApplicable(final String objectKey, final S3ScanProcessingCondition condition) {
123+
final List<String> includePrefix = condition.getApplicablePrefix();
124+
return includePrefix == null || includePrefix.isEmpty() ||
125+
includePrefix.stream().anyMatch(objectKey::startsWith);
126+
}
127+
128+
private String resolveConditionObjectKey(final String objectKey, final String fileName) {
129+
final String directory = objectKey.contains("/")
130+
? objectKey.substring(0, objectKey.lastIndexOf('/') + 1)
131+
: "";
132+
return directory + fileName;
133+
}
134+
135+
private Event parseFirstEventWithCodec(final String bucket,
136+
final String key,
137+
final InputCodec codec) throws IOException {
138+
final AtomicReference<Event> firstEvent = new AtomicReference<>();
139+
final GetObjectRequest request = GetObjectRequest.builder().bucket(bucket).key(key).build();
140+
try (final ResponseInputStream<GetObjectResponse> response = s3Client.getObject(request)) {
141+
codec.parse(response, record -> firstEvent.compareAndSet(null, record.getData()));
142+
}
143+
return firstEvent.get();
144+
}
145+
146+
private String readS3ObjectAsString(final String bucket, final String key) throws IOException {
147+
final GetObjectRequest request = GetObjectRequest.builder().bucket(bucket).key(key).build();
148+
try (final ResponseInputStream<GetObjectResponse> response = s3Client.getObject(request)) {
149+
return new String(response.readAllBytes(), StandardCharsets.UTF_8);
150+
}
151+
}
152+
}

data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanService.java

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,27 @@
44
*/
55
package org.opensearch.dataprepper.plugins.source.s3;
66

7+
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
78
import org.opensearch.dataprepper.metrics.PluginMetrics;
89
import org.opensearch.dataprepper.common.concurrent.BackgroundThreadFactory;
910
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
11+
import org.opensearch.dataprepper.model.plugin.PluginFactory;
1012
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
1113
import org.opensearch.dataprepper.plugins.s3.common.ownership.BucketOwnerProvider;
14+
import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanBucketOption;
1215
import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanBucketOptions;
16+
import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanProcessingCondition;
1317

1418
import java.time.Duration;
1519
import java.time.LocalDateTime;
20+
import java.util.HashMap;
21+
import java.util.Map;
1622
import java.util.concurrent.TimeUnit;
1723
import java.util.ArrayList;
1824
import java.util.List;
1925
import java.util.concurrent.Executors;
2026
import java.util.concurrent.ExecutorService;
27+
import java.util.stream.Collectors;
2128

2229
/**
2330
* Class responsible for taking an {@link S3SourceConfig} and creating all the necessary {@link ScanOptions}
@@ -42,6 +49,8 @@ public class S3ScanService {
4249
private final AcknowledgementSetManager acknowledgementSetManager;
4350
private final S3ObjectDeleteWorker s3ObjectDeleteWorker;
4451
private final PluginMetrics pluginMetrics;
52+
private final ExpressionEvaluator expressionEvaluator;
53+
private final PluginFactory pluginFactory;
4554
private final ExecutorService executorService;
4655
private final List<ScanObjectWorker> workers;
4756

@@ -52,7 +61,9 @@ public S3ScanService(final S3SourceConfig s3SourceConfig,
5261
final SourceCoordinator<S3SourceProgressState> sourceCoordinator,
5362
final AcknowledgementSetManager acknowledgementSetManager,
5463
final S3ObjectDeleteWorker s3ObjectDeleteWorker,
55-
final PluginMetrics pluginMetrics) {
64+
final PluginMetrics pluginMetrics,
65+
final ExpressionEvaluator expressionEvaluator,
66+
final PluginFactory pluginFactory) {
5667
this.s3SourceConfig = s3SourceConfig;
5768
this.s3ScanBucketOptions = s3SourceConfig.getS3ScanScanOptions().getBuckets();
5869
this.s3ClientBuilderFactory = s3ClientBuilderFactory;
@@ -65,15 +76,35 @@ public S3ScanService(final S3SourceConfig s3SourceConfig,
6576
this.acknowledgementSetManager = acknowledgementSetManager;
6677
this.s3ObjectDeleteWorker = s3ObjectDeleteWorker;
6778
this.pluginMetrics = pluginMetrics;
79+
this.expressionEvaluator = expressionEvaluator;
80+
this.pluginFactory = pluginFactory;
6881
this.workers = new ArrayList<>();
6982
this.executorService = Executors.newFixedThreadPool(s3SourceConfig.getNumWorkers(), BackgroundThreadFactory.defaultExecutorThreadFactory("s3-source-scan"));
7083
}
7184

7285
public void start() {
7386
long backOffMs = s3SourceConfig.getBackOff().toMillis();
87+
final Map<String, List<S3ScanProcessingCondition>> bucketProcessingConditionsMap = new HashMap<>();
88+
if (s3ScanBucketOptions != null) {
89+
for (final S3ScanBucketOptions bucketOption : s3ScanBucketOptions) {
90+
final S3ScanBucketOption option = bucketOption.getS3ScanBucketOption();
91+
if (option != null && option.getProcessingConditions() != null) {
92+
bucketProcessingConditionsMap
93+
.computeIfAbsent(option.getName(), k -> new ArrayList<>())
94+
.addAll(option.getProcessingConditions());
95+
}
96+
}
97+
}
98+
final List<S3ScanProcessingCondition> allConditions = bucketProcessingConditionsMap.values().stream()
99+
.flatMap(List::stream)
100+
.collect(Collectors.toList());
101+
final S3ScanProcessingConditionEvaluator conditionEvaluator =
102+
new S3ScanProcessingConditionEvaluator(s3ClientBuilderFactory.getS3Client(), expressionEvaluator, pluginFactory, allConditions);
74103
for (int i = 0; i < s3SourceConfig.getNumWorkers(); i++) {
75104
ScanObjectWorker scanObjectWorker = new ScanObjectWorker(s3ClientBuilderFactory.getS3Client(),
76-
getScanOptions(),s3ObjectHandler,bucketOwnerProvider, sourceCoordinator, s3SourceConfig, acknowledgementSetManager, s3ObjectDeleteWorker, backOffMs, pluginMetrics);
105+
getScanOptions(), s3ObjectHandler, bucketOwnerProvider, sourceCoordinator, s3SourceConfig,
106+
acknowledgementSetManager, s3ObjectDeleteWorker, backOffMs, pluginMetrics, conditionEvaluator,
107+
bucketProcessingConditionsMap);
77108
workers.add(scanObjectWorker);
78109
executorService.submit(new Thread(scanObjectWorker));
79110
}

data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3Source.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.opensearch.dataprepper.plugins.source.s3;
77

88
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
9+
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
910
import org.opensearch.dataprepper.metrics.PluginMetrics;
1011
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
1112
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
@@ -47,6 +48,7 @@ public class S3Source implements Source<Record<Event>>, UsesSourceCoordination {
4748
private final AcknowledgementSetManager acknowledgementSetManager;
4849
private final AwsCredentialsSupplier awsCredentialsSupplier;
4950
private final boolean acknowledgementsEnabled;
51+
private final ExpressionEvaluator expressionEvaluator;
5052
private SourceCoordinator<S3SourceProgressState> sourceCoordinator;
5153

5254

@@ -56,14 +58,16 @@ public S3Source(
5658
final S3SourceConfig s3SourceConfig,
5759
final PluginFactory pluginFactory,
5860
final AcknowledgementSetManager acknowledgementSetManager,
59-
final AwsCredentialsSupplier awsCredentialsSupplier) {
61+
final AwsCredentialsSupplier awsCredentialsSupplier,
62+
final ExpressionEvaluator expressionEvaluator) {
6063
this.pluginMetrics = pluginMetrics;
6164
this.s3SourceConfig = s3SourceConfig;
6265
this.pluginFactory = pluginFactory;
6366
this.s3ScanScanOptional = Optional.ofNullable(s3SourceConfig.getS3ScanScanOptions());
6467
this.acknowledgementsEnabled = s3SourceConfig.getAcknowledgements();
6568
this.acknowledgementSetManager = acknowledgementSetManager;
6669
this.awsCredentialsSupplier = awsCredentialsSupplier;
70+
this.expressionEvaluator = expressionEvaluator;
6771
}
6872

6973
@Override
@@ -126,7 +130,7 @@ public void start(Buffer<Record<Event>> buffer) {
126130
sqsService.start();
127131
}
128132
if(s3ScanScanOptional.isPresent()) {
129-
s3ScanService = new S3ScanService(s3SourceConfig, s3ClientBuilderFactory, s3Handler, bucketOwnerProvider, sourceCoordinator, acknowledgementSetManager, s3ObjectDeleteWorker, pluginMetrics);
133+
s3ScanService = new S3ScanService(s3SourceConfig, s3ClientBuilderFactory, s3Handler, bucketOwnerProvider, sourceCoordinator, acknowledgementSetManager, s3ObjectDeleteWorker, pluginMetrics, expressionEvaluator, pluginFactory);
130134
s3ScanService.start();
131135
}
132136
}

0 commit comments

Comments
 (0)