diff --git a/data-prepper-plugins/multiline-codecs/README.md b/data-prepper-plugins/multiline-codecs/README.md new file mode 100644 index 0000000000..bae5add698 --- /dev/null +++ b/data-prepper-plugins/multiline-codecs/README.md @@ -0,0 +1,121 @@ +# Multiline Codecs + +This plugin provides a multiline input codec for Data Prepper that groups consecutive lines from an input stream into single events based on a configurable regex pattern. + +## Usages + +The multiline input codec can be configured with source plugins (e.g. S3 source, file source) in the pipeline file. + +### Use Cases + +- **Java/Kotlin stack traces**: Exception messages followed by `at ...` lines +- **Python tracebacks**: `Traceback` blocks spanning multiple lines +- **Timestamp-prefixed logs**: Logs where each entry starts with a timestamp and continuation lines don't +- **Multi-line JSON/XML in logs**: Structured data embedded across multiple lines within log entries +- **Custom log formats**: Any format where a recognizable pattern marks the start or end of a new event + +## Configuration Options + +Exactly one of the four pattern fields must be specified: + +| Option | Required | Type | Default | Description | +|---|---|---|---|---| +| `event_start_pattern` | One of four | String (regex) | - | A new event begins at each line matching this pattern | +| `event_end_pattern` | One of four | String (regex) | - | An event ends at each line matching this pattern (inclusive) | +| `continuation_line_start_pattern` | One of four | String (regex) | - | Lines matching this pattern are continuations of the previous event | +| `continuation_line_end_pattern` | One of four | String (regex) | - | Lines matching this pattern are prepended to the next event | +| `omit_matched_section` | No | Boolean | `false` | When true, the matched portion of the line is omitted from the output | +| `max_lines` | No | Integer | `500` | Maximum number of lines that can be combined into a single event | +| `max_length` | No | Integer | `10000` | Maximum character length of a combined multiline event. Note: a single line exceeding this limit will still be emitted as a complete event without truncation | +| `line_separator` | No | String | `\n` | Separator string used when joining lines into a single event message. Note: `BufferedReader.readLine()` strips original line endings, so the codec normalizes joined lines using this separator. Set to `""` for no separator | +| `encoding` | No | String | `UTF-8` | Character encoding to use when reading the input stream | + +## How It Works + +The codec reads lines from the input stream and uses the configured pattern to determine event boundaries: + +1. **`event_start_pattern`** (most common): Each line matching the pattern starts a new event. All subsequent non-matching lines are appended to it. + +2. **`event_end_pattern`**: Lines are accumulated until a line matches the pattern. The matching line is included in the current event, and the next line starts a new event. + +3. **`continuation_line_start_pattern`**: Lines matching the pattern are continuations of the previous event. Non-matching lines start new events. + +4. **`continuation_line_end_pattern`**: Lines matching the pattern are prepended to the next non-matching line's event. + +## Examples + +### Java Stack Traces + +Each log entry starts with a timestamp. Lines without a timestamp (stack frames) are part of the previous entry. + +```yaml +pipeline: + source: + s3: + codec: + multiline: + event_start_pattern: "^\\d{4}-\\d{2}-\\d{2}" +``` + +Input: +``` +2024-01-01 12:00:00 ERROR NullPointerException + at com.example.Service.method(Service.java:42) + at com.example.Main.run(Main.java:10) +2024-01-01 12:00:01 INFO Application recovered +``` + +Result: 2 events (stack trace grouped with its ERROR line) + +### Delimiter-Separated Entries + +Log entries are separated by a `---` line. + +```yaml +pipeline: + source: + s3: + codec: + multiline: + event_end_pattern: "^---$" +``` + +### Stack Traces (continuation pattern) + +Lines starting with whitespace followed by `at ` or `Caused by:` are continuations. + +```yaml +pipeline: + source: + s3: + codec: + multiline: + continuation_line_start_pattern: "^\\s+(at |\\.\\.\\.|Caused by:)" +``` + +### Omitting Timestamps from Output + +Strip the timestamp from each event's first line: + +```yaml +pipeline: + source: + s3: + codec: + multiline: + event_start_pattern: "^\\d{4}-\\d{2}-\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}\\s+" + omit_matched_section: true +``` + +## Developer Guide + +This plugin is compatible with Java 11. See below: + +- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md) +- [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md) + +The following command runs the unit and integration tests: + +``` +./gradlew :data-prepper-plugins:multiline-codecs:test +``` diff --git a/data-prepper-plugins/multiline-codecs/build.gradle b/data-prepper-plugins/multiline-codecs/build.gradle new file mode 100644 index 0000000000..65a8a97804 --- /dev/null +++ b/data-prepper-plugins/multiline-codecs/build.gradle @@ -0,0 +1,16 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +dependencies { + implementation project(':data-prepper-api') + implementation 'com.fasterxml.jackson.core:jackson-annotations' + testImplementation project(':data-prepper-plugins:common') + testImplementation project(':data-prepper-test:test-event') + testImplementation project(':data-prepper-test:plugin-test-framework') +} diff --git a/data-prepper-plugins/multiline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/multiline/MultilineInputCodec.java b/data-prepper-plugins/multiline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/multiline/MultilineInputCodec.java new file mode 100644 index 0000000000..b345a2e1dd --- /dev/null +++ b/data-prepper-plugins/multiline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/multiline/MultilineInputCodec.java @@ -0,0 +1,181 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.codec.multiline; + +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.codec.InputCodec; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventFactory; +import org.opensearch.dataprepper.model.event.LogEventBuilder; +import org.opensearch.dataprepper.model.log.Log; +import org.opensearch.dataprepper.model.record.Record; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.Charset; +import java.util.Collections; +import java.util.Objects; +import java.util.function.Consumer; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * An implementation of {@link InputCodec} which groups multiple lines from an input stream + * into single events based on a configurable regex pattern. + * + *

This is useful for ingesting logs where a single logical event spans multiple lines, + * such as Java stack traces, Python tracebacks, or any log format where entries begin with + * a recognizable pattern (e.g., a timestamp).

+ * + *

The codec supports four mutually exclusive pattern modes:

+ * + */ +@DataPrepperPlugin(name = "multiline", pluginType = InputCodec.class, pluginConfigurationType = MultilineInputCodecConfig.class) +public class MultilineInputCodec implements InputCodec { + + private static final Logger LOG = LoggerFactory.getLogger(MultilineInputCodec.class); + private static final String MESSAGE_FIELD_NAME = "message"; + + private final Pattern pattern; + private final boolean boundaryOnMatch; + private final boolean flushAfter; + private final boolean omitMatchedSection; + private final int maxLines; + private final int maxLength; + private final String lineSeparator; + private final Charset encoding; + private final EventFactory eventFactory; + + @DataPrepperPluginConstructor + public MultilineInputCodec(final MultilineInputCodecConfig config, final EventFactory eventFactory) { + Objects.requireNonNull(config, "config must not be null"); + this.eventFactory = Objects.requireNonNull(eventFactory, "eventFactory must not be null"); + + this.pattern = config.getCompiledPattern(); + if (this.pattern == null) { + throw new IllegalArgumentException("A valid pattern must be configured"); + } + + final MultilineMode mode = resolveMode(config); + this.boundaryOnMatch = (mode == MultilineMode.EVENT_START || mode == MultilineMode.EVENT_END); + this.flushAfter = (mode == MultilineMode.EVENT_END || mode == MultilineMode.CONTINUATION_END); + this.omitMatchedSection = config.getOmitMatchedSection(); + this.maxLines = config.getMaxLines(); + this.maxLength = config.getMaxLength(); + this.lineSeparator = config.getLineSeparator(); + this.encoding = config.getEncoding(); + } + + private static MultilineMode resolveMode(final MultilineInputCodecConfig config) { + if (config.getEventStartPattern() != null) { + return MultilineMode.EVENT_START; + } else if (config.getEventEndPattern() != null) { + return MultilineMode.EVENT_END; + } else if (config.getContinuationLineStartPattern() != null) { + return MultilineMode.CONTINUATION_START; + } else { + return MultilineMode.CONTINUATION_END; + } + } + + @Override + public void parse(final InputStream inputStream, final Consumer> eventConsumer) throws IOException { + Objects.requireNonNull(inputStream, "inputStream must not be null"); + Objects.requireNonNull(eventConsumer, "eventConsumer must not be null"); + + final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, encoding)); + parseLines(reader, eventConsumer); + } + + private void parseLines(final BufferedReader reader, final Consumer> eventConsumer) throws IOException { + final StringBuilder buffer = new StringBuilder(); + int lineCount = 0; + String line; + + while ((line = reader.readLine()) != null) { + final boolean matches = pattern.matcher(line).find(); + final boolean isBoundary = (boundaryOnMatch == matches); + + if ((!flushAfter && isBoundary) || shouldFlush(buffer, lineCount, line)) { + flushIfNonEmpty(buffer, eventConsumer); + lineCount = 0; + } + + appendLineToBuffer(buffer, processLine(line, matches)); + lineCount++; + + if (flushAfter && isBoundary) { + flushIfNonEmpty(buffer, eventConsumer); + lineCount = 0; + } + } + + flushIfNonEmpty(buffer, eventConsumer); + } + + private String processLine(final String line, final boolean matches) { + if (!omitMatchedSection || !matches) { + return line; + } + final Matcher matcher = pattern.matcher(line); + return matcher.replaceFirst(""); + } + + private void appendLineToBuffer(final StringBuilder buffer, final String processedLine) { + if (processedLine.isEmpty()) { + return; + } + if (buffer.length() > 0) { + buffer.append(lineSeparator); + } + buffer.append(processedLine); + } + + private void flushIfNonEmpty(final StringBuilder buffer, final Consumer> eventConsumer) { + if (buffer.length() > 0) { + emitEvent(buffer.toString(), eventConsumer); + buffer.setLength(0); + } + } + + /** + * Determines if the buffer should be flushed before appending the next line. + * Note: if a single line exceeds max_length on its own, it will still be emitted + * as a complete event without truncation. + */ + private boolean shouldFlush(final StringBuilder buffer, final int lineCount, final String nextLine) { + if (lineCount >= maxLines) { + LOG.debug("Flushing multiline event due to max_lines limit of {}", maxLines); + return true; + } + if (buffer.length() + lineSeparator.length() + nextLine.length() > maxLength) { + LOG.debug("Flushing multiline event due to max_length limit of {}", maxLength); + return true; + } + return false; + } + + private void emitEvent(final String message, final Consumer> eventConsumer) { + final Log event = eventFactory.eventBuilder(LogEventBuilder.class) + .withData(Collections.singletonMap(MESSAGE_FIELD_NAME, message)) + .build(); + eventConsumer.accept(new Record<>(event)); + } +} diff --git a/data-prepper-plugins/multiline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/multiline/MultilineInputCodecConfig.java b/data-prepper-plugins/multiline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/multiline/MultilineInputCodecConfig.java new file mode 100644 index 0000000000..4bea8356e7 --- /dev/null +++ b/data-prepper-plugins/multiline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/multiline/MultilineInputCodecConfig.java @@ -0,0 +1,211 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.codec.multiline; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.AssertTrue; +import jakarta.validation.constraints.Min; +import jakarta.validation.constraints.NotNull; + +import java.nio.charset.Charset; +import java.nio.charset.IllegalCharsetNameException; +import java.nio.charset.StandardCharsets; +import java.nio.charset.UnsupportedCharsetException; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; + +/** + * Configuration class for the multiline input codec. + * + *

The multiline codec groups consecutive lines from an input stream into a single event + * based on a regex pattern. Exactly one of the four pattern fields must be specified:

+ *
    + *
  • {@code event_start_pattern}: A new event begins at each line matching this pattern.
  • + *
  • {@code event_end_pattern}: An event ends at each line matching this pattern (inclusive).
  • + *
  • {@code continuation_line_start_pattern}: Lines matching this pattern are continuations of the previous event.
  • + *
  • {@code continuation_line_end_pattern}: Lines matching this pattern are prepended to the next event.
  • + *
+ * + *

Example configuration for Java stack traces:

+ *
+ * codec:
+ *   multiline:
+ *     event_start_pattern: "^\\d{4}-\\d{2}-\\d{2}"
+ * 
+ */ +public class MultilineInputCodecConfig { + + static final int DEFAULT_MAX_LINES = 500; + static final int DEFAULT_MAX_LENGTH = 10000; + static final String DEFAULT_LINE_SEPARATOR = "\n"; + + @JsonProperty("event_start_pattern") + private String eventStartPattern; + + @JsonProperty("event_end_pattern") + private String eventEndPattern; + + @JsonProperty("continuation_line_start_pattern") + private String continuationLineStartPattern; + + @JsonProperty("continuation_line_end_pattern") + private String continuationLineEndPattern; + + @JsonProperty("omit_matched_section") + private boolean omitMatchedSection = false; + + @Min(value = 1, message = "max_lines must be at least 1") + @JsonProperty("max_lines") + private int maxLines = DEFAULT_MAX_LINES; + + @Min(value = 1, message = "max_length must be at least 1") + @JsonProperty("max_length") + private int maxLength = DEFAULT_MAX_LENGTH; + + @NotNull(message = "line_separator must not be null") + @JsonProperty("line_separator") + private String lineSeparator = DEFAULT_LINE_SEPARATOR; + + @JsonProperty("encoding") + private String encoding = StandardCharsets.UTF_8.name(); + + private Pattern compiledPattern; + + public String getEventStartPattern() { + return eventStartPattern; + } + + public String getEventEndPattern() { + return eventEndPattern; + } + + public String getContinuationLineStartPattern() { + return continuationLineStartPattern; + } + + public String getContinuationLineEndPattern() { + return continuationLineEndPattern; + } + + public boolean getOmitMatchedSection() { + return omitMatchedSection; + } + + public int getMaxLines() { + return maxLines; + } + + public int getMaxLength() { + return maxLength; + } + + public String getLineSeparator() { + return lineSeparator; + } + + /** + * Returns the validated Charset, compiled on first access. + * + * @return The Charset. + */ + public Charset getEncoding() { + return Charset.forName(encoding); + } + + /** + * Returns the compiled regex pattern, compiled on first access. + * + * @return The compiled Pattern. + */ + public Pattern getCompiledPattern() { + if (compiledPattern == null) { + compiledPattern = Pattern.compile(getConfiguredPatternString()); + } + return compiledPattern; + } + + @AssertTrue(message = "Exactly one pattern field must be specified: event_start_pattern, event_end_pattern, " + + "continuation_line_start_pattern, or continuation_line_end_pattern") + boolean isExactlyOnePatternSpecified() { + int count = 0; + if (eventStartPattern != null) count++; + if (eventEndPattern != null) count++; + if (continuationLineStartPattern != null) count++; + if (continuationLineEndPattern != null) count++; + return count == 1; + } + + @AssertTrue(message = "The specified pattern must be a valid regular expression") + boolean isValidPattern() { + final String patternString = getConfiguredPatternString(); + if (patternString == null || patternString.isEmpty()) { + return false; + } + try { + Pattern.compile(patternString); + return true; + } catch (final PatternSyntaxException e) { + return false; + } + } + + @AssertTrue(message = "The specified encoding must be a valid charset") + boolean isValidEncoding() { + if (encoding == null || encoding.isEmpty()) { + return false; + } + try { + Charset.forName(encoding); + return true; + } catch (final IllegalCharsetNameException | UnsupportedCharsetException e) { + return false; + } + } + + String getConfiguredPatternString() { + if (eventStartPattern != null) return eventStartPattern; + if (eventEndPattern != null) return eventEndPattern; + if (continuationLineStartPattern != null) return continuationLineStartPattern; + if (continuationLineEndPattern != null) return continuationLineEndPattern; + return null; + } + + static Builder builder() { + return new Builder(); + } + + static class Builder { + private final MultilineInputCodecConfig config = new MultilineInputCodecConfig(); + + Builder withEventStartPattern(final String pattern) { + config.eventStartPattern = pattern; + return this; + } + + Builder withEventEndPattern(final String pattern) { + config.eventEndPattern = pattern; + return this; + } + + Builder withContinuationLineStartPattern(final String pattern) { + config.continuationLineStartPattern = pattern; + return this; + } + + Builder withContinuationLineEndPattern(final String pattern) { + config.continuationLineEndPattern = pattern; + return this; + } + + MultilineInputCodecConfig build() { + return config; + } + } +} diff --git a/data-prepper-plugins/multiline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/multiline/MultilineMode.java b/data-prepper-plugins/multiline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/multiline/MultilineMode.java new file mode 100644 index 0000000000..fafa8cd0b0 --- /dev/null +++ b/data-prepper-plugins/multiline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/multiline/MultilineMode.java @@ -0,0 +1,40 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.codec.multiline; + +/** + * Internal representation of the multiline grouping mode, determined from the configuration. + */ +enum MultilineMode { + + /** + * A new event starts at each line matching the pattern. + * Non-matching lines are continuations of the preceding event. + */ + EVENT_START, + + /** + * An event ends at each line matching the pattern (inclusive). + * The next line begins a new event. + */ + EVENT_END, + + /** + * Lines matching the pattern are continuations of the previous event. + * Non-matching lines start new events. + */ + CONTINUATION_START, + + /** + * Lines matching the pattern are prepended to the next event. + * Non-matching lines complete the event. + */ + CONTINUATION_END +} diff --git a/data-prepper-plugins/multiline-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/multiline/MultilineCodecsIT.java b/data-prepper-plugins/multiline-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/multiline/MultilineCodecsIT.java new file mode 100644 index 0000000000..ad67ec24df --- /dev/null +++ b/data-prepper-plugins/multiline-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/multiline/MultilineCodecsIT.java @@ -0,0 +1,145 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.codec.multiline; + +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.codec.InputCodec; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.test.plugins.DataPrepperPluginTest; +import org.opensearch.dataprepper.test.plugins.PluginConfigurationFile; +import org.opensearch.dataprepper.test.plugins.junit.BaseDataPrepperPluginStandardTestSuite; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; + +@DataPrepperPluginTest(pluginName = "multiline", pluginType = InputCodec.class) +public class MultilineCodecsIT extends BaseDataPrepperPluginStandardTestSuite { + + private List> parseContent(final InputCodec codec, final String content) throws IOException { + final List> events = new ArrayList<>(); + codec.parse(new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)), events::add); + return events; + } + + @Test + void parse_java_stack_trace_with_event_start_pattern( + @PluginConfigurationFile("event-start-pattern.yaml") final InputCodec codec) throws IOException { + + final String input = + "2024-01-15 10:23:45.123 ERROR [main] com.example.UserService - Request failed\n" + + "java.lang.NullPointerException: null\n" + + "\tat com.example.UserService.getUser(UserService.java:42)\n" + + "\tat com.example.Controller.handle(Controller.java:28)\n" + + "Caused by: java.sql.SQLException: Connection refused\n" + + "\tat com.mysql.jdbc.Connection.connect(Connection.java:456)\n" + + "2024-01-15 10:23:45.456 INFO [main] com.example.UserService - Retrying\n"; + + final List> events = parseContent(codec, input); + + assertThat(events.size(), equalTo(2)); + final String event1 = events.get(0).getData().get("message", String.class); + assertThat(event1, containsString("NullPointerException")); + assertThat(event1, containsString("at com.example.UserService.getUser")); + assertThat(event1, containsString("Caused by: java.sql.SQLException")); + assertThat(events.get(1).getData().get("message", String.class), + equalTo("2024-01-15 10:23:45.456 INFO [main] com.example.UserService - Retrying")); + } + + @Test + void parse_with_event_end_pattern( + @PluginConfigurationFile("event-end-pattern.yaml") final InputCodec codec) throws IOException { + + final String input = + "entry 1 line 1\n" + + "entry 1 line 2\n" + + "---\n" + + "entry 2 line 1\n" + + "---\n"; + + final List> events = parseContent(codec, input); + + assertThat(events.size(), equalTo(2)); + assertThat(events.get(0).getData().get("message", String.class), + equalTo("entry 1 line 1\nentry 1 line 2\n---")); + assertThat(events.get(1).getData().get("message", String.class), + equalTo("entry 2 line 1\n---")); + } + + @Test + void parse_with_continuation_line_start_pattern( + @PluginConfigurationFile("continuation-line-start-pattern.yaml") final InputCodec codec) throws IOException { + + final String input = + "java.lang.RuntimeException: error\n" + + " at com.example.A.method(A.java:1)\n" + + " Caused by: java.io.IOException\n" + + " at com.example.C.read(C.java:3)\n" + + "Application recovered\n"; + + final List> events = parseContent(codec, input); + + assertThat(events.size(), equalTo(2)); + final String event1 = events.get(0).getData().get("message", String.class); + assertThat(event1, containsString("RuntimeException: error")); + assertThat(event1, containsString("at com.example.A.method")); + assertThat(event1, containsString("Caused by: java.io.IOException")); + assertThat(events.get(1).getData().get("message", String.class), + equalTo("Application recovered")); + } + + @Test + void parse_with_omit_matched_section( + @PluginConfigurationFile("omit-matched-section.yaml") final InputCodec codec) throws IOException { + + final String input = + "2024-01-01 ERROR something bad\n" + + " stack trace\n" + + "2024-01-02 INFO recovered\n"; + + final List> events = parseContent(codec, input); + + assertThat(events.size(), equalTo(2)); + assertThat(events.get(0).getData().get("message", String.class), + equalTo("ERROR something bad\n stack trace")); + assertThat(events.get(1).getData().get("message", String.class), + equalTo("INFO recovered")); + } + + @Test + void parse_with_continuation_line_end_pattern( + @PluginConfigurationFile("continuation-line-end-pattern.yaml") final InputCodec codec) throws IOException { + + final String input = + " context-line-1\n" + + " context-line-2\n" + + "MAIN EVENT A\n" + + " context-line-3\n" + + "MAIN EVENT B\n"; + + final List> events = parseContent(codec, input); + + assertThat(events.size(), equalTo(2)); + final String event1 = events.get(0).getData().get("message", String.class); + assertThat(event1, containsString("context-line-1")); + assertThat(event1, containsString("context-line-2")); + assertThat(event1, containsString("MAIN EVENT A")); + final String event2 = events.get(1).getData().get("message", String.class); + assertThat(event2, containsString("context-line-3")); + assertThat(event2, containsString("MAIN EVENT B")); + } +} diff --git a/data-prepper-plugins/multiline-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/multiline/MultilineInputCodecConfigTest.java b/data-prepper-plugins/multiline-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/multiline/MultilineInputCodecConfigTest.java new file mode 100644 index 0000000000..74c5f97b9a --- /dev/null +++ b/data-prepper-plugins/multiline-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/multiline/MultilineInputCodecConfigTest.java @@ -0,0 +1,122 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.codec.multiline; + +import org.junit.jupiter.api.Test; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; + +class MultilineInputCodecConfigTest { + + @Test + void defaults_are_correct() { + final MultilineInputCodecConfig config = new MultilineInputCodecConfig(); + + assertThat(config.getEventStartPattern(), nullValue()); + assertThat(config.getEventEndPattern(), nullValue()); + assertThat(config.getContinuationLineStartPattern(), nullValue()); + assertThat(config.getContinuationLineEndPattern(), nullValue()); + assertThat(config.getOmitMatchedSection(), equalTo(false)); + assertThat(config.getMaxLines(), equalTo(MultilineInputCodecConfig.DEFAULT_MAX_LINES)); + assertThat(config.getMaxLength(), equalTo(MultilineInputCodecConfig.DEFAULT_MAX_LENGTH)); + assertThat(config.getLineSeparator(), equalTo(MultilineInputCodecConfig.DEFAULT_LINE_SEPARATOR)); + assertThat(config.getConfiguredPatternString(), nullValue()); + } + + @Test + void isExactlyOnePatternSpecified_returns_true_for_event_start_pattern() { + final MultilineInputCodecConfig config = MultilineInputCodecConfig.builder() + .withEventStartPattern("^\\d{4}") + .build(); + assertThat(config.isExactlyOnePatternSpecified(), equalTo(true)); + } + + @Test + void isExactlyOnePatternSpecified_returns_true_for_event_end_pattern() { + final MultilineInputCodecConfig config = MultilineInputCodecConfig.builder() + .withEventEndPattern("^---$") + .build(); + assertThat(config.isExactlyOnePatternSpecified(), equalTo(true)); + } + + @Test + void isExactlyOnePatternSpecified_returns_true_for_continuation_line_start_pattern() { + final MultilineInputCodecConfig config = MultilineInputCodecConfig.builder() + .withContinuationLineStartPattern("^\\s") + .build(); + assertThat(config.isExactlyOnePatternSpecified(), equalTo(true)); + } + + @Test + void isExactlyOnePatternSpecified_returns_true_for_continuation_line_end_pattern() { + final MultilineInputCodecConfig config = MultilineInputCodecConfig.builder() + .withContinuationLineEndPattern("^\\s") + .build(); + assertThat(config.isExactlyOnePatternSpecified(), equalTo(true)); + } + + @Test + void isExactlyOnePatternSpecified_returns_false_when_none_specified() { + final MultilineInputCodecConfig config = new MultilineInputCodecConfig(); + assertThat(config.isExactlyOnePatternSpecified(), equalTo(false)); + } + + @Test + void isExactlyOnePatternSpecified_returns_false_when_two_specified() { + final MultilineInputCodecConfig config = MultilineInputCodecConfig.builder() + .withEventStartPattern("^\\d{4}") + .withEventEndPattern("^---$") + .build(); + assertThat(config.isExactlyOnePatternSpecified(), equalTo(false)); + } + + @Test + void isValidPattern_returns_true_for_valid_regex() { + final MultilineInputCodecConfig config = MultilineInputCodecConfig.builder() + .withEventStartPattern("^\\d{4}-\\d{2}-\\d{2}") + .build(); + assertThat(config.isValidPattern(), equalTo(true)); + } + + @Test + void isValidPattern_returns_false_for_invalid_regex() { + final MultilineInputCodecConfig config = MultilineInputCodecConfig.builder() + .withEventStartPattern("[invalid(") + .build(); + assertThat(config.isValidPattern(), equalTo(false)); + } + + @Test + void isValidPattern_returns_false_when_no_pattern_configured() { + final MultilineInputCodecConfig config = new MultilineInputCodecConfig(); + assertThat(config.isValidPattern(), equalTo(false)); + } + + @Test + void getConfiguredPatternString_returns_null_when_none_specified() { + final MultilineInputCodecConfig config = new MultilineInputCodecConfig(); + assertThat(config.getConfiguredPatternString(), nullValue()); + } + + @Test + void isValidEncoding_returns_true_for_default_utf8() { + final MultilineInputCodecConfig config = new MultilineInputCodecConfig(); + assertThat(config.isValidEncoding(), equalTo(true)); + } + + @Test + void isValidEncoding_returns_true_for_valid_charset() { + final MultilineInputCodecConfig config = new MultilineInputCodecConfig(); + assertThat(config.isValidEncoding(), equalTo(true)); + assertThat(config.getEncoding().name(), equalTo("UTF-8")); + } +} diff --git a/data-prepper-plugins/multiline-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/multiline/MultilineInputCodecTest.java b/data-prepper-plugins/multiline-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/multiline/MultilineInputCodecTest.java new file mode 100644 index 0000000000..ea8dc07d4d --- /dev/null +++ b/data-prepper-plugins/multiline-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/multiline/MultilineInputCodecTest.java @@ -0,0 +1,474 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.codec.multiline; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.event.TestEventFactory; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventFactory; +import org.opensearch.dataprepper.model.record.Record; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Pattern; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class MultilineInputCodecTest { + + @Mock + private MultilineInputCodecConfig config; + + private final EventFactory eventFactory = TestEventFactory.getTestEventFactory(); + + private MultilineInputCodec createObjectUnderTest() { + return new MultilineInputCodec(config, eventFactory); + } + + private InputStream toInputStream(final String content) { + return new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)); + } + + private List> parseContent(final String content) throws IOException { + final List> events = new ArrayList<>(); + createObjectUnderTest().parse(toInputStream(content), events::add); + return events; + } + + @Test + void constructor_throws_if_config_is_null() { + assertThrows(NullPointerException.class, () -> new MultilineInputCodec(null, eventFactory)); + } + + @Test + void constructor_throws_if_eventFactory_is_null() { + assertThrows(NullPointerException.class, () -> new MultilineInputCodec(config, null)); + } + + @Test + void constructor_throws_if_no_pattern_configured() { + when(config.getCompiledPattern()).thenReturn(null); + + assertThrows(IllegalArgumentException.class, this::createObjectUnderTest); + } + + private void setupConfig(final String patternStr) { + when(config.getCompiledPattern()).thenReturn(Pattern.compile(patternStr)); + when(config.getMaxLines()).thenReturn(500); + when(config.getMaxLength()).thenReturn(10000); + when(config.getLineSeparator()).thenReturn("\n"); + when(config.getOmitMatchedSection()).thenReturn(false); + when(config.getEncoding()).thenReturn(StandardCharsets.UTF_8); + } + + @Nested + class EventStartMode { + + @BeforeEach + void setUp() { + setupConfig("^\\d{4}-\\d{2}-\\d{2}"); + when(config.getEventStartPattern()).thenReturn("^\\d{4}-\\d{2}-\\d{2}"); + } + + @Test + void groups_stack_trace_with_timestamp_start() throws IOException { + final String input = "2024-01-01 ERROR NullPointerException\n" + + " at com.example.Service.method(Service.java:42)\n" + + " at com.example.Main.run(Main.java:10)\n" + + "2024-01-01 INFO Application recovered\n"; + + final List> events = parseContent(input); + + assertThat(events.size(), equalTo(2)); + assertThat(events.get(0).getData().get("message", String.class), + equalTo("2024-01-01 ERROR NullPointerException\n" + + " at com.example.Service.method(Service.java:42)\n" + + " at com.example.Main.run(Main.java:10)")); + assertThat(events.get(1).getData().get("message", String.class), + equalTo("2024-01-01 INFO Application recovered")); + } + + @Test + void multiple_single_line_events() throws IOException { + final String input = "2024-01-01 INFO line one\n" + + "2024-01-02 INFO line two\n" + + "2024-01-03 INFO line three\n"; + + final List> events = parseContent(input); + + assertThat(events.size(), equalTo(3)); + assertThat(events.get(0).getData().get("message", String.class), equalTo("2024-01-01 INFO line one")); + assertThat(events.get(1).getData().get("message", String.class), equalTo("2024-01-02 INFO line two")); + assertThat(events.get(2).getData().get("message", String.class), equalTo("2024-01-03 INFO line three")); + } + + @Test + void continuation_lines_at_beginning_grouped_as_first_event() throws IOException { + final String input = " orphan line 1\n" + + " orphan line 2\n" + + "2024-01-01 INFO first entry\n"; + + final List> events = parseContent(input); + + assertThat(events.size(), equalTo(2)); + assertThat(events.get(0).getData().get("message", String.class), + equalTo(" orphan line 1\n orphan line 2")); + assertThat(events.get(1).getData().get("message", String.class), + equalTo("2024-01-01 INFO first entry")); + } + + @Test + void last_event_flushed_at_end_of_stream() throws IOException { + final String input = "2024-01-01 ERROR Exception\n" + + " at com.example.Foo.bar(Foo.java:1)\n"; + + final List> events = parseContent(input); + + assertThat(events.size(), equalTo(1)); + assertThat(events.get(0).getData().get("message", String.class), + equalTo("2024-01-01 ERROR Exception\n at com.example.Foo.bar(Foo.java:1)")); + } + + @Test + void empty_input_produces_no_events() throws IOException { + final List> events = parseContent(""); + assertThat(events.size(), equalTo(0)); + } + + @Test + void no_lines_match_produces_single_event() throws IOException { + final String input = " line 1\n line 2\n line 3\n"; + + final List> events = parseContent(input); + + assertThat(events.size(), equalTo(1)); + assertThat(events.get(0).getData().get("message", String.class), + equalTo(" line 1\n line 2\n line 3")); + } + } + + @Nested + class EventEndMode { + + @BeforeEach + void setUp() { + setupConfig("^---$"); + when(config.getEventEndPattern()).thenReturn("^---$"); + } + + @Test + void groups_lines_until_separator() throws IOException { + final String input = "line 1\n" + + "line 2\n" + + "---\n" + + "line 3\n" + + "line 4\n" + + "---\n"; + + final List> events = parseContent(input); + + assertThat(events.size(), equalTo(2)); + assertThat(events.get(0).getData().get("message", String.class), + equalTo("line 1\nline 2\n---")); + assertThat(events.get(1).getData().get("message", String.class), + equalTo("line 3\nline 4\n---")); + } + + @Test + void trailing_lines_without_end_marker_flushed() throws IOException { + final String input = "line 1\n" + + "---\n" + + "line 2\n" + + "line 3\n"; + + final List> events = parseContent(input); + + assertThat(events.size(), equalTo(2)); + assertThat(events.get(0).getData().get("message", String.class), + equalTo("line 1\n---")); + assertThat(events.get(1).getData().get("message", String.class), + equalTo("line 2\nline 3")); + } + + @Test + void single_line_matching_end_pattern() throws IOException { + final String input = "---\n"; + + final List> events = parseContent(input); + + assertThat(events.size(), equalTo(1)); + assertThat(events.get(0).getData().get("message", String.class), equalTo("---")); + } + } + + @Nested + class ContinuationStartMode { + + @BeforeEach + void setUp() { + setupConfig("^\\s+(at |\\.\\.\\.|Caused by:)"); + when(config.getContinuationLineStartPattern()).thenReturn("^\\s+(at |\\.\\.\\.|Caused by:)"); + } + + @Test + void groups_stack_trace_lines_with_previous() throws IOException { + final String input = "java.lang.NullPointerException: null\n" + + " at com.example.Service.process(Service.java:42)\n" + + " at com.example.Main.run(Main.java:10)\n" + + "INFO: Recovery complete\n"; + + final List> events = parseContent(input); + + assertThat(events.size(), equalTo(2)); + assertThat(events.get(0).getData().get("message", String.class), + equalTo("java.lang.NullPointerException: null\n" + + " at com.example.Service.process(Service.java:42)\n" + + " at com.example.Main.run(Main.java:10)")); + assertThat(events.get(1).getData().get("message", String.class), + equalTo("INFO: Recovery complete")); + } + + @Test + void caused_by_grouped_with_previous() throws IOException { + final String input = "java.lang.RuntimeException: error\n" + + " at com.example.A.method(A.java:1)\n" + + " Caused by: java.io.IOException\n" + + " at com.example.B.read(B.java:5)\n" + + "Next log entry\n"; + + final List> events = parseContent(input); + + assertThat(events.size(), equalTo(2)); + assertThat(events.get(0).getData().get("message", String.class), + equalTo("java.lang.RuntimeException: error\n" + + " at com.example.A.method(A.java:1)\n" + + " Caused by: java.io.IOException\n" + + " at com.example.B.read(B.java:5)")); + } + } + + @Nested + class ContinuationEndMode { + + @BeforeEach + void setUp() { + setupConfig("^\\s"); + } + + @Test + void continuation_lines_prepended_to_next_event() throws IOException { + final String input = " header line 1\n" + + " header line 2\n" + + "MAIN LOG ENTRY\n"; + + final List> events = parseContent(input); + + assertThat(events.size(), equalTo(1)); + assertThat(events.get(0).getData().get("message", String.class), + equalTo(" header line 1\n header line 2\nMAIN LOG ENTRY")); + } + + @Test + void multiple_groups() throws IOException { + final String input = " context A\n" + + "EVENT A\n" + + " context B\n" + + "EVENT B\n"; + + final List> events = parseContent(input); + + assertThat(events.size(), equalTo(2)); + assertThat(events.get(0).getData().get("message", String.class), + equalTo(" context A\nEVENT A")); + assertThat(events.get(1).getData().get("message", String.class), + equalTo(" context B\nEVENT B")); + } + + @Test + void trailing_continuation_lines_flushed() throws IOException { + final String input = "EVENT A\n" + + " trailing 1\n" + + " trailing 2\n"; + + final List> events = parseContent(input); + + assertThat(events.size(), equalTo(2)); + assertThat(events.get(0).getData().get("message", String.class), equalTo("EVENT A")); + assertThat(events.get(1).getData().get("message", String.class), + equalTo(" trailing 1\n trailing 2")); + } + + @Test + void no_continuation_lines_each_is_separate_event() throws IOException { + final String input = "EVENT A\nEVENT B\nEVENT C\n"; + + final List> events = parseContent(input); + + assertThat(events.size(), equalTo(3)); + assertThat(events.get(0).getData().get("message", String.class), equalTo("EVENT A")); + assertThat(events.get(1).getData().get("message", String.class), equalTo("EVENT B")); + assertThat(events.get(2).getData().get("message", String.class), equalTo("EVENT C")); + } + } + + @Nested + class OmitMatchedSection { + + @Test + void event_start_pattern_omits_matched_section() throws IOException { + when(config.getCompiledPattern()).thenReturn(Pattern.compile("^\\d{4}-\\d{2}-\\d{2}\\s+")); + when(config.getEventStartPattern()).thenReturn("^\\d{4}-\\d{2}-\\d{2}\\s+"); + when(config.getMaxLines()).thenReturn(500); + when(config.getMaxLength()).thenReturn(10000); + when(config.getLineSeparator()).thenReturn("\n"); + when(config.getOmitMatchedSection()).thenReturn(true); + when(config.getEncoding()).thenReturn(StandardCharsets.UTF_8); + + final String input = "2024-01-01 ERROR something\n" + + " stack trace line\n" + + "2024-01-02 INFO recovered\n"; + + final List> events = parseContent(input); + + assertThat(events.size(), equalTo(2)); + assertThat(events.get(0).getData().get("message", String.class), + equalTo("ERROR something\n stack trace line")); + assertThat(events.get(1).getData().get("message", String.class), + equalTo("INFO recovered")); + } + + @Test + void event_end_pattern_omits_matched_section() throws IOException { + when(config.getCompiledPattern()).thenReturn(Pattern.compile("^---$")); + when(config.getEventEndPattern()).thenReturn("^---$"); + when(config.getMaxLines()).thenReturn(500); + when(config.getMaxLength()).thenReturn(10000); + when(config.getLineSeparator()).thenReturn("\n"); + when(config.getOmitMatchedSection()).thenReturn(true); + when(config.getEncoding()).thenReturn(StandardCharsets.UTF_8); + + final String input = "line 1\nline 2\n---\nline 3\n---\n"; + + final List> events = parseContent(input); + + assertThat(events.size(), equalTo(2)); + assertThat(events.get(0).getData().get("message", String.class), + equalTo("line 1\nline 2")); + assertThat(events.get(1).getData().get("message", String.class), + equalTo("line 3")); + } + + @Test + void omit_false_preserves_matched_section() throws IOException { + when(config.getCompiledPattern()).thenReturn(Pattern.compile("^\\d{4}-\\d{2}-\\d{2}\\s+")); + when(config.getEventStartPattern()).thenReturn("^\\d{4}-\\d{2}-\\d{2}\\s+"); + when(config.getMaxLines()).thenReturn(500); + when(config.getMaxLength()).thenReturn(10000); + when(config.getLineSeparator()).thenReturn("\n"); + when(config.getOmitMatchedSection()).thenReturn(false); + when(config.getEncoding()).thenReturn(StandardCharsets.UTF_8); + + final String input = "2024-01-01 ERROR something\n"; + + final List> events = parseContent(input); + + assertThat(events.size(), equalTo(1)); + assertThat(events.get(0).getData().get("message", String.class), + equalTo("2024-01-01 ERROR something")); + } + } + + @Nested + class MaxLinesLimit { + + @BeforeEach + void setUp() { + when(config.getCompiledPattern()).thenReturn(Pattern.compile("^\\d{4}")); + when(config.getEventStartPattern()).thenReturn("^\\d{4}"); + when(config.getMaxLines()).thenReturn(3); + when(config.getMaxLength()).thenReturn(10000); + when(config.getLineSeparator()).thenReturn("\n"); + when(config.getOmitMatchedSection()).thenReturn(false); + when(config.getEncoding()).thenReturn(StandardCharsets.UTF_8); + } + + @Test + void flushes_event_when_max_lines_exceeded() throws IOException { + final String input = "2024 start\n line 2\n line 3\n line 4\n line 5\n2024 next\n"; + + final List> events = parseContent(input); + + assertThat(events.size(), equalTo(3)); + assertThat(events.get(0).getData().get("message", String.class), + equalTo("2024 start\n line 2\n line 3")); + assertThat(events.get(1).getData().get("message", String.class), + equalTo(" line 4\n line 5")); + assertThat(events.get(2).getData().get("message", String.class), + equalTo("2024 next")); + } + } + + @Nested + class MaxLengthLimit { + + @BeforeEach + void setUp() { + when(config.getCompiledPattern()).thenReturn(Pattern.compile("^\\d{4}")); + when(config.getEventStartPattern()).thenReturn("^\\d{4}"); + when(config.getMaxLines()).thenReturn(500); + when(config.getMaxLength()).thenReturn(30); + when(config.getLineSeparator()).thenReturn("\n"); + when(config.getOmitMatchedSection()).thenReturn(false); + when(config.getEncoding()).thenReturn(StandardCharsets.UTF_8); + } + + @Test + void flushes_event_when_max_length_exceeded() throws IOException { + final String input = "2024 start line here\n continuation is long\n2024 next entry\n"; + + final List> events = parseContent(input); + + assertThat(events.size(), equalTo(3)); + assertThat(events.get(0).getData().get("message", String.class), + equalTo("2024 start line here")); + assertThat(events.get(1).getData().get("message", String.class), + equalTo(" continuation is long")); + assertThat(events.get(2).getData().get("message", String.class), + equalTo("2024 next entry")); + } + } + + @Test + void event_metadata_is_log_type() throws IOException { + setupConfig("^\\d{4}"); + when(config.getEventStartPattern()).thenReturn("^\\d{4}"); + + final List> events = parseContent("2024-01-01 test\n"); + + assertThat(events.size(), equalTo(1)); + assertThat(events.get(0).getData(), notNullValue()); + assertThat(events.get(0).getData().getMetadata(), notNullValue()); + assertThat(events.get(0).getData().getMetadata().getEventType(), equalTo("LOG")); + } +} diff --git a/data-prepper-plugins/multiline-codecs/src/test/resources/org/opensearch/dataprepper/plugins/codec/multiline/continuation-line-end-pattern.yaml b/data-prepper-plugins/multiline-codecs/src/test/resources/org/opensearch/dataprepper/plugins/codec/multiline/continuation-line-end-pattern.yaml new file mode 100644 index 0000000000..2beb48c08a --- /dev/null +++ b/data-prepper-plugins/multiline-codecs/src/test/resources/org/opensearch/dataprepper/plugins/codec/multiline/continuation-line-end-pattern.yaml @@ -0,0 +1,15 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + +test-pipeline: + source: + unused: + processor: + - multiline: + continuation_line_end_pattern: "^\\s" + sink: + - unused: diff --git a/data-prepper-plugins/multiline-codecs/src/test/resources/org/opensearch/dataprepper/plugins/codec/multiline/continuation-line-start-pattern.yaml b/data-prepper-plugins/multiline-codecs/src/test/resources/org/opensearch/dataprepper/plugins/codec/multiline/continuation-line-start-pattern.yaml new file mode 100644 index 0000000000..7fbb62d7cc --- /dev/null +++ b/data-prepper-plugins/multiline-codecs/src/test/resources/org/opensearch/dataprepper/plugins/codec/multiline/continuation-line-start-pattern.yaml @@ -0,0 +1,15 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + +test-pipeline: + source: + unused: + processor: + - multiline: + continuation_line_start_pattern: "^\\s+(at |\\.\\.\\.|Caused by:)" + sink: + - unused: diff --git a/data-prepper-plugins/multiline-codecs/src/test/resources/org/opensearch/dataprepper/plugins/codec/multiline/event-end-pattern.yaml b/data-prepper-plugins/multiline-codecs/src/test/resources/org/opensearch/dataprepper/plugins/codec/multiline/event-end-pattern.yaml new file mode 100644 index 0000000000..06b9577b18 --- /dev/null +++ b/data-prepper-plugins/multiline-codecs/src/test/resources/org/opensearch/dataprepper/plugins/codec/multiline/event-end-pattern.yaml @@ -0,0 +1,15 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + +test-pipeline: + source: + unused: + processor: + - multiline: + event_end_pattern: "^---$" + sink: + - unused: diff --git a/data-prepper-plugins/multiline-codecs/src/test/resources/org/opensearch/dataprepper/plugins/codec/multiline/event-start-pattern.yaml b/data-prepper-plugins/multiline-codecs/src/test/resources/org/opensearch/dataprepper/plugins/codec/multiline/event-start-pattern.yaml new file mode 100644 index 0000000000..c95b3b7be9 --- /dev/null +++ b/data-prepper-plugins/multiline-codecs/src/test/resources/org/opensearch/dataprepper/plugins/codec/multiline/event-start-pattern.yaml @@ -0,0 +1,15 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + +test-pipeline: + source: + unused: + processor: + - multiline: + event_start_pattern: "^\\d{4}-\\d{2}-\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}" + sink: + - unused: diff --git a/data-prepper-plugins/multiline-codecs/src/test/resources/org/opensearch/dataprepper/plugins/codec/multiline/omit-matched-section.yaml b/data-prepper-plugins/multiline-codecs/src/test/resources/org/opensearch/dataprepper/plugins/codec/multiline/omit-matched-section.yaml new file mode 100644 index 0000000000..ec7b990b13 --- /dev/null +++ b/data-prepper-plugins/multiline-codecs/src/test/resources/org/opensearch/dataprepper/plugins/codec/multiline/omit-matched-section.yaml @@ -0,0 +1,16 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + +test-pipeline: + source: + unused: + processor: + - multiline: + event_start_pattern: "^\\d{4}-\\d{2}-\\d{2}\\s+" + omit_matched_section: true + sink: + - unused: diff --git a/settings.gradle b/settings.gradle index f6f07cc1b0..3409e170eb 100644 --- a/settings.gradle +++ b/settings.gradle @@ -169,6 +169,7 @@ include 'release:maven' include 'e2e-test:peerforwarder' include 'data-prepper-plugins:failures-common' include 'data-prepper-plugins:newline-codecs' +include 'data-prepper-plugins:multiline-codecs' include 'data-prepper-plugins:avro-codecs' include 'data-prepper-plugins:kafka-plugins' include 'data-prepper-plugins:user-agent-processor'