Skip to content

Commit b92c00c

Browse files
committed
Add selective merge to Event API and refactor S3EnrichProcessor
Signed-off-by: Xun Zhang <xunzh@amazon.com>
1 parent 478e15c commit b92c00c

16 files changed

Lines changed: 181 additions & 89 deletions

File tree

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ data-prepper-main/src/test/resources/logstash-conf/logstash-filter.yaml
1313
# output folder created when we run test cases
1414
**/out/
1515

16+
# Eclipse/IDE compiled output
17+
**/bin/
18+
1619
# Development tools
1720
.DS_Store
1821
.idea

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/Event.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,19 @@ public interface Event extends Serializable {
146146
*/
147147
void merge(Event other);
148148

149+
/**
150+
* Merges only the specified keys from another Event into the current Event.
151+
* Only the keys present in {@code keys} will be copied from {@code other} into this Event.
152+
* Values from {@code other} will overwrite existing values in this Event for matching keys.
153+
*
154+
* @param other the other Event to merge from
155+
* @param keys the list of keys to selectively merge
156+
* @throws IllegalArgumentException if the input event is not compatible to merge.
157+
* @throws UnsupportedOperationException if the current Event does not support merging.
158+
* @since 2.11
159+
*/
160+
void merge(Event other, List<String> keys);
161+
149162
/**
150163
* Generates a serialized Json string of the entire Event
151164
*

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,26 @@ public void merge(final Event other) {
446446
((ObjectNode) jsonNode).setAll(otherObjectNode);
447447
}
448448

449+
@Override
450+
public void merge(final Event other, final List<String> keys) {
451+
if (keys == null || keys.isEmpty()) {
452+
throw new IllegalArgumentException("Keys list must not be null or empty for selective merge.");
453+
}
454+
if (!(other instanceof JacksonEvent)) {
455+
throw new IllegalArgumentException("Unable to merge the Event. The input Event must be a JacksonEvent.");
456+
}
457+
if (!(jsonNode instanceof ObjectNode)) {
458+
throw new UnsupportedOperationException("Unable to merge the Event. The current Event must have object data.");
459+
}
460+
461+
for (final String key : keys) {
462+
final Object value = other.get(key, Object.class);
463+
if (value != null) {
464+
put(key, value);
465+
}
466+
}
467+
}
468+
449469
@Override
450470
public String toJsonString() {
451471
return jsonNode.toString();

data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -682,6 +682,71 @@ void merge_overrides_existing_values() {
682682
assertThat(event.get("info/ids/id", String.class), equalTo("idx"));
683683
}
684684

685+
@Test
686+
void merge_with_keys_only_copies_specified_keys() {
687+
final String jsonString = "{\"a\": \"alpha\", \"b\": \"beta\", \"c\": \"gamma\"}";
688+
Event otherEvent = JacksonEvent.builder().withEventType(EventType.DOCUMENT.toString()).withData(jsonString).build();
689+
event.merge(otherEvent, List.of("a", "b"));
690+
691+
assertThat(event.get("a", Object.class), equalTo("alpha"));
692+
assertThat(event.get("b", Object.class), equalTo("beta"));
693+
assertThat(event.containsKey("c"), equalTo(false));
694+
}
695+
696+
@Test
697+
void merge_with_keys_overwrites_existing_values() {
698+
event.put("a", "original");
699+
final String jsonString = "{\"a\": \"updated\", \"b\": \"beta\"}";
700+
Event otherEvent = JacksonEvent.builder().withEventType(EventType.DOCUMENT.toString()).withData(jsonString).build();
701+
event.merge(otherEvent, List.of("a"));
702+
703+
assertThat(event.get("a", Object.class), equalTo("updated"));
704+
assertThat(event.containsKey("b"), equalTo(false));
705+
}
706+
707+
@Test
708+
void merge_with_keys_skips_missing_keys_in_other() {
709+
event.put("existing", "value");
710+
final String jsonString = "{\"a\": \"alpha\"}";
711+
Event otherEvent = JacksonEvent.builder().withEventType(EventType.DOCUMENT.toString()).withData(jsonString).build();
712+
event.merge(otherEvent, List.of("a", "nonexistent"));
713+
714+
assertThat(event.get("a", Object.class), equalTo("alpha"));
715+
assertThat(event.get("existing", Object.class), equalTo("value"));
716+
assertThat(event.containsKey("nonexistent"), equalTo(false));
717+
}
718+
719+
@Test
720+
void merge_with_keys_throws_when_keys_list_is_null() {
721+
Event otherEvent = JacksonEvent.builder().withEventType(EventType.DOCUMENT.toString()).withData("{}").build();
722+
assertThrows(IllegalArgumentException.class, () -> event.merge(otherEvent, null));
723+
}
724+
725+
@Test
726+
void merge_with_keys_throws_when_keys_list_is_empty() {
727+
Event otherEvent = JacksonEvent.builder().withEventType(EventType.DOCUMENT.toString()).withData("{}").build();
728+
assertThrows(IllegalArgumentException.class, () -> event.merge(otherEvent, List.of()));
729+
}
730+
731+
@Test
732+
void merge_with_keys_throws_when_other_is_not_JacksonEvent() {
733+
final Event otherEvent = mock(Event.class);
734+
assertThrows(IllegalArgumentException.class, () -> event.merge(otherEvent, List.of("a")));
735+
}
736+
737+
@Test
738+
void merge_with_keys_throws_when_current_event_has_array_data() {
739+
final JacksonEvent arrayEvent = JacksonEvent.builder()
740+
.withEventType(EventType.DOCUMENT.toString())
741+
.withData("[1, 2, 3]")
742+
.build();
743+
final Event otherEvent = JacksonEvent.builder()
744+
.withEventType(EventType.DOCUMENT.toString())
745+
.withData("{\"a\": \"alpha\"}")
746+
.build();
747+
assertThrows(UnsupportedOperationException.class, () -> arrayEvent.merge(otherEvent, List.of("a")));
748+
}
749+
685750
@ParameterizedTest
686751
@ValueSource(strings = {"/", "foo", "/foo", "/foo/bar", "foo/bar", "foo/bar/", "/foo/bar/leaf/key"})
687752
public void testDelete_withNonexistentKey(final String key) {

data-prepper-core/src/test/java/org/opensearch/dataprepper/core/peerforwarder/discovery/AwsCloudMapPeerListProviderTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,9 @@ void listener_gets_list_after_several_failed_attempts() {
347347

348348
waitUntilPeerListPopulated(objectUnderTest);
349349

350-
assertThat(listenerEndpoints.size(), equalTo(knownIpPeers.size()));
350+
await().atMost(5, TimeUnit.SECONDS)
351+
.pollDelay(100, TimeUnit.MILLISECONDS)
352+
.untilAsserted(() -> assertThat(listenerEndpoints.size(), equalTo(knownIpPeers.size())));
351353

352354
final Set<String> observedIps = listenerEndpoints.stream()
353355
.map(Endpoint::ipAddr)

data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/NdjsonInputConfig.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,18 @@ public class NdjsonInputConfig {
1818
@JsonProperty("include_empty_objects")
1919
private boolean includeEmptyObjects = false;
2020

21+
/**
22+
* Optional file extension used to identify enrichment source files.
23+
* Defaults to "jsonl".
24+
*/
25+
@JsonProperty("extension")
26+
private String extension = "jsonl";
27+
2128
public boolean isIncludeEmptyObjects() {
2229
return includeEmptyObjects;
2330
}
31+
32+
public String getExtension() {
33+
return extension;
34+
}
2435
}

data-prepper-plugins/s3-enrich-processor/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ This plugin enables you to merge data from a S3 file with source data from your
88
ml_merge-pipeline:
99
...
1010
processor:
11-
- s3_enricher:
11+
- s3_enrich:
1212
# =============================================================================
1313
# S3 SOURCE BUCKET CONFIGURATION
1414
# Defines where to fetch the original/source data for enrichment
@@ -45,7 +45,7 @@ ml_merge-pipeline:
4545
# =============================================================================
4646
# Maximum size (in MB) of S3 source files to process
4747
# Files exceeding this limit will be skipped
48-
s3_object_size_limit_mb: 100
48+
s3_object_size_limit: 100mb
4949
5050
# JSON path in the incoming pipeline event that contains the S3 object key
5151
# Used to determine which source file to fetch for enrichment

data-prepper-plugins/s3-enrich-processor/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ dependencies {
1919
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
2020
implementation 'org.projectlombok:lombok:1.18.22'
2121
implementation libs.parquet.common
22-
implementation 'dev.failsafe:failsafe:3.3.2'
2322
implementation 'org.apache.httpcomponents:httpcore:4.4.16'
2423
implementation libs.caffeine
24+
implementation 'org.hibernate.validator:hibernate-validator:8.0.1.Final'
2525
annotationProcessor 'org.projectlombok:lombok:1.18.20'
2626
implementation 'software.amazon.awssdk:s3'
2727
testImplementation project(':data-prepper-test:test-event')

data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/S3EnrichProcessor.java

Lines changed: 4 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import java.util.ArrayList;
4444
import java.util.Collection;
4545
import java.util.List;
46-
import java.util.regex.Pattern;
4746
import java.util.stream.Collectors;
4847

4948
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;
@@ -61,7 +60,6 @@ public class S3EnrichProcessor extends AbstractProcessor<Record<Event>, Record<E
6160
private final AwsCredentialsSupplier awsCredentialsSupplier;
6261
private final PluginSetting codecPluginSettings;
6362
private final PluginFactory pluginFactory;
64-
private final Pattern baseNamePattern;
6563
private final InputCodec codec;
6664
private final S3ObjectWorker s3ObjectWorker;
6765
protected final List<String> tagsOnFailure;
@@ -91,7 +89,6 @@ public S3EnrichProcessor(final S3EnrichProcessorConfig s3EnrichProcessorConfig,
9189
this.pluginFactory = pluginFactory;
9290
CacheFactory factory = new CacheFactory(s3EnrichProcessorConfig);
9391
this.cacheService = new S3EnricherCacheService(factory);
94-
this.baseNamePattern = Pattern.compile(s3EnrichProcessorConfig.getEnricherNamePattern());
9592
this.tagsOnFailure = s3EnrichProcessorConfig.getTagsOnFailure();
9693

9794
final AwsAuthenticationAdapter awsAuthenticationAdapter = new AwsAuthenticationAdapter(awsCredentialsSupplier, s3EnrichProcessorConfig);
@@ -224,52 +221,11 @@ private void mergeData(Event targetEvent, Event sourceEvent) {
224221
throw new IllegalArgumentException("No merge keys configured");
225222
}
226223

227-
List<String> failedKeys = new ArrayList<>();
228-
// Merge only specified keys
229-
for (EventKey eventKey : mergeKeys) {
230-
try {
231-
mergeKey(targetEvent, sourceEvent, eventKey);
232-
} catch (Exception e) {
233-
LOG.error("Failed to merge key '{}': {}", eventKey, e.getMessage(), e);
234-
failedKeys.add(eventKey.getKey());
235-
}
236-
}
237-
// Handle failures based on configuration or policy
238-
if (!failedKeys.isEmpty()) {
239-
if (failedKeys.size() == mergeKeys.size()) {
240-
// All failed - always throw
241-
throw new RuntimeException("All merge keys failed: " + failedKeys);
242-
} else {
243-
// Partial failure - log warning
244-
LOG.warn("Failed to merge {}/{} keys: {}",
245-
failedKeys.size(), mergeKeys.size(), failedKeys);
246-
}
247-
}
248-
}
249-
250-
/**
251-
* Merges a single key from source to target event.
252-
*/
253-
private void mergeKey(Event targetEvent, Event sourceEvent, EventKey eventKey) {
254-
String keyPath = eventKey.getKey();
255-
256-
// Check if source has this key
257-
if (!sourceEvent.containsKey(keyPath)) {
258-
LOG.debug("Source event does not contain key: {}", keyPath);
259-
throw new IllegalArgumentException("Source event does not contain key: " + keyPath);
260-
}
261-
262-
// Get value from source
263-
Object sourceValue = sourceEvent.get(keyPath, Object.class);
264-
265-
if (sourceValue == null) {
266-
LOG.debug("Source value is null for key: {}", keyPath);
267-
throw new IllegalArgumentException("Source value is null for key: " + keyPath);
268-
}
224+
final List<String> keyPaths = mergeKeys.stream()
225+
.map(EventKey::getKey)
226+
.collect(Collectors.toList());
269227

270-
// Put into target (will overwrite if exists)
271-
targetEvent.put(eventKey, sourceValue);
272-
LOG.trace("Merged key '{}' with value: {}", keyPath, sourceValue);
228+
targetEvent.merge(sourceEvent, keyPaths);
273229
}
274230

275231
/**

data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/S3EnrichProcessorConfig.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import jakarta.validation.constraints.NotNull;
2121
import jakarta.validation.constraints.Size;
2222
import lombok.Getter;
23+
import org.hibernate.validator.constraints.time.DurationMax;
24+
import org.hibernate.validator.constraints.time.DurationMin;
2325
import org.opensearch.dataprepper.aws.validator.AwsAccountId;
2426
import org.opensearch.dataprepper.model.annotations.ExampleValues;
2527
import org.opensearch.dataprepper.model.configuration.PluginModel;
@@ -77,12 +79,14 @@ public class S3EnrichProcessorConfig {
7779
@ByteCountMax("300mb")
7880
private ByteCount enricherSizeLimit = ByteCount.parse(DEFAULT_ENRICHER_SIZE_LIMIT);
7981

80-
@JsonProperty(value = "cache_max_size", defaultValue="100000")
82+
@JsonProperty(value = "cache_max_count", defaultValue="200000")
8183
@Min(0)
82-
@Max(300000)
83-
private int cacheSizeLimit = DEFAULT_CACHE_SIZE_LIMIT;
84+
@Max(1000000)
85+
private int cacheCountLimit = DEFAULT_CACHE_SIZE_LIMIT;
8486

8587
@JsonProperty(value = "cache_ttl", defaultValue = "PT10M")
88+
@DurationMin(minutes = 1)
89+
@DurationMax(minutes = 120)
8690
@JsonPropertyDescription("The TTL for cache entries. Accepts ISO-8601 duration format (e.g., PT10M for 10 minutes, PT1H for 1 hour).")
8791
private Duration cacheTtl = DEFAULT_CACHE_TTL;
8892

@@ -118,6 +122,18 @@ public class S3EnrichProcessorConfig {
118122
"other parts of the configuration.")
119123
private List<String> tagsOnFailure = Collections.emptyList();
120124

125+
/**
126+
* Returns the file extension configured on the codec (e.g. "jsonl").
127+
* Reads the {@code extension} key from the codec plugin settings, falling back to {@code "jsonl"}.
128+
*/
129+
public String getCodecExtension() {
130+
if (codec == null || codec.getPluginSettings() == null) {
131+
return "jsonl";
132+
}
133+
final Object ext = codec.getPluginSettings().get("extension");
134+
return (ext instanceof String && !((String) ext).isBlank()) ? (String) ext : "jsonl";
135+
}
136+
121137
/**
122138
* Safely retrieves the S3 scan include prefix from the configuration chain.
123139
*

0 commit comments

Comments
 (0)