diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessor.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessor.java index 50d020ee58..efeeb8017c 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessor.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessor.java @@ -12,6 +12,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.event.exceptions.EventKeyNotFoundException; import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.processor.AbstractProcessor; @@ -47,6 +48,12 @@ public AddEntryProcessor(final PluginMetrics pluginMetrics, final AddEntryProces throw new InvalidPluginConfigurationException( String.format("add_when %s is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax", entry.getAddWhen())); } + + if (entry.getAddToElementWhen() != null + && !expressionEvaluator.isValidExpressionStatement(entry.getAddToElementWhen())) { + throw new InvalidPluginConfigurationException( + String.format("add_to_element_when %s is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax", entry.getAddWhen())); + } }); } @@ -65,38 +72,20 @@ public Collection> doExecute(final Collection> recor try { final String key = (entry.getKey() == null) ? null : recordEvent.formatString(entry.getKey(), expressionEvaluator); final String metadataKey = entry.getMetadataKey(); - Object value; - if (!Objects.isNull(entry.getValueExpression())) { - value = expressionEvaluator.evaluate(entry.getValueExpression(), recordEvent); - } else if (!Objects.isNull(entry.getFormat())) { - try { - value = recordEvent.formatString(entry.getFormat()); - } catch (final EventKeyNotFoundException e) { - value = null; - } - } else { - value = entry.getValue(); - } - if (!Objects.isNull(key)) { - if (!recordEvent.containsKey(key) || entry.getOverwriteIfKeyExists()) { - recordEvent.put(key, value); - } else if (recordEvent.containsKey(key) && entry.getAppendIfKeyExists()) { - mergeValueToEvent(recordEvent, key, value); - } - } else { - Map attributes = recordEvent.getMetadata().getAttributes(); - if (!attributes.containsKey(metadataKey) || entry.getOverwriteIfKeyExists()) { - recordEvent.getMetadata().setAttribute(metadataKey, value); - } else if (attributes.containsKey(metadataKey) && entry.getAppendIfKeyExists()) { - mergeValueToEventMetadata(recordEvent, metadataKey, value); - } + final String iterateOn = entry.getIterateOn(); + if (Objects.isNull(iterateOn)) { + handleWithoutIterateOn(entry, recordEvent, key, metadataKey); + } else if (!Objects.isNull(key)) { + handleWithIterateOn(entry, recordEvent, iterateOn, key); } } catch (Exception e) { LOG.atError() .addMarker(EVENT) .addMarker(NOISY) - .setMessage("Error adding entry to record [{}] with key [{}], metadataKey [{}], value_expression [{}] format [{}], value [{}]") + .setMessage("Error adding entry to record [{}] with iterate_on [{}], add_to_element_when [{}], key [{}], metadataKey [{}], value_expression [{}] format [{}], value [{}]") .addArgument(recordEvent) + .addArgument(entry.getIterateOn()) + .addArgument(entry.getAddToElementWhen()) .addArgument(entry.getKey()) .addArgument(entry.getMetadataKey()) .addArgument(entry.getValueExpression()) @@ -132,6 +121,71 @@ public boolean isReadyForShutdown() { public void shutdown() { } + private void handleWithoutIterateOn(final AddEntryProcessorConfig.Entry entry, + final Event recordEvent, + final String key, + final String metadataKey) { + final Object value = retrieveValue(entry, recordEvent); + if (!Objects.isNull(key)) { + if (!recordEvent.containsKey(key) || entry.getOverwriteIfKeyExists()) { + recordEvent.put(key, value); + } else if (recordEvent.containsKey(key) && entry.getAppendIfKeyExists()) { + mergeValueToEvent(recordEvent, key, value); + } + } else { + Map attributes = recordEvent.getMetadata().getAttributes(); + if (!attributes.containsKey(metadataKey) || entry.getOverwriteIfKeyExists()) { + recordEvent.getMetadata().setAttribute(metadataKey, value); + } else if (attributes.containsKey(metadataKey) && entry.getAppendIfKeyExists()) { + mergeValueToEventMetadata(recordEvent, metadataKey, value); + } + } + } + + private void handleWithIterateOn(final AddEntryProcessorConfig.Entry entry, + final Event recordEvent, + final String iterateOn, + final String key) { + final List> iterateOnList = recordEvent.get(iterateOn, List.class); + if (iterateOnList != null) { + for (final Map item : iterateOnList) { + final Object value; + final Event context = JacksonEvent.builder() + .withEventMetadata(recordEvent.getMetadata()) + .withData(item) + .build(); + if (entry.getAddToElementWhen() != null && !expressionEvaluator.evaluateConditional(entry.getAddToElementWhen(), recordEvent)) { + continue; + } + + value = retrieveValue(entry, context); + if (!item.containsKey(key) || entry.getOverwriteIfKeyExists()) { + item.put(key, value); + } else if (item.containsKey(key) && entry.getAppendIfKeyExists()) { + mergeValueToMap(item, key, value); + } + } + recordEvent.put(iterateOn, iterateOnList); + } + } + + private Object retrieveValue(final AddEntryProcessorConfig.Entry entry, + final Event context) { + Object value; + if (!Objects.isNull(entry.getValueExpression())) { + value = expressionEvaluator.evaluate(entry.getValueExpression(), context); + } else if (!Objects.isNull(entry.getFormat())) { + try { + value = context.formatString(entry.getFormat()); + } catch (final EventKeyNotFoundException e) { + value = null; + } + } else { + value = entry.getValue(); + } + return value; + } + private void mergeValueToEvent(final Event recordEvent, final String key, final Object value) { mergeValue(value, () -> recordEvent.get(key, Object.class), newValue -> recordEvent.put(key, newValue)); } @@ -140,6 +194,10 @@ private void mergeValueToEventMetadata(final Event recordEvent, final String key mergeValue(value, () -> recordEvent.getMetadata().getAttribute(key), newValue -> recordEvent.getMetadata().setAttribute(key, newValue)); } + private void mergeValueToMap(final Map item, final String key, final Object value) { + mergeValue(value, () -> item.get(key), newValue -> item.put(key, newValue)); + } + private void mergeValue(final Object value, Supplier getter, Consumer setter) { final Object currentValue = getter.get(); final List mergedValue = new ArrayList<>(); diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessorConfig.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessorConfig.java index cf46fc13ae..966b0706ac 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessorConfig.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessorConfig.java @@ -19,6 +19,7 @@ import org.opensearch.dataprepper.model.annotations.ConditionalRequired.SchemaProperty; import org.opensearch.dataprepper.model.annotations.ExampleValues; import org.opensearch.dataprepper.model.annotations.ExampleValues.Example; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import java.util.List; import java.util.stream.Stream; @@ -64,6 +65,15 @@ public class AddEntryProcessorConfig { @JsonPropertyOrder public static class Entry { + @JsonProperty("iterate_on") + @JsonPropertyDescription( + "Specifies the key of the list of object to iterate over and add entry into.") + private String iterateOn; + + @JsonPropertyDescription("Specifies the condition for when to add the key to each element of a list when using iterate_on.") + @JsonProperty("add_to_element_when") + private String addToElementWhen; + @JsonPropertyDescription("The key of the new entry to be added. Some examples of keys include my_key, " + "myKey, and object/sub_Key. The key can also be a format expression, for example, ${/key1} to " + "use the value of field key1 as the key. Exactly one of key or metadata_key is required.") @@ -155,6 +165,12 @@ public static class Entry { }) private String addWhen; + public String getIterateOn() { + return iterateOn; + } + + public String getAddToElementWhen() { return addToElementWhen; } + public String getKey() { return key; } @@ -202,7 +218,9 @@ public Entry(final String key, final String valueExpression, final boolean overwriteIfKeyExists, final boolean appendIfKeyExists, - final String addWhen) + final String addWhen, + final String iterateOn, + final String addToElementWhen) { if (key != null && metadataKey != null) { throw new IllegalArgumentException("Only one of the two - key and metadatakey - should be specified"); @@ -210,6 +228,14 @@ public Entry(final String key, if (key == null && metadataKey == null) { throw new IllegalArgumentException("At least one of the two - key and metadatakey - must be specified"); } + if (metadataKey != null && iterateOn != null) { + throw new IllegalArgumentException("iterate_on cannot be applied to metadata"); + } + + if (iterateOn == null && addToElementWhen != null) { + throw new InvalidPluginConfigurationException("add_to_element_when only applies when iterate_on is configured."); + } + this.key = key; this.metadataKey = metadataKey; this.value = value; @@ -218,6 +244,8 @@ public Entry(final String key, this.overwriteIfKeyExists = overwriteIfKeyExists; this.appendIfKeyExists = appendIfKeyExists; this.addWhen = addWhen; + this.iterateOn = iterateOn; + this.addToElementWhen = addToElementWhen; } public Entry() { diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/DeleteEntryProcessor.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/DeleteEntryProcessor.java index bc9c7ebb97..c068aa1962 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/DeleteEntryProcessor.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/DeleteEntryProcessor.java @@ -13,6 +13,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventKey; +import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.processor.AbstractProcessor; import org.opensearch.dataprepper.model.processor.Processor; @@ -22,6 +23,7 @@ import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Objects; @DataPrepperPlugin(name = "delete_entries", pluginType = Processor.class, pluginConfigurationType = DeleteEntryProcessorConfig.class) @@ -33,11 +35,13 @@ public class DeleteEntryProcessor extends AbstractProcessor, Recor private final List entries; private final ExpressionEvaluator expressionEvaluator; + private final DeleteEntryProcessorConfig deleteEntryProcessorConfig; @DataPrepperPluginConstructor public DeleteEntryProcessor(final PluginMetrics pluginMetrics, final DeleteEntryProcessorConfig config, final ExpressionEvaluator expressionEvaluator) { super(pluginMetrics); this.withKeys = config.getWithKeys(); + this.deleteEntryProcessorConfig = config; this.deleteWhen = config.getDeleteWhen(); this.expressionEvaluator = expressionEvaluator; @@ -49,7 +53,7 @@ public DeleteEntryProcessor(final PluginMetrics pluginMetrics, final DeleteEntry } if (this.withKeys != null && !this.withKeys.isEmpty()) { - DeleteEntryProcessorConfig.Entry entry = new DeleteEntryProcessorConfig.Entry(this.withKeys, this.deleteWhen); + DeleteEntryProcessorConfig.Entry entry = new DeleteEntryProcessorConfig.Entry(this.withKeys, this.deleteWhen, config.getIterateOn(), config.getDeleteFromElementWhen()); this.entries = List.of(entry); } else { this.entries = config.getEntries(); @@ -63,23 +67,39 @@ public DeleteEntryProcessor(final PluginMetrics pluginMetrics, final DeleteEntry ".org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax", entry.getDeleteWhen())); } + + if (entry.getIterateOn() == null && entry.getDeleteFromElementWhen() != null) { + throw new InvalidPluginConfigurationException("delete_from_element_when only applies when iterate_on is configured."); + } + + if (entry.getDeleteFromElementWhen() != null + && !expressionEvaluator.isValidExpressionStatement(entry.getDeleteFromElementWhen())) { + throw new InvalidPluginConfigurationException( + String.format("delete_from_element_when %s is not a valid expression statement. See https://opensearch" + + ".org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax", + entry.getDeleteFromElementWhen())); + } }); } @Override public Collection> doExecute(final Collection> records) { - for(final Record record : records) { + for (final Record record : records) { final Event recordEvent = record.getData(); - try { for (final DeleteEntryProcessorConfig.Entry entry : entries) { - if (Objects.nonNull(entry.getDeleteWhen()) && !expressionEvaluator.evaluateConditional(entry.getDeleteWhen(), - recordEvent)) { + if (Objects.nonNull(entry.getDeleteWhen()) && !expressionEvaluator.evaluateConditional(entry.getDeleteWhen(), recordEvent)) { continue; } - for (final EventKey key : entry.getWithKeys()) { - recordEvent.delete(key); + final String iterateOn = deleteEntryProcessorConfig.getIterateOn(); + if (Objects.isNull(iterateOn)) { + + for (final EventKey entryKey : entry.getWithKeys()) { + recordEvent.delete(entryKey); + } + } else { + handleForIterateOn(recordEvent, entry, iterateOn); } } } catch (final Exception e) { @@ -92,7 +112,6 @@ public Collection> doExecute(final Collection> recor .log(); } } - return records; } @@ -108,4 +127,29 @@ public boolean isReadyForShutdown() { @Override public void shutdown() { } + + private void handleForIterateOn(final Event recordEvent, + final DeleteEntryProcessorConfig.Entry entry, + final String iterateOn) { + final List> iterateOnList = recordEvent.get(iterateOn, List.class); + if (iterateOnList != null) { + for (int i = 0; i < iterateOnList.size(); i++) { + final Map item = iterateOnList.get(i); + final Event context = JacksonEvent.builder() + .withEventMetadata(recordEvent.getMetadata()) + .withData(item) + .build(); + if (entry.getDeleteFromElementWhen() != null && + !expressionEvaluator.evaluateConditional(entry.getDeleteFromElementWhen(), context)) { + continue; + } + + for (final EventKey entryKey : entry.getWithKeys()) { + context.delete(entryKey); + } + iterateOnList.set(i, context.toMap()); + } + recordEvent.put(iterateOn, iterateOnList); + } + } } diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/DeleteEntryProcessorConfig.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/DeleteEntryProcessorConfig.java index 540c1f61ee..a44b21d057 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/DeleteEntryProcessorConfig.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/DeleteEntryProcessorConfig.java @@ -58,6 +58,15 @@ public static class Entry { }) private String deleteWhen; + @JsonPropertyDescription("Specifies the condition for when to delete the key from each element of a list when using iterate_on.") + @JsonProperty("delete_from_element_when") + private String deleteFromElementWhen; + + @JsonPropertyDescription( + "Specifies the key of the list of object to iterate over and delete the keys specified in with_keys.") + @JsonProperty("iterate_on") + private String iterateOn; + public List getWithKeys() { return withKeys; } @@ -66,9 +75,20 @@ public String getDeleteWhen() { return deleteWhen; } - public Entry(final List withKeys, final String deleteWhen) { + public String getIterateOn() { return iterateOn; } + + public String getDeleteFromElementWhen() { + return deleteFromElementWhen; + } + + public Entry(final List withKeys, + final String deleteWhen, + final String iterateOn, + final String deleteFromElementWhen) { this.withKeys = withKeys; this.deleteWhen = deleteWhen; + this.deleteFromElementWhen = deleteFromElementWhen; + this.iterateOn = iterateOn; } public Entry() { @@ -99,6 +119,14 @@ boolean isConfigurationPresent() { boolean hasBothConfigurations() { return entries != null && withKeys != null; } + @JsonPropertyDescription( + "Specifies the key of the list of object to iterate over and delete the keys specified in with_keys.") + @JsonProperty("iterate_on") + private String iterateOn; + + @JsonPropertyDescription("Specifies the condition for when to delete the key from each element of a list when using iterate_on.") + @JsonProperty("delete_from_element_when") + private String deleteFromElementWhen; public List getWithKeys() { return withKeys; @@ -111,4 +139,12 @@ public String getDeleteWhen() { public List getEntries() { return entries; } + + public String getIterateOn() { + return iterateOn; + } + + public String getDeleteFromElementWhen() { + return deleteFromElementWhen; + } } diff --git a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessorTests.java b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessorTests.java index 2a61d05241..88ada5e929 100644 --- a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessorTests.java +++ b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessorTests.java @@ -30,6 +30,8 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @@ -51,7 +53,7 @@ public class AddEntryProcessorTests { void invalid_add_when_throws_InvalidPluginConfigurationException() { final String addWhen = UUID.randomUUID().toString(); - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, 3, null, null, false, false,addWhen))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, 3, null, null, false, false, addWhen, null, null))); when(expressionEvaluator.isValidExpressionStatement(addWhen)).thenReturn(false); @@ -60,7 +62,7 @@ void invalid_add_when_throws_InvalidPluginConfigurationException() { @Test public void testSingleAddProcessorTests() { - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, 3, null, null, false, false,null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, 3, null, null, false, false,null, null, null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getEvent("thisisamessage"); @@ -72,10 +74,166 @@ public void testSingleAddProcessorTests() { assertThat(editedRecords.get(0).getData().get("newMessage", Object.class), equalTo(3)); } + @Test + public void testSingleEntryIterativeAddValue() { + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, 3, null, null, false, false,null, "message", null))); + + final AddEntryProcessor processor = createObjectUnderTest(); + final List> mapList = List.of(Collections.emptyMap()); + final Record record = getEvent(mapList); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("message"), is(true)); + assertThat(editedRecords.get(0).getData().get("message", List.class), + equalTo(List.of(Map.of("newMessage", 3)))); + } + + @Test + void invalid_add_to_element_when_throws_InvalidPluginConfigurationException() { + final String addToElementWhen = UUID.randomUUID().toString(); + + when(expressionEvaluator.isValidExpressionStatement(addToElementWhen)).thenReturn(false); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, 3, null, null, false, false,null, "message", addToElementWhen))); + + assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest); + } + + @Test + void using_add_to_element_when_without_iterate_on_throws_InvalidPluginConfigurationException() { + final String addToElementWhen = UUID.randomUUID().toString(); + + assertThrows(InvalidPluginConfigurationException.class, () -> createListOfEntries(createEntry("newMessage", null, 3, null, null, false, false,null, null, addToElementWhen))); + } + + @Test + void add_to_element_when_as_false_does_not_add() { + final String addToElementWhen = UUID.randomUUID().toString(); + + when(expressionEvaluator.isValidExpressionStatement(addToElementWhen)).thenReturn(true); + when(expressionEvaluator.evaluateConditional(eq(addToElementWhen), any(Event.class))).thenReturn(false); + + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, 3, null, null, false, false,null, "message", addToElementWhen))); + + final AddEntryProcessor processor = createObjectUnderTest(); + final List> mapList = List.of(Collections.emptyMap()); + final Record record = getEvent(mapList); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("message"), is(true)); + + assertThat(editedRecords.get(0).getData().get("message", List.class), + equalTo(List.of(Collections.emptyMap()))); + } + + @Test + void add_to_element_when_as_true_does_add() { + final String addToElementWhen = UUID.randomUUID().toString(); + + when(expressionEvaluator.isValidExpressionStatement(addToElementWhen)).thenReturn(true); + when(expressionEvaluator.evaluateConditional(eq(addToElementWhen), any(Event.class))).thenReturn(true); + + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, 3, null, null, false, false,null, "message", addToElementWhen))); + + final AddEntryProcessor processor = createObjectUnderTest(); + final List> mapList = List.of(Collections.emptyMap()); + final Record record = getEvent(mapList); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("message"), is(true)); + assertThat(editedRecords.get(0).getData().get("message", List.class), + equalTo(List.of(Map.of("newMessage", 3)))); + } + + @Test + public void testSingleEntryIterativeAddValue_key_exists() { + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, 3, null, null, false, false,null, "message", null))); + + final AddEntryProcessor processor = createObjectUnderTest(); + final List> mapList = List.of(Map.of("newMessage", 5)); + final Record record = getEvent(mapList); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("message"), is(true)); + assertThat(editedRecords.get(0).getData().get("message", List.class), + equalTo(List.of(Map.of("newMessage", 5)))); + } + + @Test + public void testSingleEntryIterativeAddValue_overwriteIfKeyExists() { + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, 3, null, null, true, false,null, "message", null))); + + final AddEntryProcessor processor = createObjectUnderTest(); + final List> mapList = List.of(Map.of("newMessage", 5)); + final Record record = getEvent(mapList); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("message"), is(true)); + assertThat(editedRecords.get(0).getData().get("message", List.class), + equalTo(List.of(Map.of("newMessage", 3)))); + } + + @Test + public void testSingleEntryIterativeAddValue_appendIfKeyExists() { + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, 3, null, null, false, true,null, "message", null))); + + final AddEntryProcessor processor = createObjectUnderTest(); + final List> mapList = List.of(Map.of("newMessage", 5)); + final Record record = getEvent(mapList); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("message"), is(true)); + assertThat(editedRecords.get(0).getData().get("message", List.class), + equalTo(List.of(Map.of("newMessage", List.of(5, 3))))); + } + + @Test + public void test_iterate_on_add_value_expression() { + String valueExpression = "/testKey"; + when(expressionEvaluator.evaluate(eq(valueExpression), any(Event.class))).thenAnswer(invocation -> { + Event eventArg = invocation.getArgument(1); + return eventArg.get("testKey", String.class); + }); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, 3, null, valueExpression, false, false,null, "message", null))); + + final AddEntryProcessor processor = createObjectUnderTest(); + final List> mapList = List.of(Map.of("testKey", "testValue")); + final Map data = Map.of("message", mapList); + final Record record = buildRecordWithEvent(data); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("message"), is(true)); + assertThat(editedRecords.get(0).getData().get("message", List.class), + equalTo(List.of(Map.of( + "newMessage", "testValue", + "testKey", "testValue" + )))); + } + + @Test + public void test_iterate_on_add_format() { + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, 3, TEST_FORMAT, null, false, false,null, "message", null))); + + final AddEntryProcessor processor = createObjectUnderTest(); + final List> mapList = List.of(Map.of( + "date", "date-value", + "time", "time-value" + )); + final Map data = Map.of("message", mapList); + final Record record = buildRecordWithEvent(data); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("message"), is(true)); + assertThat(editedRecords.get(0).getData().get("message", List.class), + equalTo(List.of(Map.of( + "date", "date-value", + "time", "time-value", + "newMessage", "date-value time-value")))); + } + @Test public void testMultiAddProcessorTests() { - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, 3, null, null, false, false,null), - createEntry("message2", null, 4, null, null, false, false,null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, 3, null, null, false, false,null, null, null), + createEntry("message2", null, 4, null, null, false, false,null, null, null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getEvent("thisisamessage"); @@ -91,7 +249,7 @@ public void testMultiAddProcessorTests() { @Test public void testSingleNoOverwriteAddProcessorTests() { - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, 3, null, null, false, false,null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, 3, null, null, false, false,null, null, null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getEvent("thisisamessage"); @@ -106,7 +264,7 @@ public void testSingleNoOverwriteAddProcessorTests() { @Test public void testSingleOverwriteAddProcessorTests() { - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, 3, null, null, true, false,null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, 3, null, null, true, false,null, null, null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getEvent("thisisamessage"); @@ -121,8 +279,8 @@ public void testSingleOverwriteAddProcessorTests() { @Test public void testMultiOverwriteMixedAddProcessorTests() { - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, 3, null, null, true, false,null), - createEntry("message", null, 4, null, null, false, false,null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, 3, null, null, true, false,null, null, null), + createEntry("message", null, 4, null, null, false, false,null, null, null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getEvent("thisisamessage"); @@ -137,7 +295,7 @@ public void testMultiOverwriteMixedAddProcessorTests() { @Test public void testAppendValueToExistingSimpleField() { - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", null, 3, null, null, false, true,null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", null, 3, null, null, false, true,null, null, null))); final AddEntryProcessor processor = createObjectUnderTest(); final String currentValue = "old_message"; @@ -150,7 +308,7 @@ public void testAppendValueToExistingSimpleField() { @Test public void testAppendValueToExistingListField() { - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", null, 3, null, null, false, true,null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", null, 3, null, null, false, true,null, null, null))); final AddEntryProcessor processor = createObjectUnderTest(); final List listValue = new ArrayList<>(); @@ -165,7 +323,7 @@ public void testAppendValueToExistingListField() { @Test public void testIntAddProcessorTests() { - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, 3, null, null, false, false,null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, 3, null, null, false, false,null, null, null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getEvent("thisisamessage"); @@ -179,7 +337,7 @@ public void testIntAddProcessorTests() { @Test public void testBoolAddProcessorTests() { - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, true, null, null, false, false,null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, true, null, null, false, false,null, null, null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getEvent("thisisamessage"); @@ -193,7 +351,7 @@ public void testBoolAddProcessorTests() { @Test public void testStringAddProcessorTests() { - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, "string", null, null, false, false,null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, "string", null, null, false, false,null, null, null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getEvent("thisisamessage"); @@ -207,7 +365,7 @@ public void testStringAddProcessorTests() { @Test public void testNullAddProcessorTests() { - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, null, null, null, false, false,null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, null, null, null, false, false,null, null, null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getEvent("thisisamessage"); @@ -233,7 +391,7 @@ public boolean equals(Object o) { public void testNestedAddProcessorTests() { TestObject obj = new TestObject(); obj.a = "test"; - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, obj, null, null, false, false,null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, obj, null, null, false, false,null, null, null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getEvent("thisisamessage"); @@ -248,7 +406,7 @@ public void testNestedAddProcessorTests() { @Test public void testArrayAddProcessorTests() { Object[] array = new Object[] { 1, 1.2, "string", true, null }; - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, array, null, null, false, false,null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, array, null, null, false, false,null, null, null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getEvent("thisisamessage"); @@ -263,7 +421,7 @@ public void testArrayAddProcessorTests() { @Test public void testFloatAddProcessorTests() { when(mockConfig.getEntries()) - .thenReturn(createListOfEntries(createEntry("newMessage", null, 1.2, null, null, false, false,null))); + .thenReturn(createListOfEntries(createEntry("newMessage", null, 1.2, null, null, false, false,null, null, null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getEvent("thisisamessage"); @@ -278,7 +436,7 @@ public void testFloatAddProcessorTests() { @Test public void testAddSingleFormatEntry() { when(mockConfig.getEntries()) - .thenReturn(createListOfEntries(createEntry("date-time", null, null, TEST_FORMAT, null, false, false,null))); + .thenReturn(createListOfEntries(createEntry("date-time", null, null, TEST_FORMAT, null, false, false,null, null, null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getTestEventWithMultipleFields(); @@ -293,8 +451,8 @@ public void testAddSingleFormatEntry() { @Test public void testAddMultipleFormatEntries() { when(mockConfig.getEntries()) - .thenReturn(createListOfEntries(createEntry("date-time", null, null, TEST_FORMAT, null, false, false,null), - createEntry("date-time2", null, null, ANOTHER_TEST_FORMAT, null, false, false,null))); + .thenReturn(createListOfEntries(createEntry("date-time", null, null, TEST_FORMAT, null, false, false,null, null, null), + createEntry("date-time2", null, null, ANOTHER_TEST_FORMAT, null, false, false,null, null, null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getTestEventWithMultipleFields(); @@ -311,7 +469,7 @@ public void testAddMultipleFormatEntries() { public void testFormatOverwritesExistingEntry() { when(mockConfig.getEntries()) .thenReturn( - createListOfEntries(createEntry("time", null, null, TEST_FORMAT, null, true, false,null))); + createListOfEntries(createEntry("time", null, null, TEST_FORMAT, null, true, false,null, null, null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getTestEventWithMultipleFields(); @@ -326,7 +484,7 @@ public void testFormatOverwritesExistingEntry() { public void testFormatNotOverwriteExistingEntry() { when(mockConfig.getEntries()) .thenReturn( - createListOfEntries(createEntry("time", null, null, TEST_FORMAT, null, false, false,null))); + createListOfEntries(createEntry("time", null, null, TEST_FORMAT, null, false, false,null, null, null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getTestEventWithMultipleFields(); @@ -340,7 +498,7 @@ public void testFormatNotOverwriteExistingEntry() { @Test public void testAppendFormatValueToExistingSimpleField() { - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("time", null, 3, TEST_FORMAT, null, false, true,null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("time", null, 3, TEST_FORMAT, null, false, true,null, null, null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getTestEventWithMultipleFields(); @@ -353,7 +511,7 @@ public void testAppendFormatValueToExistingSimpleField() { @Test public void testAppendFormatValueToExistingListField() { - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("date-time", null, 3, TEST_FORMAT, null, false, true,null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("date-time", null, 3, TEST_FORMAT, null, false, true,null, null, null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getTestEventWithMultipleFields(); @@ -371,7 +529,7 @@ public void testAppendFormatValueToExistingListField() { public void testFormatPrecedesValue() { when(mockConfig.getEntries()) .thenReturn( - createListOfEntries(createEntry("date-time", null, "date-time-value", TEST_FORMAT, null, false, false,null))); + createListOfEntries(createEntry("date-time", null, "date-time-value", TEST_FORMAT, null, false, false,null, null, null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getTestEventWithMultipleFields(); @@ -386,7 +544,7 @@ public void testFormatPrecedesValue() { @Test public void testFormatVariousDataTypes() { when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry( - "newField", null, null, "${number-key}-${boolean-key}-${string-key}", null, false, false, null))); + "newField", null, null, "${number-key}-${boolean-key}-${string-key}", null, false, false, null, null, null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getTestEventWithMultipleDataTypes(); @@ -398,7 +556,7 @@ public void testFormatVariousDataTypes() { @Test public void testBadFormatThenEntryNotAdded() { - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("data-time", null, null, BAD_TEST_FORMAT, null, false, false,null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("data-time", null, null, BAD_TEST_FORMAT, null, false, false,null, null, null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getTestEventWithMultipleFields(); @@ -412,7 +570,7 @@ public void testBadFormatThenEntryNotAdded() { @Test public void testMetadataKeySetWithBadFormatThenEntryNotAdded() { - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry(null,"data-time", null, BAD_TEST_FORMAT, null, false, false,null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry(null,"data-time", null, BAD_TEST_FORMAT, null, false, false,null, null, null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getEventWithMetadata("message", Map.of("date", "date-value", "time", "time-value")); @@ -428,7 +586,7 @@ public void testMetadataKeySetWithBadFormatThenEntryNotAdded() { public void testKeyIsNotAdded_when_addWhen_condition_is_false() { final String addWhen = UUID.randomUUID().toString(); - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, 3, null, null, false, false,addWhen))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, 3, null, null, false, false, addWhen, null, null))); when(expressionEvaluator.isValidExpressionStatement(addWhen)).thenReturn(true); final AddEntryProcessor processor = createObjectUnderTest(); @@ -446,7 +604,7 @@ public void testKeyIsNotAdded_when_addWhen_condition_is_false() { public void testMetadataKeyIsNotAdded_when_addWhen_condition_is_false() { final String addWhen = UUID.randomUUID().toString(); - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry(null, "newMessage", 3, null, null, false, false,addWhen))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry(null, "newMessage", 3, null, null, false, false, addWhen, null, null))); when(expressionEvaluator.isValidExpressionStatement(addWhen)).thenReturn(true); final AddEntryProcessor processor = createObjectUnderTest(); @@ -465,9 +623,9 @@ public void testMetadataKeyIsNotAdded_when_addWhen_condition_is_false() { @Test public void testMetadataKeySetWithDifferentDataTypes() { when(mockConfig.getEntries()).thenReturn(createListOfEntries( - createEntry(null, "newField", "newValue", null, null, false, false,null), - createEntry(null, "newIntField", 123, null, null, false, false,null), - createEntry(null, "newBooleanField", true, null, null, false, false,null) + createEntry(null, "newField", "newValue", null, null, false, false,null, null, null), + createEntry(null, "newIntField", 123, null, null, false, false,null, null, null), + createEntry(null, "newBooleanField", true, null, null, false, false,null, null, null) )); final AddEntryProcessor processor = createObjectUnderTest(); @@ -484,7 +642,7 @@ public void testMetadataKeySetWithDifferentDataTypes() { public void testMetadataKeySetWithFormatNotOverwriteExistingEntry() { when(mockConfig.getEntries()) .thenReturn( - createListOfEntries(createEntry(null, "time", null, TEST_FORMAT, null, false, false,null))); + createListOfEntries(createEntry(null, "time", null, TEST_FORMAT, null, false, false,null, null, null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getEventWithMetadata("message", Map.of("date", "date-value", "time", "time-value")); @@ -500,7 +658,7 @@ public void testMetadataKeySetWithFormatNotOverwriteExistingEntry() { public void testMetadataKeySetWithFormatOverwriteExistingEntry() { when(mockConfig.getEntries()) .thenReturn( - createListOfEntries(createEntry(null, "time", null, TEST_FORMAT, null, true, false,null))); + createListOfEntries(createEntry(null, "time", null, TEST_FORMAT, null, true, false,null, null, null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getEventWithMetadata("message", Map.of("date", "date-value", "time", "time-value")); @@ -516,7 +674,7 @@ public void testMetadataKeySetWithFormatOverwriteExistingEntry() { public void testMetadataKeySetAppendToExistingSimpleValue() { when(mockConfig.getEntries()) .thenReturn( - createListOfEntries(createEntry(null, "time", "time-value2", null, null, false, true,null))); + createListOfEntries(createEntry(null, "time", "time-value2", null, null, false, true,null, null, null))); final AddEntryProcessor processor = createObjectUnderTest(); final String currentValue = "time-value1"; @@ -531,7 +689,7 @@ public void testMetadataKeySetAppendToExistingSimpleValue() { public void testMetadataKeySetAppendToExistingListValue() { when(mockConfig.getEntries()) .thenReturn( - createListOfEntries(createEntry(null, "time", "time-value2", null, null, false, true,null))); + createListOfEntries(createEntry(null, "time", "time-value2", null, null, false, true,null, null, null))); final AddEntryProcessor processor = createObjectUnderTest(); final List listValue = new ArrayList<>(); @@ -546,38 +704,38 @@ public void testMetadataKeySetAppendToExistingListValue() { @Test public void testMetadataKeyAndKeyBothNotSetThrows() { - assertThrows(IllegalArgumentException.class, () -> createEntry(null, null, "newValue", null, null, false, false,null)); + assertThrows(IllegalArgumentException.class, () -> createEntry(null, null, "newValue", null, null, false, false,null, null, null)); } @Test public void testMetadataKeyAndKeyBothSetThrows() { - assertThrows(IllegalArgumentException.class, () -> createEntry("newKey", "newMetadataKey", "newValue", null, null, false, false,null)); + assertThrows(IllegalArgumentException.class, () -> createEntry("newKey", "newMetadataKey", "newValue", null, null, false, false,null, null, null)); } @Test public void testOnlyOneTypeOfValueIsSupported() { - assertThrows(RuntimeException.class, () -> createEntry("newKey", "newMetadataKey", "newValue", "/newFormat", null, false, false,null)); + assertThrows(RuntimeException.class, () -> createEntry("newKey", "newMetadataKey", "newValue", "/newFormat", null, false, false,null, null, null)); } @Test public void testOnlyOneTypeOfValueIsSupportedWithExpressionAndFormat() { - assertThrows(RuntimeException.class, () -> createEntry("newKey", "newMetadataKey", null, "/newFormat", "length(/message)", false, false,null)); + assertThrows(RuntimeException.class, () -> createEntry("newKey", "newMetadataKey", null, "/newFormat", "length(/message)", false, false,null, null, null)); } @Test public void testOnlyOneTypeOfValueIsSupportedWithValueAndExpressionAndFormat() { - assertThrows(RuntimeException.class, () -> createEntry("newKey", "newMetadataKey", "value", "/newFormat", "length(/message)", false, false,null)); + assertThrows(RuntimeException.class, () -> createEntry("newKey", "newMetadataKey", "value", "/newFormat", "length(/message)", false, false,null, null, null)); } @Test public void testWithAllValuesNull() { - assertThrows(RuntimeException.class, () -> createEntry("newKey", "newMetadataKey", null, null, null, false, false,null)); + assertThrows(RuntimeException.class, () -> createEntry("newKey", "newMetadataKey", null, null, null, false, false,null, null, null)); } @Test public void testValueExpressionWithArithmeticExpression() { String valueExpression = "/number-key"; - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("num_key", null, null, null, valueExpression, false, false,null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("num_key", null, null, null, valueExpression, false, false,null, null, null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getTestEventWithMultipleDataTypes(); Random random = new Random(); @@ -591,7 +749,7 @@ public void testValueExpressionWithArithmeticExpression() { @Test public void testValueExpressionWithStringExpression() { String valueExpression = "/string-key"; - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("num_key", null, null, null, valueExpression, false, false,null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("num_key", null, null, null, valueExpression, false, false,null, null, null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getTestEventWithMultipleDataTypes(); String randomString = UUID.randomUUID().toString(); @@ -604,7 +762,7 @@ public void testValueExpressionWithStringExpression() { @Test public void testValueExpressionWithBooleanExpression() { String valueExpression = "/number-key > 5"; - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("num_key", null, null, null, valueExpression, false, false,null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("num_key", null, null, null, valueExpression, false, false,null, null, null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getTestEventWithMultipleDataTypes(); when(expressionEvaluator.evaluate(valueExpression, record.getData())).thenReturn(false); @@ -616,7 +774,7 @@ public void testValueExpressionWithBooleanExpression() { @Test public void testValueExpressionWithIntegerFunctions() { String valueExpression = "length(/string-key)"; - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("length_key", null, null, null, valueExpression, false, false,null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("length_key", null, null, null, valueExpression, false, false,null, null, null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getTestEventWithMultipleDataTypes(); String randomString = UUID.randomUUID().toString(); @@ -629,7 +787,7 @@ public void testValueExpressionWithIntegerFunctions() { @Test public void testValueExpressionWithIntegerFunctionsAndMetadataKey() { String valueExpression = "length(/date)"; - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry(null, "length_key", null, null, valueExpression, false, false,null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry(null, "length_key", null, null, valueExpression, false, false,null, null, null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getEventWithMetadata("message", Map.of("key", "value")); String randomString = UUID.randomUUID().toString(); @@ -642,7 +800,7 @@ public void testValueExpressionWithIntegerFunctionsAndMetadataKey() { @Test public void testValueExpressionWithStringExpressionWithMetadataKey() { String valueExpression = "/date"; - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry(null, "newkey", null, null, valueExpression, false, false,null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry(null, "newkey", null, null, valueExpression, false, false,null, null, null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getEventWithMetadata("message", Map.of("key", "value")); String randomString = UUID.randomUUID().toString(); @@ -654,7 +812,7 @@ public void testValueExpressionWithStringExpressionWithMetadataKey() { @Test public void testAddSingleFieldWithDynamicKey() { - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("${message}", null, 3, null, null, false, false,null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("${message}", null, 3, null, null, false, false,null, null, null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getEvent("value_as_name"); @@ -668,7 +826,7 @@ public void testAddSingleFieldWithDynamicKey() { @Test public void testAddSingleFieldWithDynamicExpression() { - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("${message}_${getMetadata(\"id\")}", null, 3, null, null, false, false,null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("${message}_${getMetadata(\"id\")}", null, 3, null, null, false, false,null, null, null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getEventWithMetadata("value_as_name", Map.of("id", 1)); @@ -684,8 +842,8 @@ public void testAddSingleFieldWithDynamicExpression() { @Test public void testAddMultipleFieldsWithDynamicKeys() { - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("${message}", null, 3, null, null, false, false,null), - createEntry("${message}_2", null, 4, null, null, false, false,null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("${message}", null, 3, null, null, false, false,null, null, null), + createEntry("${message}_2", null, 4, null, null, false, false,null, null, null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getEvent("value_as_name"); @@ -701,7 +859,7 @@ public void testAddMultipleFieldsWithDynamicKeys() { @Test public void testAddFieldWithInvalidInputKeyThenNoChangeToEvent() { - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("${message", null, 3, null, null, false, false,null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("${message", null, 3, null, null, false, false,null, null, null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getEvent("value_as_name"); @@ -715,7 +873,7 @@ public void testAddFieldWithInvalidInputKeyThenNoChangeToEvent() { @Test public void testAddFieldWithInvalidDynamicKeyThenNoChangeToEvent() { - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("${message}", null, 3, null, null, false, false,null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("${message}", null, 3, null, null, false, false,null, null, null))); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getEvent("name_with_invalid_chars|[$"); @@ -731,8 +889,19 @@ private AddEntryProcessor createObjectUnderTest() { } private AddEntryProcessorConfig.Entry createEntry( - final String key, final String metadataKey, final Object value, final String format, final String valueExpression, final boolean overwriteIfKeyExists, final boolean appendIfKeyExists, final String addWhen) { - return new AddEntryProcessorConfig.Entry(key, metadataKey, value, format, valueExpression, overwriteIfKeyExists, appendIfKeyExists, addWhen); + final String key, + final String metadataKey, + final Object value, + final String format, + final String valueExpression, + final boolean overwriteIfKeyExists, + final boolean appendIfKeyExists, + final String addWhen, + final String iterateOn, + final String addToElementWhen) { + return new AddEntryProcessorConfig.Entry( + key, metadataKey, value, format, valueExpression, overwriteIfKeyExists, appendIfKeyExists, addWhen, + iterateOn, addToElementWhen); } private List createListOfEntries(final AddEntryProcessorConfig.Entry... entries) { diff --git a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/DeleteEntryProcessorTests.java b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/DeleteEntryProcessorTests.java index d044895c28..e3640ba3dc 100644 --- a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/DeleteEntryProcessorTests.java +++ b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/DeleteEntryProcessorTests.java @@ -25,9 +25,13 @@ import java.util.Map; import java.util.UUID; +import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @@ -68,6 +72,174 @@ public void testSingleDeleteProcessorTest() { assertThat(editedRecords.get(0).getData().containsKey("newMessage"), is(true)); } + @Test + public void testSingleEntryIterativeDeleteKey() { + final String testKey = "testKey"; + when(mockConfig.getWithKeys()).thenReturn(List.of(eventKeyFactory.createEventKey(testKey, EventKeyFactory.EventAction.DELETE))); + when(mockConfig.getIterateOn()).thenReturn("message"); + when(mockConfig.getDeleteWhen()).thenReturn(null); + + final DeleteEntryProcessor processor = createObjectUnderTest(); + final List> mapList = List.of(Map.of(testKey, UUID.randomUUID().toString())); + final Map data = Map.of("message", mapList); + final Record record = buildRecordWithEvent(data); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("message"), is(true)); + assertThat(editedRecords.get(0).getData().get("message", List.class), equalTo( + List.of(Collections.emptyMap()))); + } + + @Test + void invalid_delete_from_element_when_throws_InvalidPluginConfigurationException() { + final String testKey = UUID.randomUUID().toString(); + final String deleteWhen = UUID.randomUUID().toString(); + final String deleteFromElementWhen = UUID.randomUUID().toString(); + + when(mockConfig.getWithKeys()).thenReturn(List.of(eventKeyFactory.createEventKey(testKey, EventKeyFactory.EventAction.DELETE))); + when(mockConfig.getIterateOn()).thenReturn("message"); + when(mockConfig.getDeleteWhen()).thenReturn(deleteWhen); + when(mockConfig.getDeleteFromElementWhen()).thenReturn(deleteFromElementWhen); + + when(expressionEvaluator.isValidExpressionStatement(deleteWhen)).thenReturn(true); + when(expressionEvaluator.isValidExpressionStatement(deleteFromElementWhen)).thenReturn(false); + + assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest); + } + + @Test + void using_delete_from_element_when_without_iterate_on_throws_InvalidPluginConfigurationException() { + final String testKey = UUID.randomUUID().toString(); + final String deleteWhen = UUID.randomUUID().toString(); + final String deleteFromElementWhen = UUID.randomUUID().toString(); + + when(mockConfig.getWithKeys()).thenReturn(List.of(eventKeyFactory.createEventKey(testKey, EventKeyFactory.EventAction.DELETE))); + when(mockConfig.getIterateOn()).thenReturn(null); + when(mockConfig.getDeleteWhen()).thenReturn(deleteWhen); + when(mockConfig.getDeleteFromElementWhen()).thenReturn(deleteFromElementWhen); + + when(expressionEvaluator.isValidExpressionStatement(deleteWhen)).thenReturn(true); + + assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest); + } + + @Test + public void testSingleEntryIterativeDeleteKey_applyEventLevelDeleteWhen_when_deleteWhen_returns_true() { + final String testKey = "testKey"; + final String deleteWhen = "/condition == true"; + when(expressionEvaluator.isValidExpressionStatement(deleteWhen)).thenReturn(true); + when(expressionEvaluator.evaluateConditional(eq(deleteWhen), any(Event.class))).thenAnswer(invocation -> { + Event eventArg = invocation.getArgument(1); + return eventArg.get("condition", Boolean.class); + }); + when(mockConfig.getWithKeys()).thenReturn(List.of(eventKeyFactory.createEventKey(testKey, EventKeyFactory.EventAction.DELETE))); + when(mockConfig.getIterateOn()).thenReturn("message"); + when(mockConfig.getDeleteWhen()).thenReturn(deleteWhen); + + final DeleteEntryProcessor processor = createObjectUnderTest(); + final List> mapList = List.of(Map.of(testKey, UUID.randomUUID().toString())); + final Map data = Map.of( + "condition", true, + "message", mapList + ); + final Record record = buildRecordWithEvent(data); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("condition"), is(true)); + assertThat(editedRecords.get(0).getData().containsKey("message"), is(true)); + assertThat(editedRecords.get(0).getData().get("message", List.class), equalTo( + List.of(Collections.emptyMap()))); + } + + @Test + public void iterate_on_with_delete_from_element_when_condition_true_deletes_key() { + final String testKey = "testKey"; + final String deleteWhen = "/condition == true"; + final String deleteFromElementWhen = UUID.randomUUID().toString(); + + when(mockConfig.getWithKeys()).thenReturn(List.of(eventKeyFactory.createEventKey(testKey, EventKeyFactory.EventAction.DELETE))); + when(mockConfig.getIterateOn()).thenReturn("message"); + when(mockConfig.getDeleteWhen()).thenReturn(deleteWhen); + when(mockConfig.getDeleteFromElementWhen()).thenReturn(deleteFromElementWhen); + + when(expressionEvaluator.isValidExpressionStatement(deleteWhen)).thenReturn(true); + when(expressionEvaluator.isValidExpressionStatement(deleteFromElementWhen)).thenReturn(true); + + when(expressionEvaluator.evaluateConditional(eq(deleteWhen), any(Event.class))).thenReturn(true); + when(expressionEvaluator.evaluateConditional(eq(deleteFromElementWhen), any(Event.class))).thenReturn(true); + + final DeleteEntryProcessor processor = createObjectUnderTest(); + final List> mapList = List.of(Map.of( + "condition", true, + testKey, UUID.randomUUID().toString())); + final Map data = Map.of("message", mapList); + final Record record = buildRecordWithEvent(data); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("message"), is(true)); + assertThat(editedRecords.get(0).getData().get("message", List.class), equalTo( + List.of(Map.of("condition", true)))); + } + + @Test + public void iterate_on_with_delete_from_element_when_condition_false_does_not_delete() { + final String testKey = "testKey"; + final String deleteWhen = "/condition == true"; + final String deleteFromElementWhen = UUID.randomUUID().toString(); + + when(mockConfig.getWithKeys()).thenReturn(List.of(eventKeyFactory.createEventKey(testKey, EventKeyFactory.EventAction.DELETE))); + when(mockConfig.getIterateOn()).thenReturn("message"); + when(mockConfig.getDeleteWhen()).thenReturn(deleteWhen); + when(mockConfig.getDeleteFromElementWhen()).thenReturn(deleteFromElementWhen); + + when(expressionEvaluator.isValidExpressionStatement(deleteWhen)).thenReturn(true); + when(expressionEvaluator.isValidExpressionStatement(deleteFromElementWhen)).thenReturn(true); + + when(expressionEvaluator.evaluateConditional(eq(deleteWhen), any(Event.class))).thenReturn(true); + when(expressionEvaluator.evaluateConditional(eq(deleteFromElementWhen), any(Event.class))).thenReturn(false); + + final DeleteEntryProcessor processor = createObjectUnderTest(); + final List> mapList = List.of(Map.of( + "condition", true, + testKey, UUID.randomUUID().toString())); + final Map data = Map.of("message", mapList); + final Record record = buildRecordWithEvent(data); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("message"), is(true)); + assertThat(editedRecords.get(0).getData().get("message", List.class), equalTo(mapList)); + } + + @Test + public void iterate_on_with_delete_when_condition_false_does_not_delete() { + final String testKey = "testKey"; + final String deleteWhen = "/condition == true"; + final String deleteFromElementWhen = UUID.randomUUID().toString(); + when(expressionEvaluator.isValidExpressionStatement(deleteWhen)).thenReturn(true); + when(expressionEvaluator.isValidExpressionStatement(deleteFromElementWhen)).thenReturn(true); + + when(mockConfig.getWithKeys()).thenReturn(List.of(eventKeyFactory.createEventKey(testKey, EventKeyFactory.EventAction.DELETE))); + when(mockConfig.getIterateOn()).thenReturn("message"); + when(mockConfig.getDeleteWhen()).thenReturn(deleteWhen); + when(mockConfig.getDeleteFromElementWhen()).thenReturn(deleteFromElementWhen); + + when(expressionEvaluator.evaluateConditional(eq(deleteWhen), any(Event.class))).thenReturn(false); + + + final DeleteEntryProcessor processor = createObjectUnderTest(); + final List> mapList = List.of(Map.of( + "condition", false, + testKey, UUID.randomUUID().toString())); + final Map data = Map.of("message", mapList); + final Record record = buildRecordWithEvent(data); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("message"), is(true)); + assertThat(editedRecords.get(0).getData().get("message", List.class), equalTo(mapList)); + + verifyNoMoreInteractions(expressionEvaluator); + } + @Test public void testWithKeyDneDeleteProcessorTest() { when(mockConfig.getWithKeys()).thenReturn(List.of(eventKeyFactory.createEventKey("message2", EventKeyFactory.EventAction.DELETE))); @@ -138,9 +310,9 @@ public void testNestedDeleteProcessorTest() { @Test public void test_multiple_entries_with_different_delete_when_conditions() { final DeleteEntryProcessorConfig.Entry entry1 = new DeleteEntryProcessorConfig.Entry(List.of(eventKeyFactory.createEventKey("key1" - , EventKeyFactory.EventAction.DELETE)), "condition1"); + , EventKeyFactory.EventAction.DELETE)), "condition1", null, null); final DeleteEntryProcessorConfig.Entry entry2 = new DeleteEntryProcessorConfig.Entry(List.of(eventKeyFactory.createEventKey("key2" - , EventKeyFactory.EventAction.DELETE)), "condition2"); + , EventKeyFactory.EventAction.DELETE)), "condition2", null, null); when(mockConfig.getEntries()).thenReturn(List.of(entry1, entry2)); when(expressionEvaluator.isValidExpressionStatement("condition1")).thenReturn(true); @@ -179,7 +351,7 @@ public void test_legacy_format_conversion_to_entries_format() { @Test public void invalid_delete_when_with_entries_format_throws_InvalidPluginConfigurationException() { DeleteEntryProcessorConfig.Entry entry = new DeleteEntryProcessorConfig.Entry(List.of(eventKeyFactory.createEventKey("key1", - EventKeyFactory.EventAction.DELETE)), "invalid_condition"); + EventKeyFactory.EventAction.DELETE)), "invalid_condition", null, null); when(mockConfig.getEntries()).thenReturn(List.of(entry)); when(expressionEvaluator.isValidExpressionStatement("invalid_condition")).thenReturn(false); @@ -191,7 +363,7 @@ public void invalid_delete_when_with_entries_format_throws_InvalidPluginConfigur public void test_both_configurations_used_together() { final DeleteEntryProcessorConfig configObjectUnderTest = new DeleteEntryProcessorConfig(); final DeleteEntryProcessorConfig.Entry entry = new DeleteEntryProcessorConfig.Entry(List.of(eventKeyFactory.createEventKey("key1" - , EventKeyFactory.EventAction.DELETE)), "condition"); + , EventKeyFactory.EventAction.DELETE)), "condition", null, null); ReflectionTestUtils.setField(configObjectUnderTest, "withKeys", List.of(eventKeyFactory.createEventKey("message", EventKeyFactory.EventAction.DELETE)));