Skip to content

Commit 79455e6

Browse files
Add wrap_entries processor to mutate-event-processors (#6665)
* Add map_entries processor to mutate-event-processors Adds a new map_entries processor that wraps each element of a primitive array into an object using a configured key name. This enables downstream processors like add_entries and delete_entries with iterate_on, which require List of Map and cannot operate on primitive arrays. Example: ["alice", "bob"] -> [{"name": "alice"}, {"name": "bob"}] Configuration options: - source (required): key of the primitive array to transform - target (optional): key to write result to (defaults to source) - key (required): key name in each resulting object - exclude_null_empty_values: filter out null/empty elements - append_if_target_exists: append to existing target array - map_entries_when: conditional expression - tags_on_failure: tags on processing failure Includes 21 unit tests and 3 config tests. Signed-off-by: Nishant Kadivar <nimahesx@amazon.com> * fix(map_entries): Improve edge case handling for missing keys and empty filtered results - Change missing source key from warn+tags_on_failure to silent debug log, consistent with other mutate-event processors - Remove early return when all elements are filtered out by exclude_null_empty_values, ensuring consistent List<Map> output type - Update test and README to reflect new behavior Signed-off-by: Nishant Kadivar <nimahesx@amazon.com> * Refactor map_entries validation into config and add integration test Signed-off-by: Nishant Kadivar <nimahesx@amazon.com> * Add chained wrap_entries processor test for nested listsgit Signed-off-by: Nishant Kadivar <nimahesx@amazon.com> --------- Signed-off-by: Nishant Kadivar <nimahesx@amazon.com>
1 parent 2ef318d commit 79455e6

7 files changed

Lines changed: 865 additions & 0 deletions

File tree

data-prepper-plugins/mutate-event-processors/README.md

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -647,6 +647,116 @@ will end up with this after processing:
647647
* `tags_on_failure` - (optional): a list of tags to add to event metadata when the event fails to process
648648

649649

650+
## WrapEntriesProcessor
651+
A processor that wraps each element of a primitive array into an object using a configured key name. This enables downstream processors like `add_entries` and `delete_entries` with `iterate_on`, which require `List<Map<String, Object>>` and cannot operate on primitive arrays.
652+
653+
### Basic Usage
654+
To get started, create the following `pipeline.yaml`.
655+
```yaml
656+
pipeline:
657+
source:
658+
file:
659+
path: "/full/path/to/logs_json.log"
660+
record_type: "event"
661+
format: "json"
662+
processor:
663+
- wrap_entries:
664+
source: "/names"
665+
key: "name"
666+
sink:
667+
- stdout:
668+
```
669+
670+
Create the following file named `logs_json.log` and replace the `path` in the file source of your `pipeline.yaml` with the path of this file.
671+
```json
672+
{"names": ["alice", "bob", "charlie"]}
673+
```
674+
675+
When run, the processor will parse the message into the following output:
676+
677+
```json
678+
{"names": [{"name": "alice"}, {"name": "bob"}, {"name": "charlie"}]}
679+
```
680+
681+
### Writing to a Separate Target
682+
If you want to keep the original array and write the wrapped objects to a different key, use the `target` option:
683+
```yaml
684+
processor:
685+
- wrap_entries:
686+
source: "/items"
687+
target: "/inventory_items"
688+
key: "product"
689+
```
690+
691+
Input:
692+
```json
693+
{"items": ["laptop", "monitor", "keyboard"]}
694+
```
695+
696+
Output:
697+
```json
698+
{"items": ["laptop", "monitor", "keyboard"], "inventory_items": [{"product": "laptop"}, {"product": "monitor"}, {"product": "keyboard"}]}
699+
```
700+
701+
### Conditional Processing
702+
Use `wrap_entries_when` to only process events matching a condition:
703+
```yaml
704+
processor:
705+
- wrap_entries:
706+
source: "/tags"
707+
key: "value"
708+
wrap_entries_when: '/type == "tagged"'
709+
```
710+
711+
Only events where `type` equals `"tagged"` will be processed.
712+
713+
### Chaining with add_entries
714+
A common use case is wrapping a primitive array so that `add_entries` with `iterate_on` can operate on it:
715+
```yaml
716+
processor:
717+
- wrap_entries:
718+
source: "/names"
719+
key: "name"
720+
- add_entries:
721+
iterate_on: "/names"
722+
entries:
723+
- key: "greeting"
724+
format: "Hello, ${name}"
725+
```
726+
727+
Input:
728+
```json
729+
{"names": ["alice", "bob"]}
730+
```
731+
732+
After `wrap_entries`:
733+
```json
734+
{"names": [{"name": "alice"}, {"name": "bob"}]}
735+
```
736+
737+
After `add_entries`:
738+
```json
739+
{"names": [{"name": "alice", "greeting": "Hello, alice"}, {"name": "bob", "greeting": "Hello, bob"}]}
740+
```
741+
742+
### Configuration
743+
* `source` - (required) - The key of the primitive array to transform (JSON Pointer)
744+
* `target` - (optional) - The key to write the resulting object array to. Defaults to `source` (in-place). Must not be empty when specified.
745+
* `key` - (required) - The key name to use in each resulting object
746+
* `exclude_null_empty_values` - (optional) - When set to `true`, null and empty string elements are filtered out before wrapping. Default is `false`
747+
* `append_if_target_exists` - (optional) - When set to `true`, appends results to the existing target array instead of overwriting. Default is `false`
748+
* `wrap_entries_when` - (optional) - A [conditional expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/) that determines whether the processor runs on the event. Evaluated at the root event level.
749+
* `tags_on_failure` - (optional) - A list of tags to add to the event metadata when the event fails to process
750+
751+
### Edge Case Behavior
752+
* If the `source` key does not exist in the event, the processor silently skips the event (no-op). A debug-level log is emitted.
753+
* If the `source` value is not a list (e.g., a string or number), the processor skips the event (no-op) and adds `tags_on_failure` if configured.
754+
* If the `source` list is empty, the processor does nothing — the empty list remains as-is.
755+
* If no elements remain after filtering (when `exclude_null_empty_values` is `true` and all elements are null or empty), the target is written as an empty list `[]`.
756+
* Null elements within the list are wrapped like any other value by default: `[null]` becomes `[{"key": null}]`. Use `exclude_null_empty_values: true` to filter them out.
757+
758+
___
759+
650760
## Developer Guide
651761
This plugin is compatible with Java 11 and 17. Refer to the following developer guides for plugin development:
652762
- [Developer Guide](https://github.com/opensearch-project/data-prepper/blob/main/docs/developer_guide.md)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
package org.opensearch.dataprepper.plugins.processor.mutateevent;
11+
12+
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
13+
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;
14+
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;
15+
import org.opensearch.dataprepper.metrics.PluginMetrics;
16+
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
17+
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
18+
import org.opensearch.dataprepper.model.event.Event;
19+
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
20+
import org.opensearch.dataprepper.model.processor.Processor;
21+
import org.opensearch.dataprepper.model.record.Record;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
25+
import java.util.ArrayList;
26+
import java.util.Collection;
27+
import java.util.HashMap;
28+
import java.util.List;
29+
import java.util.Map;
30+
31+
@DataPrepperPlugin(name = "wrap_entries", pluginType = Processor.class, pluginConfigurationType = WrapEntriesProcessorConfig.class)
32+
public class WrapEntriesProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
33+
34+
private static final Logger LOG = LoggerFactory.getLogger(WrapEntriesProcessor.class);
35+
36+
private final WrapEntriesProcessorConfig config;
37+
private final ExpressionEvaluator expressionEvaluator;
38+
39+
@DataPrepperPluginConstructor
40+
public WrapEntriesProcessor(final PluginMetrics pluginMetrics,
41+
final WrapEntriesProcessorConfig config,
42+
final ExpressionEvaluator expressionEvaluator) {
43+
super(pluginMetrics);
44+
this.config = config;
45+
this.expressionEvaluator = expressionEvaluator;
46+
config.validateExpressions(expressionEvaluator);
47+
}
48+
49+
@Override
50+
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) {
51+
for (final Record<Event> record : records) {
52+
final Event event = record.getData();
53+
54+
try {
55+
if (config.getWrapEntriesWhen() != null
56+
&& !expressionEvaluator.evaluateConditional(config.getWrapEntriesWhen(), event)) {
57+
continue;
58+
}
59+
60+
processEvent(event);
61+
} catch (final Exception e) {
62+
LOG.atError()
63+
.addMarker(EVENT)
64+
.addMarker(NOISY)
65+
.setMessage("Error processing event [{}]")
66+
.addArgument(event)
67+
.setCause(e)
68+
.log();
69+
addTagsOnFailure(event);
70+
}
71+
}
72+
return records;
73+
}
74+
75+
private void processEvent(final Event event) {
76+
final String source = config.getSource();
77+
78+
if (!event.containsKey(source)) {
79+
LOG.debug(EVENT, "Source key [{}] does not exist in event [{}], skipping.", source, event);
80+
return;
81+
}
82+
83+
final Object sourceValue = event.get(source, Object.class);
84+
85+
if (!(sourceValue instanceof List)) {
86+
LOG.warn(EVENT, "Source key [{}] is not a list in event [{}], skipping.", source, event);
87+
addTagsOnFailure(event);
88+
return;
89+
}
90+
91+
final List<?> sourceList = (List<?>) sourceValue;
92+
93+
if (sourceList.isEmpty()) {
94+
return;
95+
}
96+
97+
final List<Map<String, Object>> result = new ArrayList<>(sourceList.size());
98+
final String key = config.getKey();
99+
100+
for (final Object element : sourceList) {
101+
if (config.getExcludeNullEmptyValues()) {
102+
if (element == null) {
103+
continue;
104+
}
105+
if (element instanceof String && ((String) element).isEmpty()) {
106+
continue;
107+
}
108+
}
109+
final Map<String, Object> entry = new HashMap<>();
110+
entry.put(key, element);
111+
result.add(entry);
112+
}
113+
114+
115+
final String effectiveTarget = config.getEffectiveTarget();
116+
117+
if (config.getAppendIfTargetExists() && event.containsKey(effectiveTarget)) {
118+
final Object existingValue = event.get(effectiveTarget, Object.class);
119+
if (!(existingValue instanceof List)) {
120+
LOG.warn(EVENT, "Target key [{}] exists but is not a list in event [{}], skipping.",
121+
effectiveTarget, event);
122+
addTagsOnFailure(event);
123+
return;
124+
}
125+
final List<Object> existingList = new ArrayList<>((List<?>) existingValue);
126+
existingList.addAll(result);
127+
event.put(effectiveTarget, existingList);
128+
} else {
129+
event.put(effectiveTarget, result);
130+
}
131+
}
132+
133+
private void addTagsOnFailure(final Event event) {
134+
final List<String> tags = config.getTagsOnFailure();
135+
if (tags != null && !tags.isEmpty()) {
136+
event.getMetadata().addTags(tags);
137+
}
138+
}
139+
140+
@Override
141+
public void prepareForShutdown() {
142+
}
143+
144+
@Override
145+
public boolean isReadyForShutdown() {
146+
return true;
147+
}
148+
149+
@Override
150+
public void shutdown() {
151+
}
152+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.processor.mutateevent;
12+
13+
import com.fasterxml.jackson.annotation.JsonClassDescription;
14+
import com.fasterxml.jackson.annotation.JsonProperty;
15+
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
16+
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
17+
import jakarta.validation.constraints.NotEmpty;
18+
import jakarta.validation.constraints.NotNull;
19+
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
20+
import org.opensearch.dataprepper.model.annotations.ExampleValues;
21+
import org.opensearch.dataprepper.model.annotations.ExampleValues.Example;
22+
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
23+
24+
import java.util.List;
25+
26+
@JsonPropertyOrder
27+
@JsonClassDescription("The <code>wrap_entries</code> processor wraps each element of a primitive array " +
28+
"into an object using a configured key name.")
29+
public class WrapEntriesProcessorConfig {
30+
31+
@NotNull
32+
@NotEmpty
33+
@JsonProperty("source")
34+
@JsonPropertyDescription("The key of the primitive array to transform.")
35+
@ExampleValues({
36+
@Example(value = "/names", description = "The source array field to wrap into objects.")
37+
})
38+
private String source;
39+
40+
@JsonProperty("target")
41+
@JsonPropertyDescription("The key to write the resulting object array to. Defaults to <code>source</code> (in-place).")
42+
@ExampleValues({
43+
@Example(value = "/agents", description = "Write the resulting object array to a separate field.")
44+
})
45+
private String target;
46+
47+
@NotNull
48+
@NotEmpty
49+
@JsonProperty("key")
50+
@JsonPropertyDescription("The key name to use in each resulting object.")
51+
@ExampleValues({
52+
@Example(value = "name", description = "Each primitive value is wrapped as {\"name\": value}.")
53+
})
54+
private String key;
55+
56+
@JsonProperty("exclude_null_empty_values")
57+
@JsonPropertyDescription("When <code>true</code>, null and empty string elements are filtered out " +
58+
"before wrapping. Default is <code>false</code>.")
59+
private boolean excludeNullEmptyValues = false;
60+
61+
@JsonProperty("append_if_target_exists")
62+
@JsonPropertyDescription("When <code>true</code>, appends results to the existing target array " +
63+
"instead of overwriting. Default is <code>false</code>.")
64+
private boolean appendIfTargetExists = false;
65+
66+
@JsonProperty("wrap_entries_when")
67+
@JsonPropertyDescription("A <a href=\"https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/\">conditional expression</a> " +
68+
"that will be evaluated to determine whether the processor will be run on the event.")
69+
@ExampleValues({
70+
@Example(value = "/type == \"tagged\"", description = "Only process events where type is 'tagged'.")
71+
})
72+
private String wrapEntriesWhen;
73+
74+
@JsonProperty("tags_on_failure")
75+
@JsonPropertyDescription("A list of tags to add to the event metadata when the event fails to process.")
76+
private List<String> tagsOnFailure;
77+
78+
public String getSource() {
79+
return source;
80+
}
81+
82+
public String getTarget() {
83+
return target;
84+
}
85+
86+
public String getKey() {
87+
return key;
88+
}
89+
90+
public boolean getExcludeNullEmptyValues() {
91+
return excludeNullEmptyValues;
92+
}
93+
94+
public boolean getAppendIfTargetExists() {
95+
return appendIfTargetExists;
96+
}
97+
98+
public String getWrapEntriesWhen() {
99+
return wrapEntriesWhen;
100+
}
101+
102+
public List<String> getTagsOnFailure() {
103+
return tagsOnFailure;
104+
}
105+
106+
void validateExpressions(final ExpressionEvaluator expressionEvaluator) {
107+
if (target != null && target.isEmpty()) {
108+
throw new InvalidPluginConfigurationException("target must not be empty when specified.");
109+
}
110+
111+
if (wrapEntriesWhen != null && !expressionEvaluator.isValidExpressionStatement(wrapEntriesWhen)) {
112+
throw new InvalidPluginConfigurationException(
113+
String.format("wrap_entries_when \"%s\" is not a valid expression statement. " +
114+
"See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax",
115+
wrapEntriesWhen));
116+
}
117+
}
118+
119+
/**
120+
* Returns the effective target key. If target is not set, defaults to source.
121+
*/
122+
public String getEffectiveTarget() {
123+
return target != null ? target : source;
124+
}
125+
}

0 commit comments

Comments
 (0)