diff --git a/data-prepper-plugins/dissect-processor/build.gradle b/data-prepper-plugins/dissect-processor/build.gradle index 2583244db5..a491ab955e 100644 --- a/data-prepper-plugins/dissect-processor/build.gradle +++ b/data-prepper-plugins/dissect-processor/build.gradle @@ -9,6 +9,7 @@ plugins { id 'java' + id 'data-prepper.jmh' } @@ -21,8 +22,17 @@ dependencies { testImplementation project(':data-prepper-plugins:log-generator-source') testImplementation project(':data-prepper-test:test-common') implementation libs.commons.lang3 + jmh project(':data-prepper-api') } test { useJUnitPlatform() +} + +tasks.withType(Zip).configureEach { + zip64 = true +} + +jmh { + profilers = ['gc'] } \ No newline at end of file diff --git a/data-prepper-plugins/dissect-processor/src/jmh/java/org/opensearch/dataprepper/plugins/processor/dissect/DissectProcessorBenchmark.java b/data-prepper-plugins/dissect-processor/src/jmh/java/org/opensearch/dataprepper/plugins/processor/dissect/DissectProcessorBenchmark.java new file mode 100644 index 0000000000..e1ff91c0d6 --- /dev/null +++ b/data-prepper-plugins/dissect-processor/src/jmh/java/org/opensearch/dataprepper/plugins/processor/dissect/DissectProcessorBenchmark.java @@ -0,0 +1,46 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.processor.dissect; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +@Fork(2) +@Threads(4) +@Warmup(iterations = 1, time = 2) +@Measurement(iterations = 5, time = 10) +public class DissectProcessorBenchmark { + + @State(Scope.Benchmark) + public static class BenchmarkState { + Dissector dissector; + String input; + + @Setup + public void setUp() { + dissector = new Dissector("%{timestamp} %{level} %{message}"); + input = "2024-01-15 ERROR service crashed"; + } + } + + @Benchmark + @BenchmarkMode(Mode.Throughput) + public Object benchmark_dissect(BenchmarkState state) { + return state.dissector.dissectText(state.input); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/dissect-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/dissect/Delimiter.java b/data-prepper-plugins/dissect-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/dissect/Delimiter.java index a867736316..143efa6f0b 100644 --- a/data-prepper-plugins/dissect-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/dissect/Delimiter.java +++ b/data-prepper-plugins/dissect-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/dissect/Delimiter.java @@ -11,48 +11,11 @@ public class Delimiter { private final String delimiterString; - private int start = -1; - private int end = -1; - private Delimiter next = null; - - private Delimiter prev = null; public Delimiter(String delimiterString) { this.delimiterString = delimiterString; } - public int getStart() { - return start; - } - - public void setStart(int ind) { - start = ind; - } - - public int getEnd() { - return end; - } - - public void setEnd(int ind) { - end = ind; - } - - public Delimiter getNext() { - return next; - } - - public void setNext(Delimiter nextDelimiter) { - next = nextDelimiter; - } - - public Delimiter getPrev() { - return prev; - } - - public void setPrev(Delimiter prevDelimiter) { - prev = prevDelimiter; - } - @Override public String toString() { return delimiterString; diff --git a/data-prepper-plugins/dissect-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/dissect/DissectProcessor.java b/data-prepper-plugins/dissect-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/dissect/DissectProcessor.java index b3bacbac22..a00b816a11 100644 --- a/data-prepper-plugins/dissect-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/dissect/DissectProcessor.java +++ b/data-prepper-plugins/dissect-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/dissect/DissectProcessor.java @@ -15,13 +15,11 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; -import org.opensearch.dataprepper.model.annotations.SingleThread; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.processor.AbstractProcessor; import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.processor.dissect.Fields.Field; import org.opensearch.dataprepper.plugins.processor.mutateevent.TargetType; import org.opensearch.dataprepper.typeconverter.TypeConverter; import org.slf4j.Logger; @@ -29,12 +27,9 @@ import java.util.Collection; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Objects; - -@SingleThread @DataPrepperPlugin(name = "dissect", pluginType = Processor.class, pluginConfigurationType = DissectProcessorConfig.class) public class DissectProcessor extends AbstractProcessor, Record> { private static final Logger LOG = LoggerFactory.getLogger(DissectProcessor.class); @@ -96,16 +91,15 @@ private void dissectField(Event event, String field){ Dissector dissector = dissectorMap.get(field); boolean isDeleteSourceOnSuccessfulDissect = dissectConfig.isDeleteSourceRequested(); String text = event.get(field, String.class); - if (dissector.dissectText(text)) { - List dissectedFields = dissector.getDissectedFields(); - for(Field disectedField: dissectedFields) { - String dissectFieldName = disectedField.getKey(); - Object dissectFieldValue = convertTargetType(dissectFieldName,disectedField.getValue()); - event.put(disectedField.getKey(), dissectFieldValue); - } - if (isDeleteSourceOnSuccessfulDissect) { - event.delete(field); - } + Map dissectedFields = dissector.dissectText(text); + if (dissectedFields == null) { + return; + } + for(Map.Entry entry: dissectedFields.entrySet()) { + event.put(entry.getKey(), convertTargetType(entry.getKey(), entry.getValue())); + } + if (isDeleteSourceOnSuccessfulDissect) { + event.delete(field); } } @@ -126,8 +120,6 @@ private Object convertTargetType(String fieldKey, String fieldValue){ } } - - @Override public void prepareForShutdown() { diff --git a/data-prepper-plugins/dissect-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/dissect/Dissector.java b/data-prepper-plugins/dissect-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/dissect/Dissector.java index 937155514d..d848450352 100644 --- a/data-prepper-plugins/dissect-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/dissect/Dissector.java +++ b/data-prepper-plugins/dissect-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/dissect/Dissector.java @@ -16,12 +16,12 @@ import org.opensearch.dataprepper.plugins.processor.dissect.Fields.NormalField; import org.opensearch.dataprepper.plugins.processor.dissect.Fields.SkipField; -import java.util.ArrayList; -import java.util.Collections; +import java.util.Map; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; -import java.util.Map; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.Collections; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -66,64 +66,82 @@ public Dissector(String dissectPatternString){ setFieldsMaps(); } - public boolean dissectText(String text){ - try { - if (!setDelimiterIndexes(text)) { - return false; + public Map dissectText(String text) { + if (text == null) { + return null; + } + int n = delimiterList.size(); + int[] delimStarts = new int[n]; + int[] delimEnds = new int[n]; + if (!computeDelimiterPositions(text, delimStarts, delimEnds, n)) { + return null; + } + Map localValues = new HashMap<>(); // keyed by identity — same Field instances used for put and get + Field head = fieldsList.getFirst(); + for (int i = 0; i < n; i++) { + int fieldStart = 0; + int fieldEnd = delimStarts[i]; + if (i == 0 && delimStarts[i] == 0) { + continue; + } + if (i > 0) { + fieldStart = delimEnds[i - 1] + 1; } - Field head = fieldsList.getFirst(); - for (final Delimiter delimiter : delimiterList) { - int fieldStart = 0; - int fieldEnd = delimiter.getStart(); - if (delimiter.getPrev() == null && delimiter.getStart() == 0) { - continue; - } - if (delimiter.getPrev() != null || delimiter.getStart() == 0) { - fieldStart = delimiter.getPrev().getEnd() + 1; - } - head.setValue(text.substring(fieldStart, fieldEnd)); - head = head.getNext(); - } - if (delimiterList.getLast().getEnd() != text.length() - 1) { - int fieldStart = delimiterList.getLast().getEnd() + 1; - int fieldEnd = text.length(); - head.setValue(text.substring(fieldStart, fieldEnd)); - } - return true; - } catch (Exception e) { - return false; + if (head == null) { + return null; + } + String val = text.substring(fieldStart, fieldEnd); + localValues.put(head, head.isStripTrailing() ? val.stripTrailing() : val); + head = head.getNext(); + } + if (delimEnds[n - 1] != text.length() - 1) { + if (head == null) { + return null; + } + String val = text.substring(delimEnds[n - 1] + 1); + localValues.put(head, head.isStripTrailing() ? val.stripTrailing() : val); } + return getDissectedFields(localValues); } - public List getDissectedFields(){ - final List dissectedFields = new ArrayList<>(); - Map appendFieldMap = getAppendedFields(unAppendedFieldsMap); + private Map getDissectedFields(Map localValues) { + final Map results = new HashMap<>(); + Map appendFieldMap = getAppendedFields(localValues); - dissectedFields.addAll(normalFieldMap.values()); - dissectedFields.addAll(appendFieldMap.values()); - - for(final Field indirectField : indirectFieldMap.values()){ - if(normalFieldMap.containsKey(indirectField.getKey())){ - indirectField.setKey(normalFieldMap.get(indirectField.getKey()).getValue()); + for (NormalField templateField : normalFieldMap.values()) { + String val = localValues.get(templateField); + if (val != null) { + results.put(templateField.getKey(), val); } - if(skipFieldMap.containsKey(indirectField.getKey())){ - indirectField.setKey(skipFieldMap.get(indirectField.getKey()).getValue()); + } + + for (Map.Entry entry : appendFieldMap.entrySet()) { + results.put(entry.getKey(), entry.getValue()); + } + + for (IndirectField templateField : indirectFieldMap.values()) { + String templateKey = templateField.getKey(); + String resolvedKey = null; + if (normalFieldMap.containsKey(templateKey)) { + resolvedKey = localValues.get(normalFieldMap.get(templateKey)); + } else if (skipFieldMap.containsKey(templateKey)) { + resolvedKey = localValues.get(skipFieldMap.get(templateKey)); + } else if (appendFieldMap.containsKey(templateKey)) { + resolvedKey = appendFieldMap.get(templateKey); } - if(appendFieldMap.containsKey(indirectField.getKey())){ - indirectField.setKey(appendFieldMap - .get(indirectField.getKey()).getValue()); + String val = localValues.get(templateField); + if (resolvedKey != null && val != null) { + results.put(resolvedKey, val); } - dissectedFields.add(indirectField); } - - return dissectedFields; + return results; } private void setFieldsMaps(){ - this.normalFieldMap = fieldHelper.getNormalFieldMap(); - this.skipFieldMap = fieldHelper.getSkipFieldMap(); - this.indirectFieldMap = fieldHelper.getIndirectFieldMap(); - this.unAppendedFieldsMap = fieldHelper.getAppendFieldMap(); + this.normalFieldMap = Collections.unmodifiableMap(fieldHelper.getNormalFieldMap()); + this.skipFieldMap = Collections.unmodifiableMap(fieldHelper.getSkipFieldMap()); + this.indirectFieldMap = Collections.unmodifiableMap(fieldHelper.getIndirectFieldMap()); + this.unAppendedFieldsMap = Collections.unmodifiableMap(fieldHelper.getAppendFieldMap()); } private void parseFields(String[] fieldsArray){ @@ -150,47 +168,45 @@ private void parseDelimiters(String[] delimiterArray) { if (delimiterString.length() == 0) { continue; } - Delimiter delimiter = new Delimiter(delimiterString); - if (delimiterList.size() == 0) { - delimiterList.addLast(delimiter); - } else { - delimiterList.getLast().setNext(delimiter); - delimiter.setPrev(delimiterList.getLast()); - delimiterList.addLast(delimiter); - } + delimiterList.addLast(new Delimiter(delimiterString)); } } - private boolean setDelimiterIndexes(String text){ + private boolean computeDelimiterPositions(String text, int[] starts, int[] ends, int n) { + int i = 0; for (Delimiter delimiter : delimiterList) { int prevEnd = 0; - if (delimiter.getPrev() != null) { - prevEnd = delimiter.getPrev().getEnd() + 1; + if (i > 0) { + prevEnd = ends[i - 1] + 1; } String delimiterString = delimiter.toString(); int start = text.indexOf(delimiterString, prevEnd); + if (start < 0) { + return false; + } if (delimiterString.trim().isEmpty()) { start = start + findLastWhitespaceIndex(text.substring(start), delimiterString.length()); } - int end = start + delimiterString.length() -1; - if (start < 0 || end > text.length()) { + int end = start + delimiterString.length() - 1; + if (end > text.length()) { return false; } - delimiter.setStart(start); - delimiter.setEnd(end); + starts[i] = start; + ends[i] = end; + i++; } return true; } - private Map getAppendedFields(Map> unAppendedFieldsMap){ - final Map appendFieldMap = new HashMap<>(); - for(final String key : unAppendedFieldsMap.keySet()){ - List appendFields = unAppendedFieldsMap.get(key); - Collections.sort(appendFields); - String value = appendFields.stream().map(AppendField::getValue).collect(Collectors.joining()); - AppendField sortedField = new AppendField(key); - sortedField.setValue(value); - appendFieldMap.put(sortedField.getKey(), sortedField); + private Map getAppendedFields(Map localValues) { + final Map appendFieldMap = new HashMap<>(); + for (Map.Entry> entry : unAppendedFieldsMap.entrySet()) { + List copy = new ArrayList<>(entry.getValue()); + Collections.sort(copy); + String value = copy.stream() + .map(f -> localValues.getOrDefault(f, "")) + .collect(Collectors.joining()); + appendFieldMap.put(entry.getKey(), value); } return appendFieldMap; } diff --git a/data-prepper-plugins/dissect-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/dissect/Fields/Field.java b/data-prepper-plugins/dissect-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/dissect/Fields/Field.java index fc5963a9cf..4ab0451857 100644 --- a/data-prepper-plugins/dissect-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/dissect/Fields/Field.java +++ b/data-prepper-plugins/dissect-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/dissect/Fields/Field.java @@ -12,13 +12,8 @@ public abstract class Field { boolean stripTrailing = false; private String key; - private String value; private Field next; - public String getValue() { - return value; - } - public String getKey() { return key; } @@ -35,8 +30,8 @@ public void setNext(Field next) { this.next = next; } - public void setValue(String value) { - this.value = stripTrailing ? value.stripTrailing() : value; + public boolean isStripTrailing() { + return stripTrailing; } } diff --git a/data-prepper-plugins/dissect-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/dissect/DelimiterTest.java b/data-prepper-plugins/dissect-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/dissect/DelimiterTest.java index 0358136192..aac9b3ddca 100644 --- a/data-prepper-plugins/dissect-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/dissect/DelimiterTest.java +++ b/data-prepper-plugins/dissect-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/dissect/DelimiterTest.java @@ -9,7 +9,6 @@ package org.opensearch.dataprepper.plugins.processor.dissect; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import static org.hamcrest.MatcherAssert.assertThat; @@ -17,37 +16,9 @@ class DelimiterTest { - private static final String DELIMITER_STRING = "test_string"; - private static final int START_END_INDEX = 3; - private static final Delimiter DELIMITER = new Delimiter("test_delimiter"); - private Delimiter delimiter; - - @BeforeEach - public void setUp() { - delimiter = new Delimiter(DELIMITER_STRING); - } - - @Test - public void testSetAndGetStart() { - delimiter.setStart(START_END_INDEX); - assertThat(delimiter.getStart(), is(START_END_INDEX)); - } - - @Test - public void testSetAndGetEnd() { - delimiter.setEnd(START_END_INDEX); - assertThat(delimiter.getEnd(), is(START_END_INDEX)); - } - - @Test - public void testSetAndGetNext() { - delimiter.setNext(DELIMITER); - assertThat(delimiter.getNext(), is(DELIMITER)); - } - @Test - public void testSetAndGetPrev() { - delimiter.setPrev(DELIMITER); - assertThat(delimiter.getPrev(), is(DELIMITER)); + public void testToString() { + Delimiter delimiter = new Delimiter("test_string"); + assertThat(delimiter.toString(), is("test_string")); } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/dissect-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/dissect/DissectProcessorTest.java b/data-prepper-plugins/dissect-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/dissect/DissectProcessorTest.java index 9040ce2ceb..d4946372e2 100644 --- a/data-prepper-plugins/dissect-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/dissect/DissectProcessorTest.java +++ b/data-prepper-plugins/dissect-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/dissect/DissectProcessorTest.java @@ -20,17 +20,18 @@ import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.processor.dissect.Fields.AppendField; -import org.opensearch.dataprepper.plugins.processor.dissect.Fields.Field; -import org.opensearch.dataprepper.plugins.processor.dissect.Fields.IndirectField; -import org.opensearch.dataprepper.plugins.processor.dissect.Fields.NormalField; import org.opensearch.dataprepper.plugins.processor.mutateevent.TargetType; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -38,6 +39,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @@ -71,13 +73,7 @@ void invalid_dissect_when_condition_throws_InvalidPluginConfigurationException() @Test void test_normal_fields_dissect_succeeded() throws NoSuchFieldException, IllegalAccessException { - Field field1 = new NormalField("field1"); - Field field2 = new NormalField("field2"); - field1.setValue("foo"); - field2.setValue("bar"); - - when(dissector.dissectText(any(String.class))).thenReturn(true); - when(dissector.getDissectedFields()).thenReturn(List.of(field1, field2)); + when(dissector.dissectText(any(String.class))).thenReturn(Map.of("field1", "foo", "field2", "bar")); final DissectProcessor processor = createObjectUnderTest(); reflectivelySetDissectorMap(processor); @@ -94,13 +90,7 @@ void test_normal_fields_dissect_succeeded() throws NoSuchFieldException, Illegal @Test void test_append_fields_dissect_succeeded() throws NoSuchFieldException, IllegalAccessException { - Field field1 = new AppendField("field1"); - Field field2 = new AppendField("field2"); - field1.setValue("foo"); - field2.setValue("bar"); - - when(dissector.dissectText(any(String.class))).thenReturn(true); - when(dissector.getDissectedFields()).thenReturn(List.of(field1, field2)); + when(dissector.dissectText(any(String.class))).thenReturn(Map.of("field1", "foo", "field2", "bar")); final DissectProcessor processor = createObjectUnderTest(); reflectivelySetDissectorMap(processor); @@ -117,13 +107,7 @@ void test_append_fields_dissect_succeeded() throws NoSuchFieldException, Illegal @Test void test_indirect_fields_dissect_succeeded() throws NoSuchFieldException, IllegalAccessException { - Field field1 = new IndirectField("field1"); - Field field2 = new IndirectField("field2"); - field1.setValue("foo"); - field2.setValue("bar"); - - when(dissector.dissectText(any(String.class))).thenReturn(true); - when(dissector.getDissectedFields()).thenReturn(List.of(field1, field2)); + when(dissector.dissectText(any(String.class))).thenReturn(Map.of("field1", "foo", "field2", "bar")); final DissectProcessor processor = createObjectUnderTest(); reflectivelySetDissectorMap(processor); @@ -138,16 +122,15 @@ void test_indirect_fields_dissect_succeeded() throws NoSuchFieldException, Illeg } @Test - void test_dissectText_returns_false() throws NoSuchFieldException, IllegalAccessException { + void test_dissectText_returns_null_on_failure() throws NoSuchFieldException, IllegalAccessException { - when(dissector.dissectText(any(String.class))).thenReturn(false); + when(dissector.dissectText(any(String.class))).thenReturn(null); final DissectProcessor processor = createObjectUnderTest(); reflectivelySetDissectorMap(processor); final Record record = getEvent(""); final List> dissectedRecords = (List>) processor.doExecute(Collections.singletonList(record)); - // assert event is not modified assertThat(dissectedRecords.get(0).getData(), is(record.getData())); } @@ -160,19 +143,13 @@ void test_dissectText_throws_exception() throws NoSuchFieldException, IllegalAcc final Record record = getEvent(""); final List> dissectedRecords = (List>) processor.doExecute(Collections.singletonList(record)); - // assert event is not modified assertThat(dissectedRecords.get(0).getData(), is(record.getData())); } @Test void test_target_type_int() throws NoSuchFieldException, IllegalAccessException { - Field field1 = new IndirectField("field1"); - Field field2 = new IndirectField("field2"); - field1.setValue("20"); - field2.setValue("30"); - when(dissector.dissectText(any(String.class))).thenReturn(true); - when(dissector.getDissectedFields()).thenReturn(List.of(field1, field2)); + when(dissector.dissectText(any(String.class))).thenReturn(Map.of("field1", "20", "field2", "30")); Map targetsMap = Map.of("field1", TargetType.INTEGER); when(dissectConfig.getTargetTypes()).thenReturn(targetsMap); @@ -193,12 +170,8 @@ void test_target_type_int() throws NoSuchFieldException, IllegalAccessException @Test void test_target_type_bool() throws NoSuchFieldException, IllegalAccessException { - Field field1 = new IndirectField("field1"); - Field field2 = new IndirectField("field2"); - field1.setValue("true"); - field2.setValue("30"); - when(dissector.dissectText(any(String.class))).thenReturn(true); - when(dissector.getDissectedFields()).thenReturn(List.of(field1, field2)); + + when(dissector.dissectText(any(String.class))).thenReturn(Map.of("field1", "true", "field2", "30")); Map targetsMap = Map.of("field1", TargetType.BOOLEAN); when(dissectConfig.getTargetTypes()).thenReturn(targetsMap); @@ -219,12 +192,8 @@ void test_target_type_bool() throws NoSuchFieldException, IllegalAccessException @Test void test_target_type_double() throws NoSuchFieldException, IllegalAccessException { - Field field1 = new IndirectField("field1"); - Field field2 = new IndirectField("field2"); - field1.setValue("20.0"); - field2.setValue("30"); - when(dissector.dissectText(any(String.class))).thenReturn(true); - when(dissector.getDissectedFields()).thenReturn(List.of(field1, field2)); + + when(dissector.dissectText(any(String.class))).thenReturn(Map.of("field1", "20.0", "field2", "30")); Map targetsMap = Map.of("field1", TargetType.DOUBLE); when(dissectConfig.getTargetTypes()).thenReturn(targetsMap); @@ -276,11 +245,7 @@ private void reflectivelySetDissectorMap(DissectProcessor processor) throws NoSu @Test void test_delete_source_requested() throws NoSuchFieldException, IllegalAccessException { - Field dissectedField = new NormalField("level"); - dissectedField.setValue("WARN"); - - when(dissector.dissectText(any(String.class))).thenReturn(true); - when(dissector.getDissectedFields()).thenReturn(List.of(dissectedField)); + when(dissector.dissectText(any(String.class))).thenReturn(Map.of("level", "WARN")); when(dissectConfig.isDeleteSourceRequested()).thenReturn(true); final DissectProcessor processor = createObjectUnderTest(); @@ -295,11 +260,8 @@ void test_delete_source_requested() throws NoSuchFieldException, IllegalAccessEx @Test void test_delete_source_not_requested() throws NoSuchFieldException, IllegalAccessException { - Field dissectedField = new NormalField("level"); - dissectedField.setValue("WARN"); - when(dissector.dissectText(any(String.class))).thenReturn(true); - when(dissector.getDissectedFields()).thenReturn(List.of(dissectedField)); + when(dissector.dissectText(any(String.class))).thenReturn(Map.of("level", "WARN")); when(dissectConfig.isDeleteSourceRequested()).thenReturn(false); final DissectProcessor processor = createObjectUnderTest(); @@ -315,7 +277,7 @@ void test_delete_source_not_requested() throws NoSuchFieldException, IllegalAcce @Test void test_delete_source_requested_dissect_fail() throws NoSuchFieldException, IllegalAccessException { - when(dissector.dissectText(any(String.class))).thenReturn(false); + when(dissector.dissectText(any(String.class))).thenReturn(null); when(dissectConfig.isDeleteSourceRequested()).thenReturn(true); final DissectProcessor processor = createObjectUnderTest(); @@ -326,4 +288,89 @@ void test_delete_source_requested_dissect_fail() throws NoSuchFieldException, Il assertTrue(dissectedRecords.get(0).getData().containsKey("test")); } -} \ No newline at end of file + @Test + void test_dissect_when_condition_false_skips_event() throws NoSuchFieldException, IllegalAccessException { + final String dissectWhen = UUID.randomUUID().toString(); + when(dissectConfig.getDissectWhen()).thenReturn(dissectWhen); + when(expressionEvaluator.isValidExpressionStatement(dissectWhen)).thenReturn(true); + when(expressionEvaluator.evaluateConditional(eq(dissectWhen), any())).thenReturn(false); + + final DissectProcessor processor = createObjectUnderTest(); + reflectivelySetDissectorMap(processor); + final Record record = getEvent("some text"); + processor.doExecute(Collections.singletonList(record)); + + org.mockito.Mockito.verify(dissector, org.mockito.Mockito.never()).dissectText(any()); + } + + @Test + void test_dissect_when_condition_true_processes_event() throws NoSuchFieldException, IllegalAccessException { + final String dissectWhen = UUID.randomUUID().toString(); + when(dissectConfig.getDissectWhen()).thenReturn(dissectWhen); + when(expressionEvaluator.isValidExpressionStatement(dissectWhen)).thenReturn(true); + when(expressionEvaluator.evaluateConditional(eq(dissectWhen), any())).thenReturn(true); + when(dissector.dissectText(any(String.class))).thenReturn(Map.of("field1", "foo")); + + final DissectProcessor processor = createObjectUnderTest(); + reflectivelySetDissectorMap(processor); + final Record record = getEvent("some text"); + final List> dissectedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + org.mockito.Mockito.verify(dissector).dissectText(any()); + assertTrue(dissectedRecords.get(0).getData().containsKey("field1")); + } + + @Test + void test_concurrent_doExecute_no_cross_contamination() throws InterruptedException { + when(dissectConfig.getMap()).thenReturn(Map.of("message", "%{timestamp} %{level} %{content}")); + final DissectProcessor processor = createObjectUnderTest(); + + int threadCount = 10; + int iterationsPerThread = 100; + ExecutorService executor = Executors.newFixedThreadPool(threadCount); + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch doneLatch = new CountDownLatch(threadCount); + List errors = Collections.synchronizedList(new ArrayList<>()); + + for (int t = 0; t < threadCount; t++) { + final String timestamp = "2024-01-" + String.format("%02d", t + 1); + final String level = "LEVEL" + t; + final String content = "content" + t; + final String input = timestamp + " " + level + " " + content; + + executor.submit(() -> { + try { + startLatch.await(); + for (int i = 0; i < iterationsPerThread; i++) { + final Map data = new HashMap<>(); + data.put("message", input); + final Record record = new Record<>(JacksonEvent.builder() + .withData(data).withEventType("event").build()); + final List> results = (List>) processor.doExecute(Collections.singletonList(record)); + final Event resultEvent = results.get(0).getData(); + if (!timestamp.equals(resultEvent.get("timestamp", String.class))) { + errors.add("timestamp mismatch: expected " + timestamp + " got " + resultEvent.get("timestamp", String.class)); + } + if (!level.equals(resultEvent.get("level", String.class))) { + errors.add("level mismatch: expected " + level + " got " + resultEvent.get("level", String.class)); + } + if (!content.equals(resultEvent.get("content", String.class))) { + errors.add("content mismatch: expected " + content + " got " + resultEvent.get("content", String.class)); + } + } + } catch (Exception e) { + errors.add(e.getMessage()); + } finally { + doneLatch.countDown(); + } + }); + } + + startLatch.countDown(); + assertTrue(doneLatch.await(5, TimeUnit.SECONDS), "Test timed out"); + executor.shutdown(); + + assertTrue(errors.isEmpty(), "Concurrency errors: " + errors); + } + +} diff --git a/data-prepper-plugins/dissect-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/dissect/DissectorTest.java b/data-prepper-plugins/dissect-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/dissect/DissectorTest.java index fe76217301..fe15cc6f67 100644 --- a/data-prepper-plugins/dissect-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/dissect/DissectorTest.java +++ b/data-prepper-plugins/dissect-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/dissect/DissectorTest.java @@ -10,13 +10,21 @@ package org.opensearch.dataprepper.plugins.processor.dissect; import org.junit.jupiter.api.Test; -import org.opensearch.dataprepper.plugins.processor.dissect.Fields.Field; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; class DissectorTest { @@ -24,207 +32,222 @@ class DissectorTest { @Test void test_normal_field_trailing_spaces(){ Dissector dissector = createObjectUnderTest(" %{field1} %{field2} "); - boolean result = dissector.dissectText(" foo bar "); + Map result = dissector.dissectText(" foo bar "); - assertTrue(result); - - List fields = dissector.getDissectedFields(); - - assertThat(fields.get(0).getKey(), is("field1")); - assertThat(fields.get(0).getValue(), is("foo")); - assertThat(fields.get(1).getKey(), is("field2")); - assertThat(fields.get(1).getValue(), is("bar")); + assertNotNull(result); + assertFalse(result.isEmpty()); + assertThat(result.get("field1"), is("foo")); + assertThat(result.get("field2"), is("bar")); } @Test void test_normal_field_without_trailing_spaces(){ Dissector dissector = createObjectUnderTest("dm1 %{field1} %{field2} dm2"); - boolean result = dissector.dissectText("dm1 foo bar dm2"); - assertTrue(result); + Map result = dissector.dissectText("dm1 foo bar dm2"); - List fields = dissector.getDissectedFields(); - - assertThat(fields.get(0).getKey(), is("field1")); - assertThat(fields.get(0).getValue(), is("foo")); - assertThat(fields.get(1).getKey(), is("field2")); - assertThat(fields.get(1).getValue(), is("bar")); + assertNotNull(result); + assertFalse(result.isEmpty()); + assertThat(result.get("field1"), is("foo")); + assertThat(result.get("field2"), is("bar")); } @Test void test_normal_field_failure_without_delimiters(){ Dissector dissector = createObjectUnderTest("dm1 %{field1} %{field2} dm2"); - boolean result = dissector.dissectText("dm1 foo bar"); - assertFalse(result); + assertNull(dissector.dissectText("dm1 foo bar")); } @Test void test_normal_field_failure_with_extra_whitespaces(){ Dissector dissector = createObjectUnderTest("dm1 %{field1} %{field2} dm2"); - boolean result = dissector.dissectText(" dm1 foo bar dm2"); - assertFalse(result); + assertNull(dissector.dissectText(" dm1 foo bar dm2")); } @Test void test_named_skip_field(){ Dissector dissector = createObjectUnderTest("dm1 %{?field1} %{field2} dm2"); + Map result = dissector.dissectText("dm1 foo bar dm2"); - boolean result = dissector.dissectText("dm1 foo bar dm2"); - assertTrue(result); - List fields = dissector.getDissectedFields(); - - assertThat(fields.size(), is(1)); - assertThat(fields.get(0).getKey(), is("field2")); - assertThat(fields.get(0).getValue(), is("bar")); + assertFalse(result.isEmpty()); + assertThat(result.size(), is(1)); + assertThat(result.get("field2"), is("bar")); } @Test void test_unnamed_skip_field(){ Dissector dissector = createObjectUnderTest("dm1 %{} %{field2} dm2"); + Map result = dissector.dissectText("dm1 foo bar dm2"); - boolean result = dissector.dissectText("dm1 foo bar dm2"); - assertTrue(result); - List fields = dissector.getDissectedFields(); - - assertThat(fields.size(), is(1)); - assertThat(fields.get(0).getKey(), is("field2")); - assertThat(fields.get(0).getValue(), is("bar")); + assertFalse(result.isEmpty()); + assertThat(result.size(), is(1)); + assertThat(result.get("field2"), is("bar")); } @Test void test_indirect_field_with_skip_field(){ Dissector dissector = createObjectUnderTest("dm1 %{?field1} %{&field1} dm2"); + Map result = dissector.dissectText("dm1 foo bar dm2"); - boolean result = dissector.dissectText("dm1 foo bar dm2"); - assertTrue(result); - List fields = dissector.getDissectedFields(); - - assertThat(fields.size(), is(1)); - assertThat(fields.get(0).getKey(), is("foo")); - assertThat(fields.get(0).getValue(), is("bar")); + assertFalse(result.isEmpty()); + assertThat(result.size(), is(1)); + assertThat(result.get("foo"), is("bar")); } @Test void test_indirect_field_with_normal_field(){ Dissector dissector = createObjectUnderTest("dm1 %{field1} %{&field1} dm2"); + Map result = dissector.dissectText("dm1 foo bar dm2"); - boolean result = dissector.dissectText("dm1 foo bar dm2"); - assertTrue(result); - List fields = dissector.getDissectedFields(); - - assertThat(fields.get(0).getKey(), is("field1")); - assertThat(fields.get(0).getValue(), is("foo")); - assertThat(fields.get(1).getKey(), is("foo")); - assertThat(fields.get(1).getValue(), is("bar")); + assertFalse(result.isEmpty()); + assertThat(result.get("field1"), is("foo")); + assertThat(result.get("foo"), is("bar")); } @Test void test_append_field_without_index(){ Dissector dissector = createObjectUnderTest("dm1 %{+field1} %{+field1} dm2"); + Map result = dissector.dissectText("dm1 foo bar dm2"); - boolean result = dissector.dissectText("dm1 foo bar dm2"); - assertTrue(result); - List fields = dissector.getDissectedFields(); - - assertThat(fields.size(), is(1)); - assertThat(fields.get(0).getKey(), is("field1")); - assertThat(fields.get(0).getValue(), is("foobar")); + assertFalse(result.isEmpty()); + assertThat(result.size(), is(1)); + assertThat(result.get("field1"), is("foobar")); } @Test void test_append_field_with_index(){ Dissector dissector = createObjectUnderTest("dm1 %{+field1/2} %{+field1/1} dm2"); + Map result = dissector.dissectText("dm1 foo bar dm2"); - boolean result = dissector.dissectText("dm1 foo bar dm2"); - assertTrue(result); - List fields = dissector.getDissectedFields(); - - assertThat(fields.size(), is(1)); - assertThat(fields.get(0).getKey(), is("field1")); - assertThat(fields.get(0).getValue(), is("barfoo")); + assertFalse(result.isEmpty()); + assertThat(result.size(), is(1)); + assertThat(result.get("field1"), is("barfoo")); } @Test void test_append_whitespace_normal_field(){ Dissector dissector = createObjectUnderTest("dm1 %{field1->} %{field2} dm2"); + Map result = dissector.dissectText("dm1 foo bar dm2"); - boolean result = dissector.dissectText("dm1 foo bar dm2"); - assertTrue(result); - List fields = dissector.getDissectedFields(); - - assertThat(fields.get(0).getKey(), is("field1")); - assertThat(fields.get(0).getValue(), is("foo")); - assertThat(fields.get(1).getKey(), is("field2")); - assertThat(fields.get(1).getValue(), is("bar")); + assertFalse(result.isEmpty()); + assertThat(result.get("field1"), is("foo")); + assertThat(result.get("field2"), is("bar")); } @Test void test_append_whitespace_append_field(){ Dissector dissector = createObjectUnderTest("dm1 %{+field1->} %{+field1} dm2"); + Map result = dissector.dissectText("dm1 foo bar dm2"); - boolean result = dissector.dissectText("dm1 foo bar dm2"); - assertTrue(result); - List fields = dissector.getDissectedFields(); - - assertThat(fields.size(), is(1)); - assertThat(fields.get(0).getKey(), is("field1")); - assertThat(fields.get(0).getValue(), is("foobar")); + assertFalse(result.isEmpty()); + assertThat(result.size(), is(1)); + assertThat(result.get("field1"), is("foobar")); } @Test void test_append_whitespace_indirect_field(){ Dissector dissector = createObjectUnderTest("dm1 %{?field1->} %{&field1} dm2"); + Map result = dissector.dissectText("dm1 foo bar dm2"); - boolean result = dissector.dissectText("dm1 foo bar dm2"); - assertTrue(result); - List fields = dissector.getDissectedFields(); - - assertThat(fields.size(), is(1)); - assertThat(fields.get(0).getKey(), is("foo")); - assertThat(fields.get(0).getValue(), is("bar")); + assertFalse(result.isEmpty()); + assertThat(result.size(), is(1)); + assertThat(result.get("foo"), is("bar")); } @Test void test_skip_fields_with_padding(){ Dissector dissector = createObjectUnderTest("dm1 %{?field1->} %{?field3} %{field2} dm2"); + Map result = dissector.dissectText("dm1 foo skip bar dm2"); - boolean result = dissector.dissectText("dm1 foo skip bar dm2"); - assertTrue(result); - List fields = dissector.getDissectedFields(); - - assertThat(fields.size(), is(1)); - assertThat(fields.get(0).getKey(), is("field2")); - assertThat(fields.get(0).getValue(), is("bar")); + assertFalse(result.isEmpty()); + assertThat(result.size(), is(1)); + assertThat(result.get("field2"), is("bar")); } @Test void test_indirect_field_with_append(){ Dissector dissector = createObjectUnderTest("%{+field1->} %{+field1} %{&field1->}"); + Map result = dissector.dissectText("foo bar result "); - boolean result = dissector.dissectText("foo bar result "); - assertTrue(result); - List fields = dissector.getDissectedFields(); + assertFalse(result.isEmpty()); + assertThat(result.size(), is(2)); + assertThat(result.get("field1"), is("foobar")); + assertThat(result.get("foobar"), is("result")); + } + + @Test + void test_dissect_text_returns_null_on_failure(){ + Dissector dissector = createObjectUnderTest("dm1 %{field1} %{field2} dm2"); + assertNull(dissector.dissectText(null)); + } + + @Test + void test_indirect_field_unresolved(){ + Dissector dissector = createObjectUnderTest("dm1 %{field1} %{&field2} dm2"); + Map result = dissector.dissectText("dm1 foo bar dm2"); + + assertFalse(result.isEmpty()); + assertThat(result.size(), is(1)); + assertThat(result.get("field1"), is("foo")); + assertFalse(result.containsKey("bar")); + } + + @Test + void test_concurrent_dissect_no_cross_contamination() throws InterruptedException { + Dissector dissector = createObjectUnderTest("dm1 %{field1} %{field2} dm2"); - assertThat(fields.size(), is(2)); - assertThat(fields.get(0).getKey(), is("field1")); - assertThat(fields.get(0).getValue(), is("foobar")); - assertThat(fields.get(1).getKey(), is("foobar")); - assertThat(fields.get(1).getValue(), is("result")); + int threadCount = 10; + int iterationsPerThread = 100; + ExecutorService executor = Executors.newFixedThreadPool(threadCount); + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch doneLatch = new CountDownLatch(threadCount); + List errors = Collections.synchronizedList(new ArrayList<>()); + + for (int t = 0; t < threadCount; t++) { + final String field1Value = "value" + t; + final String field2Value = "data" + t; + final String input = "dm1 " + field1Value + " " + field2Value + " dm2"; + + executor.submit(() -> { + try { + startLatch.await(); + for (int i = 0; i < iterationsPerThread; i++) { + Map result = dissector.dissectText(input); + if (!field1Value.equals(result.get("field1"))) { + errors.add("field1 mismatch: expected " + field1Value + " got " + result.get("field1")); + } + if (!field2Value.equals(result.get("field2"))) { + errors.add("field2 mismatch: expected " + field2Value + " got " + result.get("field2")); + } + } + } catch (Exception e) { + errors.add(e.getMessage()); + } finally { + doneLatch.countDown(); + } + }); + } + + startLatch.countDown(); + assertTrue(doneLatch.await(5, TimeUnit.SECONDS), "Test timed out"); + executor.shutdown(); + + assertTrue(errors.isEmpty(), "Concurrency errors: " + errors); } private Dissector createObjectUnderTest(String dissectPatternString) { return new Dissector(dissectPatternString); } -} \ No newline at end of file +}