Add filter_list processor to filter list elements#6659
Conversation
Signed-off-by: Manisha Yadav <yavmanis@amazon.com>
d9bee48 to
6460e31
Compare
| final JacksonEvent.Builder contextBuilder = JacksonEvent.builder() | ||
| .withEventType("event"); | ||
|
|
||
| for (final Object element : sourceList) { |
There was a problem hiding this comment.
Each array element gets wrapped in a full JacksonEvent via builder.withData(contextMap).build(), which internally runs mapper.valueToTree(). For large arrays this creates
significant GC pressure and serialization overhead — the same concern @dlvenable raised in the issue discussion. This matches the pattern used by the translate processor, which has reportedly caused performance issues.
For now this is probably acceptable since the ExpressionEvaluator requires an Event, but worth noting as a known limitation.
There was a problem hiding this comment.
For now, we have acknowledged the limitation and mark it as TODO.
There was a problem hiding this comment.
There are two problems with the translate processor approach. One is the performance. But the other issue is how we handle relative paths. There is a good discussion in #6609 about this same issue.
Basically, we need a way to decide relative versus absolute paths. Some processors are all absolute and some have relative paths. I propose in #6609 that we should use / to state absolute. And lack of that can be relative. Your elementEvent below will be a relative path with an absolute marker. This is a larger problem because it will lock us into the current behavior until a breaking change.
There was a problem hiding this comment.
Thanks for flagging this. We understand this could conflict with a future convention where / explicitly means absolute (as discussed in #6609). We'll keep this in mind and align with whatever convention is adopted at the framework level. For now, this is the only viable approach given the current ExpressionEvaluator API.
There was a problem hiding this comment.
@yavmanis , We should avoid pushing this bug further into processors. Using / is clearly indicating a JSON pointer at a root object.
I think these are our options at this point:
- Resolve this correctly through JacksonEvent.
- Create a throw-away configuration like
use_relative_pointer_on_keep_when. The only allowable value would betrueand this is not a default value so users must provide it.
- filter_list:
source: "items"
keep_when: '/status == "active"'
filter_list_when: '/env == "production"'
use_relative_pointer_on_keep_when: true
We would eventually remove it.
This name helps clarify that filter_list_when isn't using the relative pointer.
There was a problem hiding this comment.
@dlvenable Since we implemented this processor from the beginning, we have carefully handled all the relevant conditions, particularly those related to relative JSON pointers. Specifically, the filter_list_when condition resolves against the root-level fields, while the keep_when condition resolves against the element-level fields for each object in the list.
Based on our testing, both conditions are working as expected within their respective contexts. Therefore, we believe that introducing the flag use_relative_pointer_on_keep_when: true may not be necessary for resolving JSON pointers, as this behavior is already managed within the processor implementation.
Additionally, the behavior of both conditions is consistent with the delete_entries processor.
However, if you have a specific use case in mind that is not adequately handled by the current implementation and would require this flag, please let us know. We would be happy to review and address it.
There was a problem hiding this comment.
@dlvenable if keep_when syntax stays the same regardless, I'm not sure if adding a flag like this use_relative_pointer_on_keep_when: true is correct yet. Right now, as long as we are consistently saying "keep_when uses local element scope", and "filter_list_when" uses scope from the root of the Event", I think that is pretty clear.
There was a problem hiding this comment.
We had a discussion last week about renaming the fields to promote consistency. I suggested in another comment that we use iterate_on and keep_element_when to signal that these have similar behavior to the delete entries processor.
The add entries processor is unfortunately not handling this in a great way currently. But this PR would match delete entries with those name changes.
|
|
||
| try { | ||
| final Event elementEvent = contextBuilder | ||
| .withData(contextMap) |
There was a problem hiding this comment.
When evaluateConditional throws for a single element, that element is silently dropped from the result. This is a data loss scenario with no signal to the user beyond WARN-level logs
There was a problem hiding this comment.
Thanks for calling this out.
To improve visibility for this case, I updated the processor to mark the event as failed (tags_on_failure) whenever evaluateConditional throws for one or more elements, and to attach diagnostic metadata:
-
filter_list_processor_failed_elements_count
-
filter_list_processor_failed_elements
This keeps current filtering behavior consistent with similar processors, while ensuring downstream processors and users can clearly detect and inspect element-level evaluation failures instead of relying only on WARN logs.
| } | ||
|
|
||
| final List<Object> filteredList = new ArrayList<>(); | ||
| final JacksonEvent.Builder contextBuilder = JacksonEvent.builder() |
There was a problem hiding this comment.
The JacksonEvent.Builder is created once outside the loop and reused via .withData() on each iteration. This works today because build() consumes the current data reference, but it's fragile — if the builder ever accumulates state across calls, this breaks silently. Safer to create the builder inside the loop
bagmarnikhil
left a comment
There was a problem hiding this comment.
Nice work — clean implementation with thorough test coverage. Two items to address:
-
when evaluateConditional throws for a single element in the loop (FilterListProcessor.java:103-110), that element is silently dropped from the output — this is a data loss risk, consider failing open or tagging the event;
-
JacksonEvent.Builder at line 93 is reused across loop iterations which is fragile — safer to create it inside the loop. Rest looks good.
|
Looks good too. But it looks like we are creating new processors each of the "list" operations. Maybe we should have a list processor that can filter and that manipulate list items and one of the manipulations could be converting simple list items to map. That way the other PR (#6665) can be functionally merged into one processor that operates on lists? |
…behaviour and addressed comments - Add processor-scoped metadata: - filter_list_processor_failed_elements_count - filter_list_processor_failed_elements - Keep per-element filtering behavior unchanged - Add TODO for future optimization of evaluation path overhead Signed-off-by: nishantKadivar <nimahesx@amazon.com>
| this.expressionEvaluator = expressionEvaluator; | ||
| this.target = config.getTarget() != null ? config.getTarget() : config.getSource(); | ||
|
|
||
| if (config.getFilterListWhen() != null |
There was a problem hiding this comment.
Put validations in the Config classes themselves.
You can use an @AssertTrue from the Jakarta package. Other configs follow this pattern.
There was a problem hiding this comment.
Target defaulting has been moved into FilterListProcessorConfig.getTarget(), which now returns source when target is not set.
Regarding @AssertTrue — the existing usages in this module are all for pure cross-field validations that don't require external dependencies. Our getTarget() never rejects anything — it always succeeds. It's a getter with a fallback default, not a validation. So @AssertTrue doesn't apply here.
Expression validation requires ExpressionEvaluator, which is a runtime dependency not available to config POJOs during Jakarta validation. No config class in the project uses @AssertTrue for expression validation.
| final JacksonEvent.Builder contextBuilder = JacksonEvent.builder() | ||
| .withEventType("event"); | ||
|
|
||
| for (final Object element : sourceList) { |
There was a problem hiding this comment.
There are two problems with the translate processor approach. One is the performance. But the other issue is how we handle relative paths. There is a good discussion in #6609 about this same issue.
Basically, we need a way to decide relative versus absolute paths. Some processors are all absolute and some have relative paths. I propose in #6609 that we should use / to state absolute. And lack of that can be relative. Your elementEvent below will be a relative path with an absolute marker. This is a larger problem because it will lock us into the current behavior until a breaking change.
…ests Signed-off-by: Manisha Yadav <yavmanis@amazon.com>
✅ License Header Check PassedAll newly added files have proper license headers. Great work! 🎉 |
We do not have headers specifically defined in these YAML files, as we observed that they were not included in any of the existing processor files. This was done to maintain consistency across processors. Please let us know if it is mandatory to add headers in these files as well. |
|
@yavmanis , Be sure to also correct the headers. The last thing I want to do before merging this is to resolve the issue with the relative pointers. |
Signed-off-by: Manisha Yadav <yavmanis@amazon.com>
I have added the license headers in the yaml files |
|
As discussed on the call, David will revisit the filter_list processor requirements and the current implementation, and will advise on how he would like to handle the processor’s behavior for condition evaluation at the root versus element level, as well as how this processor should align with the other processors. We will make the necessary changes in the PR once we receive David’s final guidance on this. |
| @SuppressWarnings("unchecked") | ||
| final Map<String, Object> contextMap = element instanceof Map | ||
| ? (Map<String, Object>) element | ||
| : Collections.singletonMap("value", element); |
There was a problem hiding this comment.
From this and the examples in the description it looks like we are using "/value" in the keep_when expression as a special name for primitive lists. Are we sure this is the right approach? At first I was worried this would conflict with actual fields named "value", but this checking for Map condition does handle that conflict. We just need to be very clear in the documentation for keep_when that "/value" is a special keyword for primitive lists.
| try { | ||
| sourceList = recordEvent.get(config.getSource(), List.class); | ||
| } catch (final Exception e) { | ||
| LOG.warn(EVENT, "Given source path [{}] is not valid on record [{}]", |
There was a problem hiding this comment.
Can you SENSITIVE marker to this log? This is needed if we are logging the entire recordEvent. Also, may be useful having a metric on this error case as well, but not required.
| recordEvent.put(config.getTarget(), filteredList); | ||
|
|
||
| } catch (final Exception e) { | ||
| LOG.atError() |
There was a problem hiding this comment.
Same here logging the full event requires SENSITIVE marker
There was a problem hiding this comment.
Added SENSITIVE marker to warning logs that output full event context:
- Line 66: LOG.warn(EVENT, SENSITIVE, ...) — when source path is invalid
- Line 101: LOG.warn(EVENT, SENSITIVE, ...) — when keep_element_when evaluation fails
| @JsonPropertyDescription("An expression evaluated per element. Elements where this expression evaluates to true are kept. " + | ||
| "The expression is evaluated against each element of the array as if it were a standalone event.") | ||
| @ExampleValues({ | ||
| @Example(value = "/status == \"active\"", description = "Keeps only elements where 'status' equals 'active'."), | ||
| @Example(value = "/score > 50", description = "Keeps only elements where 'score' is greater than 50.") | ||
| }) |
There was a problem hiding this comment.
Let's add examples for primitive lists where "/value" is used as keyword, and explain that in the description.
There was a problem hiding this comment.
Updated FilterListProcessorConfig @JsonPropertyDescription and @ExampleValues for keep_element_when:
- Added explanation: "For primitive elements (strings, numbers, booleans), the element value is accessible via the key /value"
- Added examples:
- /value > 50 (numeric primitives)
- /value != "" (string primitives)
|
To promote consistency with other processors that iterate over elements, we can rename |
…-processor # Conflicts: # data-prepper-plugins/mutate-event-processors/README.md
- Rename source → iterate_on, keep_when → keep_element_when - Update FilterListProcessorConfig fields/getters/validation - Update FilterListProcessor to use new getters - Add SENSITIVE marker to warning logs - Add TODO(opensearch-project#6609) reference in processor - Document /value for primitive elements in keep_element_when - Update README examples and configuration docs - Update all tests and YAML fixtures to new key names - Verified: :data-prepper-plugins:mutate-event-processors:test (PASS) Signed-off-by: nishantKadivar <nimahesx@amazon.com>
Updated as suggested — renamed source to iterate_on and keep_when to keep_element_when to align with existing Data Prepper conventions. Other configuration options remain unchanged. |
dlvenable
left a comment
There was a problem hiding this comment.
Thank you @yavmanis and @nishantKadivar !
bagmarnikhil
left a comment
There was a problem hiding this comment.
Thank you for the change!
filter_list Processor
The
filter_listprocessor filters elements within an array field by evaluating a condition against each element and retaining only those that satisfy the condition.It supports:
Description
This processor iterates over a specified array field and evaluates an expression for each element.
keep_element_whenfilter_list_whenThis behavior is aligned with existing processors like
delete_entriesfor consistency.Configuration Updates
To promote consistency with other processors that iterate over elements:
source→iterate_onBetter reflects that the processor iterates over a collection
keep_when→keep_element_whenClearly indicates evaluation happens at the element level
Other configuration options remain unchanged.
Configuration Options
outer/inner)Execution Semantics
keep_element_when→ evaluated in element scopefilter_list_when→ evaluated in root event scopefilter_list_whenis false → processor is skippedtargetis not provided → filtering is in-placetargetis provided → filtered result is written to target fieldFailure Handling
If evaluation fails for one or more elements:
Event is marked using
tags_on_failureMetadata fields are added:
filter_list_processor_failed_elements_countfilter_list_processor_failed_elementsThis avoids silent data loss and improves observability.
Example Pipeline Configuration
Input
{"items": [{"name": "item1", "status": "active"}, {"name": "item2", "status": "inactive"}, {"name": "item3", "status": "active"}], "env": "production", "type": "inventory"} {"tags": ["important", "", "urgent", ""], "type": "tagged"} {"scores": [95, 30, 75, 10, 88], "subject": "math", "type": "grades"} {"items": [{"name": "bug1", "priority": "critical"}, {"name": "bug2", "priority": "low"}, {"name": "bug3", "priority": "critical"}], "type": "tickets"} {"items": [{"name": "item1", "status": "active"}], "env": "staging", "type": "inventory"}Output
{"items":[{"name":"item1","status":"active"},{"name":"item3","status":"active"}],"env":"production","type":"inventory"} {"tags":["important","urgent"],"type":"tagged"} {"scores":[95,30,75,10,88],"subject":"math","type":"grades","passing_scores":[95,75,88]} {"items":[{"name":"bug1","priority":"critical"},{"name":"bug2","priority":"low"},{"name":"bug3","priority":"critical"}],"type":"tickets","critical_items":[{"name":"bug1","priority":"critical"},{"name":"bug3","priority":"critical"}]} {"items":[{"name":"item1","status":"active"}],"env":"staging","type":"inventory"}End-to-End Scenarios
status == active, env=production)> 50)priority == critical)Summary
Introduces
filter_listprocessor for array element filteringAligns naming with existing processors:
iterate_onkeep_element_whenSupports:
Adds strong failure visibility with metadata