Skip to content

Commit 627a6b7

Browse files
authored
add S3 Enrich processor to merge ml batch job output with source inputs (#5992)
Signed-off-by: Xun Zhang <xunzh@amazon.com>
1 parent 2f8c836 commit 627a6b7

26 files changed

Lines changed: 2589 additions & 1 deletion

File tree

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ data-prepper-main/src/test/resources/logstash-conf/logstash-filter.yaml
1313
# output folder created when we run test cases
1414
**/out/
1515

16+
# Eclipse/IDE compiled output
17+
**/bin/
18+
1619
# Development tools
1720
.DS_Store
1821
.idea

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/Event.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import com.fasterxml.jackson.databind.JsonNode;
1414

1515
import java.io.Serializable;
16+
import java.util.Collection;
1617
import java.util.List;
1718
import java.util.Map;
1819

@@ -146,6 +147,19 @@ public interface Event extends Serializable {
146147
*/
147148
void merge(Event other);
148149

150+
/**
151+
* Merges only the specified keys from another Event into the current Event.
152+
* Only the keys present in {@code keys} will be copied from {@code other} into this Event.
153+
* Values from {@code other} will overwrite existing values in this Event for matching keys.
154+
*
155+
* @param other the other Event to merge from
156+
* @param keys the list of keys to selectively merge
157+
* @throws IllegalArgumentException if the input event is not compatible to merge.
158+
* @throws UnsupportedOperationException if the current Event does not support merging.
159+
* @since 2.15
160+
*/
161+
void merge(Event other, Collection<String> keys);
162+
149163
/**
150164
* Generates a serialized Json string of the entire Event
151165
*

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.io.ObjectInputStream;
3333
import java.time.Instant;
3434
import java.util.ArrayList;
35+
import java.util.Collection;
3536
import java.util.Collections;
3637
import java.util.Deque;
3738
import java.util.HashMap;
@@ -467,6 +468,26 @@ public void merge(final Event other) {
467468
((ObjectNode) jsonNode).setAll(otherObjectNode);
468469
}
469470

471+
@Override
472+
public void merge(final Event other, final Collection<String> keys) {
473+
if (keys == null || keys.isEmpty()) {
474+
throw new IllegalArgumentException("Keys list must not be null or empty for selective merge.");
475+
}
476+
if (!(other instanceof JacksonEvent)) {
477+
throw new IllegalArgumentException("Unable to merge the Event. The input Event must be a JacksonEvent.");
478+
}
479+
if (!(jsonNode instanceof ObjectNode)) {
480+
throw new UnsupportedOperationException("Unable to merge the Event. The current Event must have object data.");
481+
}
482+
483+
for (final String key : keys) {
484+
final Object value = other.get(key, Object.class);
485+
if (value != null) {
486+
put(key, value);
487+
}
488+
}
489+
}
490+
470491
@Override
471492
public String toJsonString() {
472493
return jsonNode.toString();

data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -695,6 +695,71 @@ void merge_overrides_existing_values() {
695695
assertThat(event.get("info/ids/id", String.class), equalTo("idx"));
696696
}
697697

698+
@Test
699+
void merge_with_keys_only_copies_specified_keys() {
700+
final String jsonString = "{\"a\": \"alpha\", \"b\": \"beta\", \"c\": \"gamma\"}";
701+
Event otherEvent = JacksonEvent.builder().withEventType(EventType.DOCUMENT.toString()).withData(jsonString).build();
702+
event.merge(otherEvent, List.of("a", "b"));
703+
704+
assertThat(event.get("a", Object.class), equalTo("alpha"));
705+
assertThat(event.get("b", Object.class), equalTo("beta"));
706+
assertThat(event.containsKey("c"), equalTo(false));
707+
}
708+
709+
@Test
710+
void merge_with_keys_overwrites_existing_values() {
711+
event.put("a", "original");
712+
final String jsonString = "{\"a\": \"updated\", \"b\": \"beta\"}";
713+
Event otherEvent = JacksonEvent.builder().withEventType(EventType.DOCUMENT.toString()).withData(jsonString).build();
714+
event.merge(otherEvent, List.of("a"));
715+
716+
assertThat(event.get("a", Object.class), equalTo("updated"));
717+
assertThat(event.containsKey("b"), equalTo(false));
718+
}
719+
720+
@Test
721+
void merge_with_keys_skips_missing_keys_in_other() {
722+
event.put("existing", "value");
723+
final String jsonString = "{\"a\": \"alpha\"}";
724+
Event otherEvent = JacksonEvent.builder().withEventType(EventType.DOCUMENT.toString()).withData(jsonString).build();
725+
event.merge(otherEvent, List.of("a", "nonexistent"));
726+
727+
assertThat(event.get("a", Object.class), equalTo("alpha"));
728+
assertThat(event.get("existing", Object.class), equalTo("value"));
729+
assertThat(event.containsKey("nonexistent"), equalTo(false));
730+
}
731+
732+
@Test
733+
void merge_with_keys_throws_when_keys_list_is_null() {
734+
Event otherEvent = JacksonEvent.builder().withEventType(EventType.DOCUMENT.toString()).withData("{}").build();
735+
assertThrows(IllegalArgumentException.class, () -> event.merge(otherEvent, null));
736+
}
737+
738+
@Test
739+
void merge_with_keys_throws_when_keys_list_is_empty() {
740+
Event otherEvent = JacksonEvent.builder().withEventType(EventType.DOCUMENT.toString()).withData("{}").build();
741+
assertThrows(IllegalArgumentException.class, () -> event.merge(otherEvent, List.of()));
742+
}
743+
744+
@Test
745+
void merge_with_keys_throws_when_other_is_not_JacksonEvent() {
746+
final Event otherEvent = mock(Event.class);
747+
assertThrows(IllegalArgumentException.class, () -> event.merge(otherEvent, List.of("a")));
748+
}
749+
750+
@Test
751+
void merge_with_keys_throws_when_current_event_has_array_data() {
752+
final JacksonEvent arrayEvent = JacksonEvent.builder()
753+
.withEventType(EventType.DOCUMENT.toString())
754+
.withData("[1, 2, 3]")
755+
.build();
756+
final Event otherEvent = JacksonEvent.builder()
757+
.withEventType(EventType.DOCUMENT.toString())
758+
.withData("{\"a\": \"alpha\"}")
759+
.build();
760+
assertThrows(UnsupportedOperationException.class, () -> arrayEvent.merge(otherEvent, List.of("a")));
761+
}
762+
698763
@ParameterizedTest
699764
@ValueSource(strings = {"/", "foo", "/foo", "/foo/bar", "foo/bar", "foo/bar/", "/foo/bar/leaf/key"})
700765
public void testDelete_withNonexistentKey(final String key) {

data-prepper-core/src/test/java/org/opensearch/dataprepper/core/peerforwarder/discovery/AwsCloudMapPeerListProviderTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,9 @@ void listener_gets_list_after_several_failed_attempts() {
347347

348348
waitUntilPeerListPopulated(objectUnderTest);
349349

350-
assertThat(listenerEndpoints.size(), equalTo(knownIpPeers.size()));
350+
await().atMost(5, TimeUnit.SECONDS)
351+
.pollDelay(100, TimeUnit.MILLISECONDS)
352+
.untilAsserted(() -> assertThat(listenerEndpoints.size(), equalTo(knownIpPeers.size())));
351353

352354
final Set<String> observedIps = listenerEndpoints.stream()
353355
.map(Endpoint::ipAddr)

data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/NdjsonInputConfig.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,18 @@ public class NdjsonInputConfig {
1818
@JsonProperty("include_empty_objects")
1919
private boolean includeEmptyObjects = false;
2020

21+
/**
22+
* Optional file extension used to identify enrichment source files.
23+
* Defaults to "jsonl".
24+
*/
25+
@JsonProperty("extension")
26+
private String extension = "jsonl";
27+
2128
public boolean isIncludeEmptyObjects() {
2229
return includeEmptyObjects;
2330
}
31+
32+
public String getExtension() {
33+
return extension;
34+
}
2435
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
2+
# S3 Enricher Processor
3+
4+
This plugin enables you to merge data from a S3 file with source data from your Data Prepper pipeline.
5+
6+
## Usage
7+
```aidl
8+
ml_merge-pipeline:
9+
...
10+
processor:
11+
- s3_enrich:
12+
# =============================================================================
13+
# S3 SOURCE BUCKET CONFIGURATION
14+
# Defines where to fetch the original/source data for enrichment
15+
# =============================================================================
16+
bucket:
17+
# The S3 bucket containing the source records to enrich from
18+
name: offlinebatch
19+
filter:
20+
# S3 prefix path where source files are located
21+
# The processor will look for source files under this prefix
22+
# Example: s3://offlinebatch/bedrockbatch/originsource/test_batch_50k.jsonl
23+
include_prefix: bedrockbatch/originsource/
24+
25+
# =============================================================================
26+
# DATA FORMAT CONFIGURATION
27+
# =============================================================================
28+
# Codec for parsing source S3 files
29+
# Options: ndjson, json, csv, etc.
30+
codec:
31+
ndjson:
32+
33+
# =============================================================================
34+
# AWS CONFIGURATION
35+
# =============================================================================
36+
# AWS account ID that owns the S3 bucket (for cross-account access)
37+
default_bucket_owner: 802041417063
38+
39+
aws:
40+
# AWS region where the S3 bucket is located
41+
region: us-east-1
42+
43+
# =============================================================================
44+
# S3 OBJECT SETTINGS
45+
# =============================================================================
46+
# Maximum size (in MB) of S3 source files to process
47+
# Files exceeding this limit will be skipped
48+
s3_object_size_limit: 100mb
49+
50+
# JSON path in the incoming pipeline event that contains the S3 object key
51+
# Used to determine which source file to fetch for enrichment
52+
# Example event: {"s3": {"bucket": "...", "key": "output/file.jsonl.out"}}
53+
s3_key_path: "s3/key"
54+
55+
# =============================================================================
56+
# SOURCE FILE NAME EXTRACTION
57+
# =============================================================================
58+
# Regex pattern to extract the base filename from the output S3 key
59+
# The first capture group (.*?) extracts the original source filename
60+
#
61+
# Example:
62+
# Input: test_batch_50k-2025-11-06T21-19-15Z-1762463955825635000-uuid.jsonl.out
63+
# Match: Group 1 = "test_batch_50k"
64+
# Result: Looks for source file "test_batch_50k.jsonl" in include_prefix path
65+
#
66+
# Pattern breakdown:
67+
# ^(.*?) - Capture base filename (non-greedy)
68+
# -\d{4}-\d{2}-\d{2} - Match date: -YYYY-MM-DD
69+
# T\d{2}-\d{2}-\d{2}Z - Match time: THH-MM-SSZ
70+
# -.* - Match remaining (job ID, UUID, etc.)
71+
# \.jsonl\.out$ - Match file extension
72+
s3_object_name_pattern: ^(.*?)-\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2}Z-.*\.jsonl\.out$
73+
74+
# =============================================================================
75+
# RECORD MATCHING & ENRICHMENT
76+
# =============================================================================
77+
# Field name used to correlate/match records between output and source files
78+
# Both the pipeline event and source records must contain this field
79+
# Records with matching correlation values will be merged
80+
correlation_key: "recordId"
81+
82+
# List of fields to copy from the source record into the pipeline event
83+
# Only these specified fields will be merged; all other source fields are ignored
84+
# If a field doesn't exist in source, it will be skipped
85+
keys_to_merge:
86+
- "field_A"
87+
- "field_B"
88+
- "field_C"
89+
90+
# =============================================================================
91+
# CONDITIONAL PROCESSING
92+
# =============================================================================
93+
# Data Prepper expression to conditionally apply enrichment
94+
# Only events matching this condition will be processed by the enricher
95+
# Events not matching will pass through unchanged
96+
enrich_when: /s3/key != null
97+
```
98+
`keys_to_merge` List of fields to copy from the source record into the pipeline event.
99+
`s3_object_name_pattern` as Regex pattern to extract the base filename from the output S3 key.
100+
`s3_key_path` as JSON path in the incoming pipeline event that contains the S3 object key
101+
`correlation_key` as the Field name used to correlate/match records between output and source files
102+
103+
## Metrics
104+
- 'numberOfRecordsEnrichedSuccessFromS3': Number of pipeline records successfully enriched from S3 source
105+
- 'numberOfRecordsEnrichedFailerFromS3': Number of pipeline records that failed enrichment from S3 source
106+
- 's3EnricherObjectsFailed': Number of S3 source objects successfully loaded for enrichment
107+
- 's3EnricherObjectsSucceeded': Number of S3 source objects that failed to load for enrichment
108+
109+
## Developer Guide
110+
111+
The integration tests for this plugin do not run as part of the Data Prepper build.
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
dependencies {
12+
implementation project(path: ':data-prepper-plugins:common')
13+
implementation project(':data-prepper-plugins:s3-common')
14+
implementation project(':data-prepper-plugins:aws-plugin-api')
15+
implementation 'software.amazon.awssdk:sdk-core'
16+
implementation 'software.amazon.awssdk:sts'
17+
implementation 'io.micrometer:micrometer-core'
18+
implementation 'org.json:json'
19+
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
20+
implementation 'org.projectlombok:lombok:1.18.22'
21+
implementation libs.parquet.common
22+
implementation 'org.apache.httpcomponents:httpcore:4.4.16'
23+
implementation libs.caffeine
24+
implementation 'org.hibernate.validator:hibernate-validator:8.0.1.Final'
25+
annotationProcessor 'org.projectlombok:lombok:1.18.20'
26+
implementation 'software.amazon.awssdk:s3'
27+
testImplementation project(':data-prepper-test:test-event')
28+
testImplementation testLibs.slf4j.simple
29+
}
30+
31+
sourceSets {
32+
integrationTest {
33+
java {
34+
compileClasspath += main.output + test.output
35+
runtimeClasspath += main.output + test.output
36+
srcDir file('src/integrationTest/java')
37+
}
38+
resources.srcDir file('src/integrationTest/resources')
39+
}
40+
}
41+
42+
configurations {
43+
integrationTestImplementation.extendsFrom testImplementation
44+
integrationTestRuntime.extendsFrom testRuntime
45+
}
46+
47+
task integrationTest(type: Test) {
48+
group = 'verification'
49+
testClassesDirs = sourceSets.integrationTest.output.classesDirs
50+
51+
useJUnitPlatform()
52+
53+
classpath = sourceSets.integrationTest.runtimeClasspath
54+
55+
systemProperty 'log4j.configurationFile', 'src/test/resources/log4j2.properties'
56+
57+
filter {
58+
includeTestsMatching '*IT'
59+
}
60+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.s3_enrich.processor;
12+
13+
import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions;
14+
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
15+
import org.opensearch.dataprepper.plugins.s3.common.config.AwsAuthenticationOptions;
16+
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
17+
18+
public class AwsAuthenticationAdapter {
19+
private final AwsCredentialsSupplier awsCredentialsSupplier;
20+
private final S3EnrichProcessorConfig s3EnrichProcessorConfig;
21+
22+
23+
AwsAuthenticationAdapter(
24+
final AwsCredentialsSupplier awsCredentialsSupplier,
25+
final S3EnrichProcessorConfig s3EnricherProcessorConfig) {
26+
this.awsCredentialsSupplier = awsCredentialsSupplier;
27+
this.s3EnrichProcessorConfig = s3EnricherProcessorConfig;
28+
}
29+
30+
AwsCredentialsProvider getCredentialsProvider() {
31+
final AwsAuthenticationOptions awsAuthenticationOptions = s3EnrichProcessorConfig.getAwsAuthenticationOptions();
32+
33+
final AwsCredentialsOptions options = AwsCredentialsOptions.builder()
34+
.withStsRoleArn(awsAuthenticationOptions.getAwsStsRoleArn())
35+
.withRegion(awsAuthenticationOptions.getAwsRegion())
36+
.withStsHeaderOverrides(awsAuthenticationOptions.getAwsStsHeaderOverrides())
37+
.withStsExternalId(awsAuthenticationOptions.getAwsStsExternalId())
38+
.build();
39+
40+
return awsCredentialsSupplier.getProvider(options);
41+
}
42+
}

0 commit comments

Comments
 (0)