From 6460e31908837b000a9b2d879cef0f0befbdc61c Mon Sep 17 00:00:00 2001 From: Manisha Yadav Date: Fri, 20 Mar 2026 08:09:12 +0000 Subject: [PATCH 1/5] Add filter_list processor to filter list elements Signed-off-by: Manisha Yadav --- .../mutate-event-processors/README.md | 147 ++++ .../mutateevent/FilterListProcessor.java | 149 ++++ .../FilterListProcessorConfig.java | 89 +++ .../FilterListProcessorConfigTest.java | 65 ++ .../mutateevent/FilterListProcessorTest.java | 661 ++++++++++++++++++ 5 files changed, 1111 insertions(+) create mode 100644 data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessor.java create mode 100644 data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorConfig.java create mode 100644 data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorConfigTest.java create mode 100644 data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorTest.java diff --git a/data-prepper-plugins/mutate-event-processors/README.md b/data-prepper-plugins/mutate-event-processors/README.md index 75f7e049a0..fca3f812d4 100644 --- a/data-prepper-plugins/mutate-event-processors/README.md +++ b/data-prepper-plugins/mutate-event-processors/README.md @@ -647,6 +647,153 @@ will end up with this after processing: * `tags_on_failure` - (optional): a list of tags to add to event metadata when the event fails to process +## FilterListProcessor +A processor that filters elements within an array field by evaluating a condition against each element, keeping only those where the condition is true. It supports arrays of objects and arrays of primitives (strings, numbers, booleans). + +### Basic Usage +To get started, create the following `pipeline.yaml`. +```yaml +pipeline: + source: + file: + path: "/full/path/to/logs_json.log" + record_type: "event" + format: "json" + processor: + - filter_list: + source: "items" + keep_when: '/status == "active"' + sink: + - stdout: +``` + +Create the following file named `logs_json.log` and replace the `path` in the file source of your `pipeline.yaml` with the path of this file. + +```json +{"items": [{"name": "item1", "status": "active"}, {"name": "item2", "status": "inactive"}, {"name": "item3", "status": "active"}]} +``` + +When run, the processor will filter the array in-place and produce the following output: + +```json +{"items": [{"name": "item1", "status": "active"}, {"name": "item3", "status": "active"}]} +``` + +### Filtering to a different target + +You can write the filtered result to a different key, leaving the original array unchanged: + +```yaml + processor: + - filter_list: + source: "items" + target: "active_items" + keep_when: '/status == "active"' +``` + +With the same input, the output will be: + +```json +{ + "items": [{"name": "item1", "status": "active"}, {"name": "item2", "status": "inactive"}, {"name": "item3", "status": "active"}], + "active_items": [{"name": "item1", "status": "active"}, {"name": "item3", "status": "active"}] +} +``` + +### Filtering primitive arrays + +For arrays of primitives (strings, numbers, booleans), each element is accessible via the `/value` key in the expression: + +```yaml + processor: + - filter_list: + source: "tags" + keep_when: '/value != ""' +``` + +With the following input: + +```json +{"tags": ["important", "", "urgent", ""]} +``` + +The output will be: + +```json +{"tags": ["important", "urgent"]} +``` + +Another example filtering numbers: + +```yaml + processor: + - filter_list: + source: "scores" + keep_when: '/value > 50' +``` + +With the following input: + +```json +{"scores": [90, 30, 75, 10]} +``` + +The output will be: + +```json +{"scores": [90, 75]} +``` + +### Using both conditions + +The `filter_list_when` condition controls whether the processor runs at all (evaluated against the root event), while `keep_when` controls which elements are kept (evaluated per element): + +```yaml + processor: + - filter_list: + source: "items" + keep_when: '/status == "active"' + filter_list_when: '/env == "production"' +``` + +With the following input: + +```json +{"env": "production", "items": [{"name": "item1", "status": "active"}, {"name": "item2", "status": "inactive"}]} +``` + +Since `env` is `"production"`, the processor runs and filters by `status`, producing: + +```json +{"env": "production", "items": [{"name": "item1", "status": "active"}]} +``` + +With a different event where `filter_list_when` evaluates to `false`: + +```json +{"env": "staging", "items": [{"name": "item1", "status": "active"}, {"name": "item2", "status": "inactive"}]} +``` + +The processor is skipped entirely and the event passes through unchanged: + +```json +{"env": "staging", "items": [{"name": "item1", "status": "active"}, {"name": "item2", "status": "inactive"}]} +``` + +### Configuration +* `source` - (required) - The key of the array field to filter. Supports nested paths (e.g. `outer_key/inner_list`). +* `target` - (optional) - The key to write the filtered array to. Defaults to the `source` key (in-place filtering). Supports nested paths. +* `keep_when` - (required) - A [Data Prepper expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/) evaluated per element. Elements where this expression evaluates to `true` are kept. For object elements, the expression is evaluated against the object's fields directly (e.g. `/status == "active"`). For primitive elements, the value is accessible via `/value` (e.g. `/value > 50`). When no elements match, the result is an empty list `[]`. +* `filter_list_when` - (optional) - A [Data Prepper expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/) evaluated against the root event. When provided, the processor only runs if this condition is `true`. By default, all events are processed. +* `tags_on_failure` - (optional) - A list of tags to add to the event metadata when the processor fails to process the event. + +**Edge case behavior:** +- If the `source` key does not exist or its value is `null`, the processor is a no-op and the event passes through unchanged. +- If the `source` value is not a list (e.g. a string or number), the processor logs a warning and adds `tags_on_failure` if configured. +- `null` elements within the list are evaluated normally. For example, with `keep_when: '/value != null'`, null elements are filtered out while non-null elements are kept. + +___ + ## Developer Guide This plugin is compatible with Java 11 and 17. Refer to the following developer guides for plugin development: - [Developer Guide](https://github.com/opensearch-project/data-prepper/blob/main/docs/developer_guide.md) diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessor.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessor.java new file mode 100644 index 0000000000..5269ab4da5 --- /dev/null +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessor.java @@ -0,0 +1,149 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.processor.mutateevent; + +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; +import org.opensearch.dataprepper.model.processor.AbstractProcessor; +import org.opensearch.dataprepper.model.processor.Processor; +import org.opensearch.dataprepper.model.record.Record; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT; +import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY; + +@DataPrepperPlugin(name = "filter_list", pluginType = Processor.class, pluginConfigurationType = FilterListProcessorConfig.class) +public class FilterListProcessor extends AbstractProcessor, Record> { + + private static final Logger LOG = LoggerFactory.getLogger(FilterListProcessor.class); + private final FilterListProcessorConfig config; + private final ExpressionEvaluator expressionEvaluator; + private final String target; + + @DataPrepperPluginConstructor + public FilterListProcessor(final PluginMetrics pluginMetrics, final FilterListProcessorConfig config, final ExpressionEvaluator expressionEvaluator) { + super(pluginMetrics); + this.config = config; + this.expressionEvaluator = expressionEvaluator; + this.target = config.getTarget() != null ? config.getTarget() : config.getSource(); + + if (config.getFilterListWhen() != null + && !expressionEvaluator.isValidExpressionStatement(config.getFilterListWhen())) { + throw new InvalidPluginConfigurationException( + String.format("filter_list_when %s is not a valid expression statement. " + + "See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax", + config.getFilterListWhen())); + } + + if (!expressionEvaluator.isValidExpressionStatement(config.getKeepWhen())) { + throw new InvalidPluginConfigurationException( + String.format("keep_when %s is not a valid expression statement. " + + "See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax", + config.getKeepWhen())); + } + } + + @Override + public Collection> doExecute(final Collection> records) { + for (final Record record : records) { + final Event recordEvent = record.getData(); + + try { + if (Objects.nonNull(config.getFilterListWhen()) && !expressionEvaluator.evaluateConditional(config.getFilterListWhen(), recordEvent)) { + continue; + } + + final List sourceList; + try { + sourceList = recordEvent.get(config.getSource(), List.class); + } catch (final Exception e) { + LOG.warn(EVENT, "Given source path [{}] is not valid on record [{}]", + config.getSource(), recordEvent, e); + addTagsOnFailure(recordEvent); + continue; + } + + if (sourceList == null) { + LOG.debug("Source list at path [{}] is null, skipping event", config.getSource()); + continue; + } + + final List filteredList = new ArrayList<>(); + final JacksonEvent.Builder contextBuilder = JacksonEvent.builder() + .withEventType("event"); + + for (final Object element : sourceList) { + @SuppressWarnings("unchecked") + final Map contextMap = element instanceof Map + ? (Map) element + : Collections.singletonMap("value", element); + + try { + final Event elementEvent = contextBuilder + .withData(contextMap) + .build(); + + if (expressionEvaluator.evaluateConditional(config.getKeepWhen(), elementEvent)) { + filteredList.add(element); + } + } catch (final Exception e) { + LOG.warn(EVENT, "Error evaluating keep_when expression [{}] for element in source list at path [{}]", + config.getKeepWhen(), config.getSource(), e); + } + } + + recordEvent.put(target, filteredList); + + } catch (final Exception e) { + LOG.atError() + .addMarker(EVENT) + .addMarker(NOISY) + .setMessage("There was an exception while processing Event [{}]") + .addArgument(recordEvent) + .setCause(e) + .log(); + addTagsOnFailure(recordEvent); + } + } + return records; + } + + private void addTagsOnFailure(final Event event) { + if (config.getTagsOnFailure() != null) { + event.getMetadata().addTags(config.getTagsOnFailure()); + } + } + + @Override + public void prepareForShutdown() { + } + + @Override + public boolean isReadyForShutdown() { + return true; + } + + @Override + public void shutdown() { + } +} diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorConfig.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorConfig.java new file mode 100644 index 0000000000..487528b2e4 --- /dev/null +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorConfig.java @@ -0,0 +1,89 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.processor.mutateevent; + +import com.fasterxml.jackson.annotation.JsonClassDescription; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; +import org.opensearch.dataprepper.model.annotations.ExampleValues; +import org.opensearch.dataprepper.model.annotations.ExampleValues.Example; + +import java.util.List; + +@JsonPropertyOrder +@JsonClassDescription("The filter_list processor evaluates a condition against each element of an array " + + "and keeps only those elements where the condition is true.") +public class FilterListProcessorConfig { + + @NotNull + @NotEmpty + @JsonProperty("source") + @JsonPropertyDescription("The key of the array field to filter. Supports nested paths.") + @ExampleValues({ + @Example(value = "my-list", description = "Filters the 'my-list' array at the root of the event."), + @Example(value = "outer-key/my-list", description = "Filters the 'my-list' array nested under 'outer-key'.") + }) + private String source; + + @JsonProperty("target") + @JsonPropertyDescription("The key to write the filtered array to. Defaults to the source key (in-place). " + + "Supports nested paths — intermediate objects are created automatically if they do not exist.") + @ExampleValues({ + @Example(value = "filtered-list", description = "Writes the filtered array to 'filtered-list'.") + }) + private String target; + + @NotNull + @NotEmpty + @JsonProperty("keep_when") + @JsonPropertyDescription("An expression evaluated per element. Elements where this expression evaluates to true are kept. " + + "The expression is evaluated against each element of the array as if it were a standalone event.") + @ExampleValues({ + @Example(value = "/status == \"active\"", description = "Keeps only elements where 'status' equals 'active'."), + @Example(value = "/score > 50", description = "Keeps only elements where 'score' is greater than 50.") + }) + private String keepWhen; + + @JsonProperty("filter_list_when") + @JsonPropertyDescription("A conditional expression, " + + "such as /some-key == \"test\", that will be evaluated against the root event to determine whether " + + "the processor will be run on the event. By default, all events will be processed unless otherwise stated.") + @ExampleValues({ + @Example(value = "/some-key == \"test\"", description = "The processor only runs when the value of 'some-key' is 'test'.") + }) + private String filterListWhen; + + @JsonProperty("tags_on_failure") + @JsonPropertyDescription("A list of tags to add to the event metadata when the event fails to process.") + private List tagsOnFailure; + + public String getSource() { + return source; + } + + public String getTarget() { + return target; + } + + public String getKeepWhen() { + return keepWhen; + } + + public String getFilterListWhen() { + return filterListWhen; + } + + public List getTagsOnFailure() { + return tagsOnFailure; + } +} diff --git a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorConfigTest.java b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorConfigTest.java new file mode 100644 index 0000000000..fd5cbf59d4 --- /dev/null +++ b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorConfigTest.java @@ -0,0 +1,65 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.processor.mutateevent; + +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Field; +import java.util.List; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; + +class FilterListProcessorConfigTest { + + @Test + void test_default_target_is_null() { + final FilterListProcessorConfig config = new FilterListProcessorConfig(); + assertThat(config.getTarget(), is(nullValue())); + } + + @Test + void test_default_filter_list_when_is_null() { + final FilterListProcessorConfig config = new FilterListProcessorConfig(); + assertThat(config.getFilterListWhen(), is(nullValue())); + } + + @Test + void test_default_tags_on_failure_is_null() { + final FilterListProcessorConfig config = new FilterListProcessorConfig(); + assertThat(config.getTagsOnFailure(), is(nullValue())); + } + + @Test + void test_getters_return_set_values() throws NoSuchFieldException, IllegalAccessException { + final FilterListProcessorConfig config = new FilterListProcessorConfig(); + + setField(config, "source", "my_source"); + setField(config, "target", "my_target"); + setField(config, "keepWhen", "/type == \"cve\""); + setField(config, "filterListWhen", "/enabled == true"); + setField(config, "tagsOnFailure", List.of("tag1")); + + assertThat(config.getSource(), is(equalTo("my_source"))); + assertThat(config.getTarget(), is(equalTo("my_target"))); + assertThat(config.getKeepWhen(), is(equalTo("/type == \"cve\""))); + assertThat(config.getFilterListWhen(), is(equalTo("/enabled == true"))); + assertThat(config.getTagsOnFailure(), is(equalTo(List.of("tag1")))); + } + + private void setField(final Object object, final String fieldName, final Object value) + throws NoSuchFieldException, IllegalAccessException { + final Field field = object.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + field.set(object, value); + } +} diff --git a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorTest.java b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorTest.java new file mode 100644 index 0000000000..5ce4afde2d --- /dev/null +++ b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorTest.java @@ -0,0 +1,661 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.processor.mutateevent; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; +import org.opensearch.dataprepper.model.record.Record; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class FilterListProcessorTest { + + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private FilterListProcessorConfig mockConfig; + + @Mock + private ExpressionEvaluator expressionEvaluator; + + private static final String KEEP_WHEN_EXPRESSION = "/type == \"cve\""; + private static final String SOURCE_KEY = "identifiers"; + + @BeforeEach + void setUp() { + lenient().when(mockConfig.getSource()).thenReturn(SOURCE_KEY); + lenient().when(mockConfig.getTarget()).thenReturn(null); + lenient().when(mockConfig.getKeepWhen()).thenReturn(KEEP_WHEN_EXPRESSION); + lenient().when(mockConfig.getFilterListWhen()).thenReturn(null); + lenient().when(mockConfig.getTagsOnFailure()).thenReturn(null); + lenient().when(expressionEvaluator.isValidExpressionStatement(KEEP_WHEN_EXPRESSION)).thenReturn(true); + } + + @Test + void invalid_keep_when_throws_InvalidPluginConfigurationException() { + final String invalidExpression = UUID.randomUUID().toString(); + when(mockConfig.getKeepWhen()).thenReturn(invalidExpression); + when(expressionEvaluator.isValidExpressionStatement(invalidExpression)).thenReturn(false); + + assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest); + } + + @Test + void invalid_filter_list_when_throws_InvalidPluginConfigurationException() { + final String filterListWhen = UUID.randomUUID().toString(); + when(mockConfig.getFilterListWhen()).thenReturn(filterListWhen); + when(expressionEvaluator.isValidExpressionStatement(filterListWhen)).thenReturn(false); + + assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest); + } + + @Test + void test_filter_keeps_matching_elements_in_place() { + final FilterListProcessor processor = createObjectUnderTest(); + final Record testRecord = createTestRecord(); + + when(expressionEvaluator.evaluateConditional(eq(KEEP_WHEN_EXPRESSION), any(Event.class))) + .thenAnswer(invocation -> { + Event event = invocation.getArgument(1); + return "cve".equals(event.get("type", String.class)); + }); + + final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecords.size(), is(1)); + final Event resultEvent = resultRecords.get(0).getData(); + final List> filteredList = resultEvent.get(SOURCE_KEY, List.class); + assertThat(filteredList, is(notNullValue())); + assertThat(filteredList.size(), is(2)); + assertThat(filteredList.get(0).get("id"), is("CVE-1")); + assertThat(filteredList.get(1).get("id"), is("CVE-2")); + } + + @Test + void test_filter_writes_to_different_target() { + final String targetKey = "cve_identifiers"; + when(mockConfig.getTarget()).thenReturn(targetKey); + + final FilterListProcessor processor = createObjectUnderTest(); + final Record testRecord = createTestRecord(); + + when(expressionEvaluator.evaluateConditional(eq(KEEP_WHEN_EXPRESSION), any(Event.class))) + .thenAnswer(invocation -> { + Event event = invocation.getArgument(1); + return "cve".equals(event.get("type", String.class)); + }); + + final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecords.size(), is(1)); + final Event resultEvent = resultRecords.get(0).getData(); + + // Original source should remain unchanged + final List> originalList = resultEvent.get(SOURCE_KEY, List.class); + assertThat(originalList.size(), is(3)); + + // Target should have filtered list + final List> filteredList = resultEvent.get(targetKey, List.class); + assertThat(filteredList, is(notNullValue())); + assertThat(filteredList.size(), is(2)); + assertThat(filteredList.get(0).get("id"), is("CVE-1")); + assertThat(filteredList.get(1).get("id"), is("CVE-2")); + } + + @Test + void test_filter_writes_to_nested_target() { + final String nestedTarget = "vulnerability/cve_ids"; + when(mockConfig.getTarget()).thenReturn(nestedTarget); + + final FilterListProcessor processor = createObjectUnderTest(); + final Record testRecord = createTestRecord(); + + when(expressionEvaluator.evaluateConditional(eq(KEEP_WHEN_EXPRESSION), any(Event.class))) + .thenAnswer(invocation -> { + Event event = invocation.getArgument(1); + return "cve".equals(event.get("type", String.class)); + }); + + final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecords.size(), is(1)); + final Event resultEvent = resultRecords.get(0).getData(); + final List> filteredList = resultEvent.get(nestedTarget, List.class); + assertThat(filteredList, is(notNullValue())); + assertThat(filteredList.size(), is(2)); + } + + @Test + void test_filter_removes_all_elements_when_none_match() { + final FilterListProcessor processor = createObjectUnderTest(); + final Record testRecord = createTestRecord(); + + when(expressionEvaluator.evaluateConditional(eq(KEEP_WHEN_EXPRESSION), any(Event.class))) + .thenReturn(false); + + final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecords.size(), is(1)); + final Event resultEvent = resultRecords.get(0).getData(); + final List> filteredList = resultEvent.get(SOURCE_KEY, List.class); + assertThat(filteredList, is(notNullValue())); + assertThat(filteredList.size(), is(0)); + } + + @Test + void test_filter_keeps_all_elements_when_all_match() { + final FilterListProcessor processor = createObjectUnderTest(); + final Record testRecord = createTestRecord(); + + when(expressionEvaluator.evaluateConditional(eq(KEEP_WHEN_EXPRESSION), any(Event.class))) + .thenReturn(true); + + final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecords.size(), is(1)); + final Event resultEvent = resultRecords.get(0).getData(); + final List> filteredList = resultEvent.get(SOURCE_KEY, List.class); + assertThat(filteredList, is(notNullValue())); + assertThat(filteredList.size(), is(3)); + } + + @Test + void test_filter_with_empty_source_list() { + final FilterListProcessor processor = createObjectUnderTest(); + + final Map data = Map.of(SOURCE_KEY, List.of()); + final Event event = JacksonEvent.builder().withData(data).withEventType("event").build(); + final Record testRecord = new Record<>(event); + + final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecords.size(), is(1)); + final Event resultEvent = resultRecords.get(0).getData(); + final List> filteredList = resultEvent.get(SOURCE_KEY, List.class); + assertThat(filteredList, is(notNullValue())); + assertThat(filteredList.size(), is(0)); + } + + @Test + void test_filter_with_null_source_list_is_noop() { + final FilterListProcessor processor = createObjectUnderTest(); + + final Map data = new HashMap<>(); + data.put(SOURCE_KEY, null); + final Event event = JacksonEvent.builder().withData(data).withEventType("event").build(); + final Record testRecord = new Record<>(event); + + final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecords.size(), is(1)); + } + + @Test + void test_filter_with_missing_source_key_is_noop() { + final FilterListProcessor processor = createObjectUnderTest(); + + final Map data = Map.of("other_key", "value"); + final Event event = JacksonEvent.builder().withData(data).withEventType("event").build(); + final Record testRecord = new Record<>(event); + + final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecords.size(), is(1)); + final Event resultEvent = resultRecords.get(0).getData(); + // Event should remain unchanged since source key doesn't exist + assertThat(resultEvent.toMap(), equalTo(testRecord.getData().toMap())); + } + + @Test + void test_filter_skipped_when_filter_list_when_is_false() { + final String filterListWhen = UUID.randomUUID().toString(); + when(mockConfig.getFilterListWhen()).thenReturn(filterListWhen); + when(expressionEvaluator.isValidExpressionStatement(filterListWhen)).thenReturn(true); + + final FilterListProcessor processor = createObjectUnderTest(); + final Record testRecord = createTestRecord(); + + when(expressionEvaluator.evaluateConditional(filterListWhen, testRecord.getData())).thenReturn(false); + + final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecords.size(), is(1)); + final Event resultEvent = resultRecords.get(0).getData(); + // Event should remain unchanged + assertThat(resultEvent.toMap(), equalTo(testRecord.getData().toMap())); + } + + @Test + void test_filter_runs_when_filter_list_when_is_true() { + final String filterListWhen = UUID.randomUUID().toString(); + when(mockConfig.getFilterListWhen()).thenReturn(filterListWhen); + when(expressionEvaluator.isValidExpressionStatement(filterListWhen)).thenReturn(true); + + final FilterListProcessor processor = createObjectUnderTest(); + final Record testRecord = createTestRecord(); + + when(expressionEvaluator.evaluateConditional(filterListWhen, testRecord.getData())).thenReturn(true); + when(expressionEvaluator.evaluateConditional(eq(KEEP_WHEN_EXPRESSION), any(Event.class))) + .thenReturn(false); + + final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecords.size(), is(1)); + final Event resultEvent = resultRecords.get(0).getData(); + final List> filteredList = resultEvent.get(SOURCE_KEY, List.class); + assertThat(filteredList, is(notNullValue())); + assertThat(filteredList.size(), is(0)); + } + + @Test + void test_filter_with_nested_source_path() { + final String nestedSource = "vulnerability/identifiers"; + when(mockConfig.getSource()).thenReturn(nestedSource); + + final FilterListProcessor processor = createObjectUnderTest(); + + final List> identifiers = new ArrayList<>(); + identifiers.add(Map.of("id", "CVE-1", "type", "cve")); + identifiers.add(Map.of("id", "CWE-1", "type", "cwe")); + + final Map data = Map.of("vulnerability", Map.of("identifiers", identifiers)); + final Event event = JacksonEvent.builder().withData(data).withEventType("event").build(); + final Record testRecord = new Record<>(event); + + when(expressionEvaluator.evaluateConditional(eq(KEEP_WHEN_EXPRESSION), any(Event.class))) + .thenAnswer(invocation -> { + Event e = invocation.getArgument(1); + return "cve".equals(e.get("type", String.class)); + }); + + final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecords.size(), is(1)); + final Event resultEvent = resultRecords.get(0).getData(); + final List> filteredList = resultEvent.get(nestedSource, List.class); + assertThat(filteredList, is(notNullValue())); + assertThat(filteredList.size(), is(1)); + assertThat(filteredList.get(0).get("id"), is("CVE-1")); + } + + @Test + void test_filter_with_mixed_map_and_primitive_elements() { + final FilterListProcessor processor = createObjectUnderTest(); + + final List mixedList = new ArrayList<>(); + mixedList.add(Map.of("id", "CVE-1", "type", "cve")); + mixedList.add("a-string"); + mixedList.add(Map.of("id", "CVE-2", "type", "cve")); + + final Map data = new HashMap<>(); + data.put(SOURCE_KEY, mixedList); + final Event event = JacksonEvent.builder().withData(data).withEventType("event").build(); + final Record testRecord = new Record<>(event); + + when(expressionEvaluator.evaluateConditional(eq(KEEP_WHEN_EXPRESSION), any(Event.class))) + .thenReturn(true); + + final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecords.size(), is(1)); + final Event resultEvent = resultRecords.get(0).getData(); + final List filteredList = resultEvent.get(SOURCE_KEY, List.class); + assertThat(filteredList, is(notNullValue())); + assertThat(filteredList.size(), is(3)); + } + + @Test + void test_filter_primitive_string_list() { + final String keepWhen = "/value != \"\""; + when(mockConfig.getKeepWhen()).thenReturn(keepWhen); + when(expressionEvaluator.isValidExpressionStatement(keepWhen)).thenReturn(true); + + final FilterListProcessor processor = createObjectUnderTest(); + + final List stringList = new ArrayList<>(); + stringList.add("hello"); + stringList.add(""); + stringList.add("world"); + + final Map data = new HashMap<>(); + data.put(SOURCE_KEY, stringList); + final Event event = JacksonEvent.builder().withData(data).withEventType("event").build(); + final Record testRecord = new Record<>(event); + + when(expressionEvaluator.evaluateConditional(eq(keepWhen), any(Event.class))) + .thenAnswer(invocation -> { + Event e = invocation.getArgument(1); + Object val = e.get("value", Object.class); + return val != null && !"".equals(val); + }); + + final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecords.size(), is(1)); + final List filteredList = resultRecords.get(0).getData().get(SOURCE_KEY, List.class); + assertThat(filteredList, is(notNullValue())); + assertThat(filteredList.size(), is(2)); + assertThat(filteredList.get(0), is("hello")); + assertThat(filteredList.get(1), is("world")); + } + + @Test + void test_filter_primitive_number_list() { + final String keepWhen = "/value > 0"; + when(mockConfig.getKeepWhen()).thenReturn(keepWhen); + when(expressionEvaluator.isValidExpressionStatement(keepWhen)).thenReturn(true); + + final FilterListProcessor processor = createObjectUnderTest(); + + final List numberList = new ArrayList<>(); + numberList.add(10); + numberList.add(-5); + numberList.add(0); + numberList.add(42); + + final Map data = new HashMap<>(); + data.put(SOURCE_KEY, numberList); + final Event event = JacksonEvent.builder().withData(data).withEventType("event").build(); + final Record testRecord = new Record<>(event); + + when(expressionEvaluator.evaluateConditional(eq(keepWhen), any(Event.class))) + .thenAnswer(invocation -> { + Event e = invocation.getArgument(1); + Object val = e.get("value", Object.class); + return val instanceof Number && ((Number) val).intValue() > 0; + }); + + final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecords.size(), is(1)); + final List filteredList = resultRecords.get(0).getData().get(SOURCE_KEY, List.class); + assertThat(filteredList, is(notNullValue())); + assertThat(filteredList.size(), is(2)); + assertThat(filteredList.get(0), is(10)); + assertThat(filteredList.get(1), is(42)); + } + + @Test + void test_filter_primitive_list_with_nulls() { + final String keepWhen = "/value != null"; + when(mockConfig.getKeepWhen()).thenReturn(keepWhen); + when(expressionEvaluator.isValidExpressionStatement(keepWhen)).thenReturn(true); + + final FilterListProcessor processor = createObjectUnderTest(); + + final List listWithNulls = new ArrayList<>(); + listWithNulls.add("keep"); + listWithNulls.add(null); + listWithNulls.add(42); + + final Map data = new HashMap<>(); + data.put(SOURCE_KEY, listWithNulls); + final Event event = JacksonEvent.builder().withData(data).withEventType("event").build(); + final Record testRecord = new Record<>(event); + + when(expressionEvaluator.evaluateConditional(eq(keepWhen), any(Event.class))) + .thenAnswer(invocation -> { + Event e = invocation.getArgument(1); + return e.get("value", Object.class) != null; + }); + + final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecords.size(), is(1)); + final List filteredList = resultRecords.get(0).getData().get(SOURCE_KEY, List.class); + assertThat(filteredList, is(notNullValue())); + assertThat(filteredList.size(), is(2)); + assertThat(filteredList.get(0), is("keep")); + assertThat(filteredList.get(1), is(42)); + } + + @Test + void test_filter_continues_processing_when_expression_throws_for_one_element() { + final FilterListProcessor processor = createObjectUnderTest(); + final Record testRecord = createTestRecord(); + + when(expressionEvaluator.evaluateConditional(eq(KEEP_WHEN_EXPRESSION), any(Event.class))) + .thenReturn(true) + .thenThrow(new RuntimeException("expression error")) + .thenReturn(true); + + final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecords.size(), is(1)); + final Event resultEvent = resultRecords.get(0).getData(); + final List> filteredList = resultEvent.get(SOURCE_KEY, List.class); + assertThat(filteredList, is(notNullValue())); + assertThat(filteredList.size(), is(2)); + } + + @Test + void test_filter_processes_multiple_records() { + final FilterListProcessor processor = createObjectUnderTest(); + + final Record record1 = createTestRecord(); + final Record record2 = createTestRecord(); + + when(expressionEvaluator.evaluateConditional(eq(KEEP_WHEN_EXPRESSION), any(Event.class))) + .thenAnswer(invocation -> { + Event event = invocation.getArgument(1); + return "cve".equals(event.get("type", String.class)); + }); + + final List> resultRecords = (List>) processor.doExecute(List.of(record1, record2)); + + assertThat(resultRecords.size(), is(2)); + for (final Record resultRecord : resultRecords) { + final List> filteredList = resultRecord.getData().get(SOURCE_KEY, List.class); + assertThat(filteredList.size(), is(2)); + } + } + + @Test + void test_filter_list_when_true_and_keep_when_selectively_filters() { + final String filterListWhen = "/env == \"production\""; + when(mockConfig.getFilterListWhen()).thenReturn(filterListWhen); + when(expressionEvaluator.isValidExpressionStatement(filterListWhen)).thenReturn(true); + + final FilterListProcessor processor = createObjectUnderTest(); + final Record testRecord = createTestRecord(); + + when(expressionEvaluator.evaluateConditional(filterListWhen, testRecord.getData())).thenReturn(true); + when(expressionEvaluator.evaluateConditional(eq(KEEP_WHEN_EXPRESSION), any(Event.class))) + .thenAnswer(invocation -> { + Event event = invocation.getArgument(1); + return "cve".equals(event.get("type", String.class)); + }); + + final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecords.size(), is(1)); + final Event resultEvent = resultRecords.get(0).getData(); + final List> filteredList = resultEvent.get(SOURCE_KEY, List.class); + assertThat(filteredList, is(notNullValue())); + assertThat(filteredList.size(), is(2)); + assertThat(filteredList.get(0).get("id"), is("CVE-1")); + assertThat(filteredList.get(1).get("id"), is("CVE-2")); + } + + @Test + void test_multiple_records_with_different_filter_list_when_outcomes() { + final String filterListWhen = "/process == true"; + when(mockConfig.getFilterListWhen()).thenReturn(filterListWhen); + when(expressionEvaluator.isValidExpressionStatement(filterListWhen)).thenReturn(true); + + final FilterListProcessor processor = createObjectUnderTest(); + + // Record 1: filter_list_when passes + final Record record1 = createTestRecord(); + // Record 2: filter_list_when fails + final Map data2 = new HashMap<>(); + final List> list2 = new ArrayList<>(); + list2.add(Map.of("id", "KEEP-1", "type", "cve")); + list2.add(Map.of("id", "KEEP-2", "type", "cwe")); + data2.put(SOURCE_KEY, list2); + final Event event2 = JacksonEvent.builder().withData(data2).withEventType("event").build(); + final Record record2 = new Record<>(event2); + + when(expressionEvaluator.evaluateConditional(filterListWhen, record1.getData())).thenReturn(true); + when(expressionEvaluator.evaluateConditional(filterListWhen, record2.getData())).thenReturn(false); + when(expressionEvaluator.evaluateConditional(eq(KEEP_WHEN_EXPRESSION), any(Event.class))) + .thenAnswer(invocation -> { + Event event = invocation.getArgument(1); + return "cve".equals(event.get("type", String.class)); + }); + + final List> resultRecords = (List>) processor.doExecute(List.of(record1, record2)); + + assertThat(resultRecords.size(), is(2)); + + // Record 1: filtered + final List> filteredList1 = resultRecords.get(0).getData().get(SOURCE_KEY, List.class); + assertThat(filteredList1.size(), is(2)); + assertThat(filteredList1.get(0).get("id"), is("CVE-1")); + + // Record 2: untouched + final List> untouchedList = resultRecords.get(1).getData().get(SOURCE_KEY, List.class); + assertThat(untouchedList.size(), is(2)); + assertThat(untouchedList.get(0).get("id"), is("KEEP-1")); + assertThat(untouchedList.get(1).get("id"), is("KEEP-2")); + } + + @Test + void test_filter_with_source_not_a_list_adds_tags_on_failure() { + final List testTags = List.of("tag1", "tag2"); + when(mockConfig.getTagsOnFailure()).thenReturn(testTags); + + final FilterListProcessor processor = createObjectUnderTest(); + + final Map data = Map.of(SOURCE_KEY, "not-a-list"); + final Event event = JacksonEvent.builder().withData(data).withEventType("event").build(); + final Record testRecord = new Record<>(event); + + final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecords.size(), is(1)); + final Event resultEvent = resultRecords.get(0).getData(); + assertThat(resultEvent.getMetadata().getTags(), is(new HashSet<>(testTags))); + } + + @Test + void test_filter_with_single_element_list() { + final FilterListProcessor processor = createObjectUnderTest(); + + final List> singleElementList = List.of(Map.of("id", "CVE-1", "type", "cve")); + final Map data = Map.of(SOURCE_KEY, singleElementList); + final Event event = JacksonEvent.builder().withData(data).withEventType("event").build(); + final Record testRecord = new Record<>(event); + + when(expressionEvaluator.evaluateConditional(eq(KEEP_WHEN_EXPRESSION), any(Event.class))) + .thenReturn(true); + + final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecords.size(), is(1)); + final Event resultEvent = resultRecords.get(0).getData(); + final List> filteredList = resultEvent.get(SOURCE_KEY, List.class); + assertThat(filteredList.size(), is(1)); + assertThat(filteredList.get(0).get("id"), is("CVE-1")); + } + + @Test + void test_isReadyForShutdown_returns_true() { + final FilterListProcessor processor = createObjectUnderTest(); + assertThat(processor.isReadyForShutdown(), is(true)); + } + + @Test + void test_outer_catch_block_adds_tags_on_failure_when_unexpected_exception_occurs() { + final List testTags = List.of("filter_list_failure"); + when(mockConfig.getTagsOnFailure()).thenReturn(testTags); + + final FilterListProcessor processor = createObjectUnderTest(); + + final List> identifiers = new ArrayList<>(); + identifiers.add(Map.of("id", "CVE-1", "type", "cve")); + + final Map data = new HashMap<>(); + data.put(SOURCE_KEY, identifiers); + final Event event = spy(JacksonEvent.builder().withData(data).withEventType("event").build()); + doThrow(new RuntimeException("unexpected error")).when(event).put(any(String.class), any()); + final Record testRecord = new Record<>(event); + + when(expressionEvaluator.evaluateConditional(eq(KEEP_WHEN_EXPRESSION), any(Event.class))) + .thenReturn(true); + + final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecords.size(), is(1)); + final Event resultEvent = resultRecords.get(0).getData(); + assertThat(resultEvent.getMetadata().getTags(), is(new HashSet<>(testTags))); + } + + @Test + void test_tags_not_added_when_tags_on_failure_is_null_and_source_is_not_a_list() { + final FilterListProcessor processor = createObjectUnderTest(); + + final Map data = Map.of(SOURCE_KEY, "not-a-list"); + final Event event = JacksonEvent.builder().withData(data).withEventType("event").build(); + final Record testRecord = new Record<>(event); + + final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecords.size(), is(1)); + final Event resultEvent = resultRecords.get(0).getData(); + assertThat(resultEvent.getMetadata().getTags().isEmpty(), is(true)); + } + + private FilterListProcessor createObjectUnderTest() { + return new FilterListProcessor(pluginMetrics, mockConfig, expressionEvaluator); + } + + private Record createTestRecord() { + final List> identifiers = new ArrayList<>(); + identifiers.add(Map.of("id", "CVE-1", "type", "cve")); + identifiers.add(Map.of("id", "CVE-2", "type", "cve")); + identifiers.add(Map.of("id", "CWE-1", "type", "cwe")); + + final Map data = Map.of(SOURCE_KEY, identifiers); + final Event event = JacksonEvent.builder() + .withData(data) + .withEventType("event") + .build(); + return new Record<>(event); + } +} From 9dccfcec10303f24b04707c3de15329e400e0978 Mon Sep 17 00:00:00 2001 From: nishantKadivar Date: Mon, 30 Mar 2026 11:14:49 +0530 Subject: [PATCH 2/5] filter_list: add failure visibility metadata, acknowledged filtering behaviour and addressed comments - Add processor-scoped metadata: - filter_list_processor_failed_elements_count - filter_list_processor_failed_elements - Keep per-element filtering behavior unchanged - Add TODO for future optimization of evaluation path overhead Signed-off-by: nishantKadivar --- .../mutateevent/FilterListProcessor.java | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessor.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessor.java index 5269ab4da5..5872d9a085 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessor.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessor.java @@ -36,6 +36,8 @@ public class FilterListProcessor extends AbstractProcessor, Record> { private static final Logger LOG = LoggerFactory.getLogger(FilterListProcessor.class); + private static final String FAILED_ELEMENTS_METADATA_KEY = "filter_list_processor_failed_elements"; + private static final String FAILED_ELEMENTS_COUNT_METADATA_KEY = "filter_list_processor_failed_elements_count"; private final FilterListProcessorConfig config; private final ExpressionEvaluator expressionEvaluator; private final String target; @@ -89,8 +91,8 @@ public Collection> doExecute(final Collection> recor } final List filteredList = new ArrayList<>(); - final JacksonEvent.Builder contextBuilder = JacksonEvent.builder() - .withEventType("event"); + final List failedElements = new ArrayList<>(); + int failedElementCount = 0; for (final Object element : sourceList) { @SuppressWarnings("unchecked") @@ -99,7 +101,10 @@ public Collection> doExecute(final Collection> recor : Collections.singletonMap("value", element); try { - final Event elementEvent = contextBuilder + // TODO: Revisit this per-element Event construction when ExpressionEvaluator/JsonPointer + // internals support a lighter evaluation path that avoids full tree conversion. + final Event elementEvent = JacksonEvent.builder() + .withEventType("event") .withData(contextMap) .build(); @@ -107,11 +112,19 @@ public Collection> doExecute(final Collection> recor filteredList.add(element); } } catch (final Exception e) { + failedElementCount++; + failedElements.add(element); LOG.warn(EVENT, "Error evaluating keep_when expression [{}] for element in source list at path [{}]", config.getKeepWhen(), config.getSource(), e); } } + if (failedElementCount > 0) { + addTagsOnFailure(recordEvent); + recordEvent.getMetadata().setAttribute(FAILED_ELEMENTS_COUNT_METADATA_KEY, failedElementCount); + recordEvent.getMetadata().setAttribute(FAILED_ELEMENTS_METADATA_KEY, failedElements); + } + recordEvent.put(target, filteredList); } catch (final Exception e) { From 76c041157bcfdd631c3b2e4596f258cf0cd7298d Mon Sep 17 00:00:00 2001 From: Manisha Yadav Date: Tue, 31 Mar 2026 12:50:11 +0000 Subject: [PATCH 3/5] Address review comments: move validation to config, add integration tests Signed-off-by: Manisha Yadav --- .../mutate-event-processors/build.gradle | 1 + .../mutateevent/FilterListProcessor.java | 20 +-- .../FilterListProcessorConfig.java | 21 ++- .../FilterListProcessorConfigTest.java | 56 +++++++- .../mutateevent/FilterListProcessorIT.java | 136 ++++++++++++++++++ .../mutateevent/FilterListProcessorTest.java | 4 +- .../mutateevent/filter_objects_by_status.yaml | 9 ++ .../mutateevent/filter_objects_to_target.yaml | 10 ++ .../filter_primitives_with_condition.yaml | 10 ++ 9 files changed, 245 insertions(+), 22 deletions(-) create mode 100644 data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorIT.java create mode 100644 data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_objects_by_status.yaml create mode 100644 data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_objects_to_target.yaml create mode 100644 data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_primitives_with_condition.yaml diff --git a/data-prepper-plugins/mutate-event-processors/build.gradle b/data-prepper-plugins/mutate-event-processors/build.gradle index fc419ce02b..c121097ebc 100644 --- a/data-prepper-plugins/mutate-event-processors/build.gradle +++ b/data-prepper-plugins/mutate-event-processors/build.gradle @@ -23,6 +23,7 @@ dependencies { implementation project(':data-prepper-plugins:common') implementation 'com.fasterxml.jackson.core:jackson-databind' testImplementation project(':data-prepper-test:test-event') + testImplementation project(':data-prepper-test:plugin-test-framework') testImplementation testLibs.slf4j.simple testImplementation testLibs.spring.test testImplementation project(':data-prepper-test:test-common') diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessor.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessor.java index 5872d9a085..34a9506a57 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessor.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessor.java @@ -15,7 +15,6 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; -import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.processor.AbstractProcessor; import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.model.record.Record; @@ -40,29 +39,14 @@ public class FilterListProcessor extends AbstractProcessor, Record private static final String FAILED_ELEMENTS_COUNT_METADATA_KEY = "filter_list_processor_failed_elements_count"; private final FilterListProcessorConfig config; private final ExpressionEvaluator expressionEvaluator; - private final String target; @DataPrepperPluginConstructor public FilterListProcessor(final PluginMetrics pluginMetrics, final FilterListProcessorConfig config, final ExpressionEvaluator expressionEvaluator) { super(pluginMetrics); this.config = config; this.expressionEvaluator = expressionEvaluator; - this.target = config.getTarget() != null ? config.getTarget() : config.getSource(); - - if (config.getFilterListWhen() != null - && !expressionEvaluator.isValidExpressionStatement(config.getFilterListWhen())) { - throw new InvalidPluginConfigurationException( - String.format("filter_list_when %s is not a valid expression statement. " + - "See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax", - config.getFilterListWhen())); - } - if (!expressionEvaluator.isValidExpressionStatement(config.getKeepWhen())) { - throw new InvalidPluginConfigurationException( - String.format("keep_when %s is not a valid expression statement. " + - "See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax", - config.getKeepWhen())); - } + config.validateExpressions(expressionEvaluator); } @Override @@ -125,7 +109,7 @@ public Collection> doExecute(final Collection> recor recordEvent.getMetadata().setAttribute(FAILED_ELEMENTS_METADATA_KEY, failedElements); } - recordEvent.put(target, filteredList); + recordEvent.put(config.getTarget(), filteredList); } catch (final Exception e) { LOG.atError() diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorConfig.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorConfig.java index 487528b2e4..b025b5c57f 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorConfig.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorConfig.java @@ -18,6 +18,9 @@ import org.opensearch.dataprepper.model.annotations.ExampleValues; import org.opensearch.dataprepper.model.annotations.ExampleValues.Example; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; + import java.util.List; @JsonPropertyOrder @@ -72,7 +75,7 @@ public String getSource() { } public String getTarget() { - return target; + return target != null ? target : source; } public String getKeepWhen() { @@ -86,4 +89,20 @@ public String getFilterListWhen() { public List getTagsOnFailure() { return tagsOnFailure; } + + void validateExpressions(final ExpressionEvaluator expressionEvaluator) { + if (getFilterListWhen() != null && !expressionEvaluator.isValidExpressionStatement(getFilterListWhen())) { + throw new InvalidPluginConfigurationException( + String.format("filter_list_when %s is not a valid expression statement. " + + "See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax", + getFilterListWhen())); + } + + if (!expressionEvaluator.isValidExpressionStatement(getKeepWhen())) { + throw new InvalidPluginConfigurationException( + String.format("keep_when %s is not a valid expression statement. " + + "See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax", + getKeepWhen())); + } + } } diff --git a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorConfigTest.java b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorConfigTest.java index fd5cbf59d4..69b0bfa30a 100644 --- a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorConfigTest.java +++ b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorConfigTest.java @@ -10,6 +10,11 @@ package org.opensearch.dataprepper.plugins.processor.mutateevent; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import java.lang.reflect.Field; import java.util.List; @@ -18,13 +23,30 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) class FilterListProcessorConfigTest { + @Mock + private ExpressionEvaluator expressionEvaluator; + + @Test + void test_default_target_returns_source_when_target_not_set() throws NoSuchFieldException, IllegalAccessException { + final FilterListProcessorConfig config = new FilterListProcessorConfig(); + setField(config, "source", "my_source"); + assertThat(config.getTarget(), is(equalTo("my_source"))); + } + @Test - void test_default_target_is_null() { + void test_target_returns_explicit_target_when_set() throws NoSuchFieldException, IllegalAccessException { final FilterListProcessorConfig config = new FilterListProcessorConfig(); - assertThat(config.getTarget(), is(nullValue())); + setField(config, "source", "my_source"); + setField(config, "target", "my_target"); + assertThat(config.getTarget(), is(equalTo("my_target"))); } @Test @@ -56,6 +78,36 @@ void test_getters_return_set_values() throws NoSuchFieldException, IllegalAccess assertThat(config.getTagsOnFailure(), is(equalTo(List.of("tag1")))); } + @Test + void test_validateExpressions_throws_when_keep_when_is_invalid() throws NoSuchFieldException, IllegalAccessException { + final FilterListProcessorConfig config = new FilterListProcessorConfig(); + setField(config, "keepWhen", "invalid"); + when(expressionEvaluator.isValidExpressionStatement("invalid")).thenReturn(false); + + assertThrows(InvalidPluginConfigurationException.class, () -> config.validateExpressions(expressionEvaluator)); + } + + @Test + void test_validateExpressions_throws_when_filter_list_when_is_invalid() throws NoSuchFieldException, IllegalAccessException { + final FilterListProcessorConfig config = new FilterListProcessorConfig(); + setField(config, "keepWhen", "/type == \"cve\""); + setField(config, "filterListWhen", "invalid"); + when(expressionEvaluator.isValidExpressionStatement("invalid")).thenReturn(false); + + assertThrows(InvalidPluginConfigurationException.class, () -> config.validateExpressions(expressionEvaluator)); + } + + @Test + void test_validateExpressions_does_not_throw_when_expressions_are_valid() throws NoSuchFieldException, IllegalAccessException { + final FilterListProcessorConfig config = new FilterListProcessorConfig(); + setField(config, "keepWhen", "/type == \"cve\""); + setField(config, "filterListWhen", "/enabled == true"); + when(expressionEvaluator.isValidExpressionStatement("/type == \"cve\"")).thenReturn(true); + when(expressionEvaluator.isValidExpressionStatement("/enabled == true")).thenReturn(true); + + assertDoesNotThrow(() -> config.validateExpressions(expressionEvaluator)); + } + private void setField(final Object object, final String fieldName, final Object value) throws NoSuchFieldException, IllegalAccessException { final Field field = object.getClass().getDeclaredField(fieldName); diff --git a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorIT.java b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorIT.java new file mode 100644 index 0000000000..050acd6533 --- /dev/null +++ b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorIT.java @@ -0,0 +1,136 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.processor.mutateevent; + +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventFactory; +import org.opensearch.dataprepper.model.event.LogEventBuilder; +import org.opensearch.dataprepper.model.processor.Processor; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.test.plugins.DataPrepperPluginTest; +import org.opensearch.dataprepper.test.plugins.PluginConfigurationFile; +import org.opensearch.dataprepper.test.plugins.junit.BaseDataPrepperPluginStandardTestSuite; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; + +@DataPrepperPluginTest(pluginName = "filter_list", pluginType = Processor.class) +class FilterListProcessorIT extends BaseDataPrepperPluginStandardTestSuite { + + @Test + void filters_objects_by_field_value( + @PluginConfigurationFile("filter_objects_by_status.yaml") final Processor, Record> objectUnderTest, + final EventFactory eventFactory) { + + final Event event = eventFactory.eventBuilder(LogEventBuilder.class) + .withData(Map.of("items", List.of( + Map.of("name", "item1", "status", "active"), + Map.of("name", "item2", "status", "inactive"), + Map.of("name", "item3", "status", "active") + ))) + .build(); + + final Collection> result = objectUnderTest.execute(Collections.singletonList(new Record<>(event))); + + assertThat(result, notNullValue()); + assertThat(result.size(), equalTo(1)); + + final Event resultEvent = result.iterator().next().getData(); + final List> filteredList = resultEvent.get("items", List.class); + assertThat(filteredList, notNullValue()); + assertThat(filteredList.size(), equalTo(2)); + assertThat(filteredList.get(0).get("name"), equalTo("item1")); + assertThat(filteredList.get(1).get("name"), equalTo("item3")); + } + + @Test + void filters_objects_to_separate_target( + @PluginConfigurationFile("filter_objects_to_target.yaml") final Processor, Record> objectUnderTest, + final EventFactory eventFactory) { + + final Event event = eventFactory.eventBuilder(LogEventBuilder.class) + .withData(Map.of("items", List.of( + Map.of("name", "item1", "status", "active"), + Map.of("name", "item2", "status", "inactive") + ))) + .build(); + + final Collection> result = objectUnderTest.execute(Collections.singletonList(new Record<>(event))); + + assertThat(result, notNullValue()); + assertThat(result.size(), equalTo(1)); + + final Event resultEvent = result.iterator().next().getData(); + + final List> originalList = resultEvent.get("items", List.class); + assertThat(originalList.size(), equalTo(2)); + + final List> filteredList = resultEvent.get("active_items", List.class); + assertThat(filteredList, notNullValue()); + assertThat(filteredList.size(), equalTo(1)); + assertThat(filteredList.get(0).get("name"), equalTo("item1")); + } + + @Test + void filters_primitives_with_filter_list_when_condition( + @PluginConfigurationFile("filter_primitives_with_condition.yaml") final Processor, Record> objectUnderTest, + final EventFactory eventFactory) { + + final Event event = eventFactory.eventBuilder(LogEventBuilder.class) + .withData(Map.of( + "scores", List.of(95, 30, 75, 10, 88), + "type", "grades" + )) + .build(); + + final Collection> result = objectUnderTest.execute(Collections.singletonList(new Record<>(event))); + + assertThat(result, notNullValue()); + assertThat(result.size(), equalTo(1)); + + final Event resultEvent = result.iterator().next().getData(); + final List filteredList = resultEvent.get("scores", List.class); + assertThat(filteredList, notNullValue()); + assertThat(filteredList.size(), equalTo(3)); + assertThat(filteredList.get(0), equalTo(95)); + assertThat(filteredList.get(1), equalTo(75)); + assertThat(filteredList.get(2), equalTo(88)); + } + + @Test + void skips_filtering_when_filter_list_when_is_false( + @PluginConfigurationFile("filter_primitives_with_condition.yaml") final Processor, Record> objectUnderTest, + final EventFactory eventFactory) { + + final Event event = eventFactory.eventBuilder(LogEventBuilder.class) + .withData(Map.of( + "scores", List.of(95, 30, 75, 10, 88), + "type", "not_grades" + )) + .build(); + + final Collection> result = objectUnderTest.execute(Collections.singletonList(new Record<>(event))); + + assertThat(result, notNullValue()); + assertThat(result.size(), equalTo(1)); + + final Event resultEvent = result.iterator().next().getData(); + final List scores = resultEvent.get("scores", List.class); + assertThat(scores, notNullValue()); + assertThat(scores.size(), equalTo(5)); + } +} diff --git a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorTest.java b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorTest.java index 5ce4afde2d..d55d9a7a36 100644 --- a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorTest.java +++ b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorTest.java @@ -59,11 +59,12 @@ class FilterListProcessorTest { @BeforeEach void setUp() { lenient().when(mockConfig.getSource()).thenReturn(SOURCE_KEY); - lenient().when(mockConfig.getTarget()).thenReturn(null); + lenient().when(mockConfig.getTarget()).thenReturn(SOURCE_KEY); lenient().when(mockConfig.getKeepWhen()).thenReturn(KEEP_WHEN_EXPRESSION); lenient().when(mockConfig.getFilterListWhen()).thenReturn(null); lenient().when(mockConfig.getTagsOnFailure()).thenReturn(null); lenient().when(expressionEvaluator.isValidExpressionStatement(KEEP_WHEN_EXPRESSION)).thenReturn(true); + lenient().doCallRealMethod().when(mockConfig).validateExpressions(expressionEvaluator); } @Test @@ -286,6 +287,7 @@ void test_filter_runs_when_filter_list_when_is_true() { void test_filter_with_nested_source_path() { final String nestedSource = "vulnerability/identifiers"; when(mockConfig.getSource()).thenReturn(nestedSource); + when(mockConfig.getTarget()).thenReturn(nestedSource); final FilterListProcessor processor = createObjectUnderTest(); diff --git a/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_objects_by_status.yaml b/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_objects_by_status.yaml new file mode 100644 index 0000000000..2bda179c92 --- /dev/null +++ b/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_objects_by_status.yaml @@ -0,0 +1,9 @@ +test-pipeline: + source: + unused: + processor: + - filter_list: + source: "items" + keep_when: '/status == "active"' + sink: + - unused: diff --git a/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_objects_to_target.yaml b/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_objects_to_target.yaml new file mode 100644 index 0000000000..8f23b7da39 --- /dev/null +++ b/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_objects_to_target.yaml @@ -0,0 +1,10 @@ +test-pipeline: + source: + unused: + processor: + - filter_list: + source: "items" + target: "active_items" + keep_when: '/status == "active"' + sink: + - unused: diff --git a/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_primitives_with_condition.yaml b/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_primitives_with_condition.yaml new file mode 100644 index 0000000000..137ae4f008 --- /dev/null +++ b/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_primitives_with_condition.yaml @@ -0,0 +1,10 @@ +test-pipeline: + source: + unused: + processor: + - filter_list: + source: "scores" + keep_when: '/value > 50' + filter_list_when: '/type == "grades"' + sink: + - unused: From 324a610d26a006cd92d3073d55b623a3d5e08b45 Mon Sep 17 00:00:00 2001 From: Manisha Yadav Date: Fri, 10 Apr 2026 09:39:41 +0000 Subject: [PATCH 4/5] Add license headers in the yaml files Signed-off-by: Manisha Yadav --- .../processor/mutateevent/filter_objects_by_status.yaml | 7 +++++++ .../processor/mutateevent/filter_objects_to_target.yaml | 7 +++++++ .../mutateevent/filter_primitives_with_condition.yaml | 7 +++++++ 3 files changed, 21 insertions(+) diff --git a/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_objects_by_status.yaml b/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_objects_by_status.yaml index 2bda179c92..4d08806230 100644 --- a/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_objects_by_status.yaml +++ b/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_objects_by_status.yaml @@ -1,3 +1,10 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + test-pipeline: source: unused: diff --git a/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_objects_to_target.yaml b/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_objects_to_target.yaml index 8f23b7da39..8a7dab9517 100644 --- a/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_objects_to_target.yaml +++ b/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_objects_to_target.yaml @@ -1,3 +1,10 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + test-pipeline: source: unused: diff --git a/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_primitives_with_condition.yaml b/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_primitives_with_condition.yaml index 137ae4f008..597e789032 100644 --- a/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_primitives_with_condition.yaml +++ b/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_primitives_with_condition.yaml @@ -1,3 +1,10 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + test-pipeline: source: unused: From b1224a687727c184a2741c5157ab9102d575bc72 Mon Sep 17 00:00:00 2001 From: nishantKadivar Date: Thu, 16 Apr 2026 18:52:28 +0530 Subject: [PATCH 5/5] Rename filter_list keys and update docs/tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Rename source → iterate_on, keep_when → keep_element_when - Update FilterListProcessorConfig fields/getters/validation - Update FilterListProcessor to use new getters - Add SENSITIVE marker to warning logs - Add TODO(#6609) reference in processor - Document /value for primitive elements in keep_element_when - Update README examples and configuration docs - Update all tests and YAML fixtures to new key names - Verified: :data-prepper-plugins:mutate-event-processors:test (PASS) Signed-off-by: nishantKadivar --- .../mutate-event-processors/README.md | 34 +++++++------- .../mutateevent/FilterListProcessor.java | 29 ++++++++---- .../FilterListProcessorConfig.java | 31 +++++++------ .../FilterListProcessorConfigTest.java | 18 ++++---- .../mutateevent/FilterListProcessorTest.java | 46 +++++++++---------- .../mutateevent/filter_objects_by_status.yaml | 4 +- .../mutateevent/filter_objects_to_target.yaml | 4 +- .../filter_primitives_with_condition.yaml | 4 +- 8 files changed, 93 insertions(+), 77 deletions(-) diff --git a/data-prepper-plugins/mutate-event-processors/README.md b/data-prepper-plugins/mutate-event-processors/README.md index 8fc8f5fb3c..f25ff04f5f 100644 --- a/data-prepper-plugins/mutate-event-processors/README.md +++ b/data-prepper-plugins/mutate-event-processors/README.md @@ -822,8 +822,8 @@ pipeline: format: "json" processor: - filter_list: - source: "items" - keep_when: '/status == "active"' + iterate_on: "items" + keep_element_when: '/status == "active"' sink: - stdout: ``` @@ -847,9 +847,9 @@ You can write the filtered result to a different key, leaving the original array ```yaml processor: - filter_list: - source: "items" + iterate_on: "items" target: "active_items" - keep_when: '/status == "active"' + keep_element_when: '/status == "active"' ``` With the same input, the output will be: @@ -868,8 +868,8 @@ For arrays of primitives (strings, numbers, booleans), each element is accessibl ```yaml processor: - filter_list: - source: "tags" - keep_when: '/value != ""' + iterate_on: "tags" + keep_element_when: '/value != ""' ``` With the following input: @@ -889,8 +889,8 @@ Another example filtering numbers: ```yaml processor: - filter_list: - source: "scores" - keep_when: '/value > 50' + iterate_on: "scores" + keep_element_when: '/value > 50' ``` With the following input: @@ -907,13 +907,13 @@ The output will be: ### Using both conditions -The `filter_list_when` condition controls whether the processor runs at all (evaluated against the root event), while `keep_when` controls which elements are kept (evaluated per element): +The `filter_list_when` condition controls whether the processor runs at all (evaluated against the root event), while `keep_element_when` controls which elements are kept (evaluated per element): ```yaml processor: - filter_list: - source: "items" - keep_when: '/status == "active"' + iterate_on: "items" + keep_element_when: '/status == "active"' filter_list_when: '/env == "production"' ``` @@ -942,16 +942,16 @@ The processor is skipped entirely and the event passes through unchanged: ``` ### Configuration -* `source` - (required) - The key of the array field to filter. Supports nested paths (e.g. `outer_key/inner_list`). -* `target` - (optional) - The key to write the filtered array to. Defaults to the `source` key (in-place filtering). Supports nested paths. -* `keep_when` - (required) - A [Data Prepper expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/) evaluated per element. Elements where this expression evaluates to `true` are kept. For object elements, the expression is evaluated against the object's fields directly (e.g. `/status == "active"`). For primitive elements, the value is accessible via `/value` (e.g. `/value > 50`). When no elements match, the result is an empty list `[]`. +* `iterate_on` - (required) - The key of the array field to filter. Supports nested paths (e.g. `outer_key/inner_list`). +* `target` - (optional) - The key to write the filtered array to. Defaults to the `iterate_on` key (in-place filtering). Supports nested paths. +* `keep_element_when` - (required) - A [Data Prepper expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/) evaluated per element. Elements where this expression evaluates to `true` are kept. For object elements, the expression is evaluated against the object's fields directly (e.g. `/status == "active"`). For primitive elements, the value is accessible via `/value` (e.g. `/value > 50`). When no elements match, the result is an empty list `[]`. * `filter_list_when` - (optional) - A [Data Prepper expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/) evaluated against the root event. When provided, the processor only runs if this condition is `true`. By default, all events are processed. * `tags_on_failure` - (optional) - A list of tags to add to the event metadata when the processor fails to process the event. **Edge case behavior:** -- If the `source` key does not exist or its value is `null`, the processor is a no-op and the event passes through unchanged. -- If the `source` value is not a list (e.g. a string or number), the processor logs a warning and adds `tags_on_failure` if configured. -- `null` elements within the list are evaluated normally. For example, with `keep_when: '/value != null'`, null elements are filtered out while non-null elements are kept. +- If the `iterate_on` key does not exist or its value is `null`, the processor is a no-op and the event passes through unchanged. +- If the `iterate_on` value is not a list (e.g. a string or number), the processor logs a warning and adds `tags_on_failure` if configured. +- `null` elements within the list are evaluated normally. For example, with `keep_element_when: '/value != null'`, null elements are filtered out while non-null elements are kept. ___ diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessor.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessor.java index 34a9506a57..b4cae031da 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessor.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessor.java @@ -30,6 +30,7 @@ import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT; import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY; +import static org.opensearch.dataprepper.logging.DataPrepperMarkers.SENSITIVE; @DataPrepperPlugin(name = "filter_list", pluginType = Processor.class, pluginConfigurationType = FilterListProcessorConfig.class) public class FilterListProcessor extends AbstractProcessor, Record> { @@ -61,16 +62,22 @@ public Collection> doExecute(final Collection> recor final List sourceList; try { - sourceList = recordEvent.get(config.getSource(), List.class); + sourceList = recordEvent.get(config.getIterateOn(), List.class); } catch (final Exception e) { - LOG.warn(EVENT, "Given source path [{}] is not valid on record [{}]", - config.getSource(), recordEvent, e); + LOG.atWarn() + .addMarker(EVENT) + .addMarker(SENSITIVE) + .setMessage("Given source path [{}] is not valid on record [{}]") + .addArgument(config.getIterateOn()) + .addArgument(recordEvent) + .setCause(e) + .log(); addTagsOnFailure(recordEvent); continue; } if (sourceList == null) { - LOG.debug("Source list at path [{}] is null, skipping event", config.getSource()); + LOG.debug("Source list at path [{}] is null, skipping event", config.getIterateOn()); continue; } @@ -85,21 +92,27 @@ public Collection> doExecute(final Collection> recor : Collections.singletonMap("value", element); try { - // TODO: Revisit this per-element Event construction when ExpressionEvaluator/JsonPointer + // TODO(#6609): Revisit this per-element Event construction when ExpressionEvaluator/JsonPointer // internals support a lighter evaluation path that avoids full tree conversion. final Event elementEvent = JacksonEvent.builder() .withEventType("event") .withData(contextMap) .build(); - if (expressionEvaluator.evaluateConditional(config.getKeepWhen(), elementEvent)) { + if (expressionEvaluator.evaluateConditional(config.getKeepElementWhen(), elementEvent)) { filteredList.add(element); } } catch (final Exception e) { failedElementCount++; failedElements.add(element); - LOG.warn(EVENT, "Error evaluating keep_when expression [{}] for element in source list at path [{}]", - config.getKeepWhen(), config.getSource(), e); + LOG.atWarn() + .addMarker(EVENT) + .addMarker(SENSITIVE) + .setMessage("Error evaluating keep_element_when expression [{}] for element in source list at path [{}]") + .addArgument(config.getKeepElementWhen()) + .addArgument(config.getIterateOn()) + .setCause(e) + .log(); } } diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorConfig.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorConfig.java index b025b5c57f..2d020b3080 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorConfig.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorConfig.java @@ -30,13 +30,13 @@ public class FilterListProcessorConfig { @NotNull @NotEmpty - @JsonProperty("source") + @JsonProperty("iterate_on") @JsonPropertyDescription("The key of the array field to filter. Supports nested paths.") @ExampleValues({ @Example(value = "my-list", description = "Filters the 'my-list' array at the root of the event."), @Example(value = "outer-key/my-list", description = "Filters the 'my-list' array nested under 'outer-key'.") }) - private String source; + private String iterateOn; @JsonProperty("target") @JsonPropertyDescription("The key to write the filtered array to. Defaults to the source key (in-place). " + @@ -48,14 +48,17 @@ public class FilterListProcessorConfig { @NotNull @NotEmpty - @JsonProperty("keep_when") + @JsonProperty("keep_element_when") @JsonPropertyDescription("An expression evaluated per element. Elements where this expression evaluates to true are kept. " + - "The expression is evaluated against each element of the array as if it were a standalone event.") + "The expression is evaluated against each element of the array as if it were a standalone event. " + + "For primitive elements (strings, numbers, booleans), the element value is accessible via the key /value.") @ExampleValues({ @Example(value = "/status == \"active\"", description = "Keeps only elements where 'status' equals 'active'."), - @Example(value = "/score > 50", description = "Keeps only elements where 'score' is greater than 50.") + @Example(value = "/score > 50", description = "Keeps only elements where 'score' is greater than 50."), + @Example(value = "/value > 50", description = "Keeps only primitive elements where the value is greater than 50."), + @Example(value = "/value != \"\"", description = "Keeps only non-empty string primitive elements.") }) - private String keepWhen; + private String keepElementWhen; @JsonProperty("filter_list_when") @JsonPropertyDescription("A conditional expression, " + @@ -70,16 +73,16 @@ public class FilterListProcessorConfig { @JsonPropertyDescription("A list of tags to add to the event metadata when the event fails to process.") private List tagsOnFailure; - public String getSource() { - return source; + public String getIterateOn() { + return iterateOn; } public String getTarget() { - return target != null ? target : source; + return target != null ? target : iterateOn; } - public String getKeepWhen() { - return keepWhen; + public String getKeepElementWhen() { + return keepElementWhen; } public String getFilterListWhen() { @@ -98,11 +101,11 @@ void validateExpressions(final ExpressionEvaluator expressionEvaluator) { getFilterListWhen())); } - if (!expressionEvaluator.isValidExpressionStatement(getKeepWhen())) { + if (!expressionEvaluator.isValidExpressionStatement(getKeepElementWhen())) { throw new InvalidPluginConfigurationException( - String.format("keep_when %s is not a valid expression statement. " + + String.format("keep_element_when %s is not a valid expression statement. " + "See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax", - getKeepWhen())); + getKeepElementWhen())); } } } diff --git a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorConfigTest.java b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorConfigTest.java index 69b0bfa30a..60ab1b314e 100644 --- a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorConfigTest.java +++ b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorConfigTest.java @@ -37,14 +37,14 @@ class FilterListProcessorConfigTest { @Test void test_default_target_returns_source_when_target_not_set() throws NoSuchFieldException, IllegalAccessException { final FilterListProcessorConfig config = new FilterListProcessorConfig(); - setField(config, "source", "my_source"); + setField(config, "iterateOn", "my_source"); assertThat(config.getTarget(), is(equalTo("my_source"))); } @Test void test_target_returns_explicit_target_when_set() throws NoSuchFieldException, IllegalAccessException { final FilterListProcessorConfig config = new FilterListProcessorConfig(); - setField(config, "source", "my_source"); + setField(config, "iterateOn", "my_source"); setField(config, "target", "my_target"); assertThat(config.getTarget(), is(equalTo("my_target"))); } @@ -65,15 +65,15 @@ void test_default_tags_on_failure_is_null() { void test_getters_return_set_values() throws NoSuchFieldException, IllegalAccessException { final FilterListProcessorConfig config = new FilterListProcessorConfig(); - setField(config, "source", "my_source"); + setField(config, "iterateOn", "my_source"); setField(config, "target", "my_target"); - setField(config, "keepWhen", "/type == \"cve\""); + setField(config, "keepElementWhen", "/type == \"cve\""); setField(config, "filterListWhen", "/enabled == true"); setField(config, "tagsOnFailure", List.of("tag1")); - assertThat(config.getSource(), is(equalTo("my_source"))); + assertThat(config.getIterateOn(), is(equalTo("my_source"))); assertThat(config.getTarget(), is(equalTo("my_target"))); - assertThat(config.getKeepWhen(), is(equalTo("/type == \"cve\""))); + assertThat(config.getKeepElementWhen(), is(equalTo("/type == \"cve\""))); assertThat(config.getFilterListWhen(), is(equalTo("/enabled == true"))); assertThat(config.getTagsOnFailure(), is(equalTo(List.of("tag1")))); } @@ -81,7 +81,7 @@ void test_getters_return_set_values() throws NoSuchFieldException, IllegalAccess @Test void test_validateExpressions_throws_when_keep_when_is_invalid() throws NoSuchFieldException, IllegalAccessException { final FilterListProcessorConfig config = new FilterListProcessorConfig(); - setField(config, "keepWhen", "invalid"); + setField(config, "keepElementWhen", "invalid"); when(expressionEvaluator.isValidExpressionStatement("invalid")).thenReturn(false); assertThrows(InvalidPluginConfigurationException.class, () -> config.validateExpressions(expressionEvaluator)); @@ -90,7 +90,7 @@ void test_validateExpressions_throws_when_keep_when_is_invalid() throws NoSuchFi @Test void test_validateExpressions_throws_when_filter_list_when_is_invalid() throws NoSuchFieldException, IllegalAccessException { final FilterListProcessorConfig config = new FilterListProcessorConfig(); - setField(config, "keepWhen", "/type == \"cve\""); + setField(config, "keepElementWhen", "/type == \"cve\""); setField(config, "filterListWhen", "invalid"); when(expressionEvaluator.isValidExpressionStatement("invalid")).thenReturn(false); @@ -100,7 +100,7 @@ void test_validateExpressions_throws_when_filter_list_when_is_invalid() throws N @Test void test_validateExpressions_does_not_throw_when_expressions_are_valid() throws NoSuchFieldException, IllegalAccessException { final FilterListProcessorConfig config = new FilterListProcessorConfig(); - setField(config, "keepWhen", "/type == \"cve\""); + setField(config, "keepElementWhen", "/type == \"cve\""); setField(config, "filterListWhen", "/enabled == true"); when(expressionEvaluator.isValidExpressionStatement("/type == \"cve\"")).thenReturn(true); when(expressionEvaluator.isValidExpressionStatement("/enabled == true")).thenReturn(true); diff --git a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorTest.java b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorTest.java index d55d9a7a36..c2d4668c32 100644 --- a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorTest.java +++ b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorTest.java @@ -53,24 +53,24 @@ class FilterListProcessorTest { @Mock private ExpressionEvaluator expressionEvaluator; - private static final String KEEP_WHEN_EXPRESSION = "/type == \"cve\""; + private static final String KEEP_ELEMENT_WHEN_EXPRESSION = "/type == \"cve\""; private static final String SOURCE_KEY = "identifiers"; @BeforeEach void setUp() { - lenient().when(mockConfig.getSource()).thenReturn(SOURCE_KEY); + lenient().when(mockConfig.getIterateOn()).thenReturn(SOURCE_KEY); lenient().when(mockConfig.getTarget()).thenReturn(SOURCE_KEY); - lenient().when(mockConfig.getKeepWhen()).thenReturn(KEEP_WHEN_EXPRESSION); + lenient().when(mockConfig.getKeepElementWhen()).thenReturn(KEEP_ELEMENT_WHEN_EXPRESSION); lenient().when(mockConfig.getFilterListWhen()).thenReturn(null); lenient().when(mockConfig.getTagsOnFailure()).thenReturn(null); - lenient().when(expressionEvaluator.isValidExpressionStatement(KEEP_WHEN_EXPRESSION)).thenReturn(true); + lenient().when(expressionEvaluator.isValidExpressionStatement(KEEP_ELEMENT_WHEN_EXPRESSION)).thenReturn(true); lenient().doCallRealMethod().when(mockConfig).validateExpressions(expressionEvaluator); } @Test void invalid_keep_when_throws_InvalidPluginConfigurationException() { final String invalidExpression = UUID.randomUUID().toString(); - when(mockConfig.getKeepWhen()).thenReturn(invalidExpression); + when(mockConfig.getKeepElementWhen()).thenReturn(invalidExpression); when(expressionEvaluator.isValidExpressionStatement(invalidExpression)).thenReturn(false); assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest); @@ -90,7 +90,7 @@ void test_filter_keeps_matching_elements_in_place() { final FilterListProcessor processor = createObjectUnderTest(); final Record testRecord = createTestRecord(); - when(expressionEvaluator.evaluateConditional(eq(KEEP_WHEN_EXPRESSION), any(Event.class))) + when(expressionEvaluator.evaluateConditional(eq(KEEP_ELEMENT_WHEN_EXPRESSION), any(Event.class))) .thenAnswer(invocation -> { Event event = invocation.getArgument(1); return "cve".equals(event.get("type", String.class)); @@ -115,7 +115,7 @@ void test_filter_writes_to_different_target() { final FilterListProcessor processor = createObjectUnderTest(); final Record testRecord = createTestRecord(); - when(expressionEvaluator.evaluateConditional(eq(KEEP_WHEN_EXPRESSION), any(Event.class))) + when(expressionEvaluator.evaluateConditional(eq(KEEP_ELEMENT_WHEN_EXPRESSION), any(Event.class))) .thenAnswer(invocation -> { Event event = invocation.getArgument(1); return "cve".equals(event.get("type", String.class)); @@ -146,7 +146,7 @@ void test_filter_writes_to_nested_target() { final FilterListProcessor processor = createObjectUnderTest(); final Record testRecord = createTestRecord(); - when(expressionEvaluator.evaluateConditional(eq(KEEP_WHEN_EXPRESSION), any(Event.class))) + when(expressionEvaluator.evaluateConditional(eq(KEEP_ELEMENT_WHEN_EXPRESSION), any(Event.class))) .thenAnswer(invocation -> { Event event = invocation.getArgument(1); return "cve".equals(event.get("type", String.class)); @@ -166,7 +166,7 @@ void test_filter_removes_all_elements_when_none_match() { final FilterListProcessor processor = createObjectUnderTest(); final Record testRecord = createTestRecord(); - when(expressionEvaluator.evaluateConditional(eq(KEEP_WHEN_EXPRESSION), any(Event.class))) + when(expressionEvaluator.evaluateConditional(eq(KEEP_ELEMENT_WHEN_EXPRESSION), any(Event.class))) .thenReturn(false); final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); @@ -183,7 +183,7 @@ void test_filter_keeps_all_elements_when_all_match() { final FilterListProcessor processor = createObjectUnderTest(); final Record testRecord = createTestRecord(); - when(expressionEvaluator.evaluateConditional(eq(KEEP_WHEN_EXPRESSION), any(Event.class))) + when(expressionEvaluator.evaluateConditional(eq(KEEP_ELEMENT_WHEN_EXPRESSION), any(Event.class))) .thenReturn(true); final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); @@ -271,7 +271,7 @@ void test_filter_runs_when_filter_list_when_is_true() { final Record testRecord = createTestRecord(); when(expressionEvaluator.evaluateConditional(filterListWhen, testRecord.getData())).thenReturn(true); - when(expressionEvaluator.evaluateConditional(eq(KEEP_WHEN_EXPRESSION), any(Event.class))) + when(expressionEvaluator.evaluateConditional(eq(KEEP_ELEMENT_WHEN_EXPRESSION), any(Event.class))) .thenReturn(false); final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); @@ -286,7 +286,7 @@ void test_filter_runs_when_filter_list_when_is_true() { @Test void test_filter_with_nested_source_path() { final String nestedSource = "vulnerability/identifiers"; - when(mockConfig.getSource()).thenReturn(nestedSource); + when(mockConfig.getIterateOn()).thenReturn(nestedSource); when(mockConfig.getTarget()).thenReturn(nestedSource); final FilterListProcessor processor = createObjectUnderTest(); @@ -299,7 +299,7 @@ void test_filter_with_nested_source_path() { final Event event = JacksonEvent.builder().withData(data).withEventType("event").build(); final Record testRecord = new Record<>(event); - when(expressionEvaluator.evaluateConditional(eq(KEEP_WHEN_EXPRESSION), any(Event.class))) + when(expressionEvaluator.evaluateConditional(eq(KEEP_ELEMENT_WHEN_EXPRESSION), any(Event.class))) .thenAnswer(invocation -> { Event e = invocation.getArgument(1); return "cve".equals(e.get("type", String.class)); @@ -329,7 +329,7 @@ void test_filter_with_mixed_map_and_primitive_elements() { final Event event = JacksonEvent.builder().withData(data).withEventType("event").build(); final Record testRecord = new Record<>(event); - when(expressionEvaluator.evaluateConditional(eq(KEEP_WHEN_EXPRESSION), any(Event.class))) + when(expressionEvaluator.evaluateConditional(eq(KEEP_ELEMENT_WHEN_EXPRESSION), any(Event.class))) .thenReturn(true); final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); @@ -344,7 +344,7 @@ void test_filter_with_mixed_map_and_primitive_elements() { @Test void test_filter_primitive_string_list() { final String keepWhen = "/value != \"\""; - when(mockConfig.getKeepWhen()).thenReturn(keepWhen); + when(mockConfig.getKeepElementWhen()).thenReturn(keepWhen); when(expressionEvaluator.isValidExpressionStatement(keepWhen)).thenReturn(true); final FilterListProcessor processor = createObjectUnderTest(); @@ -379,7 +379,7 @@ void test_filter_primitive_string_list() { @Test void test_filter_primitive_number_list() { final String keepWhen = "/value > 0"; - when(mockConfig.getKeepWhen()).thenReturn(keepWhen); + when(mockConfig.getKeepElementWhen()).thenReturn(keepWhen); when(expressionEvaluator.isValidExpressionStatement(keepWhen)).thenReturn(true); final FilterListProcessor processor = createObjectUnderTest(); @@ -415,7 +415,7 @@ void test_filter_primitive_number_list() { @Test void test_filter_primitive_list_with_nulls() { final String keepWhen = "/value != null"; - when(mockConfig.getKeepWhen()).thenReturn(keepWhen); + when(mockConfig.getKeepElementWhen()).thenReturn(keepWhen); when(expressionEvaluator.isValidExpressionStatement(keepWhen)).thenReturn(true); final FilterListProcessor processor = createObjectUnderTest(); @@ -451,7 +451,7 @@ void test_filter_continues_processing_when_expression_throws_for_one_element() { final FilterListProcessor processor = createObjectUnderTest(); final Record testRecord = createTestRecord(); - when(expressionEvaluator.evaluateConditional(eq(KEEP_WHEN_EXPRESSION), any(Event.class))) + when(expressionEvaluator.evaluateConditional(eq(KEEP_ELEMENT_WHEN_EXPRESSION), any(Event.class))) .thenReturn(true) .thenThrow(new RuntimeException("expression error")) .thenReturn(true); @@ -472,7 +472,7 @@ void test_filter_processes_multiple_records() { final Record record1 = createTestRecord(); final Record record2 = createTestRecord(); - when(expressionEvaluator.evaluateConditional(eq(KEEP_WHEN_EXPRESSION), any(Event.class))) + when(expressionEvaluator.evaluateConditional(eq(KEEP_ELEMENT_WHEN_EXPRESSION), any(Event.class))) .thenAnswer(invocation -> { Event event = invocation.getArgument(1); return "cve".equals(event.get("type", String.class)); @@ -497,7 +497,7 @@ void test_filter_list_when_true_and_keep_when_selectively_filters() { final Record testRecord = createTestRecord(); when(expressionEvaluator.evaluateConditional(filterListWhen, testRecord.getData())).thenReturn(true); - when(expressionEvaluator.evaluateConditional(eq(KEEP_WHEN_EXPRESSION), any(Event.class))) + when(expressionEvaluator.evaluateConditional(eq(KEEP_ELEMENT_WHEN_EXPRESSION), any(Event.class))) .thenAnswer(invocation -> { Event event = invocation.getArgument(1); return "cve".equals(event.get("type", String.class)); @@ -535,7 +535,7 @@ void test_multiple_records_with_different_filter_list_when_outcomes() { when(expressionEvaluator.evaluateConditional(filterListWhen, record1.getData())).thenReturn(true); when(expressionEvaluator.evaluateConditional(filterListWhen, record2.getData())).thenReturn(false); - when(expressionEvaluator.evaluateConditional(eq(KEEP_WHEN_EXPRESSION), any(Event.class))) + when(expressionEvaluator.evaluateConditional(eq(KEEP_ELEMENT_WHEN_EXPRESSION), any(Event.class))) .thenAnswer(invocation -> { Event event = invocation.getArgument(1); return "cve".equals(event.get("type", String.class)); @@ -584,7 +584,7 @@ void test_filter_with_single_element_list() { final Event event = JacksonEvent.builder().withData(data).withEventType("event").build(); final Record testRecord = new Record<>(event); - when(expressionEvaluator.evaluateConditional(eq(KEEP_WHEN_EXPRESSION), any(Event.class))) + when(expressionEvaluator.evaluateConditional(eq(KEEP_ELEMENT_WHEN_EXPRESSION), any(Event.class))) .thenReturn(true); final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); @@ -618,7 +618,7 @@ void test_outer_catch_block_adds_tags_on_failure_when_unexpected_exception_occur doThrow(new RuntimeException("unexpected error")).when(event).put(any(String.class), any()); final Record testRecord = new Record<>(event); - when(expressionEvaluator.evaluateConditional(eq(KEEP_WHEN_EXPRESSION), any(Event.class))) + when(expressionEvaluator.evaluateConditional(eq(KEEP_ELEMENT_WHEN_EXPRESSION), any(Event.class))) .thenReturn(true); final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); diff --git a/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_objects_by_status.yaml b/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_objects_by_status.yaml index 4d08806230..d342c0b316 100644 --- a/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_objects_by_status.yaml +++ b/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_objects_by_status.yaml @@ -10,7 +10,7 @@ test-pipeline: unused: processor: - filter_list: - source: "items" - keep_when: '/status == "active"' + iterate_on: "items" + keep_element_when: '/status == "active"' sink: - unused: diff --git a/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_objects_to_target.yaml b/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_objects_to_target.yaml index 8a7dab9517..ad27ef9119 100644 --- a/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_objects_to_target.yaml +++ b/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_objects_to_target.yaml @@ -10,8 +10,8 @@ test-pipeline: unused: processor: - filter_list: - source: "items" + iterate_on: "items" target: "active_items" - keep_when: '/status == "active"' + keep_element_when: '/status == "active"' sink: - unused: diff --git a/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_primitives_with_condition.yaml b/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_primitives_with_condition.yaml index 597e789032..bc369dda12 100644 --- a/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_primitives_with_condition.yaml +++ b/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_primitives_with_condition.yaml @@ -10,8 +10,8 @@ test-pipeline: unused: processor: - filter_list: - source: "scores" - keep_when: '/value > 50' + iterate_on: "scores" + keep_element_when: '/value > 50' filter_list_when: '/type == "grades"' sink: - unused: