Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 147 additions & 0 deletions data-prepper-plugins/mutate-event-processors/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,153 @@ After `add_entries`:

___

## FilterListProcessor
A processor that filters elements within an array field by evaluating a condition against each element, keeping only those where the condition is true. It supports arrays of objects and arrays of primitives (strings, numbers, booleans).

### Basic Usage
To get started, create the following `pipeline.yaml`.
```yaml
pipeline:
source:
file:
path: "/full/path/to/logs_json.log"
record_type: "event"
format: "json"
processor:
- filter_list:
iterate_on: "items"
keep_element_when: '/status == "active"'
sink:
- stdout:
```

Create the following file named `logs_json.log` and replace the `path` in the file source of your `pipeline.yaml` with the path of this file.

```json
{"items": [{"name": "item1", "status": "active"}, {"name": "item2", "status": "inactive"}, {"name": "item3", "status": "active"}]}
```

When run, the processor will filter the array in-place and produce the following output:

```json
{"items": [{"name": "item1", "status": "active"}, {"name": "item3", "status": "active"}]}
```

### Filtering to a different target

You can write the filtered result to a different key, leaving the original array unchanged:

```yaml
processor:
- filter_list:
iterate_on: "items"
target: "active_items"
keep_element_when: '/status == "active"'
```

With the same input, the output will be:

```json
{
"items": [{"name": "item1", "status": "active"}, {"name": "item2", "status": "inactive"}, {"name": "item3", "status": "active"}],
"active_items": [{"name": "item1", "status": "active"}, {"name": "item3", "status": "active"}]
}
```

### Filtering primitive arrays

For arrays of primitives (strings, numbers, booleans), each element is accessible via the `/value` key in the expression:

```yaml
processor:
- filter_list:
iterate_on: "tags"
keep_element_when: '/value != ""'
```

With the following input:

```json
{"tags": ["important", "", "urgent", ""]}
```

The output will be:

```json
{"tags": ["important", "urgent"]}
```

Another example filtering numbers:

```yaml
processor:
- filter_list:
iterate_on: "scores"
keep_element_when: '/value > 50'
```

With the following input:

```json
{"scores": [90, 30, 75, 10]}
```

The output will be:

```json
{"scores": [90, 75]}
```

### Using both conditions

The `filter_list_when` condition controls whether the processor runs at all (evaluated against the root event), while `keep_element_when` controls which elements are kept (evaluated per element):

```yaml
processor:
- filter_list:
iterate_on: "items"
keep_element_when: '/status == "active"'
filter_list_when: '/env == "production"'
```

With the following input:

```json
{"env": "production", "items": [{"name": "item1", "status": "active"}, {"name": "item2", "status": "inactive"}]}
```

Since `env` is `"production"`, the processor runs and filters by `status`, producing:

```json
{"env": "production", "items": [{"name": "item1", "status": "active"}]}
```

With a different event where `filter_list_when` evaluates to `false`:

```json
{"env": "staging", "items": [{"name": "item1", "status": "active"}, {"name": "item2", "status": "inactive"}]}
```

The processor is skipped entirely and the event passes through unchanged:

```json
{"env": "staging", "items": [{"name": "item1", "status": "active"}, {"name": "item2", "status": "inactive"}]}
```

### Configuration
* `iterate_on` - (required) - The key of the array field to filter. Supports nested paths (e.g. `outer_key/inner_list`).
* `target` - (optional) - The key to write the filtered array to. Defaults to the `iterate_on` key (in-place filtering). Supports nested paths.
* `keep_element_when` - (required) - A [Data Prepper expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/) evaluated per element. Elements where this expression evaluates to `true` are kept. For object elements, the expression is evaluated against the object's fields directly (e.g. `/status == "active"`). For primitive elements, the value is accessible via `/value` (e.g. `/value > 50`). When no elements match, the result is an empty list `[]`.
* `filter_list_when` - (optional) - A [Data Prepper expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/) evaluated against the root event. When provided, the processor only runs if this condition is `true`. By default, all events are processed.
* `tags_on_failure` - (optional) - A list of tags to add to the event metadata when the processor fails to process the event.

**Edge case behavior:**
- If the `iterate_on` key does not exist or its value is `null`, the processor is a no-op and the event passes through unchanged.
- If the `iterate_on` value is not a list (e.g. a string or number), the processor logs a warning and adds `tags_on_failure` if configured.
- `null` elements within the list are evaluated normally. For example, with `keep_element_when: '/value != null'`, null elements are filtered out while non-null elements are kept.

___

## Developer Guide
This plugin is compatible with Java 11 and 17. Refer to the following developer guides for plugin development:
- [Developer Guide](https://github.com/opensearch-project/data-prepper/blob/main/docs/developer_guide.md)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.dataprepper.plugins.processor.mutateevent;

import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.SENSITIVE;

@DataPrepperPlugin(name = "filter_list", pluginType = Processor.class, pluginConfigurationType = FilterListProcessorConfig.class)
public class FilterListProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {

private static final Logger LOG = LoggerFactory.getLogger(FilterListProcessor.class);
private static final String FAILED_ELEMENTS_METADATA_KEY = "filter_list_processor_failed_elements";
private static final String FAILED_ELEMENTS_COUNT_METADATA_KEY = "filter_list_processor_failed_elements_count";
private final FilterListProcessorConfig config;
private final ExpressionEvaluator expressionEvaluator;

@DataPrepperPluginConstructor
public FilterListProcessor(final PluginMetrics pluginMetrics, final FilterListProcessorConfig config, final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics);
this.config = config;
this.expressionEvaluator = expressionEvaluator;

config.validateExpressions(expressionEvaluator);
}

@Override
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) {
for (final Record<Event> record : records) {
final Event recordEvent = record.getData();

try {
if (Objects.nonNull(config.getFilterListWhen()) && !expressionEvaluator.evaluateConditional(config.getFilterListWhen(), recordEvent)) {
continue;
}

final List<Object> sourceList;
try {
sourceList = recordEvent.get(config.getIterateOn(), List.class);
} catch (final Exception e) {
LOG.atWarn()
.addMarker(EVENT)
.addMarker(SENSITIVE)
.setMessage("Given source path [{}] is not valid on record [{}]")
.addArgument(config.getIterateOn())
.addArgument(recordEvent)
.setCause(e)
.log();
addTagsOnFailure(recordEvent);
continue;
}

if (sourceList == null) {
LOG.debug("Source list at path [{}] is null, skipping event", config.getIterateOn());
continue;
}

final List<Object> filteredList = new ArrayList<>();
final List<Object> failedElements = new ArrayList<>();
int failedElementCount = 0;

for (final Object element : sourceList) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each array element gets wrapped in a full JacksonEvent via builder.withData(contextMap).build(), which internally runs mapper.valueToTree(). For large arrays this creates
significant GC pressure and serialization overhead — the same concern @dlvenable raised in the issue discussion. This matches the pattern used by the translate processor, which has reportedly caused performance issues.

For now this is probably acceptable since the ExpressionEvaluator requires an Event, but worth noting as a known limitation.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, we have acknowledged the limitation and mark it as TODO.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two problems with the translate processor approach. One is the performance. But the other issue is how we handle relative paths. There is a good discussion in #6609 about this same issue.

Basically, we need a way to decide relative versus absolute paths. Some processors are all absolute and some have relative paths. I propose in #6609 that we should use / to state absolute. And lack of that can be relative. Your elementEvent below will be a relative path with an absolute marker. This is a larger problem because it will lock us into the current behavior until a breaking change.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for flagging this. We understand this could conflict with a future convention where / explicitly means absolute (as discussed in #6609). We'll keep this in mind and align with whatever convention is adopted at the framework level. For now, this is the only viable approach given the current ExpressionEvaluator API.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yavmanis , We should avoid pushing this bug further into processors. Using / is clearly indicating a JSON pointer at a root object.

I think these are our options at this point:

  1. Resolve this correctly through JacksonEvent.
  2. Create a throw-away configuration like use_relative_pointer_on_keep_when. The only allowable value would be true and this is not a default value so users must provide it.
- filter_list:
        source: "items"
        keep_when: '/status == "active"'
        filter_list_when: '/env == "production"'
        use_relative_pointer_on_keep_when: true

We would eventually remove it.

This name helps clarify that filter_list_when isn't using the relative pointer.

@yavmanis yavmanis Apr 13, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dlvenable Since we implemented this processor from the beginning, we have carefully handled all the relevant conditions, particularly those related to relative JSON pointers. Specifically, the filter_list_when condition resolves against the root-level fields, while the keep_when condition resolves against the element-level fields for each object in the list.

Based on our testing, both conditions are working as expected within their respective contexts. Therefore, we believe that introducing the flag use_relative_pointer_on_keep_when: true may not be necessary for resolving JSON pointers, as this behavior is already managed within the processor implementation.

Additionally, the behavior of both conditions is consistent with the delete_entries processor.

However, if you have a specific use case in mind that is not adequately handled by the current implementation and would require this flag, please let us know. We would be happy to review and address it.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dlvenable if keep_when syntax stays the same regardless, I'm not sure if adding a flag like this use_relative_pointer_on_keep_when: true is correct yet. Right now, as long as we are consistently saying "keep_when uses local element scope", and "filter_list_when" uses scope from the root of the Event", I think that is pretty clear.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had a discussion last week about renaming the fields to promote consistency. I suggested in another comment that we use iterate_on and keep_element_when to signal that these have similar behavior to the delete entries processor.

The add entries processor is unfortunately not handling this in a great way currently. But this PR would match delete entries with those name changes.

@SuppressWarnings("unchecked")
final Map<String, Object> contextMap = element instanceof Map
? (Map<String, Object>) element
: Collections.singletonMap("value", element);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From this and the examples in the description it looks like we are using "/value" in the keep_when expression as a special name for primitive lists. Are we sure this is the right approach? At first I was worried this would conflict with actual fields named "value", but this checking for Map condition does handle that conflict. We just need to be very clear in the documentation for keep_when that "/value" is a special keyword for primitive lists.


try {
// TODO(#6609): Revisit this per-element Event construction when ExpressionEvaluator/JsonPointer
// internals support a lighter evaluation path that avoids full tree conversion.
final Event elementEvent = JacksonEvent.builder()
.withEventType("event")
.withData(contextMap)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When evaluateConditional throws for a single element, that element is silently dropped from the result. This is a data loss scenario with no signal to the user beyond WARN-level logs

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for calling this out.
To improve visibility for this case, I updated the processor to mark the event as failed (tags_on_failure) whenever evaluateConditional throws for one or more elements, and to attach diagnostic metadata:

  • filter_list_processor_failed_elements_count

  • filter_list_processor_failed_elements

This keeps current filtering behavior consistent with similar processors, while ensuring downstream processors and users can clearly detect and inspect element-level evaluation failures instead of relying only on WARN logs.

.build();

if (expressionEvaluator.evaluateConditional(config.getKeepElementWhen(), elementEvent)) {
filteredList.add(element);
}
} catch (final Exception e) {
failedElementCount++;
failedElements.add(element);
LOG.atWarn()
.addMarker(EVENT)
.addMarker(SENSITIVE)
.setMessage("Error evaluating keep_element_when expression [{}] for element in source list at path [{}]")
.addArgument(config.getKeepElementWhen())
.addArgument(config.getIterateOn())
.setCause(e)
.log();
}
}

if (failedElementCount > 0) {
addTagsOnFailure(recordEvent);
recordEvent.getMetadata().setAttribute(FAILED_ELEMENTS_COUNT_METADATA_KEY, failedElementCount);
recordEvent.getMetadata().setAttribute(FAILED_ELEMENTS_METADATA_KEY, failedElements);
}

recordEvent.put(config.getTarget(), filteredList);

} catch (final Exception e) {
LOG.atError()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here logging the full event requires SENSITIVE marker

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added SENSITIVE marker to warning logs that output full event context:

  • Line 66: LOG.warn(EVENT, SENSITIVE, ...) — when source path is invalid
  • Line 101: LOG.warn(EVENT, SENSITIVE, ...) — when keep_element_when evaluation fails

.addMarker(EVENT)
.addMarker(NOISY)
.setMessage("There was an exception while processing Event [{}]")
.addArgument(recordEvent)
.setCause(e)
.log();
addTagsOnFailure(recordEvent);
}
}
return records;
}

private void addTagsOnFailure(final Event event) {
if (config.getTagsOnFailure() != null) {
event.getMetadata().addTags(config.getTagsOnFailure());
}
}

@Override
public void prepareForShutdown() {
}

@Override
public boolean isReadyForShutdown() {
return true;
}

@Override
public void shutdown() {
}
}
Loading
Loading