Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions data-prepper-plugins/mutate-event-processors/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,116 @@ will end up with this after processing:
* `tags_on_failure` - (optional): a list of tags to add to event metadata when the event fails to process


## WrapEntriesProcessor
A processor that wraps each element of a primitive array into an object using a configured key name. This enables downstream processors like `add_entries` and `delete_entries` with `iterate_on`, which require `List<Map<String, Object>>` and cannot operate on primitive arrays.

### Basic Usage
To get started, create the following `pipeline.yaml`.
```yaml
pipeline:
source:
file:
path: "/full/path/to/logs_json.log"
record_type: "event"
format: "json"
processor:
- wrap_entries:
source: "/names"
key: "name"
sink:
- stdout:
```

Create the following file named `logs_json.log` and replace the `path` in the file source of your `pipeline.yaml` with the path of this file.
```json
{"names": ["alice", "bob", "charlie"]}
```

When run, the processor will parse the message into the following output:

```json
{"names": [{"name": "alice"}, {"name": "bob"}, {"name": "charlie"}]}
```

### Writing to a Separate Target
If you want to keep the original array and write the wrapped objects to a different key, use the `target` option:
```yaml
processor:
- wrap_entries:
source: "/items"
target: "/inventory_items"
key: "product"
```

Input:
```json
{"items": ["laptop", "monitor", "keyboard"]}
```

Output:
```json
{"items": ["laptop", "monitor", "keyboard"], "inventory_items": [{"product": "laptop"}, {"product": "monitor"}, {"product": "keyboard"}]}
```

### Conditional Processing
Use `wrap_entries_when` to only process events matching a condition:
```yaml
processor:
- wrap_entries:
source: "/tags"
key: "value"
wrap_entries_when: '/type == "tagged"'
```

Only events where `type` equals `"tagged"` will be processed.

### Chaining with add_entries
A common use case is wrapping a primitive array so that `add_entries` with `iterate_on` can operate on it:
```yaml
processor:
- wrap_entries:
source: "/names"
key: "name"
- add_entries:
iterate_on: "/names"
entries:
- key: "greeting"
format: "Hello, ${name}"
```

Input:
```json
{"names": ["alice", "bob"]}
```

After `wrap_entries`:
```json
{"names": [{"name": "alice"}, {"name": "bob"}]}
```

After `add_entries`:
```json
{"names": [{"name": "alice", "greeting": "Hello, alice"}, {"name": "bob", "greeting": "Hello, bob"}]}
```

### Configuration
* `source` - (required) - The key of the primitive array to transform (JSON Pointer)
* `target` - (optional) - The key to write the resulting object array to. Defaults to `source` (in-place). Must not be empty when specified.
* `key` - (required) - The key name to use in each resulting object
* `exclude_null_empty_values` - (optional) - When set to `true`, null and empty string elements are filtered out before wrapping. Default is `false`
* `append_if_target_exists` - (optional) - When set to `true`, appends results to the existing target array instead of overwriting. Default is `false`
* `wrap_entries_when` - (optional) - A [conditional expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/) that determines whether the processor runs on the event. Evaluated at the root event level.
* `tags_on_failure` - (optional) - A list of tags to add to the event metadata when the event fails to process

### Edge Case Behavior
* If the `source` key does not exist in the event, the processor silently skips the event (no-op). A debug-level log is emitted.
* If the `source` value is not a list (e.g., a string or number), the processor skips the event (no-op) and adds `tags_on_failure` if configured.
* If the `source` list is empty, the processor does nothing — the empty list remains as-is.
* If no elements remain after filtering (when `exclude_null_empty_values` is `true` and all elements are null or empty), the target is written as an empty list `[]`.
* Null elements within the list are wrapped like any other value by default: `[null]` becomes `[{"key": null}]`. Use `exclude_null_empty_values: true` to filter them out.

___

## Developer Guide
This plugin is compatible with Java 11 and 17. Refer to the following developer guides for plugin development:
- [Developer Guide](https://github.com/opensearch-project/data-prepper/blob/main/docs/developer_guide.md)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/
package org.opensearch.dataprepper.plugins.processor.mutateevent;

import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@DataPrepperPlugin(name = "wrap_entries", pluginType = Processor.class, pluginConfigurationType = WrapEntriesProcessorConfig.class)
public class WrapEntriesProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {

private static final Logger LOG = LoggerFactory.getLogger(WrapEntriesProcessor.class);

private final WrapEntriesProcessorConfig config;
private final ExpressionEvaluator expressionEvaluator;

@DataPrepperPluginConstructor
public WrapEntriesProcessor(final PluginMetrics pluginMetrics,
final WrapEntriesProcessorConfig config,
final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics);
this.config = config;
this.expressionEvaluator = expressionEvaluator;
config.validateExpressions(expressionEvaluator);
}

@Override
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) {
for (final Record<Event> record : records) {
final Event event = record.getData();

try {
if (config.getWrapEntriesWhen() != null
&& !expressionEvaluator.evaluateConditional(config.getWrapEntriesWhen(), event)) {
continue;
}

processEvent(event);
} catch (final Exception e) {
LOG.atError()
.addMarker(EVENT)
.addMarker(NOISY)
.setMessage("Error processing event [{}]")
.addArgument(event)
.setCause(e)
.log();
addTagsOnFailure(event);
}
}
return records;
}

private void processEvent(final Event event) {
final String source = config.getSource();

if (!event.containsKey(source)) {
LOG.debug(EVENT, "Source key [{}] does not exist in event [{}], skipping.", source, event);
return;
}

final Object sourceValue = event.get(source, Object.class);

if (!(sourceValue instanceof List)) {
LOG.warn(EVENT, "Source key [{}] is not a list in event [{}], skipping.", source, event);
addTagsOnFailure(event);
return;
}

final List<?> sourceList = (List<?>) sourceValue;

if (sourceList.isEmpty()) {
return;
}

final List<Map<String, Object>> result = new ArrayList<>(sourceList.size());
final String key = config.getKey();

for (final Object element : sourceList) {
if (config.getExcludeNullEmptyValues()) {
if (element == null) {
continue;
}
if (element instanceof String && ((String) element).isEmpty()) {
continue;
}
}
final Map<String, Object> entry = new HashMap<>();
entry.put(key, element);
result.add(entry);
}


final String effectiveTarget = config.getEffectiveTarget();

if (config.getAppendIfTargetExists() && event.containsKey(effectiveTarget)) {
final Object existingValue = event.get(effectiveTarget, Object.class);
if (!(existingValue instanceof List)) {
LOG.warn(EVENT, "Target key [{}] exists but is not a list in event [{}], skipping.",
effectiveTarget, event);
addTagsOnFailure(event);
return;
}
final List<Object> existingList = new ArrayList<>((List<?>) existingValue);
existingList.addAll(result);
event.put(effectiveTarget, existingList);
} else {
event.put(effectiveTarget, result);
}
}

private void addTagsOnFailure(final Event event) {
final List<String> tags = config.getTagsOnFailure();
if (tags != null && !tags.isEmpty()) {
event.getMetadata().addTags(tags);
}
}

@Override
public void prepareForShutdown() {
}

@Override
public boolean isReadyForShutdown() {
return true;
}

@Override
public void shutdown() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.processor.mutateevent;

import com.fasterxml.jackson.annotation.JsonClassDescription;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.model.annotations.ExampleValues;
import org.opensearch.dataprepper.model.annotations.ExampleValues.Example;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;

import java.util.List;

@JsonPropertyOrder
@JsonClassDescription("The <code>wrap_entries</code> processor wraps each element of a primitive array " +
"into an object using a configured key name.")
public class WrapEntriesProcessorConfig {

@NotNull
@NotEmpty
@JsonProperty("source")
@JsonPropertyDescription("The key of the primitive array to transform.")
@ExampleValues({
@Example(value = "/names", description = "The source array field to wrap into objects.")
})
private String source;

@JsonProperty("target")
@JsonPropertyDescription("The key to write the resulting object array to. Defaults to <code>source</code> (in-place).")
@ExampleValues({
@Example(value = "/agents", description = "Write the resulting object array to a separate field.")
})
private String target;

@NotNull
@NotEmpty
@JsonProperty("key")
@JsonPropertyDescription("The key name to use in each resulting object.")
@ExampleValues({
@Example(value = "name", description = "Each primitive value is wrapped as {\"name\": value}.")
})
private String key;

@JsonProperty("exclude_null_empty_values")
@JsonPropertyDescription("When <code>true</code>, null and empty string elements are filtered out " +
"before wrapping. Default is <code>false</code>.")
private boolean excludeNullEmptyValues = false;

@JsonProperty("append_if_target_exists")
@JsonPropertyDescription("When <code>true</code>, appends results to the existing target array " +
"instead of overwriting. Default is <code>false</code>.")
private boolean appendIfTargetExists = false;

@JsonProperty("wrap_entries_when")
@JsonPropertyDescription("A <a href=\"https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/\">conditional expression</a> " +
"that will be evaluated to determine whether the processor will be run on the event.")
@ExampleValues({
@Example(value = "/type == \"tagged\"", description = "Only process events where type is 'tagged'.")
})
private String wrapEntriesWhen;

@JsonProperty("tags_on_failure")
@JsonPropertyDescription("A list of tags to add to the event metadata when the event fails to process.")
private List<String> tagsOnFailure;

public String getSource() {
return source;
}

public String getTarget() {
return target;
}

public String getKey() {
return key;
}

public boolean getExcludeNullEmptyValues() {
return excludeNullEmptyValues;
}

public boolean getAppendIfTargetExists() {
return appendIfTargetExists;
}

public String getWrapEntriesWhen() {
return wrapEntriesWhen;
}

public List<String> getTagsOnFailure() {
return tagsOnFailure;
}

void validateExpressions(final ExpressionEvaluator expressionEvaluator) {
if (target != null && target.isEmpty()) {
throw new InvalidPluginConfigurationException("target must not be empty when specified.");
}

if (wrapEntriesWhen != null && !expressionEvaluator.isValidExpressionStatement(wrapEntriesWhen)) {
throw new InvalidPluginConfigurationException(
String.format("wrap_entries_when \"%s\" is not a valid expression statement. " +
"See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax",
wrapEntriesWhen));
}
}

/**
* Returns the effective target key. If target is not set, defaults to source.
*/
public String getEffectiveTarget() {
return target != null ? target : source;
}
}
Loading
Loading