Skip to content

Commit 24d5184

Browse files
Add array field splitting support to split_event processor (#6774)
Add array field splitting support to split_event processor Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com>
1 parent 506a04f commit 24d5184

3 files changed

Lines changed: 502 additions & 250 deletions

File tree

data-prepper-plugins/split-event-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/splitevent/SplitEventProcessor.java

Lines changed: 76 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -19,93 +19,128 @@
1919
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
2020
import org.opensearch.dataprepper.model.processor.Processor;
2121
import org.opensearch.dataprepper.model.record.Record;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
2224

2325
import java.util.ArrayList;
2426
import java.util.Collection;
27+
import java.util.List;
2528
import java.util.function.Function;
2629
import java.util.regex.Pattern;
2730

28-
2931
@DataPrepperPlugin(name = "split_event", pluginType = Processor.class, pluginConfigurationType = SplitEventProcessorConfig.class)
30-
public class SplitEventProcessor extends AbstractProcessor<Record<Event>, Record<Event>>{
31-
final String delimiter;
32-
final String delimiterRegex;
33-
final String field;
34-
final Pattern pattern;
32+
public class SplitEventProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
33+
private static final Logger LOG = LoggerFactory.getLogger(SplitEventProcessor.class);
34+
35+
private final String field;
3536
private final Function<String, String[]> splitter;
3637

3738
@DataPrepperPluginConstructor
3839
public SplitEventProcessor(final PluginMetrics pluginMetrics, final SplitEventProcessorConfig config) {
3940
super(pluginMetrics);
40-
this.delimiter = config.getDelimiter();
41-
this.delimiterRegex = config.getDelimiterRegex();
4241
this.field = config.getField();
4342

44-
if(delimiterRegex != null && !delimiterRegex.isEmpty()
43+
final String delimiter = config.getDelimiter();
44+
final String delimiterRegex = config.getDelimiterRegex();
45+
46+
if (delimiterRegex != null && !delimiterRegex.isEmpty()
4547
&& delimiter != null && !delimiter.isEmpty()) {
4648
throw new IllegalArgumentException("delimiter and delimiter_regex cannot be defined at the same time");
47-
} else if((delimiterRegex == null || delimiterRegex.isEmpty()) &&
48-
(delimiter == null || delimiter.isEmpty())) {
49-
throw new IllegalArgumentException("delimiter or delimiter_regex needs to be defined");
5049
}
5150

52-
if(delimiterRegex != null && !delimiterRegex.isEmpty()) {
53-
pattern = Pattern.compile(delimiterRegex);
51+
final boolean hasRegex = (delimiterRegex != null && !delimiterRegex.isEmpty());
52+
53+
if (hasRegex) {
54+
final Pattern pattern = Pattern.compile(delimiterRegex);
5455
splitter = pattern::split;
56+
} else if (delimiter != null && !delimiter.isEmpty()) {
57+
final Pattern literalPattern = Pattern.compile(Pattern.quote(delimiter));
58+
splitter = literalPattern::split;
5559
} else {
56-
splitter = inputString -> inputString.split(delimiter);
57-
pattern = null;
60+
splitter = null;
5861
}
5962
}
6063

6164
@Override
6265
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) {
63-
Collection<Record<Event>> newRecords = new ArrayList<>();
64-
for(final Record<Event> record : records) {
66+
final Collection<Record<Event>> newRecords = new ArrayList<>();
67+
for (final Record<Event> record : records) {
6568
final Event recordEvent = record.getData();
6669

6770
if (!recordEvent.containsKey(field)) {
6871
newRecords.add(record);
6972
continue;
7073
}
71-
74+
7275
final Object value = recordEvent.get(field, Object.class);
7376

74-
//split record according to delimiter
75-
final String[] splitValues = splitter.apply((String) value);
77+
if (value == null) {
78+
newRecords.add(record);
79+
continue;
80+
}
81+
82+
if (value instanceof List<?>) {
83+
splitArrayField(record, recordEvent, (List<?>) value, newRecords);
84+
continue;
85+
}
7686

77-
// when no splits or empty value use the original record
78-
if(splitValues.length <= 1) {
87+
if (splitter == null) {
88+
LOG.debug("Field '{}' is not an array and no delimiter is configured, passing through unchanged", field);
7989
newRecords.add(record);
8090
continue;
81-
}
91+
}
8292

83-
//create new events for the splits
84-
for (int i = 0; i < splitValues.length-1 ; i++) {
85-
Record newRecord = createNewRecordFromEvent(recordEvent, splitValues[i]);
86-
addToAcknowledgementSetFromOriginEvent((Event) newRecord.getData(), recordEvent);
87-
newRecords.add(newRecord);
93+
if (!(value instanceof String)) {
94+
LOG.debug("Field '{}' has non-string, non-array value of type {}, passing through unchanged", field, value.getClass().getSimpleName());
95+
newRecords.add(record);
96+
continue;
8897
}
8998

90-
// Modify original event to hold the last split
91-
recordEvent.put(field, splitValues[splitValues.length-1]);
92-
newRecords.add(record);
99+
final String[] splitValues = splitter.apply((String) value);
100+
101+
if (splitValues.length <= 1) {
102+
newRecords.add(record);
103+
continue;
104+
}
105+
106+
splitIntoRecords(record, recordEvent, splitValues, newRecords);
93107
}
94108
return newRecords;
95109
}
96110

97-
protected Record createNewRecordFromEvent(final Event recordEvent, String splitValue) {
98-
Record newRecord;
99-
JacksonEvent newRecordEvent;
111+
private void splitArrayField(final Record<Event> record, final Event recordEvent,
112+
final List<?> arrayValue, final Collection<Record<Event>> newRecords) {
113+
if (arrayValue.size() <= 1) {
114+
if (arrayValue.size() == 1) {
115+
recordEvent.put(field, arrayValue.get(0));
116+
}
117+
newRecords.add(record);
118+
return;
119+
}
120+
121+
splitIntoRecords(record, recordEvent, arrayValue.toArray(), newRecords);
122+
}
123+
124+
private void splitIntoRecords(final Record<Event> record, final Event recordEvent,
125+
final Object[] values, final Collection<Record<Event>> newRecords) {
126+
for (int i = 0; i < values.length - 1; i++) {
127+
final Record<Event> newRecord = createNewRecordFromEvent(recordEvent, values[i]);
128+
addToAcknowledgementSetFromOriginEvent(newRecord.getData(), recordEvent);
129+
newRecords.add(newRecord);
130+
}
131+
132+
recordEvent.put(field, values[values.length - 1]);
133+
newRecords.add(record);
134+
}
100135

101-
newRecordEvent = JacksonEvent.fromEvent(recordEvent);
102-
newRecordEvent.put(field,(Object) splitValue);
103-
newRecord = new Record<>(newRecordEvent);
104-
return newRecord;
136+
private Record<Event> createNewRecordFromEvent(final Event recordEvent, final Object splitValue) {
137+
final JacksonEvent newRecordEvent = JacksonEvent.fromEvent(recordEvent);
138+
newRecordEvent.put(field, splitValue);
139+
return new Record<>(newRecordEvent);
105140
}
106141

107-
protected void addToAcknowledgementSetFromOriginEvent(Event recordEvent, Event originRecordEvent) {
108-
DefaultEventHandle eventHandle = (DefaultEventHandle) originRecordEvent.getEventHandle();
142+
private void addToAcknowledgementSetFromOriginEvent(final Event recordEvent, final Event originRecordEvent) {
143+
final DefaultEventHandle eventHandle = (DefaultEventHandle) originRecordEvent.getEventHandle();
109144
if (eventHandle != null) {
110145
eventHandle.addEventHandle(recordEvent.getEventHandle());
111146
}

data-prepper-plugins/split-event-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/splitevent/SplitEventProcessorConfig.java

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,52 +14,45 @@
1414
import com.fasterxml.jackson.annotation.JsonProperty;
1515
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
1616
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
17+
import jakarta.validation.constraints.AssertTrue;
1718
import jakarta.validation.constraints.NotEmpty;
18-
import jakarta.validation.constraints.NotNull;
1919
import jakarta.validation.constraints.Size;
2020
import org.opensearch.dataprepper.model.annotations.AlsoRequired;
21-
import org.opensearch.dataprepper.model.annotations.ConditionalRequired;
22-
import org.opensearch.dataprepper.model.annotations.ConditionalRequired.IfThenElse;
23-
import org.opensearch.dataprepper.model.annotations.ConditionalRequired.SchemaProperty;
2421
import org.opensearch.dataprepper.model.annotations.ValidRegex;
2522

26-
@ConditionalRequired(value = {
27-
@IfThenElse(
28-
ifFulfilled = {@SchemaProperty(field = "delimiter", value = "null")},
29-
thenExpect = {@SchemaProperty(field = "delimiter_regex")}
30-
),
31-
@IfThenElse(
32-
ifFulfilled = {@SchemaProperty(field = "delimiter_regex", value = "null")},
33-
thenExpect = {@SchemaProperty(field = "delimiter")}
34-
)
35-
})
3623
@JsonPropertyOrder
37-
@JsonClassDescription("The <code>split_event</code> processor is used to split events based on a delimiter and " +
38-
"generates multiple events from a user-specified field.")
24+
@JsonClassDescription("The <code>split_event</code> processor is used to split events based on a delimiter or an array field, " +
25+
"generating multiple events from a user-specified field.")
3926
public class SplitEventProcessorConfig {
4027
static final String DELIMITER_REGEX_KEY = "delimiter_regex";
4128

4229
@NotEmpty
43-
@NotNull
4430
@JsonProperty("field")
45-
@JsonPropertyDescription("The event field to be split.")
31+
@JsonPropertyDescription("The event field to be split. When no delimiter is specified, the field is treated as an array and each element becomes a separate event.")
4632
private String field;
4733

4834
@Size(min = 1, max = 1)
49-
@JsonPropertyDescription("The delimiter character used for splitting the field. You must provide either the <code>delimiter</code> or the <code>delimiter_regex</code>.")
35+
@JsonPropertyDescription("The delimiter character used for splitting the field. When neither <code>delimiter</code> nor <code>delimiter_regex</code> is specified, the field is treated as an array.")
5036
@AlsoRequired(values = {
5137
@AlsoRequired.Required(name=DELIMITER_REGEX_KEY, allowedValues = {"null", "\"\""})
5238
})
5339
private String delimiter;
5440

5541
@ValidRegex(message = "The value of delimiter_regex is not a valid regex string")
5642
@JsonProperty(DELIMITER_REGEX_KEY)
57-
@JsonPropertyDescription("The regular expression used as the delimiter for splitting the field. You must provide either the <code>delimiter</code> or the <code>delimiter_regex</code>.")
43+
@JsonPropertyDescription("The regular expression used as the delimiter for splitting the field. When neither <code>delimiter</code> nor <code>delimiter_regex</code> is specified, the field is treated as an array.")
5844
@AlsoRequired(values = {
5945
@AlsoRequired.Required(name="delimiter", allowedValues = {"null", "\"\""})
6046
})
6147
private String delimiterRegex;
6248

49+
@AssertTrue(message = "delimiter and delimiter_regex cannot both be specified")
50+
private boolean isValidDelimiterConfig() {
51+
boolean hasDelimiter = delimiter != null && !delimiter.isEmpty();
52+
boolean hasRegex = delimiterRegex != null && !delimiterRegex.isEmpty();
53+
return !(hasDelimiter && hasRegex);
54+
}
55+
6356
public String getField() {
6457
return field;
6558
}

0 commit comments

Comments
 (0)