Skip to content

Commit 74aa3cf

Browse files
Add filter_list processor to filter list elements (opensearch-project#6659)
Add filter_list processor to filter list elements - Add processor-scoped metadata: - filter_list_processor_failed_elements_count - filter_list_processor_failed_elements - Keep per-element filtering behavior unchanged - Add TODO for future optimization of evaluation path overhead - Rename source → iterate_on, keep_when → keep_element_when - Update FilterListProcessorConfig fields/getters/validation - Update FilterListProcessor to use new getters - Add SENSITIVE marker to warning logs - Add TODO(opensearch-project#6609) reference in processor - Document /value for primitive elements in keep_element_when - Update README examples and configuration docs - Update all tests and YAML fixtures to new key names - Verified: :data-prepper-plugins:mutate-event-processors:test (PASS) Signed-off-by: Manisha Yadav <yavmanis@amazon.com> Signed-off-by: nishantKadivar <nimahesx@amazon.com> Co-authored-by: nishantKadivar <nimahesx@amazon.com>
1 parent 4865dce commit 74aa3cf

9 files changed

Lines changed: 1383 additions & 0 deletions

File tree

data-prepper-plugins/mutate-event-processors/README.md

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -808,6 +808,153 @@ After `add_entries`:
808808

809809
___
810810

811+
## FilterListProcessor
812+
A processor that filters elements within an array field by evaluating a condition against each element, keeping only those where the condition is true. It supports arrays of objects and arrays of primitives (strings, numbers, booleans).
813+
814+
### Basic Usage
815+
To get started, create the following `pipeline.yaml`.
816+
```yaml
817+
pipeline:
818+
source:
819+
file:
820+
path: "/full/path/to/logs_json.log"
821+
record_type: "event"
822+
format: "json"
823+
processor:
824+
- filter_list:
825+
iterate_on: "items"
826+
keep_element_when: '/status == "active"'
827+
sink:
828+
- stdout:
829+
```
830+
831+
Create the following file named `logs_json.log` and replace the `path` in the file source of your `pipeline.yaml` with the path of this file.
832+
833+
```json
834+
{"items": [{"name": "item1", "status": "active"}, {"name": "item2", "status": "inactive"}, {"name": "item3", "status": "active"}]}
835+
```
836+
837+
When run, the processor will filter the array in-place and produce the following output:
838+
839+
```json
840+
{"items": [{"name": "item1", "status": "active"}, {"name": "item3", "status": "active"}]}
841+
```
842+
843+
### Filtering to a different target
844+
845+
You can write the filtered result to a different key, leaving the original array unchanged:
846+
847+
```yaml
848+
processor:
849+
- filter_list:
850+
iterate_on: "items"
851+
target: "active_items"
852+
keep_element_when: '/status == "active"'
853+
```
854+
855+
With the same input, the output will be:
856+
857+
```json
858+
{
859+
"items": [{"name": "item1", "status": "active"}, {"name": "item2", "status": "inactive"}, {"name": "item3", "status": "active"}],
860+
"active_items": [{"name": "item1", "status": "active"}, {"name": "item3", "status": "active"}]
861+
}
862+
```
863+
864+
### Filtering primitive arrays
865+
866+
For arrays of primitives (strings, numbers, booleans), each element is accessible via the `/value` key in the expression:
867+
868+
```yaml
869+
processor:
870+
- filter_list:
871+
iterate_on: "tags"
872+
keep_element_when: '/value != ""'
873+
```
874+
875+
With the following input:
876+
877+
```json
878+
{"tags": ["important", "", "urgent", ""]}
879+
```
880+
881+
The output will be:
882+
883+
```json
884+
{"tags": ["important", "urgent"]}
885+
```
886+
887+
Another example filtering numbers:
888+
889+
```yaml
890+
processor:
891+
- filter_list:
892+
iterate_on: "scores"
893+
keep_element_when: '/value > 50'
894+
```
895+
896+
With the following input:
897+
898+
```json
899+
{"scores": [90, 30, 75, 10]}
900+
```
901+
902+
The output will be:
903+
904+
```json
905+
{"scores": [90, 75]}
906+
```
907+
908+
### Using both conditions
909+
910+
The `filter_list_when` condition controls whether the processor runs at all (evaluated against the root event), while `keep_element_when` controls which elements are kept (evaluated per element):
911+
912+
```yaml
913+
processor:
914+
- filter_list:
915+
iterate_on: "items"
916+
keep_element_when: '/status == "active"'
917+
filter_list_when: '/env == "production"'
918+
```
919+
920+
With the following input:
921+
922+
```json
923+
{"env": "production", "items": [{"name": "item1", "status": "active"}, {"name": "item2", "status": "inactive"}]}
924+
```
925+
926+
Since `env` is `"production"`, the processor runs and filters by `status`, producing:
927+
928+
```json
929+
{"env": "production", "items": [{"name": "item1", "status": "active"}]}
930+
```
931+
932+
With a different event where `filter_list_when` evaluates to `false`:
933+
934+
```json
935+
{"env": "staging", "items": [{"name": "item1", "status": "active"}, {"name": "item2", "status": "inactive"}]}
936+
```
937+
938+
The processor is skipped entirely and the event passes through unchanged:
939+
940+
```json
941+
{"env": "staging", "items": [{"name": "item1", "status": "active"}, {"name": "item2", "status": "inactive"}]}
942+
```
943+
944+
### Configuration
945+
* `iterate_on` - (required) - The key of the array field to filter. Supports nested paths (e.g. `outer_key/inner_list`).
946+
* `target` - (optional) - The key to write the filtered array to. Defaults to the `iterate_on` key (in-place filtering). Supports nested paths.
947+
* `keep_element_when` - (required) - A [Data Prepper expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/) evaluated per element. Elements where this expression evaluates to `true` are kept. For object elements, the expression is evaluated against the object's fields directly (e.g. `/status == "active"`). For primitive elements, the value is accessible via `/value` (e.g. `/value > 50`). When no elements match, the result is an empty list `[]`.
948+
* `filter_list_when` - (optional) - A [Data Prepper expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/) evaluated against the root event. When provided, the processor only runs if this condition is `true`. By default, all events are processed.
949+
* `tags_on_failure` - (optional) - A list of tags to add to the event metadata when the processor fails to process the event.
950+
951+
**Edge case behavior:**
952+
- If the `iterate_on` key does not exist or its value is `null`, the processor is a no-op and the event passes through unchanged.
953+
- If the `iterate_on` value is not a list (e.g. a string or number), the processor logs a warning and adds `tags_on_failure` if configured.
954+
- `null` elements within the list are evaluated normally. For example, with `keep_element_when: '/value != null'`, null elements are filtered out while non-null elements are kept.
955+
956+
___
957+
811958
## Developer Guide
812959
This plugin is compatible with Java 11 and 17. Refer to the following developer guides for plugin development:
813960
- [Developer Guide](https://github.com/opensearch-project/data-prepper/blob/main/docs/developer_guide.md)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.dataprepper.plugins.processor.mutateevent;
11+
12+
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
13+
import org.opensearch.dataprepper.metrics.PluginMetrics;
14+
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
15+
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
16+
import org.opensearch.dataprepper.model.event.Event;
17+
import org.opensearch.dataprepper.model.event.JacksonEvent;
18+
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
19+
import org.opensearch.dataprepper.model.processor.Processor;
20+
import org.opensearch.dataprepper.model.record.Record;
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
23+
24+
import java.util.ArrayList;
25+
import java.util.Collection;
26+
import java.util.Collections;
27+
import java.util.List;
28+
import java.util.Map;
29+
import java.util.Objects;
30+
31+
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;
32+
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;
33+
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.SENSITIVE;
34+
35+
@DataPrepperPlugin(name = "filter_list", pluginType = Processor.class, pluginConfigurationType = FilterListProcessorConfig.class)
36+
public class FilterListProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
37+
38+
private static final Logger LOG = LoggerFactory.getLogger(FilterListProcessor.class);
39+
private static final String FAILED_ELEMENTS_METADATA_KEY = "filter_list_processor_failed_elements";
40+
private static final String FAILED_ELEMENTS_COUNT_METADATA_KEY = "filter_list_processor_failed_elements_count";
41+
private final FilterListProcessorConfig config;
42+
private final ExpressionEvaluator expressionEvaluator;
43+
44+
@DataPrepperPluginConstructor
45+
public FilterListProcessor(final PluginMetrics pluginMetrics, final FilterListProcessorConfig config, final ExpressionEvaluator expressionEvaluator) {
46+
super(pluginMetrics);
47+
this.config = config;
48+
this.expressionEvaluator = expressionEvaluator;
49+
50+
config.validateExpressions(expressionEvaluator);
51+
}
52+
53+
@Override
54+
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) {
55+
for (final Record<Event> record : records) {
56+
final Event recordEvent = record.getData();
57+
58+
try {
59+
if (Objects.nonNull(config.getFilterListWhen()) && !expressionEvaluator.evaluateConditional(config.getFilterListWhen(), recordEvent)) {
60+
continue;
61+
}
62+
63+
final List<Object> sourceList;
64+
try {
65+
sourceList = recordEvent.get(config.getIterateOn(), List.class);
66+
} catch (final Exception e) {
67+
LOG.atWarn()
68+
.addMarker(EVENT)
69+
.addMarker(SENSITIVE)
70+
.setMessage("Given source path [{}] is not valid on record [{}]")
71+
.addArgument(config.getIterateOn())
72+
.addArgument(recordEvent)
73+
.setCause(e)
74+
.log();
75+
addTagsOnFailure(recordEvent);
76+
continue;
77+
}
78+
79+
if (sourceList == null) {
80+
LOG.debug("Source list at path [{}] is null, skipping event", config.getIterateOn());
81+
continue;
82+
}
83+
84+
final List<Object> filteredList = new ArrayList<>();
85+
final List<Object> failedElements = new ArrayList<>();
86+
int failedElementCount = 0;
87+
88+
for (final Object element : sourceList) {
89+
@SuppressWarnings("unchecked")
90+
final Map<String, Object> contextMap = element instanceof Map
91+
? (Map<String, Object>) element
92+
: Collections.singletonMap("value", element);
93+
94+
try {
95+
// TODO(#6609): Revisit this per-element Event construction when ExpressionEvaluator/JsonPointer
96+
// internals support a lighter evaluation path that avoids full tree conversion.
97+
final Event elementEvent = JacksonEvent.builder()
98+
.withEventType("event")
99+
.withData(contextMap)
100+
.build();
101+
102+
if (expressionEvaluator.evaluateConditional(config.getKeepElementWhen(), elementEvent)) {
103+
filteredList.add(element);
104+
}
105+
} catch (final Exception e) {
106+
failedElementCount++;
107+
failedElements.add(element);
108+
LOG.atWarn()
109+
.addMarker(EVENT)
110+
.addMarker(SENSITIVE)
111+
.setMessage("Error evaluating keep_element_when expression [{}] for element in source list at path [{}]")
112+
.addArgument(config.getKeepElementWhen())
113+
.addArgument(config.getIterateOn())
114+
.setCause(e)
115+
.log();
116+
}
117+
}
118+
119+
if (failedElementCount > 0) {
120+
addTagsOnFailure(recordEvent);
121+
recordEvent.getMetadata().setAttribute(FAILED_ELEMENTS_COUNT_METADATA_KEY, failedElementCount);
122+
recordEvent.getMetadata().setAttribute(FAILED_ELEMENTS_METADATA_KEY, failedElements);
123+
}
124+
125+
recordEvent.put(config.getTarget(), filteredList);
126+
127+
} catch (final Exception e) {
128+
LOG.atError()
129+
.addMarker(EVENT)
130+
.addMarker(NOISY)
131+
.setMessage("There was an exception while processing Event [{}]")
132+
.addArgument(recordEvent)
133+
.setCause(e)
134+
.log();
135+
addTagsOnFailure(recordEvent);
136+
}
137+
}
138+
return records;
139+
}
140+
141+
private void addTagsOnFailure(final Event event) {
142+
if (config.getTagsOnFailure() != null) {
143+
event.getMetadata().addTags(config.getTagsOnFailure());
144+
}
145+
}
146+
147+
@Override
148+
public void prepareForShutdown() {
149+
}
150+
151+
@Override
152+
public boolean isReadyForShutdown() {
153+
return true;
154+
}
155+
156+
@Override
157+
public void shutdown() {
158+
}
159+
}

0 commit comments

Comments
 (0)