Skip to content

Commit c8d286b

Browse files
authored
Add support for parent event field access during iterate_on processing in add_entries processor (#6713)
Add disable_root_keys and evaluate_when_on_element configs to add_entries processor Signed-off-by: Manisha Yadav <yavmanis@amazon.com>
1 parent b2b5a37 commit c8d286b

9 files changed

Lines changed: 546 additions & 41 deletions

File tree

data-prepper-plugins/mutate-event-processors/README.md

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,55 @@ then when we run with the same input, the processor will parse the message into
5454
{"message": "value", "newMessage": "new value"}
5555
```
5656

57+
### Iterating over arrays with `iterate_on`
58+
59+
The `iterate_on` option allows adding entries to each element of an array. Combined with `disable_root_keys` and `evaluate_when_on_element`, you can reference root-level fields and apply per-element conditions.
60+
61+
```yaml
62+
pipeline:
63+
source:
64+
http:
65+
processor:
66+
- add_entries:
67+
entries:
68+
- key: "title"
69+
value_expression: "/alert_title"
70+
iterate_on: "vulns"
71+
disable_root_keys: false
72+
add_to_element_when: '/severity == "critical"'
73+
evaluate_when_on_element: true
74+
sink:
75+
- stdout:
76+
```
77+
78+
Given the following input:
79+
```json
80+
{
81+
"alert_title": "SQL Injection Detected",
82+
"vulns": [
83+
{"cve": "CVE-2024-001", "severity": "critical"},
84+
{"cve": "CVE-2024-002", "severity": "low"},
85+
{"cve": "CVE-2024-003", "severity": "critical"}
86+
]
87+
}
88+
```
89+
90+
The processor will produce:
91+
```json
92+
{
93+
"alert_title": "SQL Injection Detected",
94+
"vulns": [
95+
{"cve": "CVE-2024-001", "severity": "critical", "title": "SQL Injection Detected"},
96+
{"cve": "CVE-2024-002", "severity": "low"},
97+
{"cve": "CVE-2024-003", "severity": "critical", "title": "SQL Injection Detected"}
98+
]
99+
}
100+
```
101+
102+
In this example:
103+
- `disable_root_keys: false` allows `value_expression: "/alert_title"` to resolve from the root event
104+
- `evaluate_when_on_element: true` evaluates `add_to_element_when` against each element, so only elements with `severity == "critical"` receive the new key
105+
57106
### Configuration
58107
* `entries` - (required) - A list of entries to add to an event
59108
* `key` - (required) - The key of the new entry to be added. One of `key` or `metadata_key` is required.
@@ -62,6 +111,8 @@ then when we run with the same input, the processor will parse the message into
62111
* `format` - (optional) - A format string to use as value of the new entry to be added. For example, `${key1}-${ke2}` where `key1` and `key2` are existing keys in the event. Required if `value` is not specified.
63112
* `value_expression` - (optional) - An expression string to use as value of the new entry to be added. For example, `/key` where `key` is an existing key in the event of type Number/String/Boolean. Expressions can also contain functions returning Number/String/Integer. For example `length(/key)` would return the length of the `key` in the event and key must of String type. Please see [expressions syntax document](https://github.com/opensearch-project/data-prepper/blob/2.3/docs/expression_syntax.md) for complete list of supported functions. Required if `value` and `format are not specified.
64113
* `overwrite_if_key_exists` - (optional) - When set to `true`, if `key` already exists in the event, then the existing value will be overwritten. The default is `false`.
114+
* `disable_root_keys` - (optional) - When set to `false` and used with `iterate_on`, resolves `value_expression` and `format` against the root event instead of the individual array element. This allows referencing root-level fields during array iteration. Has no effect without `iterate_on`. Default is `true`.
115+
* `evaluate_when_on_element` - (optional) - When set to `true` and used with `iterate_on` and `add_to_element_when`, evaluates the `add_to_element_when` condition against each individual array element instead of the root event. This enables per-element conditional logic during array iteration. Default is `false`.
65116

66117
___
67118

data-prepper-plugins/mutate-event-processors/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ dependencies {
2323
implementation project(':data-prepper-plugins:common')
2424
implementation 'com.fasterxml.jackson.core:jackson-databind'
2525
testImplementation project(':data-prepper-test:test-event')
26+
testImplementation project(':data-prepper-test:plugin-test-framework')
2627
testImplementation testLibs.slf4j.simple
2728
testImplementation testLibs.spring.test
2829
testImplementation project(':data-prepper-test:test-common')

data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessor.java

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,15 @@ private static class EntryProperties {
5252
final boolean appendIfExists;
5353
final String addWhen;
5454
final String addToElementWhen;
55+
final boolean disableRootKeys;
56+
final boolean evaluateWhenOnElement;
5557
EntryProperties(AddEntryProcessorConfig.Entry entry, ExpressionEvaluator evaluator) {
5658
this.overwriteIfExists = entry.getOverwriteIfKeyExists();
5759
this.appendIfExists = entry.getAppendIfKeyExists();
5860
this.addWhen = entry.getAddWhen();
5961
this.addToElementWhen = entry.getAddToElementWhen();
62+
this.disableRootKeys = entry.getDisableRootKeys();
63+
this.evaluateWhenOnElement = entry.getEvaluateWhenOnElement();
6064
String valueExpr = entry.getValueExpression();
6165
if (valueExpr != null && !evaluator.isValidExpressionStatement(valueExpr)) {
6266
throw new InvalidPluginConfigurationException(
@@ -261,33 +265,47 @@ private void handleWithIterateOn(final AddEntryProcessorConfig.Entry entry,
261265
// Create builder once
262266
final JacksonEvent.Builder contextBuilder = JacksonEvent.builder()
263267
.withEventMetadata(recordEvent.getMetadata());
264-
265-
// Pre-check addToElementWhen condition if static
266-
final boolean shouldProcessAll = props.addToElementWhen == null ||
267-
expressionEvaluator.evaluateConditional(props.addToElementWhen, recordEvent);
268-
if (shouldProcessAll) {
269-
// Bulk process all items
268+
269+
if (props.addToElementWhen == null) {
270+
// No condition — process all items
270271
for (int i = 0; i < iterateOnList.size(); i++) {
271272
final Map<String, Object> item = iterateOnList.get(i);
272-
iterateOnList.set(i, processIterateOnItem(entry, contextBuilder, item, flattenKey, key, props));
273+
iterateOnList.set(i, processIterateOnItem(entry, contextBuilder, item, flattenKey, key, props, recordEvent));
273274
}
274-
} else {
275-
// Process items individually with condition check
275+
} else if (props.evaluateWhenOnElement) {
276+
// Evaluate condition against each element for selective addition
276277
for (int i = 0; i < iterateOnList.size(); i++) {
277278
final Map<String, Object> item = iterateOnList.get(i);
278-
if (expressionEvaluator.evaluateConditional(props.addToElementWhen, recordEvent)) {
279-
iterateOnList.set(i, processIterateOnItem(entry, contextBuilder, item, flattenKey, key, props));
279+
final Event elementContext = contextBuilder.withData(item).build();
280+
if (expressionEvaluator.evaluateConditional(props.addToElementWhen, elementContext)) {
281+
iterateOnList.set(i, processIterateOnItem(entry, elementContext, item, flattenKey, key, props, recordEvent));
282+
}
283+
}
284+
} else {
285+
// Evaluate condition once against root event
286+
if (expressionEvaluator.evaluateConditional(props.addToElementWhen, recordEvent)) {
287+
for (int i = 0; i < iterateOnList.size(); i++) {
288+
final Map<String, Object> item = iterateOnList.get(i);
289+
iterateOnList.set(i, processIterateOnItem(entry, contextBuilder, item, flattenKey, key, props, recordEvent));
280290
}
281291
}
282292
}
283293
recordEvent.put(iterateOn, iterateOnList);
284294
}
285295
}
286296

287-
private Map<String, Object> processIterateOnItem(AddEntryProcessorConfig.Entry entry, JacksonEvent.Builder contextBuilder,
288-
Map<String, Object> item, final boolean flattenKey, EventKey key, EntryProperties props) {
297+
private Map<String, Object> processIterateOnItem(AddEntryProcessorConfig.Entry entry, JacksonEvent.Builder contextBuilder,
298+
Map<String, Object> item, final boolean flattenKey, EventKey key, EntryProperties props,
299+
final Event recordEvent) {
289300
final Event context = contextBuilder.withData(item).build();
290-
final Object value = retrieveValue(entry, context);
301+
return processIterateOnItem(entry, context, item, flattenKey, key, props, recordEvent);
302+
}
303+
304+
private Map<String, Object> processIterateOnItem(AddEntryProcessorConfig.Entry entry, Event context,
305+
Map<String, Object> item, final boolean flattenKey, EventKey key, EntryProperties props,
306+
final Event recordEvent) {
307+
final Event valueContext = !props.disableRootKeys ? recordEvent : context;
308+
final Object value = retrieveValue(entry, valueContext);
291309
final String keyStr = key.getKey(); // Key and keyStr are guaranteed non-null by caller
292310
if (!item.containsKey(keyStr) || props.overwriteIfExists) {
293311
if (flattenKey) {

data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessorConfig.java

Lines changed: 70 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -159,13 +159,35 @@ public static class Entry {
159159

160160
@JsonProperty("flatten_key")
161161
@JsonPropertyDescription(
162-
"When true and used with iterate_on, treats the key as a plain string. When false and used with iterate_on, treats the key as a json pointer. " +
163-
"Has no effect when not used with iterate_on.")
162+
"When true and used with iterate_on, treats the key as a plain string. When false and used with iterate_on, treats the key as a json pointer. " +
163+
"Has no effect when not used with iterate_on.")
164164
@AlsoRequired(values = {
165165
@AlsoRequired.Required(name="iterate_on")
166166
})
167167
private boolean flattenKey = true;
168168

169+
@JsonProperty("disable_root_keys")
170+
@JsonPropertyDescription(
171+
"When set to <code>false</code> and used with <code>iterate_on</code>, resolves <code>value_expression</code> and <code>format</code> " +
172+
"against the root event instead of the individual array element. This allows referencing root-level fields during array iteration. " +
173+
"Has no effect when not used with <code>iterate_on</code>. The default value is <code>true</code>.")
174+
@AlsoRequired(values = {
175+
@AlsoRequired.Required(name="iterate_on")
176+
})
177+
private boolean disableRootKeys = true;
178+
179+
@JsonProperty("evaluate_when_on_element")
180+
@JsonPropertyDescription(
181+
"When set to <code>true</code> and used with <code>iterate_on</code> and <code>add_to_element_when</code>, " +
182+
"evaluates the <code>add_to_element_when</code> condition against each individual array element instead of the root event. " +
183+
"This enables per-element conditional logic during array iteration. " +
184+
"The default value is <code>false</code>.")
185+
@AlsoRequired(values = {
186+
@AlsoRequired.Required(name="iterate_on"),
187+
@AlsoRequired.Required(name="add_to_element_when")
188+
})
189+
private boolean evaluateWhenOnElement = false;
190+
169191
@JsonProperty("add_when")
170192
@JsonPropertyDescription("A <a href=\"https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/\">conditional expression</a>, " +
171193
"such as <code>/some-key == \"test\"</code>, that will be evaluated to determine whether the processor will be run on the event.")
@@ -209,7 +231,15 @@ public boolean getAppendIfKeyExists() {
209231
}
210232

211233
public boolean getFlattenKey(){
212-
return flattenKey;
234+
return flattenKey;
235+
}
236+
237+
public boolean getDisableRootKeys() {
238+
return disableRootKeys;
239+
}
240+
241+
public boolean getEvaluateWhenOnElement() {
242+
return evaluateWhenOnElement;
213243
}
214244

215245
public String getAddWhen() { return addWhen; }
@@ -229,6 +259,16 @@ boolean flattenKeyFalseIsUsedWithIterateOn() {
229259
return (!flattenKey && iterateOn!=null) || flattenKey;
230260
}
231261

262+
@AssertTrue(message = "disable_root_keys=false only applies when iterate_on is configured.")
263+
boolean disableRootKeysFalseIsUsedWithIterateOn() {
264+
return disableRootKeys || iterateOn != null;
265+
}
266+
267+
@AssertTrue(message = "evaluate_when_on_element only applies when both iterate_on and add_to_element_when are configured.")
268+
boolean evaluateWhenOnElementIsUsedWithIterateOnAndAddToElementWhen() {
269+
return !evaluateWhenOnElement || (iterateOn != null && addToElementWhen != null);
270+
}
271+
232272
public Entry(final String key,
233273
final String metadataKey,
234274
final Object value,
@@ -239,6 +279,8 @@ public Entry(final String key,
239279
final String addWhen,
240280
final String iterateOn,
241281
final boolean flattenKey,
282+
final boolean disableRootKeys,
283+
final boolean evaluateWhenOnElement,
242284
final String addToElementWhen)
243285
{
244286
if (key != null && metadataKey != null) {
@@ -253,6 +295,12 @@ public Entry(final String key,
253295
if (iterateOn == null && addToElementWhen != null) {
254296
throw new InvalidPluginConfigurationException("add_to_element_when only applies when iterate_on is configured.");
255297
}
298+
if (!disableRootKeys && iterateOn == null) {
299+
throw new InvalidPluginConfigurationException("disable_root_keys=false only applies when iterate_on is configured.");
300+
}
301+
if (evaluateWhenOnElement && (iterateOn == null || addToElementWhen == null)) {
302+
throw new InvalidPluginConfigurationException("evaluate_when_on_element only applies when both iterate_on and add_to_element_when are configured.");
303+
}
256304

257305
this.key = key;
258306
this.metadataKey = metadataKey;
@@ -264,6 +312,8 @@ public Entry(final String key,
264312
this.addWhen = addWhen;
265313
this.iterateOn = iterateOn;
266314
this.flattenKey = flattenKey;
315+
this.disableRootKeys = disableRootKeys;
316+
this.evaluateWhenOnElement = evaluateWhenOnElement;
267317
this.addToElementWhen = addToElementWhen;
268318
}
269319

@@ -276,33 +326,26 @@ public Entry(final String key,
276326
final boolean appendIfKeyExists,
277327
final String addWhen,
278328
final String iterateOn,
329+
final boolean flattenKey,
279330
final String addToElementWhen)
280331
{
281-
if (key != null && metadataKey != null) {
282-
throw new IllegalArgumentException("Only one of the two - key and metadatakey - should be specified");
283-
}
284-
if (key == null && metadataKey == null) {
285-
throw new IllegalArgumentException("At least one of the two - key and metadatakey - must be specified");
286-
}
287-
if (metadataKey != null && iterateOn != null) {
288-
throw new IllegalArgumentException("iterate_on cannot be applied to metadata");
289-
}
290-
291-
if (iterateOn == null && addToElementWhen != null) {
292-
throw new InvalidPluginConfigurationException("add_to_element_when only applies when iterate_on is configured.");
293-
}
332+
this(key, metadataKey, value, format, valueExpression, overwriteIfKeyExists, appendIfKeyExists, addWhen,
333+
iterateOn, flattenKey, true, false, addToElementWhen);
334+
}
294335

295-
this.key = key;
296-
this.metadataKey = metadataKey;
297-
this.value = value;
298-
this.format = format;
299-
this.valueExpression = valueExpression;
300-
this.overwriteIfKeyExists = overwriteIfKeyExists;
301-
this.appendIfKeyExists = appendIfKeyExists;
302-
this.addWhen = addWhen;
303-
this.iterateOn = iterateOn;
304-
this.flattenKey = true;
305-
this.addToElementWhen = addToElementWhen;
336+
public Entry(final String key,
337+
final String metadataKey,
338+
final Object value,
339+
final String format,
340+
final String valueExpression,
341+
final boolean overwriteIfKeyExists,
342+
final boolean appendIfKeyExists,
343+
final String addWhen,
344+
final String iterateOn,
345+
final String addToElementWhen)
346+
{
347+
this(key, metadataKey, value, format, valueExpression, overwriteIfKeyExists, appendIfKeyExists, addWhen,
348+
iterateOn, true, true, false, addToElementWhen);
306349
}
307350

308351
public Entry() {

0 commit comments

Comments
 (0)