Skip to content

Add filter_list processor to filter list elements#6659

Merged
dlvenable merged 7 commits into
opensearch-project:mainfrom
yavmanis:feature/filter-list-processor
Apr 20, 2026
Merged

Add filter_list processor to filter list elements#6659
dlvenable merged 7 commits into
opensearch-project:mainfrom
yavmanis:feature/filter-list-processor

Conversation

@yavmanis

@yavmanis yavmanis commented Mar 20, 2026

Copy link
Copy Markdown
Contributor

filter_list Processor

The filter_list processor filters elements within an array field by evaluating a condition against each element and retaining only those that satisfy the condition.

It supports:

  • Arrays of objects
  • Arrays of primitives (strings, numbers, booleans)

Description

This processor iterates over a specified array field and evaluates an expression for each element.

  • Element-level filtering is controlled by keep_element_when
  • Processor execution can be conditionally controlled using filter_list_when

This behavior is aligned with existing processors like delete_entries for consistency.


Configuration Updates

To promote consistency with other processors that iterate over elements:

  • sourceiterate_on
    Better reflects that the processor iterates over a collection

  • keep_whenkeep_element_when
    Clearly indicates evaluation happens at the element level

Other configuration options remain unchanged.


Configuration Options

Option Required Type Default Description
iterate_on Yes String Key of the array field to iterate and filter (supports nested paths like outer/inner)
target No String iterate_on Key to write filtered output (defaults to in-place filtering)
keep_element_when Yes String Expression evaluated per element; elements where true are retained
filter_list_when No String null Expression evaluated at root event level; processor runs only if true
tags_on_failure No List null Tags added to event metadata on failure

Execution Semantics

  • keep_element_when → evaluated in element scope
  • filter_list_when → evaluated in root event scope
  • If filter_list_when is false → processor is skipped
  • If target is not provided → filtering is in-place
  • If target is provided → filtered result is written to target field

Failure Handling

If evaluation fails for one or more elements:

  • Event is marked using tags_on_failure

  • Metadata fields are added:

    • filter_list_processor_failed_elements_count
    • filter_list_processor_failed_elements

This avoids silent data loss and improves observability.


Example Pipeline Configuration

filter-list-pipeline:
  source:
    file:
      path: "pipelines/input.json"
      format: "json"
      record_type: "event"

  processor:
    - filter_list:
        iterate_on: "items"
        keep_element_when: '/status == "active"'
        filter_list_when: '/env == "production"'

    - filter_list:
        iterate_on: "tags"
        keep_element_when: '/value != ""'
        filter_list_when: '/type == "tagged"'

    - filter_list:
        iterate_on: "scores"
        target: "passing_scores"
        keep_element_when: '/value > 50'
        filter_list_when: '/type == "grades"'

    - filter_list:
        iterate_on: "items"
        target: "critical_items"
        keep_element_when: '/priority == "critical"'
        filter_list_when: '/type == "tickets"'

  sink:
    - stdout:

Input

{"items": [{"name": "item1", "status": "active"}, {"name": "item2", "status": "inactive"}, {"name": "item3", "status": "active"}], "env": "production", "type": "inventory"}
{"tags": ["important", "", "urgent", ""], "type": "tagged"}
{"scores": [95, 30, 75, 10, 88], "subject": "math", "type": "grades"}
{"items": [{"name": "bug1", "priority": "critical"}, {"name": "bug2", "priority": "low"}, {"name": "bug3", "priority": "critical"}], "type": "tickets"}
{"items": [{"name": "item1", "status": "active"}], "env": "staging", "type": "inventory"}

Output

{"items":[{"name":"item1","status":"active"},{"name":"item3","status":"active"}],"env":"production","type":"inventory"}
{"tags":["important","urgent"],"type":"tagged"}
{"scores":[95,30,75,10,88],"subject":"math","type":"grades","passing_scores":[95,75,88]}
{"items":[{"name":"bug1","priority":"critical"},{"name":"bug2","priority":"low"},{"name":"bug3","priority":"critical"}],"type":"tickets","critical_items":[{"name":"bug1","priority":"critical"},{"name":"bug3","priority":"critical"}]}
{"items":[{"name":"item1","status":"active"}],"env":"staging","type":"inventory"}

End-to-End Scenarios

# Scenario Result
1 Object array filtered in-place (status == active, env=production) PASS
2 Primitive array removes empty values PASS
3 Number array filtered into new target (> 50) PASS
4 Object array filtered into new target (priority == critical) PASS
5 Condition not met → no mutation PASS

Summary

  • Introduces filter_list processor for array element filtering

  • Aligns naming with existing processors:

    • iterate_on
    • keep_element_when
  • Supports:

    • Object and primitive arrays
    • In-place and target-based filtering
  • Adds strong failure visibility with metadata

@yavmanis yavmanis marked this pull request as draft March 20, 2026 09:45
Signed-off-by: Manisha Yadav <yavmanis@amazon.com>
@yavmanis yavmanis force-pushed the feature/filter-list-processor branch from d9bee48 to 6460e31 Compare March 23, 2026 15:23
@yavmanis yavmanis marked this pull request as ready for review March 25, 2026 19:26
final JacksonEvent.Builder contextBuilder = JacksonEvent.builder()
.withEventType("event");

for (final Object element : sourceList) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each array element gets wrapped in a full JacksonEvent via builder.withData(contextMap).build(), which internally runs mapper.valueToTree(). For large arrays this creates
significant GC pressure and serialization overhead — the same concern @dlvenable raised in the issue discussion. This matches the pattern used by the translate processor, which has reportedly caused performance issues.

For now this is probably acceptable since the ExpressionEvaluator requires an Event, but worth noting as a known limitation.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, we have acknowledged the limitation and mark it as TODO.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two problems with the translate processor approach. One is the performance. But the other issue is how we handle relative paths. There is a good discussion in #6609 about this same issue.

Basically, we need a way to decide relative versus absolute paths. Some processors are all absolute and some have relative paths. I propose in #6609 that we should use / to state absolute. And lack of that can be relative. Your elementEvent below will be a relative path with an absolute marker. This is a larger problem because it will lock us into the current behavior until a breaking change.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for flagging this. We understand this could conflict with a future convention where / explicitly means absolute (as discussed in #6609). We'll keep this in mind and align with whatever convention is adopted at the framework level. For now, this is the only viable approach given the current ExpressionEvaluator API.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yavmanis , We should avoid pushing this bug further into processors. Using / is clearly indicating a JSON pointer at a root object.

I think these are our options at this point:

  1. Resolve this correctly through JacksonEvent.
  2. Create a throw-away configuration like use_relative_pointer_on_keep_when. The only allowable value would be true and this is not a default value so users must provide it.
- filter_list:
        source: "items"
        keep_when: '/status == "active"'
        filter_list_when: '/env == "production"'
        use_relative_pointer_on_keep_when: true

We would eventually remove it.

This name helps clarify that filter_list_when isn't using the relative pointer.

@yavmanis yavmanis Apr 13, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dlvenable Since we implemented this processor from the beginning, we have carefully handled all the relevant conditions, particularly those related to relative JSON pointers. Specifically, the filter_list_when condition resolves against the root-level fields, while the keep_when condition resolves against the element-level fields for each object in the list.

Based on our testing, both conditions are working as expected within their respective contexts. Therefore, we believe that introducing the flag use_relative_pointer_on_keep_when: true may not be necessary for resolving JSON pointers, as this behavior is already managed within the processor implementation.

Additionally, the behavior of both conditions is consistent with the delete_entries processor.

However, if you have a specific use case in mind that is not adequately handled by the current implementation and would require this flag, please let us know. We would be happy to review and address it.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dlvenable if keep_when syntax stays the same regardless, I'm not sure if adding a flag like this use_relative_pointer_on_keep_when: true is correct yet. Right now, as long as we are consistently saying "keep_when uses local element scope", and "filter_list_when" uses scope from the root of the Event", I think that is pretty clear.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had a discussion last week about renaming the fields to promote consistency. I suggested in another comment that we use iterate_on and keep_element_when to signal that these have similar behavior to the delete entries processor.

The add entries processor is unfortunately not handling this in a great way currently. But this PR would match delete entries with those name changes.


try {
final Event elementEvent = contextBuilder
.withData(contextMap)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When evaluateConditional throws for a single element, that element is silently dropped from the result. This is a data loss scenario with no signal to the user beyond WARN-level logs

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for calling this out.
To improve visibility for this case, I updated the processor to mark the event as failed (tags_on_failure) whenever evaluateConditional throws for one or more elements, and to attach diagnostic metadata:

  • filter_list_processor_failed_elements_count

  • filter_list_processor_failed_elements

This keeps current filtering behavior consistent with similar processors, while ensuring downstream processors and users can clearly detect and inspect element-level evaluation failures instead of relying only on WARN logs.

}

final List<Object> filteredList = new ArrayList<>();
final JacksonEvent.Builder contextBuilder = JacksonEvent.builder()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The JacksonEvent.Builder is created once outside the loop and reused via .withData() on each iteration. This works today because build() consumes the current data reference, but it's fragile — if the builder ever accumulates state across calls, this breaks silently. Safer to create the builder inside the loop

@bagmarnikhil bagmarnikhil left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work — clean implementation with thorough test coverage. Two items to address:

  • when evaluateConditional throws for a single element in the loop (FilterListProcessor.java:103-110), that element is silently dropped from the output — this is a data loss risk, consider failing open or tagging the event;

  • JacksonEvent.Builder at line 93 is reused across loop iterations which is fragile — safer to create it inside the loop. Rest looks good.

@kkondaka

Copy link
Copy Markdown
Collaborator

Looks good too. But it looks like we are creating new processors each of the "list" operations. Maybe we should have a list processor that can filter and that manipulate list items and one of the manipulations could be converting simple list items to map. That way the other PR (#6665) can be functionally merged into one processor that operates on lists?

…behaviour and addressed comments

- Add processor-scoped metadata:
  - filter_list_processor_failed_elements_count
  - filter_list_processor_failed_elements
- Keep per-element filtering behavior unchanged
- Add TODO for future optimization of evaluation path overhead

Signed-off-by: nishantKadivar <nimahesx@amazon.com>

@dlvenable dlvenable left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @yavmanis for this contribution!

this.expressionEvaluator = expressionEvaluator;
this.target = config.getTarget() != null ? config.getTarget() : config.getSource();

if (config.getFilterListWhen() != null

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put validations in the Config classes themselves.

You can use an @AssertTrue from the Jakarta package. Other configs follow this pattern.

@yavmanis yavmanis Mar 31, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Target defaulting has been moved into FilterListProcessorConfig.getTarget(), which now returns source when target is not set.
Regarding @AssertTrue — the existing usages in this module are all for pure cross-field validations that don't require external dependencies. Our getTarget() never rejects anything — it always succeeds. It's a getter with a fallback default, not a validation. So @AssertTrue doesn't apply here.
Expression validation requires ExpressionEvaluator, which is a runtime dependency not available to config POJOs during Jakarta validation. No config class in the project uses @AssertTrue for expression validation.

final JacksonEvent.Builder contextBuilder = JacksonEvent.builder()
.withEventType("event");

for (final Object element : sourceList) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two problems with the translate processor approach. One is the performance. But the other issue is how we handle relative paths. There is a good discussion in #6609 about this same issue.

Basically, we need a way to decide relative versus absolute paths. Some processors are all absolute and some have relative paths. I propose in #6609 that we should use / to state absolute. And lack of that can be relative. Your elementEvent below will be a relative path with an absolute marker. This is a larger problem because it will lock us into the current behavior until a breaking change.

…ests

Signed-off-by: Manisha Yadav <yavmanis@amazon.com>
@yavmanis yavmanis requested a review from dlvenable April 7, 2026 09:11
@github-actions

github-actions Bot commented Apr 8, 2026

Copy link
Copy Markdown

✅ License Header Check Passed

All newly added files have proper license headers. Great work! 🎉

@yavmanis

yavmanis commented Apr 9, 2026

Copy link
Copy Markdown
Contributor Author

⚠️ License Header Violations Found

The following newly added files are missing required license headers:

  • data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_objects_by_status.yaml
  • data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_objects_to_target.yaml
  • data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_primitives_with_condition.yaml

Please add the appropriate license header to each file and push your changes.

See the license header requirements: https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md#license-headers

We do not have headers specifically defined in these YAML files, as we observed that they were not included in any of the existing processor files. This was done to maintain consistency across processors.

Please let us know if it is mandatory to add headers in these files as well.

@dlvenable

Copy link
Copy Markdown
Member

@yavmanis , Be sure to also correct the headers.

The last thing I want to do before merging this is to resolve the issue with the relative pointers.

Signed-off-by: Manisha Yadav <yavmanis@amazon.com>
@yavmanis

Copy link
Copy Markdown
Contributor Author

@yavmanis , Be sure to also correct the headers.

The last thing I want to do before merging this is to resolve the issue with the relative pointers.

I have added the license headers in the yaml files

@yavmanis

Copy link
Copy Markdown
Contributor Author

As discussed on the call, David will revisit the filter_list processor requirements and the current implementation, and will advise on how he would like to handle the processor’s behavior for condition evaluation at the root versus element level, as well as how this processor should align with the other processors.

We will make the necessary changes in the PR once we receive David’s final guidance on this.

cc: @nishantKadivar @bagmarnikhil

@SuppressWarnings("unchecked")
final Map<String, Object> contextMap = element instanceof Map
? (Map<String, Object>) element
: Collections.singletonMap("value", element);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From this and the examples in the description it looks like we are using "/value" in the keep_when expression as a special name for primitive lists. Are we sure this is the right approach? At first I was worried this would conflict with actual fields named "value", but this checking for Map condition does handle that conflict. We just need to be very clear in the documentation for keep_when that "/value" is a special keyword for primitive lists.

try {
sourceList = recordEvent.get(config.getSource(), List.class);
} catch (final Exception e) {
LOG.warn(EVENT, "Given source path [{}] is not valid on record [{}]",

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you SENSITIVE marker to this log? This is needed if we are logging the entire recordEvent. Also, may be useful having a metric on this error case as well, but not required.

recordEvent.put(config.getTarget(), filteredList);

} catch (final Exception e) {
LOG.atError()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here logging the full event requires SENSITIVE marker

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added SENSITIVE marker to warning logs that output full event context:

  • Line 66: LOG.warn(EVENT, SENSITIVE, ...) — when source path is invalid
  • Line 101: LOG.warn(EVENT, SENSITIVE, ...) — when keep_element_when evaluation fails

Comment on lines +52 to +57
@JsonPropertyDescription("An expression evaluated per element. Elements where this expression evaluates to true are kept. " +
"The expression is evaluated against each element of the array as if it were a standalone event.")
@ExampleValues({
@Example(value = "/status == \"active\"", description = "Keeps only elements where 'status' equals 'active'."),
@Example(value = "/score > 50", description = "Keeps only elements where 'score' is greater than 50.")
})

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add examples for primitive lists where "/value" is used as keyword, and explain that in the description.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated FilterListProcessorConfig @JsonPropertyDescription and @ExampleValues for keep_element_when:

  • Added explanation: "For primitive elements (strings, numbers, booleans), the element value is accessible via the key /value"
  • Added examples:
    • /value > 50 (numeric primitives)
    • /value != "" (string primitives)

@dlvenable

Copy link
Copy Markdown
Member

To promote consistency with other processors that iterate over elements, we can rename source to iterate_on. Data Prepper already has some precedent for using element_when to evaluate against the element and not the whole event. So we could also rename keep_when to keep_element_when. The other configuration options can stay the same.

yavmanis and others added 3 commits April 16, 2026 10:27
…-processor

# Conflicts:
#	data-prepper-plugins/mutate-event-processors/README.md
- Rename source → iterate_on, keep_when → keep_element_when
- Update FilterListProcessorConfig fields/getters/validation
- Update FilterListProcessor to use new getters
- Add SENSITIVE marker to warning logs
- Add TODO(opensearch-project#6609) reference in processor
- Document /value for primitive elements in keep_element_when
- Update README examples and configuration docs
- Update all tests and YAML fixtures to new key names
- Verified: :data-prepper-plugins:mutate-event-processors:test (PASS)

Signed-off-by: nishantKadivar <nimahesx@amazon.com>
@nishantKadivar

Copy link
Copy Markdown
Contributor

To promote consistency with other processors that iterate over elements, we can rename source to iterate_on. Data Prepper already has some precedent for using element_when to evaluate against the element and not the whole event. So we could also rename keep_when to keep_element_when. The other configuration options can stay the same.

Updated as suggested — renamed source to iterate_on and keep_when to keep_element_when to align with existing Data Prepper conventions. Other configuration options remain unchanged.

@dlvenable dlvenable left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @yavmanis and @nishantKadivar !

@bagmarnikhil bagmarnikhil left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the change!

@dlvenable dlvenable merged commit 74aa3cf into opensearch-project:main Apr 20, 2026
71 of 72 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants