Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ data-prepper-main/src/test/resources/logstash-conf/logstash-filter.yaml
# output folder created when we run test cases
**/out/

# Eclipse/IDE compiled output
**/bin/

# Development tools
.DS_Store
.idea
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.fasterxml.jackson.databind.JsonNode;

import java.io.Serializable;
import java.util.Collection;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -146,6 +147,19 @@ public interface Event extends Serializable {
*/
void merge(Event other);

/**
* Merges only the specified keys from another Event into the current Event.
* Only the keys present in {@code keys} will be copied from {@code other} into this Event.
* Values from {@code other} will overwrite existing values in this Event for matching keys.
*
* @param other the other Event to merge from
* @param keys the list of keys to selectively merge
* @throws IllegalArgumentException if the input event is not compatible to merge.
* @throws UnsupportedOperationException if the current Event does not support merging.
* @since 2.15
*/
void merge(Event other, Collection<String> keys);

/**
* Generates a serialized Json string of the entire Event
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.io.ObjectInputStream;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
Expand Down Expand Up @@ -446,6 +447,26 @@ public void merge(final Event other) {
((ObjectNode) jsonNode).setAll(otherObjectNode);
}

@Override
public void merge(final Event other, final Collection<String> keys) {
if (keys == null || keys.isEmpty()) {
throw new IllegalArgumentException("Keys list must not be null or empty for selective merge.");
}
if (!(other instanceof JacksonEvent)) {
throw new IllegalArgumentException("Unable to merge the Event. The input Event must be a JacksonEvent.");
}
if (!(jsonNode instanceof ObjectNode)) {
throw new UnsupportedOperationException("Unable to merge the Event. The current Event must have object data.");
}

for (final String key : keys) {
final Object value = other.get(key, Object.class);
if (value != null) {
put(key, value);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the data has something like this { "key": null }, do you thing users will want it merged?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. For the S3 enrich use case(adding data from a lookup), skipping nulls makes sense — you don't want a missing/null enrichment field to overwrite a valid value in the target event. But in general, a user might explicitly want to merge a null to clear a field.

Given that the only current consumer is S3EnrichProcessor and the semantics are enrichment, I think we can keep skipping nulls. But I'll add a comment in the code in the next PR to make the decision explicit so in the future this can enhanced for other cases.

}
}
}

@Override
public String toJsonString() {
return jsonNode.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,71 @@ void merge_overrides_existing_values() {
assertThat(event.get("info/ids/id", String.class), equalTo("idx"));
}

@Test
void merge_with_keys_only_copies_specified_keys() {
final String jsonString = "{\"a\": \"alpha\", \"b\": \"beta\", \"c\": \"gamma\"}";
Event otherEvent = JacksonEvent.builder().withEventType(EventType.DOCUMENT.toString()).withData(jsonString).build();
event.merge(otherEvent, List.of("a", "b"));

assertThat(event.get("a", Object.class), equalTo("alpha"));
assertThat(event.get("b", Object.class), equalTo("beta"));
assertThat(event.containsKey("c"), equalTo(false));
}

@Test
void merge_with_keys_overwrites_existing_values() {
event.put("a", "original");
final String jsonString = "{\"a\": \"updated\", \"b\": \"beta\"}";
Event otherEvent = JacksonEvent.builder().withEventType(EventType.DOCUMENT.toString()).withData(jsonString).build();
event.merge(otherEvent, List.of("a"));

assertThat(event.get("a", Object.class), equalTo("updated"));
assertThat(event.containsKey("b"), equalTo(false));
}

@Test
void merge_with_keys_skips_missing_keys_in_other() {
event.put("existing", "value");
final String jsonString = "{\"a\": \"alpha\"}";
Event otherEvent = JacksonEvent.builder().withEventType(EventType.DOCUMENT.toString()).withData(jsonString).build();
event.merge(otherEvent, List.of("a", "nonexistent"));

assertThat(event.get("a", Object.class), equalTo("alpha"));
assertThat(event.get("existing", Object.class), equalTo("value"));
assertThat(event.containsKey("nonexistent"), equalTo(false));
}

@Test
void merge_with_keys_throws_when_keys_list_is_null() {
Event otherEvent = JacksonEvent.builder().withEventType(EventType.DOCUMENT.toString()).withData("{}").build();
assertThrows(IllegalArgumentException.class, () -> event.merge(otherEvent, null));
}

@Test
void merge_with_keys_throws_when_keys_list_is_empty() {
Event otherEvent = JacksonEvent.builder().withEventType(EventType.DOCUMENT.toString()).withData("{}").build();
assertThrows(IllegalArgumentException.class, () -> event.merge(otherEvent, List.of()));
}

@Test
void merge_with_keys_throws_when_other_is_not_JacksonEvent() {
final Event otherEvent = mock(Event.class);
assertThrows(IllegalArgumentException.class, () -> event.merge(otherEvent, List.of("a")));
}

@Test
void merge_with_keys_throws_when_current_event_has_array_data() {
final JacksonEvent arrayEvent = JacksonEvent.builder()
.withEventType(EventType.DOCUMENT.toString())
.withData("[1, 2, 3]")
.build();
final Event otherEvent = JacksonEvent.builder()
.withEventType(EventType.DOCUMENT.toString())
.withData("{\"a\": \"alpha\"}")
.build();
assertThrows(UnsupportedOperationException.class, () -> arrayEvent.merge(otherEvent, List.of("a")));
}

@ParameterizedTest
@ValueSource(strings = {"/", "foo", "/foo", "/foo/bar", "foo/bar", "foo/bar/", "/foo/bar/leaf/key"})
public void testDelete_withNonexistentKey(final String key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,9 @@ void listener_gets_list_after_several_failed_attempts() {

waitUntilPeerListPopulated(objectUnderTest);

assertThat(listenerEndpoints.size(), equalTo(knownIpPeers.size()));
await().atMost(5, TimeUnit.SECONDS)
.pollDelay(100, TimeUnit.MILLISECONDS)
.untilAsserted(() -> assertThat(listenerEndpoints.size(), equalTo(knownIpPeers.size())));

final Set<String> observedIps = listenerEndpoints.stream()
.map(Endpoint::ipAddr)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,18 @@ public class NdjsonInputConfig {
@JsonProperty("include_empty_objects")
private boolean includeEmptyObjects = false;

/**
* Optional file extension used to identify enrichment source files.
* Defaults to "jsonl".
*/
@JsonProperty("extension")
private String extension = "jsonl";

public boolean isIncludeEmptyObjects() {
return includeEmptyObjects;
}

public String getExtension() {
return extension;
}
}
111 changes: 111 additions & 0 deletions data-prepper-plugins/s3-enrich-processor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@

# S3 Enricher Processor

This plugin enables you to merge data from a S3 file with source data from your Data Prepper pipeline.

## Usage
```aidl
ml_merge-pipeline:
...
processor:
- s3_enrich:
# =============================================================================
# S3 SOURCE BUCKET CONFIGURATION
# Defines where to fetch the original/source data for enrichment
# =============================================================================
bucket:
# The S3 bucket containing the source records to enrich from
name: offlinebatch
filter:
# S3 prefix path where source files are located
# The processor will look for source files under this prefix
# Example: s3://offlinebatch/bedrockbatch/originsource/test_batch_50k.jsonl
include_prefix: bedrockbatch/originsource/

# =============================================================================
# DATA FORMAT CONFIGURATION
# =============================================================================
# Codec for parsing source S3 files
# Options: ndjson, json, csv, etc.
codec:
ndjson:

# =============================================================================
# AWS CONFIGURATION
# =============================================================================
# AWS account ID that owns the S3 bucket (for cross-account access)
default_bucket_owner: 802041417063

aws:
# AWS region where the S3 bucket is located
region: us-east-1

# =============================================================================
# S3 OBJECT SETTINGS
# =============================================================================
# Maximum size (in MB) of S3 source files to process
# Files exceeding this limit will be skipped
s3_object_size_limit: 100mb

# JSON path in the incoming pipeline event that contains the S3 object key
# Used to determine which source file to fetch for enrichment
# Example event: {"s3": {"bucket": "...", "key": "output/file.jsonl.out"}}
s3_key_path: "s3/key"

# =============================================================================
# SOURCE FILE NAME EXTRACTION
# =============================================================================
# Regex pattern to extract the base filename from the output S3 key
# The first capture group (.*?) extracts the original source filename
#
# Example:
# Input: test_batch_50k-2025-11-06T21-19-15Z-1762463955825635000-uuid.jsonl.out
# Match: Group 1 = "test_batch_50k"
# Result: Looks for source file "test_batch_50k.jsonl" in include_prefix path
#
# Pattern breakdown:
# ^(.*?) - Capture base filename (non-greedy)
# -\d{4}-\d{2}-\d{2} - Match date: -YYYY-MM-DD
# T\d{2}-\d{2}-\d{2}Z - Match time: THH-MM-SSZ
# -.* - Match remaining (job ID, UUID, etc.)
# \.jsonl\.out$ - Match file extension
s3_object_name_pattern: ^(.*?)-\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2}Z-.*\.jsonl\.out$

# =============================================================================
# RECORD MATCHING & ENRICHMENT
# =============================================================================
# Field name used to correlate/match records between output and source files
# Both the pipeline event and source records must contain this field
# Records with matching correlation values will be merged
correlation_key: "recordId"

# List of fields to copy from the source record into the pipeline event
# Only these specified fields will be merged; all other source fields are ignored
# If a field doesn't exist in source, it will be skipped
keys_to_merge:
- "field_A"
- "field_B"
- "field_C"

# =============================================================================
# CONDITIONAL PROCESSING
# =============================================================================
# Data Prepper expression to conditionally apply enrichment
# Only events matching this condition will be processed by the enricher
# Events not matching will pass through unchanged
enrich_when: /s3/key != null
```
`keys_to_merge` List of fields to copy from the source record into the pipeline event.
`s3_object_name_pattern` as Regex pattern to extract the base filename from the output S3 key.
`s3_key_path` as JSON path in the incoming pipeline event that contains the S3 object key
`correlation_key` as the Field name used to correlate/match records between output and source files

## Metrics
- 'numberOfRecordsEnrichedSuccessFromS3': Number of pipeline records successfully enriched from S3 source
- 'numberOfRecordsEnrichedFailerFromS3': Number of pipeline records that failed enrichment from S3 source
- 's3EnricherObjectsFailed': Number of S3 source objects successfully loaded for enrichment
- 's3EnricherObjectsSucceeded': Number of S3 source objects that failed to load for enrichment

## Developer Guide

The integration tests for this plugin do not run as part of the Data Prepper build.
60 changes: 60 additions & 0 deletions data-prepper-plugins/s3-enrich-processor/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

dependencies {
implementation project(path: ':data-prepper-plugins:common')
implementation project(':data-prepper-plugins:s3-common')
implementation project(':data-prepper-plugins:aws-plugin-api')
implementation 'software.amazon.awssdk:sdk-core'
implementation 'software.amazon.awssdk:sts'
implementation 'io.micrometer:micrometer-core'
implementation 'org.json:json'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
implementation 'org.projectlombok:lombok:1.18.22'
implementation libs.parquet.common
implementation 'org.apache.httpcomponents:httpcore:4.4.16'
implementation libs.caffeine
implementation 'org.hibernate.validator:hibernate-validator:8.0.1.Final'
annotationProcessor 'org.projectlombok:lombok:1.18.20'
implementation 'software.amazon.awssdk:s3'
testImplementation project(':data-prepper-test:test-event')
testImplementation testLibs.slf4j.simple
}

sourceSets {
integrationTest {
java {
compileClasspath += main.output + test.output
runtimeClasspath += main.output + test.output
srcDir file('src/integrationTest/java')
}
resources.srcDir file('src/integrationTest/resources')
}
}

configurations {
integrationTestImplementation.extendsFrom testImplementation
integrationTestRuntime.extendsFrom testRuntime
}

task integrationTest(type: Test) {
group = 'verification'
testClassesDirs = sourceSets.integrationTest.output.classesDirs

useJUnitPlatform()

classpath = sourceSets.integrationTest.runtimeClasspath

systemProperty 'log4j.configurationFile', 'src/test/resources/log4j2.properties'

filter {
includeTestsMatching '*IT'
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.s3_enrich.processor;

import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.plugins.s3.common.config.AwsAuthenticationOptions;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;

public class AwsAuthenticationAdapter {
private final AwsCredentialsSupplier awsCredentialsSupplier;
private final S3EnrichProcessorConfig s3EnrichProcessorConfig;


AwsAuthenticationAdapter(
final AwsCredentialsSupplier awsCredentialsSupplier,
final S3EnrichProcessorConfig s3EnricherProcessorConfig) {
this.awsCredentialsSupplier = awsCredentialsSupplier;
this.s3EnrichProcessorConfig = s3EnricherProcessorConfig;
}

AwsCredentialsProvider getCredentialsProvider() {
final AwsAuthenticationOptions awsAuthenticationOptions = s3EnrichProcessorConfig.getAwsAuthenticationOptions();

final AwsCredentialsOptions options = AwsCredentialsOptions.builder()
.withStsRoleArn(awsAuthenticationOptions.getAwsStsRoleArn())
.withRegion(awsAuthenticationOptions.getAwsRegion())
.withStsHeaderOverrides(awsAuthenticationOptions.getAwsStsHeaderOverrides())
.withStsExternalId(awsAuthenticationOptions.getAwsStsExternalId())
.build();

return awsCredentialsSupplier.getProvider(options);
}
}
Loading
Loading