Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.event.exceptions.EventKeyNotFoundException;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
Expand Down Expand Up @@ -47,6 +48,12 @@ public AddEntryProcessor(final PluginMetrics pluginMetrics, final AddEntryProces
throw new InvalidPluginConfigurationException(
String.format("add_when %s is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax", entry.getAddWhen()));
}

if (entry.getAddToElementWhen() != null
&& !expressionEvaluator.isValidExpressionStatement(entry.getAddToElementWhen())) {
throw new InvalidPluginConfigurationException(
String.format("add_to_element_when %s is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax", entry.getAddWhen()));
}
});
}

Expand All @@ -65,38 +72,20 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
try {
final String key = (entry.getKey() == null) ? null : recordEvent.formatString(entry.getKey(), expressionEvaluator);
final String metadataKey = entry.getMetadataKey();
Object value;
if (!Objects.isNull(entry.getValueExpression())) {
value = expressionEvaluator.evaluate(entry.getValueExpression(), recordEvent);
} else if (!Objects.isNull(entry.getFormat())) {
try {
value = recordEvent.formatString(entry.getFormat());
} catch (final EventKeyNotFoundException e) {
value = null;
}
} else {
value = entry.getValue();
}
if (!Objects.isNull(key)) {
if (!recordEvent.containsKey(key) || entry.getOverwriteIfKeyExists()) {
recordEvent.put(key, value);
} else if (recordEvent.containsKey(key) && entry.getAppendIfKeyExists()) {
mergeValueToEvent(recordEvent, key, value);
}
} else {
Map<String, Object> attributes = recordEvent.getMetadata().getAttributes();
if (!attributes.containsKey(metadataKey) || entry.getOverwriteIfKeyExists()) {
recordEvent.getMetadata().setAttribute(metadataKey, value);
} else if (attributes.containsKey(metadataKey) && entry.getAppendIfKeyExists()) {
mergeValueToEventMetadata(recordEvent, metadataKey, value);
}
final String iterateOn = entry.getIterateOn();
if (Objects.isNull(iterateOn)) {
handleWithoutIterateOn(entry, recordEvent, key, metadataKey);
} else if (!Objects.isNull(key)) {
handleWithIterateOn(entry, recordEvent, iterateOn, key);
}
} catch (Exception e) {
LOG.atError()
.addMarker(EVENT)
.addMarker(NOISY)
.setMessage("Error adding entry to record [{}] with key [{}], metadataKey [{}], value_expression [{}] format [{}], value [{}]")
.setMessage("Error adding entry to record [{}] with iterate_on [{}], add_to_element_when [{}], key [{}], metadataKey [{}], value_expression [{}] format [{}], value [{}]")
.addArgument(recordEvent)
.addArgument(entry.getIterateOn())
.addArgument(entry.getAddToElementWhen())
.addArgument(entry.getKey())
.addArgument(entry.getMetadataKey())
.addArgument(entry.getValueExpression())
Expand Down Expand Up @@ -132,6 +121,71 @@ public boolean isReadyForShutdown() {
public void shutdown() {
}

private void handleWithoutIterateOn(final AddEntryProcessorConfig.Entry entry,
final Event recordEvent,
final String key,
final String metadataKey) {
final Object value = retrieveValue(entry, recordEvent);
if (!Objects.isNull(key)) {
if (!recordEvent.containsKey(key) || entry.getOverwriteIfKeyExists()) {
recordEvent.put(key, value);
} else if (recordEvent.containsKey(key) && entry.getAppendIfKeyExists()) {
mergeValueToEvent(recordEvent, key, value);
}
} else {
Map<String, Object> attributes = recordEvent.getMetadata().getAttributes();
if (!attributes.containsKey(metadataKey) || entry.getOverwriteIfKeyExists()) {
recordEvent.getMetadata().setAttribute(metadataKey, value);
} else if (attributes.containsKey(metadataKey) && entry.getAppendIfKeyExists()) {
mergeValueToEventMetadata(recordEvent, metadataKey, value);
}
}
}

private void handleWithIterateOn(final AddEntryProcessorConfig.Entry entry,
final Event recordEvent,
final String iterateOn,
final String key) {
final List<Map<String, Object>> iterateOnList = recordEvent.get(iterateOn, List.class);
if (iterateOnList != null) {
for (final Map<String, Object> item : iterateOnList) {
final Object value;
final Event context = JacksonEvent.builder()
.withEventMetadata(recordEvent.getMetadata())
.withData(item)
.build();
if (entry.getAddToElementWhen() != null && !expressionEvaluator.evaluateConditional(entry.getAddToElementWhen(), recordEvent)) {
continue;
}

value = retrieveValue(entry, context);
if (!item.containsKey(key) || entry.getOverwriteIfKeyExists()) {
item.put(key, value);
} else if (item.containsKey(key) && entry.getAppendIfKeyExists()) {
mergeValueToMap(item, key, value);
}
}
recordEvent.put(iterateOn, iterateOnList);
}
}

private Object retrieveValue(final AddEntryProcessorConfig.Entry entry,
final Event context) {
Object value;
if (!Objects.isNull(entry.getValueExpression())) {
value = expressionEvaluator.evaluate(entry.getValueExpression(), context);
} else if (!Objects.isNull(entry.getFormat())) {
try {
value = context.formatString(entry.getFormat());
} catch (final EventKeyNotFoundException e) {
value = null;
}
} else {
value = entry.getValue();
}
return value;
}

private void mergeValueToEvent(final Event recordEvent, final String key, final Object value) {
mergeValue(value, () -> recordEvent.get(key, Object.class), newValue -> recordEvent.put(key, newValue));
}
Expand All @@ -140,6 +194,10 @@ private void mergeValueToEventMetadata(final Event recordEvent, final String key
mergeValue(value, () -> recordEvent.getMetadata().getAttribute(key), newValue -> recordEvent.getMetadata().setAttribute(key, newValue));
}

private void mergeValueToMap(final Map<String, Object> item, final String key, final Object value) {
mergeValue(value, () -> item.get(key), newValue -> item.put(key, newValue));
}

private void mergeValue(final Object value, Supplier<Object> getter, Consumer<Object> setter) {
final Object currentValue = getter.get();
final List<Object> mergedValue = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.dataprepper.model.annotations.ConditionalRequired.SchemaProperty;
import org.opensearch.dataprepper.model.annotations.ExampleValues;
import org.opensearch.dataprepper.model.annotations.ExampleValues.Example;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;

import java.util.List;
import java.util.stream.Stream;
Expand Down Expand Up @@ -64,6 +65,15 @@ public class AddEntryProcessorConfig {

@JsonPropertyOrder
public static class Entry {
@JsonProperty("iterate_on")
@JsonPropertyDescription(
"Specifies the key of the list of object to iterate over and add entry into.")
private String iterateOn;

@JsonPropertyDescription("Specifies the condition for when to add the key to each element of a list when using iterate_on.")
@JsonProperty("add_to_element_when")
private String addToElementWhen;

@JsonPropertyDescription("The key of the new entry to be added. Some examples of keys include <code>my_key</code>, " +
"<code>myKey</code>, and <code>object/sub_Key</code>. The key can also be a format expression, for example, <code>${/key1}</code> to " +
"use the value of field <code>key1</code> as the key. Exactly one of <code>key</code> or <code>metadata_key</code> is required.")
Expand Down Expand Up @@ -155,6 +165,12 @@ public static class Entry {
})
private String addWhen;

public String getIterateOn() {
return iterateOn;
}

public String getAddToElementWhen() { return addToElementWhen; }

public String getKey() {
return key;
}
Expand Down Expand Up @@ -202,14 +218,24 @@ public Entry(final String key,
final String valueExpression,
final boolean overwriteIfKeyExists,
final boolean appendIfKeyExists,
final String addWhen)
final String addWhen,
final String iterateOn,
final String addToElementWhen)
{
if (key != null && metadataKey != null) {
throw new IllegalArgumentException("Only one of the two - key and metadatakey - should be specified");
}
if (key == null && metadataKey == null) {
throw new IllegalArgumentException("At least one of the two - key and metadatakey - must be specified");
}
if (metadataKey != null && iterateOn != null) {
throw new IllegalArgumentException("iterate_on cannot be applied to metadata");
}

if (iterateOn == null && addToElementWhen != null) {
throw new InvalidPluginConfigurationException("add_to_element_when only applies when iterate_on is configured.");
}

this.key = key;
this.metadataKey = metadataKey;
this.value = value;
Expand All @@ -218,6 +244,8 @@ public Entry(final String key,
this.overwriteIfKeyExists = overwriteIfKeyExists;
this.appendIfKeyExists = appendIfKeyExists;
this.addWhen = addWhen;
this.iterateOn = iterateOn;
this.addToElementWhen = addToElementWhen;
}

public Entry() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventKey;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
Expand All @@ -22,6 +23,7 @@

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;

@DataPrepperPlugin(name = "delete_entries", pluginType = Processor.class, pluginConfigurationType = DeleteEntryProcessorConfig.class)
Expand All @@ -33,11 +35,13 @@ public class DeleteEntryProcessor extends AbstractProcessor<Record<Event>, Recor
private final List<DeleteEntryProcessorConfig.Entry> entries;

private final ExpressionEvaluator expressionEvaluator;
private final DeleteEntryProcessorConfig deleteEntryProcessorConfig;

@DataPrepperPluginConstructor
public DeleteEntryProcessor(final PluginMetrics pluginMetrics, final DeleteEntryProcessorConfig config, final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics);
this.withKeys = config.getWithKeys();
this.deleteEntryProcessorConfig = config;
this.deleteWhen = config.getDeleteWhen();
this.expressionEvaluator = expressionEvaluator;

Expand All @@ -49,7 +53,7 @@ public DeleteEntryProcessor(final PluginMetrics pluginMetrics, final DeleteEntry
}

if (this.withKeys != null && !this.withKeys.isEmpty()) {
DeleteEntryProcessorConfig.Entry entry = new DeleteEntryProcessorConfig.Entry(this.withKeys, this.deleteWhen);
DeleteEntryProcessorConfig.Entry entry = new DeleteEntryProcessorConfig.Entry(this.withKeys, this.deleteWhen, config.getIterateOn(), config.getDeleteFromElementWhen());
this.entries = List.of(entry);
} else {
this.entries = config.getEntries();
Expand All @@ -63,23 +67,39 @@ public DeleteEntryProcessor(final PluginMetrics pluginMetrics, final DeleteEntry
".org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax",
entry.getDeleteWhen()));
}

if (entry.getIterateOn() == null && entry.getDeleteFromElementWhen() != null) {
throw new InvalidPluginConfigurationException("delete_from_element_when only applies when iterate_on is configured.");
}

if (entry.getDeleteFromElementWhen() != null
&& !expressionEvaluator.isValidExpressionStatement(entry.getDeleteFromElementWhen())) {
throw new InvalidPluginConfigurationException(
String.format("delete_from_element_when %s is not a valid expression statement. See https://opensearch" +
".org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax",
entry.getDeleteFromElementWhen()));
}
});
}

@Override
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) {
for(final Record<Event> record : records) {
for (final Record<Event> record : records) {
final Event recordEvent = record.getData();

try {
for (final DeleteEntryProcessorConfig.Entry entry : entries) {
if (Objects.nonNull(entry.getDeleteWhen()) && !expressionEvaluator.evaluateConditional(entry.getDeleteWhen(),
recordEvent)) {
if (Objects.nonNull(entry.getDeleteWhen()) && !expressionEvaluator.evaluateConditional(entry.getDeleteWhen(), recordEvent)) {
continue;
}

for (final EventKey key : entry.getWithKeys()) {
recordEvent.delete(key);
final String iterateOn = deleteEntryProcessorConfig.getIterateOn();
if (Objects.isNull(iterateOn)) {

for (final EventKey entryKey : entry.getWithKeys()) {
recordEvent.delete(entryKey);
}
} else {
handleForIterateOn(recordEvent, entry, iterateOn);
}
}
} catch (final Exception e) {
Expand All @@ -92,7 +112,6 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
.log();
}
}

return records;
}

Expand All @@ -108,4 +127,29 @@ public boolean isReadyForShutdown() {
@Override
public void shutdown() {
}

private void handleForIterateOn(final Event recordEvent,
final DeleteEntryProcessorConfig.Entry entry,
final String iterateOn) {
final List<Map<String, Object>> iterateOnList = recordEvent.get(iterateOn, List.class);
if (iterateOnList != null) {
for (int i = 0; i < iterateOnList.size(); i++) {
final Map<String, Object> item = iterateOnList.get(i);
final Event context = JacksonEvent.builder()
.withEventMetadata(recordEvent.getMetadata())
.withData(item)
.build();
if (entry.getDeleteFromElementWhen() != null &&
!expressionEvaluator.evaluateConditional(entry.getDeleteFromElementWhen(), context)) {
continue;
}

for (final EventKey entryKey : entry.getWithKeys()) {
context.delete(entryKey);
}
iterateOnList.set(i, context.toMap());
}
recordEvent.put(iterateOn, iterateOnList);
}
}
}
Loading
Loading