diff --git a/data-prepper-plugins/mutate-event-processors/README.md b/data-prepper-plugins/mutate-event-processors/README.md index 75f7e049a0..10e82eca94 100644 --- a/data-prepper-plugins/mutate-event-processors/README.md +++ b/data-prepper-plugins/mutate-event-processors/README.md @@ -647,6 +647,116 @@ will end up with this after processing: * `tags_on_failure` - (optional): a list of tags to add to event metadata when the event fails to process +## WrapEntriesProcessor +A processor that wraps each element of a primitive array into an object using a configured key name. This enables downstream processors like `add_entries` and `delete_entries` with `iterate_on`, which require `List>` and cannot operate on primitive arrays. + +### Basic Usage +To get started, create the following `pipeline.yaml`. +```yaml +pipeline: + source: + file: + path: "/full/path/to/logs_json.log" + record_type: "event" + format: "json" + processor: + - wrap_entries: + source: "/names" + key: "name" + sink: + - stdout: +``` + +Create the following file named `logs_json.log` and replace the `path` in the file source of your `pipeline.yaml` with the path of this file. +```json +{"names": ["alice", "bob", "charlie"]} +``` + +When run, the processor will parse the message into the following output: + +```json +{"names": [{"name": "alice"}, {"name": "bob"}, {"name": "charlie"}]} +``` + +### Writing to a Separate Target +If you want to keep the original array and write the wrapped objects to a different key, use the `target` option: +```yaml + processor: + - wrap_entries: + source: "/items" + target: "/inventory_items" + key: "product" +``` + +Input: +```json +{"items": ["laptop", "monitor", "keyboard"]} +``` + +Output: +```json +{"items": ["laptop", "monitor", "keyboard"], "inventory_items": [{"product": "laptop"}, {"product": "monitor"}, {"product": "keyboard"}]} +``` + +### Conditional Processing +Use `wrap_entries_when` to only process events matching a condition: +```yaml + processor: + - wrap_entries: + source: "/tags" + key: "value" + wrap_entries_when: '/type == "tagged"' +``` + +Only events where `type` equals `"tagged"` will be processed. + +### Chaining with add_entries +A common use case is wrapping a primitive array so that `add_entries` with `iterate_on` can operate on it: +```yaml + processor: + - wrap_entries: + source: "/names" + key: "name" + - add_entries: + iterate_on: "/names" + entries: + - key: "greeting" + format: "Hello, ${name}" +``` + +Input: +```json +{"names": ["alice", "bob"]} +``` + +After `wrap_entries`: +```json +{"names": [{"name": "alice"}, {"name": "bob"}]} +``` + +After `add_entries`: +```json +{"names": [{"name": "alice", "greeting": "Hello, alice"}, {"name": "bob", "greeting": "Hello, bob"}]} +``` + +### Configuration +* `source` - (required) - The key of the primitive array to transform (JSON Pointer) +* `target` - (optional) - The key to write the resulting object array to. Defaults to `source` (in-place). Must not be empty when specified. +* `key` - (required) - The key name to use in each resulting object +* `exclude_null_empty_values` - (optional) - When set to `true`, null and empty string elements are filtered out before wrapping. Default is `false` +* `append_if_target_exists` - (optional) - When set to `true`, appends results to the existing target array instead of overwriting. Default is `false` +* `wrap_entries_when` - (optional) - A [conditional expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/) that determines whether the processor runs on the event. Evaluated at the root event level. +* `tags_on_failure` - (optional) - A list of tags to add to the event metadata when the event fails to process + +### Edge Case Behavior +* If the `source` key does not exist in the event, the processor silently skips the event (no-op). A debug-level log is emitted. +* If the `source` value is not a list (e.g., a string or number), the processor skips the event (no-op) and adds `tags_on_failure` if configured. +* If the `source` list is empty, the processor does nothing — the empty list remains as-is. +* If no elements remain after filtering (when `exclude_null_empty_values` is `true` and all elements are null or empty), the target is written as an empty list `[]`. +* Null elements within the list are wrapped like any other value by default: `[null]` becomes `[{"key": null}]`. Use `exclude_null_empty_values: true` to filter them out. + +___ + ## Developer Guide This plugin is compatible with Java 11 and 17. Refer to the following developer guides for plugin development: - [Developer Guide](https://github.com/opensearch-project/data-prepper/blob/main/docs/developer_guide.md) diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/WrapEntriesProcessor.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/WrapEntriesProcessor.java new file mode 100644 index 0000000000..1976a42dc2 --- /dev/null +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/WrapEntriesProcessor.java @@ -0,0 +1,152 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ +package org.opensearch.dataprepper.plugins.processor.mutateevent; + +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT; +import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.processor.AbstractProcessor; +import org.opensearch.dataprepper.model.processor.Processor; +import org.opensearch.dataprepper.model.record.Record; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@DataPrepperPlugin(name = "wrap_entries", pluginType = Processor.class, pluginConfigurationType = WrapEntriesProcessorConfig.class) +public class WrapEntriesProcessor extends AbstractProcessor, Record> { + + private static final Logger LOG = LoggerFactory.getLogger(WrapEntriesProcessor.class); + + private final WrapEntriesProcessorConfig config; + private final ExpressionEvaluator expressionEvaluator; + + @DataPrepperPluginConstructor + public WrapEntriesProcessor(final PluginMetrics pluginMetrics, + final WrapEntriesProcessorConfig config, + final ExpressionEvaluator expressionEvaluator) { + super(pluginMetrics); + this.config = config; + this.expressionEvaluator = expressionEvaluator; + config.validateExpressions(expressionEvaluator); + } + + @Override + public Collection> doExecute(final Collection> records) { + for (final Record record : records) { + final Event event = record.getData(); + + try { + if (config.getWrapEntriesWhen() != null + && !expressionEvaluator.evaluateConditional(config.getWrapEntriesWhen(), event)) { + continue; + } + + processEvent(event); + } catch (final Exception e) { + LOG.atError() + .addMarker(EVENT) + .addMarker(NOISY) + .setMessage("Error processing event [{}]") + .addArgument(event) + .setCause(e) + .log(); + addTagsOnFailure(event); + } + } + return records; + } + + private void processEvent(final Event event) { + final String source = config.getSource(); + + if (!event.containsKey(source)) { + LOG.debug(EVENT, "Source key [{}] does not exist in event [{}], skipping.", source, event); + return; + } + + final Object sourceValue = event.get(source, Object.class); + + if (!(sourceValue instanceof List)) { + LOG.warn(EVENT, "Source key [{}] is not a list in event [{}], skipping.", source, event); + addTagsOnFailure(event); + return; + } + + final List sourceList = (List) sourceValue; + + if (sourceList.isEmpty()) { + return; + } + + final List> result = new ArrayList<>(sourceList.size()); + final String key = config.getKey(); + + for (final Object element : sourceList) { + if (config.getExcludeNullEmptyValues()) { + if (element == null) { + continue; + } + if (element instanceof String && ((String) element).isEmpty()) { + continue; + } + } + final Map entry = new HashMap<>(); + entry.put(key, element); + result.add(entry); + } + + + final String effectiveTarget = config.getEffectiveTarget(); + + if (config.getAppendIfTargetExists() && event.containsKey(effectiveTarget)) { + final Object existingValue = event.get(effectiveTarget, Object.class); + if (!(existingValue instanceof List)) { + LOG.warn(EVENT, "Target key [{}] exists but is not a list in event [{}], skipping.", + effectiveTarget, event); + addTagsOnFailure(event); + return; + } + final List existingList = new ArrayList<>((List) existingValue); + existingList.addAll(result); + event.put(effectiveTarget, existingList); + } else { + event.put(effectiveTarget, result); + } + } + + private void addTagsOnFailure(final Event event) { + final List tags = config.getTagsOnFailure(); + if (tags != null && !tags.isEmpty()) { + event.getMetadata().addTags(tags); + } + } + + @Override + public void prepareForShutdown() { + } + + @Override + public boolean isReadyForShutdown() { + return true; + } + + @Override + public void shutdown() { + } +} diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/WrapEntriesProcessorConfig.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/WrapEntriesProcessorConfig.java new file mode 100644 index 0000000000..b64dfc6bb2 --- /dev/null +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/WrapEntriesProcessorConfig.java @@ -0,0 +1,125 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.processor.mutateevent; + +import com.fasterxml.jackson.annotation.JsonClassDescription; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.model.annotations.ExampleValues; +import org.opensearch.dataprepper.model.annotations.ExampleValues.Example; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; + +import java.util.List; + +@JsonPropertyOrder +@JsonClassDescription("The wrap_entries processor wraps each element of a primitive array " + + "into an object using a configured key name.") +public class WrapEntriesProcessorConfig { + + @NotNull + @NotEmpty + @JsonProperty("source") + @JsonPropertyDescription("The key of the primitive array to transform.") + @ExampleValues({ + @Example(value = "/names", description = "The source array field to wrap into objects.") + }) + private String source; + + @JsonProperty("target") + @JsonPropertyDescription("The key to write the resulting object array to. Defaults to source (in-place).") + @ExampleValues({ + @Example(value = "/agents", description = "Write the resulting object array to a separate field.") + }) + private String target; + + @NotNull + @NotEmpty + @JsonProperty("key") + @JsonPropertyDescription("The key name to use in each resulting object.") + @ExampleValues({ + @Example(value = "name", description = "Each primitive value is wrapped as {\"name\": value}.") + }) + private String key; + + @JsonProperty("exclude_null_empty_values") + @JsonPropertyDescription("When true, null and empty string elements are filtered out " + + "before wrapping. Default is false.") + private boolean excludeNullEmptyValues = false; + + @JsonProperty("append_if_target_exists") + @JsonPropertyDescription("When true, appends results to the existing target array " + + "instead of overwriting. Default is false.") + private boolean appendIfTargetExists = false; + + @JsonProperty("wrap_entries_when") + @JsonPropertyDescription("A conditional expression " + + "that will be evaluated to determine whether the processor will be run on the event.") + @ExampleValues({ + @Example(value = "/type == \"tagged\"", description = "Only process events where type is 'tagged'.") + }) + private String wrapEntriesWhen; + + @JsonProperty("tags_on_failure") + @JsonPropertyDescription("A list of tags to add to the event metadata when the event fails to process.") + private List tagsOnFailure; + + public String getSource() { + return source; + } + + public String getTarget() { + return target; + } + + public String getKey() { + return key; + } + + public boolean getExcludeNullEmptyValues() { + return excludeNullEmptyValues; + } + + public boolean getAppendIfTargetExists() { + return appendIfTargetExists; + } + + public String getWrapEntriesWhen() { + return wrapEntriesWhen; + } + + public List getTagsOnFailure() { + return tagsOnFailure; + } + + void validateExpressions(final ExpressionEvaluator expressionEvaluator) { + if (target != null && target.isEmpty()) { + throw new InvalidPluginConfigurationException("target must not be empty when specified."); + } + + if (wrapEntriesWhen != null && !expressionEvaluator.isValidExpressionStatement(wrapEntriesWhen)) { + throw new InvalidPluginConfigurationException( + String.format("wrap_entries_when \"%s\" is not a valid expression statement. " + + "See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax", + wrapEntriesWhen)); + } + } + + /** + * Returns the effective target key. If target is not set, defaults to source. + */ + public String getEffectiveTarget() { + return target != null ? target : source; + } +} diff --git a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/WrapEntriesProcessorConfigTest.java b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/WrapEntriesProcessorConfigTest.java new file mode 100644 index 0000000000..f733496394 --- /dev/null +++ b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/WrapEntriesProcessorConfigTest.java @@ -0,0 +1,78 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ +package org.opensearch.dataprepper.plugins.processor.mutateevent; + +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; + +import java.lang.reflect.Field; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class WrapEntriesProcessorConfigTest { + + @Test + void defaults_are_correct() { + final WrapEntriesProcessorConfig config = new WrapEntriesProcessorConfig(); + assertThat(config.getTarget(), nullValue()); + assertThat(config.getExcludeNullEmptyValues(), equalTo(false)); + assertThat(config.getAppendIfTargetExists(), equalTo(false)); + assertThat(config.getWrapEntriesWhen(), nullValue()); + assertThat(config.getTagsOnFailure(), nullValue()); + } + + @Test + void getEffectiveTarget_returns_target_when_set() throws Exception { + final WrapEntriesProcessorConfig config = new WrapEntriesProcessorConfig(); + setField(config, "source", "/names"); + setField(config, "target", "/agents"); + assertThat(config.getEffectiveTarget(), equalTo("/agents")); + } + + @Test + void getEffectiveTarget_returns_source_when_target_is_null() throws Exception { + final WrapEntriesProcessorConfig config = new WrapEntriesProcessorConfig(); + setField(config, "source", "/names"); + assertThat(config.getEffectiveTarget(), equalTo("/names")); + } + + @Test + void validateExpressions_with_invalid_wrap_entries_when_throws_InvalidPluginConfigurationException() throws Exception { + final WrapEntriesProcessorConfig config = new WrapEntriesProcessorConfig(); + final String condition = UUID.randomUUID().toString(); + final ExpressionEvaluator expressionEvaluator = mock(ExpressionEvaluator.class); + setField(config, "wrapEntriesWhen", condition); + when(expressionEvaluator.isValidExpressionStatement(condition)).thenReturn(false); + + assertThrows(InvalidPluginConfigurationException.class, () -> config.validateExpressions(expressionEvaluator)); + } + + @Test + void validateExpressions_with_empty_target_throws_InvalidPluginConfigurationException() throws Exception { + final WrapEntriesProcessorConfig config = new WrapEntriesProcessorConfig(); + final ExpressionEvaluator expressionEvaluator = mock(ExpressionEvaluator.class); + setField(config, "target", ""); + + assertThrows(InvalidPluginConfigurationException.class, () -> config.validateExpressions(expressionEvaluator)); + } + + private void setField(final Object obj, final String fieldName, final Object value) throws Exception { + final Field field = obj.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + field.set(obj, value); + } +} diff --git a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/WrapEntriesProcessorIT.java b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/WrapEntriesProcessorIT.java new file mode 100644 index 0000000000..0a64507ea4 --- /dev/null +++ b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/WrapEntriesProcessorIT.java @@ -0,0 +1,55 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.processor.mutateevent; + +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.processor.Processor; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.test.plugins.DataPrepperPluginTest; +import org.opensearch.dataprepper.test.plugins.PluginConfigurationFile; +import org.opensearch.dataprepper.test.plugins.junit.BaseDataPrepperPluginStandardTestSuite; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +@DataPrepperPluginTest(pluginName = "wrap_entries", pluginType = Processor.class) +class WrapEntriesProcessorIT extends BaseDataPrepperPluginStandardTestSuite { + + @Test + void doExecute_with_wrap_entries_when_expression_filters_records( + @PluginConfigurationFile("wrap_entries_when_filters_records.yaml") + final Processor, Record> objectUnderTest) { + final Record matchingRecord = new Record<>(JacksonEvent.builder() + .withEventType("event") + .withData(Map.of("names", Arrays.asList("alpha", "beta"), "type", "users")) + .build()); + + final Record nonMatchingRecord = new Record<>(JacksonEvent.builder() + .withEventType("event") + .withData(Map.of("names", Arrays.asList("gamma"), "type", "other")) + .build()); + + final Collection> result = objectUnderTest.execute(Arrays.asList(matchingRecord, nonMatchingRecord)); + final List> outputRecords = (List>) result; + + final List> matchedOutput = outputRecords.get(0).getData().get("/names", List.class); + assertThat(matchedOutput, equalTo(Arrays.asList(Map.of("name", "alpha"), Map.of("name", "beta")))); + + final List unmatchedOutput = outputRecords.get(1).getData().get("/names", List.class); + assertThat(unmatchedOutput, equalTo(Arrays.asList("gamma"))); + } +} diff --git a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/WrapEntriesProcessorTest.java b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/WrapEntriesProcessorTest.java new file mode 100644 index 0000000000..7a67bd51da --- /dev/null +++ b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/WrapEntriesProcessorTest.java @@ -0,0 +1,327 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.processor.mutateevent; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.record.Record; + +import java.util.Arrays; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class WrapEntriesProcessorTest { + + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private WrapEntriesProcessorConfig config; + + @Mock + private ExpressionEvaluator expressionEvaluator; + + @BeforeEach + void setUp() { + lenient().when(config.getSource()).thenReturn("/names"); + lenient().when(config.getKey()).thenReturn("name"); + lenient().when(config.getEffectiveTarget()).thenReturn("/names"); + lenient().when(config.getExcludeNullEmptyValues()).thenReturn(false); + lenient().when(config.getAppendIfTargetExists()).thenReturn(false); + lenient().when(config.getWrapEntriesWhen()).thenReturn(null); + lenient().when(config.getTagsOnFailure()).thenReturn(null); + } + + private WrapEntriesProcessor createObjectUnderTest() { + return new WrapEntriesProcessor(pluginMetrics, config, expressionEvaluator); + } + + private Record createEvent(final Map data) { + return new Record<>(JacksonEvent.builder().withEventType("event").withData(data).build()); + } + + @Test + void constructor_delegates_validation_to_config() { + createObjectUnderTest(); + + verify(config).validateExpressions(expressionEvaluator); + } + + + @Test + void doExecute_with_string_array_wraps_each_element_into_object_in_place() { + final Record record = createEvent(Map.of("names", Arrays.asList("alpha", "beta"))); + final List> result = (List>) createObjectUnderTest().doExecute(Collections.singletonList(record)); + + final List> output = result.get(0).getData().get("/names", List.class); + assertThat(output.size(), is(2)); + assertThat(output.get(0), equalTo(Map.of("name", "alpha"))); + assertThat(output.get(1), equalTo(Map.of("name", "beta"))); + } + + @Test + void doExecute_with_separate_target_writes_wrapped_objects_and_preserves_source() { + when(config.getEffectiveTarget()).thenReturn("/agents"); + + final Record record = createEvent(Map.of("names", Arrays.asList("alpha", "beta"))); + final List> result = (List>) createObjectUnderTest().doExecute(Collections.singletonList(record)); + + final Event event = result.get(0).getData(); + final List source = event.get("/names", List.class); + assertThat(source, equalTo(Arrays.asList("alpha", "beta"))); + final List> target = event.get("/agents", List.class); + assertThat(target.size(), is(2)); + assertThat(target.get(0), equalTo(Map.of("name", "alpha"))); + } + + @Test + void doExecute_with_mixed_types_wraps_each_element_preserving_original_type() { + final Record record = createEvent(Map.of("values", Arrays.asList("alpha", 42, true))); + when(config.getSource()).thenReturn("/values"); + when(config.getEffectiveTarget()).thenReturn("/values"); + + final List> result = (List>) createObjectUnderTest().doExecute(Collections.singletonList(record)); + + final List> output = result.get(0).getData().get("/values", List.class); + assertThat(output.size(), is(3)); + assertThat(output.get(0), equalTo(Map.of("name", "alpha"))); + assertThat(output.get(1), equalTo(Map.of("name", 42))); + assertThat(output.get(2), equalTo(Map.of("name", true))); + } + + @Test + void doExecute_with_null_elements_wraps_nulls_into_objects_by_default() { + final List input = new ArrayList<>(); + input.add("alpha"); + input.add(null); + input.add("beta"); + final Record record = createEvent(Map.of("names", input)); + + final List> result = (List>) createObjectUnderTest().doExecute(Collections.singletonList(record)); + + final List> output = result.get(0).getData().get("/names", List.class); + assertThat(output.size(), is(3)); + assertThat(output.get(1).get("name"), equalTo(null)); + } + + @Test + void doExecute_with_exclude_enabled_filters_out_null_and_empty_elements() { + when(config.getExcludeNullEmptyValues()).thenReturn(true); + + final List input = new ArrayList<>(); + input.add("alpha"); + input.add(null); + input.add(""); + input.add("beta"); + final Record record = createEvent(Map.of("names", input)); + + final List> result = (List>) createObjectUnderTest().doExecute(Collections.singletonList(record)); + + final List> output = result.get(0).getData().get("/names", List.class); + assertThat(output.size(), is(2)); + assertThat(output.get(0), equalTo(Map.of("name", "alpha"))); + assertThat(output.get(1), equalTo(Map.of("name", "beta"))); + } + + @Test + void doExecute_with_all_null_empty_and_exclude_enabled_writes_empty_list() { + when(config.getExcludeNullEmptyValues()).thenReturn(true); + + final List input = new ArrayList<>(); + input.add(null); + input.add(""); + final Record record = createEvent(Map.of("names", input)); + + final List> result = (List>) createObjectUnderTest().doExecute(Collections.singletonList(record)); + + final Object output = result.get(0).getData().get("/names", Object.class); + assertThat(output instanceof List, is(true)); + assertThat(((List) output).size(), is(0)); + } + + @Test + void doExecute_with_missing_source_key_skips_event_and_leaves_it_unchanged() { + final Record record = createEvent(Map.of("other", "value")); + + final List> result = (List>) createObjectUnderTest().doExecute(Collections.singletonList(record)); + + assertThat(result.get(0).getData().containsKey("names"), is(false)); + } + + @Test + void doExecute_with_non_list_source_skips_event_and_preserves_original_value() { + final Record record = createEvent(Map.of("names", "not-a-list")); + + final List> result = (List>) createObjectUnderTest().doExecute(Collections.singletonList(record)); + + assertThat(result.get(0).getData().get("/names", String.class), equalTo("not-a-list")); + } + + @Test + void doExecute_with_empty_source_list_leaves_empty_list_unchanged() { + final Record record = createEvent(Map.of("names", Collections.emptyList())); + + final List> result = (List>) createObjectUnderTest().doExecute(Collections.singletonList(record)); + + final List output = result.get(0).getData().get("/names", List.class); + assertThat(output.isEmpty(), is(true)); + } + + @Test + void doExecute_with_append_enabled_merges_new_entries_into_existing_target_list() { + when(config.getEffectiveTarget()).thenReturn("/result"); + when(config.getAppendIfTargetExists()).thenReturn(true); + + final Map data = new java.util.HashMap<>(); + data.put("names", Arrays.asList("alpha")); + data.put("result", new ArrayList<>(Arrays.asList(Map.of("name", "existing")))); + final Record record = createEvent(data); + + final List> result = (List>) createObjectUnderTest().doExecute(Collections.singletonList(record)); + + final List> output = result.get(0).getData().get("/result", List.class); + assertThat(output.size(), is(2)); + assertThat(output.get(0), equalTo(Map.of("name", "existing"))); + assertThat(output.get(1), equalTo(Map.of("name", "alpha"))); + } + + @Test + void doExecute_with_append_enabled_and_missing_target_creates_new_target_list() { + when(config.getEffectiveTarget()).thenReturn("/result"); + when(config.getAppendIfTargetExists()).thenReturn(true); + + final Record record = createEvent(Map.of("names", Arrays.asList("alpha"))); + + final List> result = (List>) createObjectUnderTest().doExecute(Collections.singletonList(record)); + + final List> output = result.get(0).getData().get("/result", List.class); + assertThat(output.size(), is(1)); + assertThat(output.get(0), equalTo(Map.of("name", "alpha"))); + } + + @Test + void doExecute_with_append_enabled_and_non_list_target_skips_and_preserves_target() { + when(config.getEffectiveTarget()).thenReturn("/result"); + when(config.getAppendIfTargetExists()).thenReturn(true); + + final Map data = new java.util.HashMap<>(); + data.put("names", Arrays.asList("alpha")); + data.put("result", "not-a-list"); + final Record record = createEvent(data); + + final List> result = (List>) createObjectUnderTest().doExecute(Collections.singletonList(record)); + + assertThat(result.get(0).getData().get("/result", String.class), equalTo("not-a-list")); + } + + @Test + void doExecute_with_existing_target_overwrites_with_wrapped_objects_by_default() { + when(config.getEffectiveTarget()).thenReturn("/result"); + + final Map data = new java.util.HashMap<>(); + data.put("names", Arrays.asList("alpha")); + data.put("result", Arrays.asList("old")); + final Record record = createEvent(data); + + final List> result = (List>) createObjectUnderTest().doExecute(Collections.singletonList(record)); + + final List> output = result.get(0).getData().get("/result", List.class); + assertThat(output.size(), is(1)); + assertThat(output.get(0), equalTo(Map.of("name", "alpha"))); + } + + @Test + void doExecute_with_multiple_records_wraps_only_matching_and_leaves_non_matching_unchanged() { + final String condition = "/type == \"users\""; + when(config.getWrapEntriesWhen()).thenReturn(condition); + + final Record matchingRecord = createEvent(new java.util.HashMap<>(Map.of( + "names", Arrays.asList("alpha", "beta"), "type", "users"))); + final Record nonMatchingRecord = createEvent(new java.util.HashMap<>(Map.of( + "names", Arrays.asList("gamma"), "type", "other"))); + + when(expressionEvaluator.evaluateConditional(condition, matchingRecord.getData())).thenReturn(true); + when(expressionEvaluator.evaluateConditional(condition, nonMatchingRecord.getData())).thenReturn(false); + + final List> result = (List>) createObjectUnderTest() + .doExecute(Arrays.asList(matchingRecord, nonMatchingRecord)); + + final List> matchedOutput = result.get(0).getData().get("/names", List.class); + assertThat(matchedOutput.size(), is(2)); + assertThat(matchedOutput.get(0), equalTo(Map.of("name", "alpha"))); + + final List unmatchedOutput = result.get(1).getData().get("/names", List.class); + assertThat(unmatchedOutput, equalTo(Arrays.asList("gamma"))); + } + + @Test + void doExecute_with_map_elements_wraps_each_map_into_object() { + final Record record = createEvent(Map.of("names", + Arrays.asList(Map.of("name", "alpha"), Map.of("name", "beta")))); + + final List> result = (List>) createObjectUnderTest().doExecute(Collections.singletonList(record)); + + final List> output = result.get(0).getData().get("/names", List.class); + assertThat(output.size(), is(2)); + assertThat(output.get(0), equalTo(Map.of("name", Map.of("name", "alpha")))); + assertThat(output.get(1), equalTo(Map.of("name", Map.of("name", "beta")))); + } + + @Test + void doExecute_with_list_elements_wraps_each_list_into_object() { + final Record record = createEvent(Map.of("names", + Arrays.asList(Arrays.asList("alpha1", "beta1"), Arrays.asList("alpha2", "beta2")))); + + final List> result = (List>) createObjectUnderTest().doExecute(Collections.singletonList(record)); + + final List> output = result.get(0).getData().get("/names", List.class); + assertThat(output.size(), is(2)); + assertThat(output.get(0), equalTo(Map.of("name", Arrays.asList("alpha1", "beta1")))); + assertThat(output.get(1), equalTo(Map.of("name", Arrays.asList("alpha2", "beta2")))); + } + + @Test + void doExecute_with_multiple_processors_wraps_outer_and_inner_lists() { + when(config.getSource()).thenReturn("/names", "/names/0/name", "/names/1/name"); + when(config.getEffectiveTarget()).thenReturn("/names", "/names/0/name", "/names/1/name"); + + final Record record = createEvent(Map.of("names", + Arrays.asList(Arrays.asList("alpha1", "beta1"), Arrays.asList("alpha2", "beta2")))); + + final WrapEntriesProcessor objectUnderTest = createObjectUnderTest(); + + final List> firstPassResult = (List>) objectUnderTest.doExecute(Collections.singletonList(record)); + final List> secondPassResult = (List>) objectUnderTest.doExecute(firstPassResult); + final List> thirdPassResult = (List>) objectUnderTest.doExecute(secondPassResult); + + final List> output = thirdPassResult.get(0).getData().get("/names", List.class); + assertThat(output.size(), is(2)); + assertThat(output.get(0), equalTo(Map.of("name", Arrays.asList(Map.of("name", "alpha1"), Map.of("name", "beta1"))))); + assertThat(output.get(1), equalTo(Map.of("name", Arrays.asList(Map.of("name", "alpha2"), Map.of("name", "beta2"))))); + } +} diff --git a/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/wrap_entries_when_filters_records.yaml b/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/wrap_entries_when_filters_records.yaml new file mode 100644 index 0000000000..ebf7d8b6be --- /dev/null +++ b/data-prepper-plugins/mutate-event-processors/src/test/resources/org/opensearch/dataprepper/plugins/processor/mutateevent/wrap_entries_when_filters_records.yaml @@ -0,0 +1,18 @@ + +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + + +test-pipeline: + source: + unused: + processor: + - wrap_entries: + source: /names + key: name + wrap_entries_when: '/type == "users"' + sink: + - unused: