Skip to content

Commit 8c5bba7

Browse files
Add map_entries processor to mutate-event-processors
Adds a new map_entries processor that wraps each element of a primitive array into an object using a configured key name. This enables downstream processors like add_entries and delete_entries with iterate_on, which require List of Map and cannot operate on primitive arrays. Example: ["alice", "bob"] -> [{"name": "alice"}, {"name": "bob"}] Configuration options: - source (required): key of the primitive array to transform - target (optional): key to write result to (defaults to source) - key (required): key name in each resulting object - exclude_null_empty_values: filter out null/empty elements - append_if_target_exists: append to existing target array - map_entries_when: conditional expression - tags_on_failure: tags on processing failure Includes 21 unit tests and 3 config tests. Signed-off-by: Nishant Kadivar <nimahesx@amazon.com>
1 parent d4b8363 commit 8c5bba7

5 files changed

Lines changed: 783 additions & 0 deletions

File tree

data-prepper-plugins/mutate-event-processors/README.md

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -647,6 +647,116 @@ will end up with this after processing:
647647
* `tags_on_failure` - (optional): a list of tags to add to event metadata when the event fails to process
648648

649649

650+
## MapEntriesProcessor
651+
A processor that wraps each element of a primitive array into an object using a configured key name. This enables downstream processors like `add_entries` and `delete_entries` with `iterate_on`, which require `List<Map<String, Object>>` and cannot operate on primitive arrays.
652+
653+
### Basic Usage
654+
To get started, create the following `pipeline.yaml`.
655+
```yaml
656+
pipeline:
657+
source:
658+
file:
659+
path: "/full/path/to/logs_json.log"
660+
record_type: "event"
661+
format: "json"
662+
processor:
663+
- map_entries:
664+
source: "/names"
665+
key: "name"
666+
sink:
667+
- stdout:
668+
```
669+
670+
Create the following file named `logs_json.log` and replace the `path` in the file source of your `pipeline.yaml` with the path of this file.
671+
```json
672+
{"names": ["alice", "bob", "charlie"]}
673+
```
674+
675+
When run, the processor will parse the message into the following output:
676+
677+
```json
678+
{"names": [{"name": "alice"}, {"name": "bob"}, {"name": "charlie"}]}
679+
```
680+
681+
### Writing to a Separate Target
682+
If you want to keep the original array and write the wrapped objects to a different key, use the `target` option:
683+
```yaml
684+
processor:
685+
- map_entries:
686+
source: "/items"
687+
target: "/inventory_items"
688+
key: "product"
689+
```
690+
691+
Input:
692+
```json
693+
{"items": ["laptop", "monitor", "keyboard"]}
694+
```
695+
696+
Output:
697+
```json
698+
{"items": ["laptop", "monitor", "keyboard"], "inventory_items": [{"product": "laptop"}, {"product": "monitor"}, {"product": "keyboard"}]}
699+
```
700+
701+
### Conditional Processing
702+
Use `map_entries_when` to only process events matching a condition:
703+
```yaml
704+
processor:
705+
- map_entries:
706+
source: "/tags"
707+
key: "value"
708+
map_entries_when: '/type == "tagged"'
709+
```
710+
711+
Only events where `type` equals `"tagged"` will be processed.
712+
713+
### Chaining with add_entries
714+
A common use case is wrapping a primitive array so that `add_entries` with `iterate_on` can operate on it:
715+
```yaml
716+
processor:
717+
- map_entries:
718+
source: "/names"
719+
key: "name"
720+
- add_entries:
721+
iterate_on: "/names"
722+
entries:
723+
- key: "greeting"
724+
format: "Hello, ${name}"
725+
```
726+
727+
Input:
728+
```json
729+
{"names": ["alice", "bob"]}
730+
```
731+
732+
After `map_entries`:
733+
```json
734+
{"names": [{"name": "alice"}, {"name": "bob"}]}
735+
```
736+
737+
After `add_entries`:
738+
```json
739+
{"names": [{"name": "alice", "greeting": "Hello, alice"}, {"name": "bob", "greeting": "Hello, bob"}]}
740+
```
741+
742+
### Configuration
743+
* `source` - (required) - The key of the primitive array to transform (JSON Pointer)
744+
* `target` - (optional) - The key to write the resulting object array to. Defaults to `source` (in-place). Must not be empty when specified.
745+
* `key` - (required) - The key name to use in each resulting object
746+
* `exclude_null_empty_values` - (optional) - When set to `true`, null and empty string elements are filtered out before wrapping. Default is `false`
747+
* `append_if_target_exists` - (optional) - When set to `true`, appends results to the existing target array instead of overwriting. Default is `false`
748+
* `map_entries_when` - (optional) - A [conditional expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/) that determines whether the processor runs on the event. Evaluated at the root event level.
749+
* `tags_on_failure` - (optional) - A list of tags to add to the event metadata when the event fails to process
750+
751+
### Edge Case Behavior
752+
* If the `source` key does not exist in the event, the processor skips the event (no-op) and adds `tags_on_failure` if configured.
753+
* If the `source` value is not a list (e.g., a string or number), the processor skips the event (no-op) and adds `tags_on_failure` if configured.
754+
* If the `source` list is empty, the processor does nothing — the empty list remains as-is.
755+
* If no elements remain after filtering (when `exclude_null_empty_values` is `true` and all elements are null or empty), the original list is left unchanged.
756+
* Null elements within the list are wrapped like any other value by default: `[null]` becomes `[{"key": null}]`. Use `exclude_null_empty_values: true` to filter them out.
757+
758+
___
759+
650760
## Developer Guide
651761
This plugin is compatible with Java 11 and 17. Refer to the following developer guides for plugin development:
652762
- [Developer Guide](https://github.com/opensearch-project/data-prepper/blob/main/docs/developer_guide.md)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.plugins.processor.mutateevent;
7+
8+
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
9+
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;
10+
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;
11+
import org.opensearch.dataprepper.metrics.PluginMetrics;
12+
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
13+
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
14+
import org.opensearch.dataprepper.model.event.Event;
15+
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
16+
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
17+
import org.opensearch.dataprepper.model.processor.Processor;
18+
import org.opensearch.dataprepper.model.record.Record;
19+
import org.slf4j.Logger;
20+
import org.slf4j.LoggerFactory;
21+
22+
import java.util.ArrayList;
23+
import java.util.Collection;
24+
import java.util.HashMap;
25+
import java.util.List;
26+
import java.util.Map;
27+
28+
@DataPrepperPlugin(name = "map_entries", pluginType = Processor.class, pluginConfigurationType = MapEntriesProcessorConfig.class)
29+
public class MapEntriesProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
30+
31+
private static final Logger LOG = LoggerFactory.getLogger(MapEntriesProcessor.class);
32+
33+
private final MapEntriesProcessorConfig config;
34+
private final ExpressionEvaluator expressionEvaluator;
35+
36+
@DataPrepperPluginConstructor
37+
public MapEntriesProcessor(final PluginMetrics pluginMetrics,
38+
final MapEntriesProcessorConfig config,
39+
final ExpressionEvaluator expressionEvaluator) {
40+
super(pluginMetrics);
41+
this.config = config;
42+
this.expressionEvaluator = expressionEvaluator;
43+
44+
if (config.getTarget() != null && config.getTarget().isEmpty()) {
45+
throw new InvalidPluginConfigurationException("target must not be empty when specified.");
46+
}
47+
48+
if (config.getMapEntriesWhen() != null
49+
&& !expressionEvaluator.isValidExpressionStatement(config.getMapEntriesWhen())) {
50+
throw new InvalidPluginConfigurationException(
51+
String.format("map_entries_when \"%s\" is not a valid expression statement. " +
52+
"See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ " +
53+
"for valid expression syntax", config.getMapEntriesWhen()));
54+
}
55+
56+
if (config.getTagsOnFailure() != null) {
57+
for (final String tag : config.getTagsOnFailure()) {
58+
if (tag == null || tag.isEmpty()) {
59+
throw new InvalidPluginConfigurationException("tags_on_failure must not contain null or empty strings.");
60+
}
61+
}
62+
}
63+
}
64+
65+
@Override
66+
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) {
67+
for (final Record<Event> record : records) {
68+
final Event event = record.getData();
69+
70+
try {
71+
if (config.getMapEntriesWhen() != null
72+
&& !expressionEvaluator.evaluateConditional(config.getMapEntriesWhen(), event)) {
73+
continue;
74+
}
75+
76+
processEvent(event);
77+
} catch (final Exception e) {
78+
LOG.atError()
79+
.addMarker(EVENT)
80+
.addMarker(NOISY)
81+
.setMessage("Error processing event [{}]")
82+
.addArgument(event)
83+
.setCause(e)
84+
.log();
85+
addTagsOnFailure(event);
86+
}
87+
}
88+
return records;
89+
}
90+
91+
private void processEvent(final Event event) {
92+
final String source = config.getSource();
93+
94+
if (!event.containsKey(source)) {
95+
LOG.warn(EVENT, "Source key [{}] does not exist in event [{}], skipping.", source, event);
96+
addTagsOnFailure(event);
97+
return;
98+
}
99+
100+
final Object sourceValue = event.get(source, Object.class);
101+
102+
if (!(sourceValue instanceof List)) {
103+
LOG.warn(EVENT, "Source key [{}] is not a list in event [{}], skipping.", source, event);
104+
addTagsOnFailure(event);
105+
return;
106+
}
107+
108+
final List<?> sourceList = (List<?>) sourceValue;
109+
110+
if (sourceList.isEmpty()) {
111+
return;
112+
}
113+
114+
final List<Map<String, Object>> result = new ArrayList<>(sourceList.size());
115+
final String key = config.getKey();
116+
117+
for (final Object element : sourceList) {
118+
if (config.getExcludeNullEmptyValues()) {
119+
if (element == null) {
120+
continue;
121+
}
122+
if (element instanceof String && ((String) element).isEmpty()) {
123+
continue;
124+
}
125+
}
126+
final Map<String, Object> wrapped = new HashMap<>(1);
127+
wrapped.put(key, element);
128+
result.add(wrapped);
129+
}
130+
131+
if (result.isEmpty()) {
132+
return;
133+
}
134+
135+
final String effectiveTarget = config.getEffectiveTarget();
136+
137+
if (config.getAppendIfTargetExists() && event.containsKey(effectiveTarget)) {
138+
final Object existingValue = event.get(effectiveTarget, Object.class);
139+
if (!(existingValue instanceof List)) {
140+
LOG.warn(EVENT, "Target key [{}] exists but is not a list in event [{}], skipping.",
141+
effectiveTarget, event);
142+
addTagsOnFailure(event);
143+
return;
144+
}
145+
final List<Object> existingList = new ArrayList<>((List<?>) existingValue);
146+
existingList.addAll(result);
147+
event.put(effectiveTarget, existingList);
148+
} else {
149+
event.put(effectiveTarget, result);
150+
}
151+
}
152+
153+
private void addTagsOnFailure(final Event event) {
154+
final List<String> tags = config.getTagsOnFailure();
155+
if (tags != null && !tags.isEmpty()) {
156+
event.getMetadata().addTags(tags);
157+
}
158+
}
159+
160+
@Override
161+
public void prepareForShutdown() {
162+
}
163+
164+
@Override
165+
public boolean isReadyForShutdown() {
166+
return true;
167+
}
168+
169+
@Override
170+
public void shutdown() {
171+
}
172+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.plugins.processor.mutateevent;
7+
8+
import com.fasterxml.jackson.annotation.JsonClassDescription;
9+
import com.fasterxml.jackson.annotation.JsonProperty;
10+
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
11+
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
12+
import jakarta.validation.constraints.NotEmpty;
13+
import jakarta.validation.constraints.NotNull;
14+
import org.opensearch.dataprepper.model.annotations.ExampleValues;
15+
import org.opensearch.dataprepper.model.annotations.ExampleValues.Example;
16+
17+
import java.util.List;
18+
19+
@JsonPropertyOrder
20+
@JsonClassDescription("The <code>map_entries</code> processor wraps each element of a primitive array " +
21+
"into an object using a configured key name.")
22+
public class MapEntriesProcessorConfig {
23+
24+
@NotNull
25+
@NotEmpty
26+
@JsonProperty("source")
27+
@JsonPropertyDescription("The key of the primitive array to transform.")
28+
@ExampleValues({
29+
@Example(value = "/names", description = "The source array field to wrap into objects.")
30+
})
31+
private String source;
32+
33+
@JsonProperty("target")
34+
@JsonPropertyDescription("The key to write the resulting object array to. Defaults to <code>source</code> (in-place).")
35+
@ExampleValues({
36+
@Example(value = "/agents", description = "Write the resulting object array to a separate field.")
37+
})
38+
private String target;
39+
40+
@NotNull
41+
@NotEmpty
42+
@JsonProperty("key")
43+
@JsonPropertyDescription("The key name to use in each resulting object.")
44+
@ExampleValues({
45+
@Example(value = "name", description = "Each primitive value is wrapped as {\"name\": value}.")
46+
})
47+
private String key;
48+
49+
@JsonProperty("exclude_null_empty_values")
50+
@JsonPropertyDescription("When <code>true</code>, null and empty string elements are filtered out " +
51+
"before wrapping. Default is <code>false</code>.")
52+
private boolean excludeNullEmptyValues = false;
53+
54+
@JsonProperty("append_if_target_exists")
55+
@JsonPropertyDescription("When <code>true</code>, appends results to the existing target array " +
56+
"instead of overwriting. Default is <code>false</code>.")
57+
private boolean appendIfTargetExists = false;
58+
59+
@JsonProperty("map_entries_when")
60+
@JsonPropertyDescription("A <a href=\"https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/\">conditional expression</a> " +
61+
"that will be evaluated to determine whether the processor will be run on the event.")
62+
@ExampleValues({
63+
@Example(value = "/type == \"tagged\"", description = "Only process events where type is 'tagged'.")
64+
})
65+
private String mapEntriesWhen;
66+
67+
@JsonProperty("tags_on_failure")
68+
@JsonPropertyDescription("A list of tags to add to the event metadata when the event fails to process.")
69+
private List<String> tagsOnFailure;
70+
71+
public String getSource() {
72+
return source;
73+
}
74+
75+
public String getTarget() {
76+
return target;
77+
}
78+
79+
public String getKey() {
80+
return key;
81+
}
82+
83+
public boolean getExcludeNullEmptyValues() {
84+
return excludeNullEmptyValues;
85+
}
86+
87+
public boolean getAppendIfTargetExists() {
88+
return appendIfTargetExists;
89+
}
90+
91+
public String getMapEntriesWhen() {
92+
return mapEntriesWhen;
93+
}
94+
95+
public List<String> getTagsOnFailure() {
96+
return tagsOnFailure;
97+
}
98+
99+
/**
100+
* Returns the effective target key. If target is not set, defaults to source.
101+
*/
102+
public String getEffectiveTarget() {
103+
return target != null ? target : source;
104+
}
105+
}

0 commit comments

Comments
 (0)