Skip to content

Commit b26cb82

Browse files
Iterate on support for add_entries and delete_entries processors (#5773)
Signed-off-by: George Chen <qchea@amazon.com> Signed-off-by: Taylor Gray <tylgry@amazon.com> Co-authored-by: George Chen <qchea@amazon.com>
1 parent e53aaff commit b26cb82

6 files changed

Lines changed: 604 additions & 97 deletions

File tree

data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessor.java

Lines changed: 84 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
1313
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
1414
import org.opensearch.dataprepper.model.event.Event;
15+
import org.opensearch.dataprepper.model.event.JacksonEvent;
1516
import org.opensearch.dataprepper.model.event.exceptions.EventKeyNotFoundException;
1617
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
1718
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
@@ -47,6 +48,12 @@ public AddEntryProcessor(final PluginMetrics pluginMetrics, final AddEntryProces
4748
throw new InvalidPluginConfigurationException(
4849
String.format("add_when %s is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax", entry.getAddWhen()));
4950
}
51+
52+
if (entry.getAddToElementWhen() != null
53+
&& !expressionEvaluator.isValidExpressionStatement(entry.getAddToElementWhen())) {
54+
throw new InvalidPluginConfigurationException(
55+
String.format("add_to_element_when %s is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax", entry.getAddWhen()));
56+
}
5057
});
5158
}
5259

@@ -65,38 +72,20 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
6572
try {
6673
final String key = (entry.getKey() == null) ? null : recordEvent.formatString(entry.getKey(), expressionEvaluator);
6774
final String metadataKey = entry.getMetadataKey();
68-
Object value;
69-
if (!Objects.isNull(entry.getValueExpression())) {
70-
value = expressionEvaluator.evaluate(entry.getValueExpression(), recordEvent);
71-
} else if (!Objects.isNull(entry.getFormat())) {
72-
try {
73-
value = recordEvent.formatString(entry.getFormat());
74-
} catch (final EventKeyNotFoundException e) {
75-
value = null;
76-
}
77-
} else {
78-
value = entry.getValue();
79-
}
80-
if (!Objects.isNull(key)) {
81-
if (!recordEvent.containsKey(key) || entry.getOverwriteIfKeyExists()) {
82-
recordEvent.put(key, value);
83-
} else if (recordEvent.containsKey(key) && entry.getAppendIfKeyExists()) {
84-
mergeValueToEvent(recordEvent, key, value);
85-
}
86-
} else {
87-
Map<String, Object> attributes = recordEvent.getMetadata().getAttributes();
88-
if (!attributes.containsKey(metadataKey) || entry.getOverwriteIfKeyExists()) {
89-
recordEvent.getMetadata().setAttribute(metadataKey, value);
90-
} else if (attributes.containsKey(metadataKey) && entry.getAppendIfKeyExists()) {
91-
mergeValueToEventMetadata(recordEvent, metadataKey, value);
92-
}
75+
final String iterateOn = entry.getIterateOn();
76+
if (Objects.isNull(iterateOn)) {
77+
handleWithoutIterateOn(entry, recordEvent, key, metadataKey);
78+
} else if (!Objects.isNull(key)) {
79+
handleWithIterateOn(entry, recordEvent, iterateOn, key);
9380
}
9481
} catch (Exception e) {
9582
LOG.atError()
9683
.addMarker(EVENT)
9784
.addMarker(NOISY)
98-
.setMessage("Error adding entry to record [{}] with key [{}], metadataKey [{}], value_expression [{}] format [{}], value [{}]")
85+
.setMessage("Error adding entry to record [{}] with iterate_on [{}], add_to_element_when [{}], key [{}], metadataKey [{}], value_expression [{}] format [{}], value [{}]")
9986
.addArgument(recordEvent)
87+
.addArgument(entry.getIterateOn())
88+
.addArgument(entry.getAddToElementWhen())
10089
.addArgument(entry.getKey())
10190
.addArgument(entry.getMetadataKey())
10291
.addArgument(entry.getValueExpression())
@@ -132,6 +121,71 @@ public boolean isReadyForShutdown() {
132121
public void shutdown() {
133122
}
134123

124+
private void handleWithoutIterateOn(final AddEntryProcessorConfig.Entry entry,
125+
final Event recordEvent,
126+
final String key,
127+
final String metadataKey) {
128+
final Object value = retrieveValue(entry, recordEvent);
129+
if (!Objects.isNull(key)) {
130+
if (!recordEvent.containsKey(key) || entry.getOverwriteIfKeyExists()) {
131+
recordEvent.put(key, value);
132+
} else if (recordEvent.containsKey(key) && entry.getAppendIfKeyExists()) {
133+
mergeValueToEvent(recordEvent, key, value);
134+
}
135+
} else {
136+
Map<String, Object> attributes = recordEvent.getMetadata().getAttributes();
137+
if (!attributes.containsKey(metadataKey) || entry.getOverwriteIfKeyExists()) {
138+
recordEvent.getMetadata().setAttribute(metadataKey, value);
139+
} else if (attributes.containsKey(metadataKey) && entry.getAppendIfKeyExists()) {
140+
mergeValueToEventMetadata(recordEvent, metadataKey, value);
141+
}
142+
}
143+
}
144+
145+
private void handleWithIterateOn(final AddEntryProcessorConfig.Entry entry,
146+
final Event recordEvent,
147+
final String iterateOn,
148+
final String key) {
149+
final List<Map<String, Object>> iterateOnList = recordEvent.get(iterateOn, List.class);
150+
if (iterateOnList != null) {
151+
for (final Map<String, Object> item : iterateOnList) {
152+
final Object value;
153+
final Event context = JacksonEvent.builder()
154+
.withEventMetadata(recordEvent.getMetadata())
155+
.withData(item)
156+
.build();
157+
if (entry.getAddToElementWhen() != null && !expressionEvaluator.evaluateConditional(entry.getAddToElementWhen(), recordEvent)) {
158+
continue;
159+
}
160+
161+
value = retrieveValue(entry, context);
162+
if (!item.containsKey(key) || entry.getOverwriteIfKeyExists()) {
163+
item.put(key, value);
164+
} else if (item.containsKey(key) && entry.getAppendIfKeyExists()) {
165+
mergeValueToMap(item, key, value);
166+
}
167+
}
168+
recordEvent.put(iterateOn, iterateOnList);
169+
}
170+
}
171+
172+
private Object retrieveValue(final AddEntryProcessorConfig.Entry entry,
173+
final Event context) {
174+
Object value;
175+
if (!Objects.isNull(entry.getValueExpression())) {
176+
value = expressionEvaluator.evaluate(entry.getValueExpression(), context);
177+
} else if (!Objects.isNull(entry.getFormat())) {
178+
try {
179+
value = context.formatString(entry.getFormat());
180+
} catch (final EventKeyNotFoundException e) {
181+
value = null;
182+
}
183+
} else {
184+
value = entry.getValue();
185+
}
186+
return value;
187+
}
188+
135189
private void mergeValueToEvent(final Event recordEvent, final String key, final Object value) {
136190
mergeValue(value, () -> recordEvent.get(key, Object.class), newValue -> recordEvent.put(key, newValue));
137191
}
@@ -140,6 +194,10 @@ private void mergeValueToEventMetadata(final Event recordEvent, final String key
140194
mergeValue(value, () -> recordEvent.getMetadata().getAttribute(key), newValue -> recordEvent.getMetadata().setAttribute(key, newValue));
141195
}
142196

197+
private void mergeValueToMap(final Map<String, Object> item, final String key, final Object value) {
198+
mergeValue(value, () -> item.get(key), newValue -> item.put(key, newValue));
199+
}
200+
143201
private void mergeValue(final Object value, Supplier<Object> getter, Consumer<Object> setter) {
144202
final Object currentValue = getter.get();
145203
final List<Object> mergedValue = new ArrayList<>();

data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessorConfig.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.opensearch.dataprepper.model.annotations.ConditionalRequired.SchemaProperty;
2020
import org.opensearch.dataprepper.model.annotations.ExampleValues;
2121
import org.opensearch.dataprepper.model.annotations.ExampleValues.Example;
22+
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
2223

2324
import java.util.List;
2425
import java.util.stream.Stream;
@@ -64,6 +65,15 @@ public class AddEntryProcessorConfig {
6465

6566
@JsonPropertyOrder
6667
public static class Entry {
68+
@JsonProperty("iterate_on")
69+
@JsonPropertyDescription(
70+
"Specifies the key of the list of object to iterate over and add entry into.")
71+
private String iterateOn;
72+
73+
@JsonPropertyDescription("Specifies the condition for when to add the key to each element of a list when using iterate_on.")
74+
@JsonProperty("add_to_element_when")
75+
private String addToElementWhen;
76+
6777
@JsonPropertyDescription("The key of the new entry to be added. Some examples of keys include <code>my_key</code>, " +
6878
"<code>myKey</code>, and <code>object/sub_Key</code>. The key can also be a format expression, for example, <code>${/key1}</code> to " +
6979
"use the value of field <code>key1</code> as the key. Exactly one of <code>key</code> or <code>metadata_key</code> is required.")
@@ -155,6 +165,12 @@ public static class Entry {
155165
})
156166
private String addWhen;
157167

168+
public String getIterateOn() {
169+
return iterateOn;
170+
}
171+
172+
public String getAddToElementWhen() { return addToElementWhen; }
173+
158174
public String getKey() {
159175
return key;
160176
}
@@ -202,14 +218,24 @@ public Entry(final String key,
202218
final String valueExpression,
203219
final boolean overwriteIfKeyExists,
204220
final boolean appendIfKeyExists,
205-
final String addWhen)
221+
final String addWhen,
222+
final String iterateOn,
223+
final String addToElementWhen)
206224
{
207225
if (key != null && metadataKey != null) {
208226
throw new IllegalArgumentException("Only one of the two - key and metadatakey - should be specified");
209227
}
210228
if (key == null && metadataKey == null) {
211229
throw new IllegalArgumentException("At least one of the two - key and metadatakey - must be specified");
212230
}
231+
if (metadataKey != null && iterateOn != null) {
232+
throw new IllegalArgumentException("iterate_on cannot be applied to metadata");
233+
}
234+
235+
if (iterateOn == null && addToElementWhen != null) {
236+
throw new InvalidPluginConfigurationException("add_to_element_when only applies when iterate_on is configured.");
237+
}
238+
213239
this.key = key;
214240
this.metadataKey = metadataKey;
215241
this.value = value;
@@ -218,6 +244,8 @@ public Entry(final String key,
218244
this.overwriteIfKeyExists = overwriteIfKeyExists;
219245
this.appendIfKeyExists = appendIfKeyExists;
220246
this.addWhen = addWhen;
247+
this.iterateOn = iterateOn;
248+
this.addToElementWhen = addToElementWhen;
221249
}
222250

223251
public Entry() {

data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/DeleteEntryProcessor.java

Lines changed: 52 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
1414
import org.opensearch.dataprepper.model.event.Event;
1515
import org.opensearch.dataprepper.model.event.EventKey;
16+
import org.opensearch.dataprepper.model.event.JacksonEvent;
1617
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
1718
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
1819
import org.opensearch.dataprepper.model.processor.Processor;
@@ -22,6 +23,7 @@
2223

2324
import java.util.Collection;
2425
import java.util.List;
26+
import java.util.Map;
2527
import java.util.Objects;
2628

2729
@DataPrepperPlugin(name = "delete_entries", pluginType = Processor.class, pluginConfigurationType = DeleteEntryProcessorConfig.class)
@@ -33,11 +35,13 @@ public class DeleteEntryProcessor extends AbstractProcessor<Record<Event>, Recor
3335
private final List<DeleteEntryProcessorConfig.Entry> entries;
3436

3537
private final ExpressionEvaluator expressionEvaluator;
38+
private final DeleteEntryProcessorConfig deleteEntryProcessorConfig;
3639

3740
@DataPrepperPluginConstructor
3841
public DeleteEntryProcessor(final PluginMetrics pluginMetrics, final DeleteEntryProcessorConfig config, final ExpressionEvaluator expressionEvaluator) {
3942
super(pluginMetrics);
4043
this.withKeys = config.getWithKeys();
44+
this.deleteEntryProcessorConfig = config;
4145
this.deleteWhen = config.getDeleteWhen();
4246
this.expressionEvaluator = expressionEvaluator;
4347

@@ -49,7 +53,7 @@ public DeleteEntryProcessor(final PluginMetrics pluginMetrics, final DeleteEntry
4953
}
5054

5155
if (this.withKeys != null && !this.withKeys.isEmpty()) {
52-
DeleteEntryProcessorConfig.Entry entry = new DeleteEntryProcessorConfig.Entry(this.withKeys, this.deleteWhen);
56+
DeleteEntryProcessorConfig.Entry entry = new DeleteEntryProcessorConfig.Entry(this.withKeys, this.deleteWhen, config.getIterateOn(), config.getDeleteFromElementWhen());
5357
this.entries = List.of(entry);
5458
} else {
5559
this.entries = config.getEntries();
@@ -63,23 +67,39 @@ public DeleteEntryProcessor(final PluginMetrics pluginMetrics, final DeleteEntry
6367
".org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax",
6468
entry.getDeleteWhen()));
6569
}
70+
71+
if (entry.getIterateOn() == null && entry.getDeleteFromElementWhen() != null) {
72+
throw new InvalidPluginConfigurationException("delete_from_element_when only applies when iterate_on is configured.");
73+
}
74+
75+
if (entry.getDeleteFromElementWhen() != null
76+
&& !expressionEvaluator.isValidExpressionStatement(entry.getDeleteFromElementWhen())) {
77+
throw new InvalidPluginConfigurationException(
78+
String.format("delete_from_element_when %s is not a valid expression statement. See https://opensearch" +
79+
".org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax",
80+
entry.getDeleteFromElementWhen()));
81+
}
6682
});
6783
}
6884

6985
@Override
7086
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) {
71-
for(final Record<Event> record : records) {
87+
for (final Record<Event> record : records) {
7288
final Event recordEvent = record.getData();
73-
7489
try {
7590
for (final DeleteEntryProcessorConfig.Entry entry : entries) {
76-
if (Objects.nonNull(entry.getDeleteWhen()) && !expressionEvaluator.evaluateConditional(entry.getDeleteWhen(),
77-
recordEvent)) {
91+
if (Objects.nonNull(entry.getDeleteWhen()) && !expressionEvaluator.evaluateConditional(entry.getDeleteWhen(), recordEvent)) {
7892
continue;
7993
}
8094

81-
for (final EventKey key : entry.getWithKeys()) {
82-
recordEvent.delete(key);
95+
final String iterateOn = deleteEntryProcessorConfig.getIterateOn();
96+
if (Objects.isNull(iterateOn)) {
97+
98+
for (final EventKey entryKey : entry.getWithKeys()) {
99+
recordEvent.delete(entryKey);
100+
}
101+
} else {
102+
handleForIterateOn(recordEvent, entry, iterateOn);
83103
}
84104
}
85105
} catch (final Exception e) {
@@ -92,7 +112,6 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
92112
.log();
93113
}
94114
}
95-
96115
return records;
97116
}
98117

@@ -108,4 +127,29 @@ public boolean isReadyForShutdown() {
108127
@Override
109128
public void shutdown() {
110129
}
130+
131+
private void handleForIterateOn(final Event recordEvent,
132+
final DeleteEntryProcessorConfig.Entry entry,
133+
final String iterateOn) {
134+
final List<Map<String, Object>> iterateOnList = recordEvent.get(iterateOn, List.class);
135+
if (iterateOnList != null) {
136+
for (int i = 0; i < iterateOnList.size(); i++) {
137+
final Map<String, Object> item = iterateOnList.get(i);
138+
final Event context = JacksonEvent.builder()
139+
.withEventMetadata(recordEvent.getMetadata())
140+
.withData(item)
141+
.build();
142+
if (entry.getDeleteFromElementWhen() != null &&
143+
!expressionEvaluator.evaluateConditional(entry.getDeleteFromElementWhen(), context)) {
144+
continue;
145+
}
146+
147+
for (final EventKey entryKey : entry.getWithKeys()) {
148+
context.delete(entryKey);
149+
}
150+
iterateOnList.set(i, context.toMap());
151+
}
152+
recordEvent.put(iterateOn, iterateOnList);
153+
}
154+
}
111155
}

0 commit comments

Comments
 (0)