diff --git a/data-prepper-plugins/mutate-event-processors/README.md b/data-prepper-plugins/mutate-event-processors/README.md index 9f8a5af4a1..f25ff04f5f 100644 --- a/data-prepper-plugins/mutate-event-processors/README.md +++ b/data-prepper-plugins/mutate-event-processors/README.md @@ -808,6 +808,153 @@ After `add_entries`: ___ +## FilterListProcessor +A processor that filters elements within an array field by evaluating a condition against each element, keeping only those where the condition is true. It supports arrays of objects and arrays of primitives (strings, numbers, booleans). + +### Basic Usage +To get started, create the following `pipeline.yaml`. +```yaml +pipeline: + source: + file: + path: "/full/path/to/logs_json.log" + record_type: "event" + format: "json" + processor: + - filter_list: + iterate_on: "items" + keep_element_when: '/status == "active"' + sink: + - stdout: +``` + +Create the following file named `logs_json.log` and replace the `path` in the file source of your `pipeline.yaml` with the path of this file. + +```json +{"items": [{"name": "item1", "status": "active"}, {"name": "item2", "status": "inactive"}, {"name": "item3", "status": "active"}]} +``` + +When run, the processor will filter the array in-place and produce the following output: + +```json +{"items": [{"name": "item1", "status": "active"}, {"name": "item3", "status": "active"}]} +``` + +### Filtering to a different target + +You can write the filtered result to a different key, leaving the original array unchanged: + +```yaml + processor: + - filter_list: + iterate_on: "items" + target: "active_items" + keep_element_when: '/status == "active"' +``` + +With the same input, the output will be: + +```json +{ + "items": [{"name": "item1", "status": "active"}, {"name": "item2", "status": "inactive"}, {"name": "item3", "status": "active"}], + "active_items": [{"name": "item1", "status": "active"}, {"name": "item3", "status": "active"}] +} +``` + +### Filtering primitive arrays + +For arrays of primitives (strings, numbers, booleans), each element is accessible via the `/value` key in the expression: + +```yaml + processor: + - filter_list: + iterate_on: "tags" + keep_element_when: '/value != ""' +``` + +With the following input: + +```json +{"tags": ["important", "", "urgent", ""]} +``` + +The output will be: + +```json +{"tags": ["important", "urgent"]} +``` + +Another example filtering numbers: + +```yaml + processor: + - filter_list: + iterate_on: "scores" + keep_element_when: '/value > 50' +``` + +With the following input: + +```json +{"scores": [90, 30, 75, 10]} +``` + +The output will be: + +```json +{"scores": [90, 75]} +``` + +### Using both conditions + +The `filter_list_when` condition controls whether the processor runs at all (evaluated against the root event), while `keep_element_when` controls which elements are kept (evaluated per element): + +```yaml + processor: + - filter_list: + iterate_on: "items" + keep_element_when: '/status == "active"' + filter_list_when: '/env == "production"' +``` + +With the following input: + +```json +{"env": "production", "items": [{"name": "item1", "status": "active"}, {"name": "item2", "status": "inactive"}]} +``` + +Since `env` is `"production"`, the processor runs and filters by `status`, producing: + +```json +{"env": "production", "items": [{"name": "item1", "status": "active"}]} +``` + +With a different event where `filter_list_when` evaluates to `false`: + +```json +{"env": "staging", "items": [{"name": "item1", "status": "active"}, {"name": "item2", "status": "inactive"}]} +``` + +The processor is skipped entirely and the event passes through unchanged: + +```json +{"env": "staging", "items": [{"name": "item1", "status": "active"}, {"name": "item2", "status": "inactive"}]} +``` + +### Configuration +* `iterate_on` - (required) - The key of the array field to filter. Supports nested paths (e.g. `outer_key/inner_list`). +* `target` - (optional) - The key to write the filtered array to. Defaults to the `iterate_on` key (in-place filtering). Supports nested paths. +* `keep_element_when` - (required) - A [Data Prepper expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/) evaluated per element. Elements where this expression evaluates to `true` are kept. For object elements, the expression is evaluated against the object's fields directly (e.g. `/status == "active"`). For primitive elements, the value is accessible via `/value` (e.g. `/value > 50`). When no elements match, the result is an empty list `[]`. +* `filter_list_when` - (optional) - A [Data Prepper expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/) evaluated against the root event. When provided, the processor only runs if this condition is `true`. By default, all events are processed. +* `tags_on_failure` - (optional) - A list of tags to add to the event metadata when the processor fails to process the event. + +**Edge case behavior:** +- If the `iterate_on` key does not exist or its value is `null`, the processor is a no-op and the event passes through unchanged. +- If the `iterate_on` value is not a list (e.g. a string or number), the processor logs a warning and adds `tags_on_failure` if configured. +- `null` elements within the list are evaluated normally. For example, with `keep_element_when: '/value != null'`, null elements are filtered out while non-null elements are kept. + +___ + ## Developer Guide This plugin is compatible with Java 11 and 17. Refer to the following developer guides for plugin development: - [Developer Guide](https://github.com/opensearch-project/data-prepper/blob/main/docs/developer_guide.md) diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessor.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessor.java new file mode 100644 index 0000000000..b4cae031da --- /dev/null +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessor.java @@ -0,0 +1,159 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.processor.mutateevent; + +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.processor.AbstractProcessor; +import org.opensearch.dataprepper.model.processor.Processor; +import org.opensearch.dataprepper.model.record.Record; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT; +import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY; +import static org.opensearch.dataprepper.logging.DataPrepperMarkers.SENSITIVE; + +@DataPrepperPlugin(name = "filter_list", pluginType = Processor.class, pluginConfigurationType = FilterListProcessorConfig.class) +public class FilterListProcessor extends AbstractProcessor, Record> { + + private static final Logger LOG = LoggerFactory.getLogger(FilterListProcessor.class); + private static final String FAILED_ELEMENTS_METADATA_KEY = "filter_list_processor_failed_elements"; + private static final String FAILED_ELEMENTS_COUNT_METADATA_KEY = "filter_list_processor_failed_elements_count"; + private final FilterListProcessorConfig config; + private final ExpressionEvaluator expressionEvaluator; + + @DataPrepperPluginConstructor + public FilterListProcessor(final PluginMetrics pluginMetrics, final FilterListProcessorConfig config, final ExpressionEvaluator expressionEvaluator) { + super(pluginMetrics); + this.config = config; + this.expressionEvaluator = expressionEvaluator; + + config.validateExpressions(expressionEvaluator); + } + + @Override + public Collection> doExecute(final Collection> records) { + for (final Record record : records) { + final Event recordEvent = record.getData(); + + try { + if (Objects.nonNull(config.getFilterListWhen()) && !expressionEvaluator.evaluateConditional(config.getFilterListWhen(), recordEvent)) { + continue; + } + + final List sourceList; + try { + sourceList = recordEvent.get(config.getIterateOn(), List.class); + } catch (final Exception e) { + LOG.atWarn() + .addMarker(EVENT) + .addMarker(SENSITIVE) + .setMessage("Given source path [{}] is not valid on record [{}]") + .addArgument(config.getIterateOn()) + .addArgument(recordEvent) + .setCause(e) + .log(); + addTagsOnFailure(recordEvent); + continue; + } + + if (sourceList == null) { + LOG.debug("Source list at path [{}] is null, skipping event", config.getIterateOn()); + continue; + } + + final List filteredList = new ArrayList<>(); + final List failedElements = new ArrayList<>(); + int failedElementCount = 0; + + for (final Object element : sourceList) { + @SuppressWarnings("unchecked") + final Map contextMap = element instanceof Map + ? (Map) element + : Collections.singletonMap("value", element); + + try { + // TODO(#6609): Revisit this per-element Event construction when ExpressionEvaluator/JsonPointer + // internals support a lighter evaluation path that avoids full tree conversion. + final Event elementEvent = JacksonEvent.builder() + .withEventType("event") + .withData(contextMap) + .build(); + + if (expressionEvaluator.evaluateConditional(config.getKeepElementWhen(), elementEvent)) { + filteredList.add(element); + } + } catch (final Exception e) { + failedElementCount++; + failedElements.add(element); + LOG.atWarn() + .addMarker(EVENT) + .addMarker(SENSITIVE) + .setMessage("Error evaluating keep_element_when expression [{}] for element in source list at path [{}]") + .addArgument(config.getKeepElementWhen()) + .addArgument(config.getIterateOn()) + .setCause(e) + .log(); + } + } + + if (failedElementCount > 0) { + addTagsOnFailure(recordEvent); + recordEvent.getMetadata().setAttribute(FAILED_ELEMENTS_COUNT_METADATA_KEY, failedElementCount); + recordEvent.getMetadata().setAttribute(FAILED_ELEMENTS_METADATA_KEY, failedElements); + } + + recordEvent.put(config.getTarget(), filteredList); + + } catch (final Exception e) { + LOG.atError() + .addMarker(EVENT) + .addMarker(NOISY) + .setMessage("There was an exception while processing Event [{}]") + .addArgument(recordEvent) + .setCause(e) + .log(); + addTagsOnFailure(recordEvent); + } + } + return records; + } + + private void addTagsOnFailure(final Event event) { + if (config.getTagsOnFailure() != null) { + event.getMetadata().addTags(config.getTagsOnFailure()); + } + } + + @Override + public void prepareForShutdown() { + } + + @Override + public boolean isReadyForShutdown() { + return true; + } + + @Override + public void shutdown() { + } +} diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorConfig.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorConfig.java new file mode 100644 index 0000000000..2d020b3080 --- /dev/null +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorConfig.java @@ -0,0 +1,111 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.processor.mutateevent; + +import com.fasterxml.jackson.annotation.JsonClassDescription; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; +import org.opensearch.dataprepper.model.annotations.ExampleValues; +import org.opensearch.dataprepper.model.annotations.ExampleValues.Example; + +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; + +import java.util.List; + +@JsonPropertyOrder +@JsonClassDescription("The filter_list processor evaluates a condition against each element of an array " + + "and keeps only those elements where the condition is true.") +public class FilterListProcessorConfig { + + @NotNull + @NotEmpty + @JsonProperty("iterate_on") + @JsonPropertyDescription("The key of the array field to filter. Supports nested paths.") + @ExampleValues({ + @Example(value = "my-list", description = "Filters the 'my-list' array at the root of the event."), + @Example(value = "outer-key/my-list", description = "Filters the 'my-list' array nested under 'outer-key'.") + }) + private String iterateOn; + + @JsonProperty("target") + @JsonPropertyDescription("The key to write the filtered array to. Defaults to the source key (in-place). " + + "Supports nested paths — intermediate objects are created automatically if they do not exist.") + @ExampleValues({ + @Example(value = "filtered-list", description = "Writes the filtered array to 'filtered-list'.") + }) + private String target; + + @NotNull + @NotEmpty + @JsonProperty("keep_element_when") + @JsonPropertyDescription("An expression evaluated per element. Elements where this expression evaluates to true are kept. " + + "The expression is evaluated against each element of the array as if it were a standalone event. " + + "For primitive elements (strings, numbers, booleans), the element value is accessible via the key /value.") + @ExampleValues({ + @Example(value = "/status == \"active\"", description = "Keeps only elements where 'status' equals 'active'."), + @Example(value = "/score > 50", description = "Keeps only elements where 'score' is greater than 50."), + @Example(value = "/value > 50", description = "Keeps only primitive elements where the value is greater than 50."), + @Example(value = "/value != \"\"", description = "Keeps only non-empty string primitive elements.") + }) + private String keepElementWhen; + + @JsonProperty("filter_list_when") + @JsonPropertyDescription("A conditional expression, " + + "such as /some-key == \"test\", that will be evaluated against the root event to determine whether " + + "the processor will be run on the event. By default, all events will be processed unless otherwise stated.") + @ExampleValues({ + @Example(value = "/some-key == \"test\"", description = "The processor only runs when the value of 'some-key' is 'test'.") + }) + private String filterListWhen; + + @JsonProperty("tags_on_failure") + @JsonPropertyDescription("A list of tags to add to the event metadata when the event fails to process.") + private List tagsOnFailure; + + public String getIterateOn() { + return iterateOn; + } + + public String getTarget() { + return target != null ? target : iterateOn; + } + + public String getKeepElementWhen() { + return keepElementWhen; + } + + public String getFilterListWhen() { + return filterListWhen; + } + + public List getTagsOnFailure() { + return tagsOnFailure; + } + + void validateExpressions(final ExpressionEvaluator expressionEvaluator) { + if (getFilterListWhen() != null && !expressionEvaluator.isValidExpressionStatement(getFilterListWhen())) { + throw new InvalidPluginConfigurationException( + String.format("filter_list_when %s is not a valid expression statement. " + + "See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax", + getFilterListWhen())); + } + + if (!expressionEvaluator.isValidExpressionStatement(getKeepElementWhen())) { + throw new InvalidPluginConfigurationException( + String.format("keep_element_when %s is not a valid expression statement. " + + "See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax", + getKeepElementWhen())); + } + } +} diff --git a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorConfigTest.java b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorConfigTest.java new file mode 100644 index 0000000000..60ab1b314e --- /dev/null +++ b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorConfigTest.java @@ -0,0 +1,117 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.processor.mutateevent; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; + +import java.lang.reflect.Field; +import java.util.List; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) + +class FilterListProcessorConfigTest { + + @Mock + private ExpressionEvaluator expressionEvaluator; + + @Test + void test_default_target_returns_source_when_target_not_set() throws NoSuchFieldException, IllegalAccessException { + final FilterListProcessorConfig config = new FilterListProcessorConfig(); + setField(config, "iterateOn", "my_source"); + assertThat(config.getTarget(), is(equalTo("my_source"))); + } + + @Test + void test_target_returns_explicit_target_when_set() throws NoSuchFieldException, IllegalAccessException { + final FilterListProcessorConfig config = new FilterListProcessorConfig(); + setField(config, "iterateOn", "my_source"); + setField(config, "target", "my_target"); + assertThat(config.getTarget(), is(equalTo("my_target"))); + } + + @Test + void test_default_filter_list_when_is_null() { + final FilterListProcessorConfig config = new FilterListProcessorConfig(); + assertThat(config.getFilterListWhen(), is(nullValue())); + } + + @Test + void test_default_tags_on_failure_is_null() { + final FilterListProcessorConfig config = new FilterListProcessorConfig(); + assertThat(config.getTagsOnFailure(), is(nullValue())); + } + + @Test + void test_getters_return_set_values() throws NoSuchFieldException, IllegalAccessException { + final FilterListProcessorConfig config = new FilterListProcessorConfig(); + + setField(config, "iterateOn", "my_source"); + setField(config, "target", "my_target"); + setField(config, "keepElementWhen", "/type == \"cve\""); + setField(config, "filterListWhen", "/enabled == true"); + setField(config, "tagsOnFailure", List.of("tag1")); + + assertThat(config.getIterateOn(), is(equalTo("my_source"))); + assertThat(config.getTarget(), is(equalTo("my_target"))); + assertThat(config.getKeepElementWhen(), is(equalTo("/type == \"cve\""))); + assertThat(config.getFilterListWhen(), is(equalTo("/enabled == true"))); + assertThat(config.getTagsOnFailure(), is(equalTo(List.of("tag1")))); + } + + @Test + void test_validateExpressions_throws_when_keep_when_is_invalid() throws NoSuchFieldException, IllegalAccessException { + final FilterListProcessorConfig config = new FilterListProcessorConfig(); + setField(config, "keepElementWhen", "invalid"); + when(expressionEvaluator.isValidExpressionStatement("invalid")).thenReturn(false); + + assertThrows(InvalidPluginConfigurationException.class, () -> config.validateExpressions(expressionEvaluator)); + } + + @Test + void test_validateExpressions_throws_when_filter_list_when_is_invalid() throws NoSuchFieldException, IllegalAccessException { + final FilterListProcessorConfig config = new FilterListProcessorConfig(); + setField(config, "keepElementWhen", "/type == \"cve\""); + setField(config, "filterListWhen", "invalid"); + when(expressionEvaluator.isValidExpressionStatement("invalid")).thenReturn(false); + + assertThrows(InvalidPluginConfigurationException.class, () -> config.validateExpressions(expressionEvaluator)); + } + + @Test + void test_validateExpressions_does_not_throw_when_expressions_are_valid() throws NoSuchFieldException, IllegalAccessException { + final FilterListProcessorConfig config = new FilterListProcessorConfig(); + setField(config, "keepElementWhen", "/type == \"cve\""); + setField(config, "filterListWhen", "/enabled == true"); + when(expressionEvaluator.isValidExpressionStatement("/type == \"cve\"")).thenReturn(true); + when(expressionEvaluator.isValidExpressionStatement("/enabled == true")).thenReturn(true); + + assertDoesNotThrow(() -> config.validateExpressions(expressionEvaluator)); + } + + private void setField(final Object object, final String fieldName, final Object value) + throws NoSuchFieldException, IllegalAccessException { + final Field field = object.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + field.set(object, value); + } +} diff --git a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorIT.java b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorIT.java new file mode 100644 index 0000000000..050acd6533 --- /dev/null +++ b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorIT.java @@ -0,0 +1,136 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.processor.mutateevent; + +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventFactory; +import org.opensearch.dataprepper.model.event.LogEventBuilder; +import org.opensearch.dataprepper.model.processor.Processor; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.test.plugins.DataPrepperPluginTest; +import org.opensearch.dataprepper.test.plugins.PluginConfigurationFile; +import org.opensearch.dataprepper.test.plugins.junit.BaseDataPrepperPluginStandardTestSuite; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; + +@DataPrepperPluginTest(pluginName = "filter_list", pluginType = Processor.class) +class FilterListProcessorIT extends BaseDataPrepperPluginStandardTestSuite { + + @Test + void filters_objects_by_field_value( + @PluginConfigurationFile("filter_objects_by_status.yaml") final Processor, Record> objectUnderTest, + final EventFactory eventFactory) { + + final Event event = eventFactory.eventBuilder(LogEventBuilder.class) + .withData(Map.of("items", List.of( + Map.of("name", "item1", "status", "active"), + Map.of("name", "item2", "status", "inactive"), + Map.of("name", "item3", "status", "active") + ))) + .build(); + + final Collection> result = objectUnderTest.execute(Collections.singletonList(new Record<>(event))); + + assertThat(result, notNullValue()); + assertThat(result.size(), equalTo(1)); + + final Event resultEvent = result.iterator().next().getData(); + final List> filteredList = resultEvent.get("items", List.class); + assertThat(filteredList, notNullValue()); + assertThat(filteredList.size(), equalTo(2)); + assertThat(filteredList.get(0).get("name"), equalTo("item1")); + assertThat(filteredList.get(1).get("name"), equalTo("item3")); + } + + @Test + void filters_objects_to_separate_target( + @PluginConfigurationFile("filter_objects_to_target.yaml") final Processor, Record> objectUnderTest, + final EventFactory eventFactory) { + + final Event event = eventFactory.eventBuilder(LogEventBuilder.class) + .withData(Map.of("items", List.of( + Map.of("name", "item1", "status", "active"), + Map.of("name", "item2", "status", "inactive") + ))) + .build(); + + final Collection> result = objectUnderTest.execute(Collections.singletonList(new Record<>(event))); + + assertThat(result, notNullValue()); + assertThat(result.size(), equalTo(1)); + + final Event resultEvent = result.iterator().next().getData(); + + final List> originalList = resultEvent.get("items", List.class); + assertThat(originalList.size(), equalTo(2)); + + final List> filteredList = resultEvent.get("active_items", List.class); + assertThat(filteredList, notNullValue()); + assertThat(filteredList.size(), equalTo(1)); + assertThat(filteredList.get(0).get("name"), equalTo("item1")); + } + + @Test + void filters_primitives_with_filter_list_when_condition( + @PluginConfigurationFile("filter_primitives_with_condition.yaml") final Processor, Record> objectUnderTest, + final EventFactory eventFactory) { + + final Event event = eventFactory.eventBuilder(LogEventBuilder.class) + .withData(Map.of( + "scores", List.of(95, 30, 75, 10, 88), + "type", "grades" + )) + .build(); + + final Collection> result = objectUnderTest.execute(Collections.singletonList(new Record<>(event))); + + assertThat(result, notNullValue()); + assertThat(result.size(), equalTo(1)); + + final Event resultEvent = result.iterator().next().getData(); + final List filteredList = resultEvent.get("scores", List.class); + assertThat(filteredList, notNullValue()); + assertThat(filteredList.size(), equalTo(3)); + assertThat(filteredList.get(0), equalTo(95)); + assertThat(filteredList.get(1), equalTo(75)); + assertThat(filteredList.get(2), equalTo(88)); + } + + @Test + void skips_filtering_when_filter_list_when_is_false( + @PluginConfigurationFile("filter_primitives_with_condition.yaml") final Processor, Record> objectUnderTest, + final EventFactory eventFactory) { + + final Event event = eventFactory.eventBuilder(LogEventBuilder.class) + .withData(Map.of( + "scores", List.of(95, 30, 75, 10, 88), + "type", "not_grades" + )) + .build(); + + final Collection> result = objectUnderTest.execute(Collections.singletonList(new Record<>(event))); + + assertThat(result, notNullValue()); + assertThat(result.size(), equalTo(1)); + + final Event resultEvent = result.iterator().next().getData(); + final List scores = resultEvent.get("scores", List.class); + assertThat(scores, notNullValue()); + assertThat(scores.size(), equalTo(5)); + } +} diff --git a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorTest.java b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorTest.java new file mode 100644 index 0000000000..c2d4668c32 --- /dev/null +++ b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/FilterListProcessorTest.java @@ -0,0 +1,663 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.processor.mutateevent; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; +import org.opensearch.dataprepper.model.record.Record; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class FilterListProcessorTest { + + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private FilterListProcessorConfig mockConfig; + + @Mock + private ExpressionEvaluator expressionEvaluator; + + private static final String KEEP_ELEMENT_WHEN_EXPRESSION = "/type == \"cve\""; + private static final String SOURCE_KEY = "identifiers"; + + @BeforeEach + void setUp() { + lenient().when(mockConfig.getIterateOn()).thenReturn(SOURCE_KEY); + lenient().when(mockConfig.getTarget()).thenReturn(SOURCE_KEY); + lenient().when(mockConfig.getKeepElementWhen()).thenReturn(KEEP_ELEMENT_WHEN_EXPRESSION); + lenient().when(mockConfig.getFilterListWhen()).thenReturn(null); + lenient().when(mockConfig.getTagsOnFailure()).thenReturn(null); + lenient().when(expressionEvaluator.isValidExpressionStatement(KEEP_ELEMENT_WHEN_EXPRESSION)).thenReturn(true); + lenient().doCallRealMethod().when(mockConfig).validateExpressions(expressionEvaluator); + } + + @Test + void invalid_keep_when_throws_InvalidPluginConfigurationException() { + final String invalidExpression = UUID.randomUUID().toString(); + when(mockConfig.getKeepElementWhen()).thenReturn(invalidExpression); + when(expressionEvaluator.isValidExpressionStatement(invalidExpression)).thenReturn(false); + + assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest); + } + + @Test + void invalid_filter_list_when_throws_InvalidPluginConfigurationException() { + final String filterListWhen = UUID.randomUUID().toString(); + when(mockConfig.getFilterListWhen()).thenReturn(filterListWhen); + when(expressionEvaluator.isValidExpressionStatement(filterListWhen)).thenReturn(false); + + assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest); + } + + @Test + void test_filter_keeps_matching_elements_in_place() { + final FilterListProcessor processor = createObjectUnderTest(); + final Record testRecord = createTestRecord(); + + when(expressionEvaluator.evaluateConditional(eq(KEEP_ELEMENT_WHEN_EXPRESSION), any(Event.class))) + .thenAnswer(invocation -> { + Event event = invocation.getArgument(1); + return "cve".equals(event.get("type", String.class)); + }); + + final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecords.size(), is(1)); + final Event resultEvent = resultRecords.get(0).getData(); + final List> filteredList = resultEvent.get(SOURCE_KEY, List.class); + assertThat(filteredList, is(notNullValue())); + assertThat(filteredList.size(), is(2)); + assertThat(filteredList.get(0).get("id"), is("CVE-1")); + assertThat(filteredList.get(1).get("id"), is("CVE-2")); + } + + @Test + void test_filter_writes_to_different_target() { + final String targetKey = "cve_identifiers"; + when(mockConfig.getTarget()).thenReturn(targetKey); + + final FilterListProcessor processor = createObjectUnderTest(); + final Record testRecord = createTestRecord(); + + when(expressionEvaluator.evaluateConditional(eq(KEEP_ELEMENT_WHEN_EXPRESSION), any(Event.class))) + .thenAnswer(invocation -> { + Event event = invocation.getArgument(1); + return "cve".equals(event.get("type", String.class)); + }); + + final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecords.size(), is(1)); + final Event resultEvent = resultRecords.get(0).getData(); + + // Original source should remain unchanged + final List> originalList = resultEvent.get(SOURCE_KEY, List.class); + assertThat(originalList.size(), is(3)); + + // Target should have filtered list + final List> filteredList = resultEvent.get(targetKey, List.class); + assertThat(filteredList, is(notNullValue())); + assertThat(filteredList.size(), is(2)); + assertThat(filteredList.get(0).get("id"), is("CVE-1")); + assertThat(filteredList.get(1).get("id"), is("CVE-2")); + } + + @Test + void test_filter_writes_to_nested_target() { + final String nestedTarget = "vulnerability/cve_ids"; + when(mockConfig.getTarget()).thenReturn(nestedTarget); + + final FilterListProcessor processor = createObjectUnderTest(); + final Record testRecord = createTestRecord(); + + when(expressionEvaluator.evaluateConditional(eq(KEEP_ELEMENT_WHEN_EXPRESSION), any(Event.class))) + .thenAnswer(invocation -> { + Event event = invocation.getArgument(1); + return "cve".equals(event.get("type", String.class)); + }); + + final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecords.size(), is(1)); + final Event resultEvent = resultRecords.get(0).getData(); + final List> filteredList = resultEvent.get(nestedTarget, List.class); + assertThat(filteredList, is(notNullValue())); + assertThat(filteredList.size(), is(2)); + } + + @Test + void test_filter_removes_all_elements_when_none_match() { + final FilterListProcessor processor = createObjectUnderTest(); + final Record testRecord = createTestRecord(); + + when(expressionEvaluator.evaluateConditional(eq(KEEP_ELEMENT_WHEN_EXPRESSION), any(Event.class))) + .thenReturn(false); + + final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecords.size(), is(1)); + final Event resultEvent = resultRecords.get(0).getData(); + final List> filteredList = resultEvent.get(SOURCE_KEY, List.class); + assertThat(filteredList, is(notNullValue())); + assertThat(filteredList.size(), is(0)); + } + + @Test + void test_filter_keeps_all_elements_when_all_match() { + final FilterListProcessor processor = createObjectUnderTest(); + final Record testRecord = createTestRecord(); + + when(expressionEvaluator.evaluateConditional(eq(KEEP_ELEMENT_WHEN_EXPRESSION), any(Event.class))) + .thenReturn(true); + + final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecords.size(), is(1)); + final Event resultEvent = resultRecords.get(0).getData(); + final List> filteredList = resultEvent.get(SOURCE_KEY, List.class); + assertThat(filteredList, is(notNullValue())); + assertThat(filteredList.size(), is(3)); + } + + @Test + void test_filter_with_empty_source_list() { + final FilterListProcessor processor = createObjectUnderTest(); + + final Map data = Map.of(SOURCE_KEY, List.of()); + final Event event = JacksonEvent.builder().withData(data).withEventType("event").build(); + final Record testRecord = new Record<>(event); + + final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecords.size(), is(1)); + final Event resultEvent = resultRecords.get(0).getData(); + final List> filteredList = resultEvent.get(SOURCE_KEY, List.class); + assertThat(filteredList, is(notNullValue())); + assertThat(filteredList.size(), is(0)); + } + + @Test + void test_filter_with_null_source_list_is_noop() { + final FilterListProcessor processor = createObjectUnderTest(); + + final Map data = new HashMap<>(); + data.put(SOURCE_KEY, null); + final Event event = JacksonEvent.builder().withData(data).withEventType("event").build(); + final Record testRecord = new Record<>(event); + + final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecords.size(), is(1)); + } + + @Test + void test_filter_with_missing_source_key_is_noop() { + final FilterListProcessor processor = createObjectUnderTest(); + + final Map data = Map.of("other_key", "value"); + final Event event = JacksonEvent.builder().withData(data).withEventType("event").build(); + final Record testRecord = new Record<>(event); + + final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecords.size(), is(1)); + final Event resultEvent = resultRecords.get(0).getData(); + // Event should remain unchanged since source key doesn't exist + assertThat(resultEvent.toMap(), equalTo(testRecord.getData().toMap())); + } + + @Test + void test_filter_skipped_when_filter_list_when_is_false() { + final String filterListWhen = UUID.randomUUID().toString(); + when(mockConfig.getFilterListWhen()).thenReturn(filterListWhen); + when(expressionEvaluator.isValidExpressionStatement(filterListWhen)).thenReturn(true); + + final FilterListProcessor processor = createObjectUnderTest(); + final Record testRecord = createTestRecord(); + + when(expressionEvaluator.evaluateConditional(filterListWhen, testRecord.getData())).thenReturn(false); + + final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecords.size(), is(1)); + final Event resultEvent = resultRecords.get(0).getData(); + // Event should remain unchanged + assertThat(resultEvent.toMap(), equalTo(testRecord.getData().toMap())); + } + + @Test + void test_filter_runs_when_filter_list_when_is_true() { + final String filterListWhen = UUID.randomUUID().toString(); + when(mockConfig.getFilterListWhen()).thenReturn(filterListWhen); + when(expressionEvaluator.isValidExpressionStatement(filterListWhen)).thenReturn(true); + + final FilterListProcessor processor = createObjectUnderTest(); + final Record testRecord = createTestRecord(); + + when(expressionEvaluator.evaluateConditional(filterListWhen, testRecord.getData())).thenReturn(true); + when(expressionEvaluator.evaluateConditional(eq(KEEP_ELEMENT_WHEN_EXPRESSION), any(Event.class))) + .thenReturn(false); + + final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecords.size(), is(1)); + final Event resultEvent = resultRecords.get(0).getData(); + final List> filteredList = resultEvent.get(SOURCE_KEY, List.class); + assertThat(filteredList, is(notNullValue())); + assertThat(filteredList.size(), is(0)); + } + + @Test + void test_filter_with_nested_source_path() { + final String nestedSource = "vulnerability/identifiers"; + when(mockConfig.getIterateOn()).thenReturn(nestedSource); + when(mockConfig.getTarget()).thenReturn(nestedSource); + + final FilterListProcessor processor = createObjectUnderTest(); + + final List> identifiers = new ArrayList<>(); + identifiers.add(Map.of("id", "CVE-1", "type", "cve")); + identifiers.add(Map.of("id", "CWE-1", "type", "cwe")); + + final Map data = Map.of("vulnerability", Map.of("identifiers", identifiers)); + final Event event = JacksonEvent.builder().withData(data).withEventType("event").build(); + final Record testRecord = new Record<>(event); + + when(expressionEvaluator.evaluateConditional(eq(KEEP_ELEMENT_WHEN_EXPRESSION), any(Event.class))) + .thenAnswer(invocation -> { + Event e = invocation.getArgument(1); + return "cve".equals(e.get("type", String.class)); + }); + + final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecords.size(), is(1)); + final Event resultEvent = resultRecords.get(0).getData(); + final List> filteredList = resultEvent.get(nestedSource, List.class); + assertThat(filteredList, is(notNullValue())); + assertThat(filteredList.size(), is(1)); + assertThat(filteredList.get(0).get("id"), is("CVE-1")); + } + + @Test + void test_filter_with_mixed_map_and_primitive_elements() { + final FilterListProcessor processor = createObjectUnderTest(); + + final List mixedList = new ArrayList<>(); + mixedList.add(Map.of("id", "CVE-1", "type", "cve")); + mixedList.add("a-string"); + mixedList.add(Map.of("id", "CVE-2", "type", "cve")); + + final Map data = new HashMap<>(); + data.put(SOURCE_KEY, mixedList); + final Event event = JacksonEvent.builder().withData(data).withEventType("event").build(); + final Record testRecord = new Record<>(event); + + when(expressionEvaluator.evaluateConditional(eq(KEEP_ELEMENT_WHEN_EXPRESSION), any(Event.class))) + .thenReturn(true); + + final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecords.size(), is(1)); + final Event resultEvent = resultRecords.get(0).getData(); + final List filteredList = resultEvent.get(SOURCE_KEY, List.class); + assertThat(filteredList, is(notNullValue())); + assertThat(filteredList.size(), is(3)); + } + + @Test + void test_filter_primitive_string_list() { + final String keepWhen = "/value != \"\""; + when(mockConfig.getKeepElementWhen()).thenReturn(keepWhen); + when(expressionEvaluator.isValidExpressionStatement(keepWhen)).thenReturn(true); + + final FilterListProcessor processor = createObjectUnderTest(); + + final List stringList = new ArrayList<>(); + stringList.add("hello"); + stringList.add(""); + stringList.add("world"); + + final Map data = new HashMap<>(); + data.put(SOURCE_KEY, stringList); + final Event event = JacksonEvent.builder().withData(data).withEventType("event").build(); + final Record testRecord = new Record<>(event); + + when(expressionEvaluator.evaluateConditional(eq(keepWhen), any(Event.class))) + .thenAnswer(invocation -> { + Event e = invocation.getArgument(1); + Object val = e.get("value", Object.class); + return val != null && !"".equals(val); + }); + + final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecords.size(), is(1)); + final List filteredList = resultRecords.get(0).getData().get(SOURCE_KEY, List.class); + assertThat(filteredList, is(notNullValue())); + assertThat(filteredList.size(), is(2)); + assertThat(filteredList.get(0), is("hello")); + assertThat(filteredList.get(1), is("world")); + } + + @Test + void test_filter_primitive_number_list() { + final String keepWhen = "/value > 0"; + when(mockConfig.getKeepElementWhen()).thenReturn(keepWhen); + when(expressionEvaluator.isValidExpressionStatement(keepWhen)).thenReturn(true); + + final FilterListProcessor processor = createObjectUnderTest(); + + final List numberList = new ArrayList<>(); + numberList.add(10); + numberList.add(-5); + numberList.add(0); + numberList.add(42); + + final Map data = new HashMap<>(); + data.put(SOURCE_KEY, numberList); + final Event event = JacksonEvent.builder().withData(data).withEventType("event").build(); + final Record testRecord = new Record<>(event); + + when(expressionEvaluator.evaluateConditional(eq(keepWhen), any(Event.class))) + .thenAnswer(invocation -> { + Event e = invocation.getArgument(1); + Object val = e.get("value", Object.class); + return val instanceof Number && ((Number) val).intValue() > 0; + }); + + final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecords.size(), is(1)); + final List filteredList = resultRecords.get(0).getData().get(SOURCE_KEY, List.class); + assertThat(filteredList, is(notNullValue())); + assertThat(filteredList.size(), is(2)); + assertThat(filteredList.get(0), is(10)); + assertThat(filteredList.get(1), is(42)); + } + + @Test + void test_filter_primitive_list_with_nulls() { + final String keepWhen = "/value != null"; + when(mockConfig.getKeepElementWhen()).thenReturn(keepWhen); + when(expressionEvaluator.isValidExpressionStatement(keepWhen)).thenReturn(true); + + final FilterListProcessor processor = createObjectUnderTest(); + + final List listWithNulls = new ArrayList<>(); + listWithNulls.add("keep"); + listWithNulls.add(null); + listWithNulls.add(42); + + final Map data = new HashMap<>(); + data.put(SOURCE_KEY, listWithNulls); + final Event event = JacksonEvent.builder().withData(data).withEventType("event").build(); + final Record testRecord = new Record<>(event); + + when(expressionEvaluator.evaluateConditional(eq(keepWhen), any(Event.class))) + .thenAnswer(invocation -> { + Event e = invocation.getArgument(1); + return e.get("value", Object.class) != null; + }); + + final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecords.size(), is(1)); + final List filteredList = resultRecords.get(0).getData().get(SOURCE_KEY, List.class); + assertThat(filteredList, is(notNullValue())); + assertThat(filteredList.size(), is(2)); + assertThat(filteredList.get(0), is("keep")); + assertThat(filteredList.get(1), is(42)); + } + + @Test + void test_filter_continues_processing_when_expression_throws_for_one_element() { + final FilterListProcessor processor = createObjectUnderTest(); + final Record testRecord = createTestRecord(); + + when(expressionEvaluator.evaluateConditional(eq(KEEP_ELEMENT_WHEN_EXPRESSION), any(Event.class))) + .thenReturn(true) + .thenThrow(new RuntimeException("expression error")) + .thenReturn(true); + + final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecords.size(), is(1)); + final Event resultEvent = resultRecords.get(0).getData(); + final List> filteredList = resultEvent.get(SOURCE_KEY, List.class); + assertThat(filteredList, is(notNullValue())); + assertThat(filteredList.size(), is(2)); + } + + @Test + void test_filter_processes_multiple_records() { + final FilterListProcessor processor = createObjectUnderTest(); + + final Record record1 = createTestRecord(); + final Record record2 = createTestRecord(); + + when(expressionEvaluator.evaluateConditional(eq(KEEP_ELEMENT_WHEN_EXPRESSION), any(Event.class))) + .thenAnswer(invocation -> { + Event event = invocation.getArgument(1); + return "cve".equals(event.get("type", String.class)); + }); + + final List> resultRecords = (List>) processor.doExecute(List.of(record1, record2)); + + assertThat(resultRecords.size(), is(2)); + for (final Record resultRecord : resultRecords) { + final List> filteredList = resultRecord.getData().get(SOURCE_KEY, List.class); + assertThat(filteredList.size(), is(2)); + } + } + + @Test + void test_filter_list_when_true_and_keep_when_selectively_filters() { + final String filterListWhen = "/env == \"production\""; + when(mockConfig.getFilterListWhen()).thenReturn(filterListWhen); + when(expressionEvaluator.isValidExpressionStatement(filterListWhen)).thenReturn(true); + + final FilterListProcessor processor = createObjectUnderTest(); + final Record testRecord = createTestRecord(); + + when(expressionEvaluator.evaluateConditional(filterListWhen, testRecord.getData())).thenReturn(true); + when(expressionEvaluator.evaluateConditional(eq(KEEP_ELEMENT_WHEN_EXPRESSION), any(Event.class))) + .thenAnswer(invocation -> { + Event event = invocation.getArgument(1); + return "cve".equals(event.get("type", String.class)); + }); + + final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecords.size(), is(1)); + final Event resultEvent = resultRecords.get(0).getData(); + final List> filteredList = resultEvent.get(SOURCE_KEY, List.class); + assertThat(filteredList, is(notNullValue())); + assertThat(filteredList.size(), is(2)); + assertThat(filteredList.get(0).get("id"), is("CVE-1")); + assertThat(filteredList.get(1).get("id"), is("CVE-2")); + } + + @Test + void test_multiple_records_with_different_filter_list_when_outcomes() { + final String filterListWhen = "/process == true"; + when(mockConfig.getFilterListWhen()).thenReturn(filterListWhen); + when(expressionEvaluator.isValidExpressionStatement(filterListWhen)).thenReturn(true); + + final FilterListProcessor processor = createObjectUnderTest(); + + // Record 1: filter_list_when passes + final Record record1 = createTestRecord(); + // Record 2: filter_list_when fails + final Map data2 = new HashMap<>(); + final List> list2 = new ArrayList<>(); + list2.add(Map.of("id", "KEEP-1", "type", "cve")); + list2.add(Map.of("id", "KEEP-2", "type", "cwe")); + data2.put(SOURCE_KEY, list2); + final Event event2 = JacksonEvent.builder().withData(data2).withEventType("event").build(); + final Record record2 = new Record<>(event2); + + when(expressionEvaluator.evaluateConditional(filterListWhen, record1.getData())).thenReturn(true); + when(expressionEvaluator.evaluateConditional(filterListWhen, record2.getData())).thenReturn(false); + when(expressionEvaluator.evaluateConditional(eq(KEEP_ELEMENT_WHEN_EXPRESSION), any(Event.class))) + .thenAnswer(invocation -> { + Event event = invocation.getArgument(1); + return "cve".equals(event.get("type", String.class)); + }); + + final List> resultRecords = (List>) processor.doExecute(List.of(record1, record2)); + + assertThat(resultRecords.size(), is(2)); + + // Record 1: filtered + final List> filteredList1 = resultRecords.get(0).getData().get(SOURCE_KEY, List.class); + assertThat(filteredList1.size(), is(2)); + assertThat(filteredList1.get(0).get("id"), is("CVE-1")); + + // Record 2: untouched + final List> untouchedList = resultRecords.get(1).getData().get(SOURCE_KEY, List.class); + assertThat(untouchedList.size(), is(2)); + assertThat(untouchedList.get(0).get("id"), is("KEEP-1")); + assertThat(untouchedList.get(1).get("id"), is("KEEP-2")); + } + + @Test + void test_filter_with_source_not_a_list_adds_tags_on_failure() { + final List testTags = List.of("tag1", "tag2"); + when(mockConfig.getTagsOnFailure()).thenReturn(testTags); + + final FilterListProcessor processor = createObjectUnderTest(); + + final Map data = Map.of(SOURCE_KEY, "not-a-list"); + final Event event = JacksonEvent.builder().withData(data).withEventType("event").build(); + final Record testRecord = new Record<>(event); + + final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecords.size(), is(1)); + final Event resultEvent = resultRecords.get(0).getData(); + assertThat(resultEvent.getMetadata().getTags(), is(new HashSet<>(testTags))); + } + + @Test + void test_filter_with_single_element_list() { + final FilterListProcessor processor = createObjectUnderTest(); + + final List> singleElementList = List.of(Map.of("id", "CVE-1", "type", "cve")); + final Map data = Map.of(SOURCE_KEY, singleElementList); + final Event event = JacksonEvent.builder().withData(data).withEventType("event").build(); + final Record testRecord = new Record<>(event); + + when(expressionEvaluator.evaluateConditional(eq(KEEP_ELEMENT_WHEN_EXPRESSION), any(Event.class))) + .thenReturn(true); + + final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecords.size(), is(1)); + final Event resultEvent = resultRecords.get(0).getData(); + final List> filteredList = resultEvent.get(SOURCE_KEY, List.class); + assertThat(filteredList.size(), is(1)); + assertThat(filteredList.get(0).get("id"), is("CVE-1")); + } + + @Test + void test_isReadyForShutdown_returns_true() { + final FilterListProcessor processor = createObjectUnderTest(); + assertThat(processor.isReadyForShutdown(), is(true)); + } + + @Test + void test_outer_catch_block_adds_tags_on_failure_when_unexpected_exception_occurs() { + final List testTags = List.of("filter_list_failure"); + when(mockConfig.getTagsOnFailure()).thenReturn(testTags); + + final FilterListProcessor processor = createObjectUnderTest(); + + final List> identifiers = new ArrayList<>(); + identifiers.add(Map.of("id", "CVE-1", "type", "cve")); + + final Map data = new HashMap<>(); + data.put(SOURCE_KEY, identifiers); + final Event event = spy(JacksonEvent.builder().withData(data).withEventType("event").build()); + doThrow(new RuntimeException("unexpected error")).when(event).put(any(String.class), any()); + final Record testRecord = new Record<>(event); + + when(expressionEvaluator.evaluateConditional(eq(KEEP_ELEMENT_WHEN_EXPRESSION), any(Event.class))) + .thenReturn(true); + + final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecords.size(), is(1)); + final Event resultEvent = resultRecords.get(0).getData(); + assertThat(resultEvent.getMetadata().getTags(), is(new HashSet<>(testTags))); + } + + @Test + void test_tags_not_added_when_tags_on_failure_is_null_and_source_is_not_a_list() { + final FilterListProcessor processor = createObjectUnderTest(); + + final Map data = Map.of(SOURCE_KEY, "not-a-list"); + final Event event = JacksonEvent.builder().withData(data).withEventType("event").build(); + final Record testRecord = new Record<>(event); + + final List> resultRecords = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecords.size(), is(1)); + final Event resultEvent = resultRecords.get(0).getData(); + assertThat(resultEvent.getMetadata().getTags().isEmpty(), is(true)); + } + + private FilterListProcessor createObjectUnderTest() { + return new FilterListProcessor(pluginMetrics, mockConfig, expressionEvaluator); + } + + private Record createTestRecord() { + final List> identifiers = new ArrayList<>(); + identifiers.add(Map.of("id", "CVE-1", "type", "cve")); + identifiers.add(Map.of("id", "CVE-2", "type", "cve")); + identifiers.add(Map.of("id", "CWE-1", "type", "cwe")); + + final Map data = Map.of(SOURCE_KEY, identifiers); + final Event event = JacksonEvent.builder() + .withData(data) + .withEventType("event") + .build(); + return new Record<>(event); + } +} diff --git a/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_objects_by_status.yaml b/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_objects_by_status.yaml new file mode 100644 index 0000000000..d342c0b316 --- /dev/null +++ b/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_objects_by_status.yaml @@ -0,0 +1,16 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + +test-pipeline: + source: + unused: + processor: + - filter_list: + iterate_on: "items" + keep_element_when: '/status == "active"' + sink: + - unused: diff --git a/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_objects_to_target.yaml b/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_objects_to_target.yaml new file mode 100644 index 0000000000..ad27ef9119 --- /dev/null +++ b/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_objects_to_target.yaml @@ -0,0 +1,17 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + +test-pipeline: + source: + unused: + processor: + - filter_list: + iterate_on: "items" + target: "active_items" + keep_element_when: '/status == "active"' + sink: + - unused: diff --git a/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_primitives_with_condition.yaml b/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_primitives_with_condition.yaml new file mode 100644 index 0000000000..bc369dda12 --- /dev/null +++ b/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/filter_primitives_with_condition.yaml @@ -0,0 +1,17 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + +test-pipeline: + source: + unused: + processor: + - filter_list: + iterate_on: "scores" + keep_element_when: '/value > 50' + filter_list_when: '/type == "grades"' + sink: + - unused: