Skip to content

Commit 6460e31

Browse files
committed
Add filter_list processor to filter list elements
Signed-off-by: Manisha Yadav <yavmanis@amazon.com>
1 parent c11b644 commit 6460e31

5 files changed

Lines changed: 1111 additions & 0 deletions

File tree

data-prepper-plugins/mutate-event-processors/README.md

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -647,6 +647,153 @@ will end up with this after processing:
647647
* `tags_on_failure` - (optional): a list of tags to add to event metadata when the event fails to process
648648

649649

650+
## FilterListProcessor
651+
A processor that filters elements within an array field by evaluating a condition against each element, keeping only those where the condition is true. It supports arrays of objects and arrays of primitives (strings, numbers, booleans).
652+
653+
### Basic Usage
654+
To get started, create the following `pipeline.yaml`.
655+
```yaml
656+
pipeline:
657+
source:
658+
file:
659+
path: "/full/path/to/logs_json.log"
660+
record_type: "event"
661+
format: "json"
662+
processor:
663+
- filter_list:
664+
source: "items"
665+
keep_when: '/status == "active"'
666+
sink:
667+
- stdout:
668+
```
669+
670+
Create the following file named `logs_json.log` and replace the `path` in the file source of your `pipeline.yaml` with the path of this file.
671+
672+
```json
673+
{"items": [{"name": "item1", "status": "active"}, {"name": "item2", "status": "inactive"}, {"name": "item3", "status": "active"}]}
674+
```
675+
676+
When run, the processor will filter the array in-place and produce the following output:
677+
678+
```json
679+
{"items": [{"name": "item1", "status": "active"}, {"name": "item3", "status": "active"}]}
680+
```
681+
682+
### Filtering to a different target
683+
684+
You can write the filtered result to a different key, leaving the original array unchanged:
685+
686+
```yaml
687+
processor:
688+
- filter_list:
689+
source: "items"
690+
target: "active_items"
691+
keep_when: '/status == "active"'
692+
```
693+
694+
With the same input, the output will be:
695+
696+
```json
697+
{
698+
"items": [{"name": "item1", "status": "active"}, {"name": "item2", "status": "inactive"}, {"name": "item3", "status": "active"}],
699+
"active_items": [{"name": "item1", "status": "active"}, {"name": "item3", "status": "active"}]
700+
}
701+
```
702+
703+
### Filtering primitive arrays
704+
705+
For arrays of primitives (strings, numbers, booleans), each element is accessible via the `/value` key in the expression:
706+
707+
```yaml
708+
processor:
709+
- filter_list:
710+
source: "tags"
711+
keep_when: '/value != ""'
712+
```
713+
714+
With the following input:
715+
716+
```json
717+
{"tags": ["important", "", "urgent", ""]}
718+
```
719+
720+
The output will be:
721+
722+
```json
723+
{"tags": ["important", "urgent"]}
724+
```
725+
726+
Another example filtering numbers:
727+
728+
```yaml
729+
processor:
730+
- filter_list:
731+
source: "scores"
732+
keep_when: '/value > 50'
733+
```
734+
735+
With the following input:
736+
737+
```json
738+
{"scores": [90, 30, 75, 10]}
739+
```
740+
741+
The output will be:
742+
743+
```json
744+
{"scores": [90, 75]}
745+
```
746+
747+
### Using both conditions
748+
749+
The `filter_list_when` condition controls whether the processor runs at all (evaluated against the root event), while `keep_when` controls which elements are kept (evaluated per element):
750+
751+
```yaml
752+
processor:
753+
- filter_list:
754+
source: "items"
755+
keep_when: '/status == "active"'
756+
filter_list_when: '/env == "production"'
757+
```
758+
759+
With the following input:
760+
761+
```json
762+
{"env": "production", "items": [{"name": "item1", "status": "active"}, {"name": "item2", "status": "inactive"}]}
763+
```
764+
765+
Since `env` is `"production"`, the processor runs and filters by `status`, producing:
766+
767+
```json
768+
{"env": "production", "items": [{"name": "item1", "status": "active"}]}
769+
```
770+
771+
With a different event where `filter_list_when` evaluates to `false`:
772+
773+
```json
774+
{"env": "staging", "items": [{"name": "item1", "status": "active"}, {"name": "item2", "status": "inactive"}]}
775+
```
776+
777+
The processor is skipped entirely and the event passes through unchanged:
778+
779+
```json
780+
{"env": "staging", "items": [{"name": "item1", "status": "active"}, {"name": "item2", "status": "inactive"}]}
781+
```
782+
783+
### Configuration
784+
* `source` - (required) - The key of the array field to filter. Supports nested paths (e.g. `outer_key/inner_list`).
785+
* `target` - (optional) - The key to write the filtered array to. Defaults to the `source` key (in-place filtering). Supports nested paths.
786+
* `keep_when` - (required) - A [Data Prepper expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/) evaluated per element. Elements where this expression evaluates to `true` are kept. For object elements, the expression is evaluated against the object's fields directly (e.g. `/status == "active"`). For primitive elements, the value is accessible via `/value` (e.g. `/value > 50`). When no elements match, the result is an empty list `[]`.
787+
* `filter_list_when` - (optional) - A [Data Prepper expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/) evaluated against the root event. When provided, the processor only runs if this condition is `true`. By default, all events are processed.
788+
* `tags_on_failure` - (optional) - A list of tags to add to the event metadata when the processor fails to process the event.
789+
790+
**Edge case behavior:**
791+
- If the `source` key does not exist or its value is `null`, the processor is a no-op and the event passes through unchanged.
792+
- If the `source` value is not a list (e.g. a string or number), the processor logs a warning and adds `tags_on_failure` if configured.
793+
- `null` elements within the list are evaluated normally. For example, with `keep_when: '/value != null'`, null elements are filtered out while non-null elements are kept.
794+
795+
___
796+
650797
## Developer Guide
651798
This plugin is compatible with Java 11 and 17. Refer to the following developer guides for plugin development:
652799
- [Developer Guide](https://github.com/opensearch-project/data-prepper/blob/main/docs/developer_guide.md)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.dataprepper.plugins.processor.mutateevent;
11+
12+
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
13+
import org.opensearch.dataprepper.metrics.PluginMetrics;
14+
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
15+
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
16+
import org.opensearch.dataprepper.model.event.Event;
17+
import org.opensearch.dataprepper.model.event.JacksonEvent;
18+
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
19+
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
20+
import org.opensearch.dataprepper.model.processor.Processor;
21+
import org.opensearch.dataprepper.model.record.Record;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
25+
import java.util.ArrayList;
26+
import java.util.Collection;
27+
import java.util.Collections;
28+
import java.util.List;
29+
import java.util.Map;
30+
import java.util.Objects;
31+
32+
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;
33+
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;
34+
35+
@DataPrepperPlugin(name = "filter_list", pluginType = Processor.class, pluginConfigurationType = FilterListProcessorConfig.class)
36+
public class FilterListProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
37+
38+
private static final Logger LOG = LoggerFactory.getLogger(FilterListProcessor.class);
39+
private final FilterListProcessorConfig config;
40+
private final ExpressionEvaluator expressionEvaluator;
41+
private final String target;
42+
43+
@DataPrepperPluginConstructor
44+
public FilterListProcessor(final PluginMetrics pluginMetrics, final FilterListProcessorConfig config, final ExpressionEvaluator expressionEvaluator) {
45+
super(pluginMetrics);
46+
this.config = config;
47+
this.expressionEvaluator = expressionEvaluator;
48+
this.target = config.getTarget() != null ? config.getTarget() : config.getSource();
49+
50+
if (config.getFilterListWhen() != null
51+
&& !expressionEvaluator.isValidExpressionStatement(config.getFilterListWhen())) {
52+
throw new InvalidPluginConfigurationException(
53+
String.format("filter_list_when %s is not a valid expression statement. " +
54+
"See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax",
55+
config.getFilterListWhen()));
56+
}
57+
58+
if (!expressionEvaluator.isValidExpressionStatement(config.getKeepWhen())) {
59+
throw new InvalidPluginConfigurationException(
60+
String.format("keep_when %s is not a valid expression statement. " +
61+
"See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax",
62+
config.getKeepWhen()));
63+
}
64+
}
65+
66+
@Override
67+
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) {
68+
for (final Record<Event> record : records) {
69+
final Event recordEvent = record.getData();
70+
71+
try {
72+
if (Objects.nonNull(config.getFilterListWhen()) && !expressionEvaluator.evaluateConditional(config.getFilterListWhen(), recordEvent)) {
73+
continue;
74+
}
75+
76+
final List<Object> sourceList;
77+
try {
78+
sourceList = recordEvent.get(config.getSource(), List.class);
79+
} catch (final Exception e) {
80+
LOG.warn(EVENT, "Given source path [{}] is not valid on record [{}]",
81+
config.getSource(), recordEvent, e);
82+
addTagsOnFailure(recordEvent);
83+
continue;
84+
}
85+
86+
if (sourceList == null) {
87+
LOG.debug("Source list at path [{}] is null, skipping event", config.getSource());
88+
continue;
89+
}
90+
91+
final List<Object> filteredList = new ArrayList<>();
92+
final JacksonEvent.Builder contextBuilder = JacksonEvent.builder()
93+
.withEventType("event");
94+
95+
for (final Object element : sourceList) {
96+
@SuppressWarnings("unchecked")
97+
final Map<String, Object> contextMap = element instanceof Map
98+
? (Map<String, Object>) element
99+
: Collections.singletonMap("value", element);
100+
101+
try {
102+
final Event elementEvent = contextBuilder
103+
.withData(contextMap)
104+
.build();
105+
106+
if (expressionEvaluator.evaluateConditional(config.getKeepWhen(), elementEvent)) {
107+
filteredList.add(element);
108+
}
109+
} catch (final Exception e) {
110+
LOG.warn(EVENT, "Error evaluating keep_when expression [{}] for element in source list at path [{}]",
111+
config.getKeepWhen(), config.getSource(), e);
112+
}
113+
}
114+
115+
recordEvent.put(target, filteredList);
116+
117+
} catch (final Exception e) {
118+
LOG.atError()
119+
.addMarker(EVENT)
120+
.addMarker(NOISY)
121+
.setMessage("There was an exception while processing Event [{}]")
122+
.addArgument(recordEvent)
123+
.setCause(e)
124+
.log();
125+
addTagsOnFailure(recordEvent);
126+
}
127+
}
128+
return records;
129+
}
130+
131+
private void addTagsOnFailure(final Event event) {
132+
if (config.getTagsOnFailure() != null) {
133+
event.getMetadata().addTags(config.getTagsOnFailure());
134+
}
135+
}
136+
137+
@Override
138+
public void prepareForShutdown() {
139+
}
140+
141+
@Override
142+
public boolean isReadyForShutdown() {
143+
return true;
144+
}
145+
146+
@Override
147+
public void shutdown() {
148+
}
149+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.dataprepper.plugins.processor.mutateevent;
11+
12+
import com.fasterxml.jackson.annotation.JsonClassDescription;
13+
import com.fasterxml.jackson.annotation.JsonProperty;
14+
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
15+
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
16+
import jakarta.validation.constraints.NotEmpty;
17+
import jakarta.validation.constraints.NotNull;
18+
import org.opensearch.dataprepper.model.annotations.ExampleValues;
19+
import org.opensearch.dataprepper.model.annotations.ExampleValues.Example;
20+
21+
import java.util.List;
22+
23+
@JsonPropertyOrder
24+
@JsonClassDescription("The <code>filter_list</code> processor evaluates a condition against each element of an array " +
25+
"and keeps only those elements where the condition is true.")
26+
public class FilterListProcessorConfig {
27+
28+
@NotNull
29+
@NotEmpty
30+
@JsonProperty("source")
31+
@JsonPropertyDescription("The key of the array field to filter. Supports nested paths.")
32+
@ExampleValues({
33+
@Example(value = "my-list", description = "Filters the 'my-list' array at the root of the event."),
34+
@Example(value = "outer-key/my-list", description = "Filters the 'my-list' array nested under 'outer-key'.")
35+
})
36+
private String source;
37+
38+
@JsonProperty("target")
39+
@JsonPropertyDescription("The key to write the filtered array to. Defaults to the source key (in-place). " +
40+
"Supports nested paths — intermediate objects are created automatically if they do not exist.")
41+
@ExampleValues({
42+
@Example(value = "filtered-list", description = "Writes the filtered array to 'filtered-list'.")
43+
})
44+
private String target;
45+
46+
@NotNull
47+
@NotEmpty
48+
@JsonProperty("keep_when")
49+
@JsonPropertyDescription("An expression evaluated per element. Elements where this expression evaluates to true are kept. " +
50+
"The expression is evaluated against each element of the array as if it were a standalone event.")
51+
@ExampleValues({
52+
@Example(value = "/status == \"active\"", description = "Keeps only elements where 'status' equals 'active'."),
53+
@Example(value = "/score > 50", description = "Keeps only elements where 'score' is greater than 50.")
54+
})
55+
private String keepWhen;
56+
57+
@JsonProperty("filter_list_when")
58+
@JsonPropertyDescription("A <a href=\"https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/\">conditional expression</a>, " +
59+
"such as <code>/some-key == \"test\"</code>, that will be evaluated against the root event to determine whether " +
60+
"the processor will be run on the event. By default, all events will be processed unless otherwise stated.")
61+
@ExampleValues({
62+
@Example(value = "/some-key == \"test\"", description = "The processor only runs when the value of 'some-key' is 'test'.")
63+
})
64+
private String filterListWhen;
65+
66+
@JsonProperty("tags_on_failure")
67+
@JsonPropertyDescription("A list of tags to add to the event metadata when the event fails to process.")
68+
private List<String> tagsOnFailure;
69+
70+
public String getSource() {
71+
return source;
72+
}
73+
74+
public String getTarget() {
75+
return target;
76+
}
77+
78+
public String getKeepWhen() {
79+
return keepWhen;
80+
}
81+
82+
public String getFilterListWhen() {
83+
return filterListWhen;
84+
}
85+
86+
public List<String> getTagsOnFailure() {
87+
return tagsOnFailure;
88+
}
89+
}

0 commit comments

Comments
 (0)