-
Notifications
You must be signed in to change notification settings - Fork 325
Add filter_list processor to filter list elements #6659
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
6460e31
9dccfce
76c0411
324a610
1487cc1
b1224a6
7bc5a03
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,159 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| * | ||
| * The OpenSearch Contributors require contributions made to | ||
| * this file be licensed under the Apache-2.0 license or a | ||
| * compatible open source license. | ||
| */ | ||
|
|
||
| package org.opensearch.dataprepper.plugins.processor.mutateevent; | ||
|
|
||
| import org.opensearch.dataprepper.expression.ExpressionEvaluator; | ||
| import org.opensearch.dataprepper.metrics.PluginMetrics; | ||
| import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; | ||
| import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; | ||
| import org.opensearch.dataprepper.model.event.Event; | ||
| import org.opensearch.dataprepper.model.event.JacksonEvent; | ||
| import org.opensearch.dataprepper.model.processor.AbstractProcessor; | ||
| import org.opensearch.dataprepper.model.processor.Processor; | ||
| import org.opensearch.dataprepper.model.record.Record; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import java.util.ArrayList; | ||
| import java.util.Collection; | ||
| import java.util.Collections; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Objects; | ||
|
|
||
| import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT; | ||
| import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY; | ||
| import static org.opensearch.dataprepper.logging.DataPrepperMarkers.SENSITIVE; | ||
|
|
||
| @DataPrepperPlugin(name = "filter_list", pluginType = Processor.class, pluginConfigurationType = FilterListProcessorConfig.class) | ||
| public class FilterListProcessor extends AbstractProcessor<Record<Event>, Record<Event>> { | ||
|
|
||
| private static final Logger LOG = LoggerFactory.getLogger(FilterListProcessor.class); | ||
| private static final String FAILED_ELEMENTS_METADATA_KEY = "filter_list_processor_failed_elements"; | ||
| private static final String FAILED_ELEMENTS_COUNT_METADATA_KEY = "filter_list_processor_failed_elements_count"; | ||
| private final FilterListProcessorConfig config; | ||
| private final ExpressionEvaluator expressionEvaluator; | ||
|
|
||
| @DataPrepperPluginConstructor | ||
| public FilterListProcessor(final PluginMetrics pluginMetrics, final FilterListProcessorConfig config, final ExpressionEvaluator expressionEvaluator) { | ||
| super(pluginMetrics); | ||
| this.config = config; | ||
| this.expressionEvaluator = expressionEvaluator; | ||
|
|
||
| config.validateExpressions(expressionEvaluator); | ||
| } | ||
|
|
||
| @Override | ||
| public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) { | ||
| for (final Record<Event> record : records) { | ||
| final Event recordEvent = record.getData(); | ||
|
|
||
| try { | ||
| if (Objects.nonNull(config.getFilterListWhen()) && !expressionEvaluator.evaluateConditional(config.getFilterListWhen(), recordEvent)) { | ||
| continue; | ||
| } | ||
|
|
||
| final List<Object> sourceList; | ||
| try { | ||
| sourceList = recordEvent.get(config.getIterateOn(), List.class); | ||
| } catch (final Exception e) { | ||
| LOG.atWarn() | ||
| .addMarker(EVENT) | ||
| .addMarker(SENSITIVE) | ||
| .setMessage("Given source path [{}] is not valid on record [{}]") | ||
| .addArgument(config.getIterateOn()) | ||
| .addArgument(recordEvent) | ||
| .setCause(e) | ||
| .log(); | ||
| addTagsOnFailure(recordEvent); | ||
| continue; | ||
| } | ||
|
|
||
| if (sourceList == null) { | ||
| LOG.debug("Source list at path [{}] is null, skipping event", config.getIterateOn()); | ||
| continue; | ||
| } | ||
|
|
||
| final List<Object> filteredList = new ArrayList<>(); | ||
| final List<Object> failedElements = new ArrayList<>(); | ||
| int failedElementCount = 0; | ||
|
|
||
| for (final Object element : sourceList) { | ||
| @SuppressWarnings("unchecked") | ||
| final Map<String, Object> contextMap = element instanceof Map | ||
| ? (Map<String, Object>) element | ||
| : Collections.singletonMap("value", element); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From this and the examples in the description it looks like we are using "/value" in the |
||
|
|
||
| try { | ||
| // TODO(#6609): Revisit this per-element Event construction when ExpressionEvaluator/JsonPointer | ||
| // internals support a lighter evaluation path that avoids full tree conversion. | ||
| final Event elementEvent = JacksonEvent.builder() | ||
| .withEventType("event") | ||
| .withData(contextMap) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When evaluateConditional throws for a single element, that element is silently dropped from the result. This is a data loss scenario with no signal to the user beyond WARN-level logs
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for calling this out.
This keeps current filtering behavior consistent with similar processors, while ensuring downstream processors and users can clearly detect and inspect element-level evaluation failures instead of relying only on WARN logs. |
||
| .build(); | ||
|
|
||
| if (expressionEvaluator.evaluateConditional(config.getKeepElementWhen(), elementEvent)) { | ||
| filteredList.add(element); | ||
| } | ||
| } catch (final Exception e) { | ||
| failedElementCount++; | ||
| failedElements.add(element); | ||
| LOG.atWarn() | ||
| .addMarker(EVENT) | ||
| .addMarker(SENSITIVE) | ||
| .setMessage("Error evaluating keep_element_when expression [{}] for element in source list at path [{}]") | ||
| .addArgument(config.getKeepElementWhen()) | ||
| .addArgument(config.getIterateOn()) | ||
| .setCause(e) | ||
| .log(); | ||
| } | ||
| } | ||
|
|
||
| if (failedElementCount > 0) { | ||
| addTagsOnFailure(recordEvent); | ||
| recordEvent.getMetadata().setAttribute(FAILED_ELEMENTS_COUNT_METADATA_KEY, failedElementCount); | ||
| recordEvent.getMetadata().setAttribute(FAILED_ELEMENTS_METADATA_KEY, failedElements); | ||
| } | ||
|
|
||
| recordEvent.put(config.getTarget(), filteredList); | ||
|
|
||
| } catch (final Exception e) { | ||
| LOG.atError() | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here logging the full event requires SENSITIVE marker
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added SENSITIVE marker to warning logs that output full event context:
|
||
| .addMarker(EVENT) | ||
| .addMarker(NOISY) | ||
| .setMessage("There was an exception while processing Event [{}]") | ||
| .addArgument(recordEvent) | ||
| .setCause(e) | ||
| .log(); | ||
| addTagsOnFailure(recordEvent); | ||
| } | ||
| } | ||
| return records; | ||
| } | ||
|
|
||
| private void addTagsOnFailure(final Event event) { | ||
| if (config.getTagsOnFailure() != null) { | ||
| event.getMetadata().addTags(config.getTagsOnFailure()); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void prepareForShutdown() { | ||
| } | ||
|
|
||
| @Override | ||
| public boolean isReadyForShutdown() { | ||
| return true; | ||
| } | ||
|
|
||
| @Override | ||
| public void shutdown() { | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Each array element gets wrapped in a full JacksonEvent via builder.withData(contextMap).build(), which internally runs mapper.valueToTree(). For large arrays this creates
significant GC pressure and serialization overhead — the same concern @dlvenable raised in the issue discussion. This matches the pattern used by the translate processor, which has reportedly caused performance issues.
For now this is probably acceptable since the ExpressionEvaluator requires an Event, but worth noting as a known limitation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now, we have acknowledged the limitation and mark it as TODO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two problems with the
translateprocessor approach. One is the performance. But the other issue is how we handle relative paths. There is a good discussion in #6609 about this same issue.Basically, we need a way to decide relative versus absolute paths. Some processors are all absolute and some have relative paths. I propose in #6609 that we should use
/to state absolute. And lack of that can be relative. YourelementEventbelow will be a relative path with an absolute marker. This is a larger problem because it will lock us into the current behavior until a breaking change.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for flagging this. We understand this could conflict with a future convention where
/explicitly means absolute (as discussed in #6609). We'll keep this in mind and align with whatever convention is adopted at the framework level. For now, this is the only viable approach given the currentExpressionEvaluatorAPI.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yavmanis , We should avoid pushing this bug further into processors. Using
/is clearly indicating a JSON pointer at a root object.I think these are our options at this point:
use_relative_pointer_on_keep_when. The only allowable value would betrueand this is not a default value so users must provide it.We would eventually remove it.
This name helps clarify that
filter_list_whenisn't using the relative pointer.Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dlvenable Since we implemented this processor from the beginning, we have carefully handled all the relevant conditions, particularly those related to relative JSON pointers. Specifically, the filter_list_when condition resolves against the root-level fields, while the keep_when condition resolves against the element-level fields for each object in the list.
Based on our testing, both conditions are working as expected within their respective contexts. Therefore, we believe that introducing the flag
use_relative_pointer_on_keep_when: truemay not be necessary for resolving JSON pointers, as this behavior is already managed within the processor implementation.Additionally, the behavior of both conditions is consistent with the delete_entries processor.
However, if you have a specific use case in mind that is not adequately handled by the current implementation and would require this flag, please let us know. We would be happy to review and address it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dlvenable if
keep_whensyntax stays the same regardless, I'm not sure if adding a flag like thisuse_relative_pointer_on_keep_when: trueis correct yet. Right now, as long as we are consistently saying "keep_when uses local element scope", and "filter_list_when" uses scope from the root of the Event", I think that is pretty clear.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We had a discussion last week about renaming the fields to promote consistency. I suggested in another comment that we use
iterate_onandkeep_element_whento signal that these have similar behavior to the delete entries processor.The add entries processor is unfortunately not handling this in a great way currently. But this PR would match delete entries with those name changes.