diff --git a/data-prepper-plugins/multiline-codecs/README.md b/data-prepper-plugins/multiline-codecs/README.md
new file mode 100644
index 0000000000..bae5add698
--- /dev/null
+++ b/data-prepper-plugins/multiline-codecs/README.md
@@ -0,0 +1,121 @@
+# Multiline Codecs
+
+This plugin provides a multiline input codec for Data Prepper that groups consecutive lines from an input stream into single events based on a configurable regex pattern.
+
+## Usages
+
+The multiline input codec can be configured with source plugins (e.g. S3 source, file source) in the pipeline file.
+
+### Use Cases
+
+- **Java/Kotlin stack traces**: Exception messages followed by `at ...` lines
+- **Python tracebacks**: `Traceback` blocks spanning multiple lines
+- **Timestamp-prefixed logs**: Logs where each entry starts with a timestamp and continuation lines don't
+- **Multi-line JSON/XML in logs**: Structured data embedded across multiple lines within log entries
+- **Custom log formats**: Any format where a recognizable pattern marks the start or end of a new event
+
+## Configuration Options
+
+Exactly one of the four pattern fields must be specified:
+
+| Option | Required | Type | Default | Description |
+|---|---|---|---|---|
+| `event_start_pattern` | One of four | String (regex) | - | A new event begins at each line matching this pattern |
+| `event_end_pattern` | One of four | String (regex) | - | An event ends at each line matching this pattern (inclusive) |
+| `continuation_line_start_pattern` | One of four | String (regex) | - | Lines matching this pattern are continuations of the previous event |
+| `continuation_line_end_pattern` | One of four | String (regex) | - | Lines matching this pattern are prepended to the next event |
+| `omit_matched_section` | No | Boolean | `false` | When true, the matched portion of the line is omitted from the output |
+| `max_lines` | No | Integer | `500` | Maximum number of lines that can be combined into a single event |
+| `max_length` | No | Integer | `10000` | Maximum character length of a combined multiline event. Note: a single line exceeding this limit will still be emitted as a complete event without truncation |
+| `line_separator` | No | String | `\n` | Separator string used when joining lines into a single event message. Note: `BufferedReader.readLine()` strips original line endings, so the codec normalizes joined lines using this separator. Set to `""` for no separator |
+| `encoding` | No | String | `UTF-8` | Character encoding to use when reading the input stream |
+
+## How It Works
+
+The codec reads lines from the input stream and uses the configured pattern to determine event boundaries:
+
+1. **`event_start_pattern`** (most common): Each line matching the pattern starts a new event. All subsequent non-matching lines are appended to it.
+
+2. **`event_end_pattern`**: Lines are accumulated until a line matches the pattern. The matching line is included in the current event, and the next line starts a new event.
+
+3. **`continuation_line_start_pattern`**: Lines matching the pattern are continuations of the previous event. Non-matching lines start new events.
+
+4. **`continuation_line_end_pattern`**: Lines matching the pattern are prepended to the next non-matching line's event.
+
+## Examples
+
+### Java Stack Traces
+
+Each log entry starts with a timestamp. Lines without a timestamp (stack frames) are part of the previous entry.
+
+```yaml
+pipeline:
+ source:
+ s3:
+ codec:
+ multiline:
+ event_start_pattern: "^\\d{4}-\\d{2}-\\d{2}"
+```
+
+Input:
+```
+2024-01-01 12:00:00 ERROR NullPointerException
+ at com.example.Service.method(Service.java:42)
+ at com.example.Main.run(Main.java:10)
+2024-01-01 12:00:01 INFO Application recovered
+```
+
+Result: 2 events (stack trace grouped with its ERROR line)
+
+### Delimiter-Separated Entries
+
+Log entries are separated by a `---` line.
+
+```yaml
+pipeline:
+ source:
+ s3:
+ codec:
+ multiline:
+ event_end_pattern: "^---$"
+```
+
+### Stack Traces (continuation pattern)
+
+Lines starting with whitespace followed by `at ` or `Caused by:` are continuations.
+
+```yaml
+pipeline:
+ source:
+ s3:
+ codec:
+ multiline:
+ continuation_line_start_pattern: "^\\s+(at |\\.\\.\\.|Caused by:)"
+```
+
+### Omitting Timestamps from Output
+
+Strip the timestamp from each event's first line:
+
+```yaml
+pipeline:
+ source:
+ s3:
+ codec:
+ multiline:
+ event_start_pattern: "^\\d{4}-\\d{2}-\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}\\s+"
+ omit_matched_section: true
+```
+
+## Developer Guide
+
+This plugin is compatible with Java 11. See below:
+
+- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md)
+- [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md)
+
+The following command runs the unit and integration tests:
+
+```
+./gradlew :data-prepper-plugins:multiline-codecs:test
+```
diff --git a/data-prepper-plugins/multiline-codecs/build.gradle b/data-prepper-plugins/multiline-codecs/build.gradle
new file mode 100644
index 0000000000..65a8a97804
--- /dev/null
+++ b/data-prepper-plugins/multiline-codecs/build.gradle
@@ -0,0 +1,16 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+dependencies {
+ implementation project(':data-prepper-api')
+ implementation 'com.fasterxml.jackson.core:jackson-annotations'
+ testImplementation project(':data-prepper-plugins:common')
+ testImplementation project(':data-prepper-test:test-event')
+ testImplementation project(':data-prepper-test:plugin-test-framework')
+}
diff --git a/data-prepper-plugins/multiline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/multiline/MultilineInputCodec.java b/data-prepper-plugins/multiline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/multiline/MultilineInputCodec.java
new file mode 100644
index 0000000000..b345a2e1dd
--- /dev/null
+++ b/data-prepper-plugins/multiline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/multiline/MultilineInputCodec.java
@@ -0,0 +1,181 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.dataprepper.plugins.codec.multiline;
+
+import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
+import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
+import org.opensearch.dataprepper.model.codec.InputCodec;
+import org.opensearch.dataprepper.model.event.Event;
+import org.opensearch.dataprepper.model.event.EventFactory;
+import org.opensearch.dataprepper.model.event.LogEventBuilder;
+import org.opensearch.dataprepper.model.log.Log;
+import org.opensearch.dataprepper.model.record.Record;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.function.Consumer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * An implementation of {@link InputCodec} which groups multiple lines from an input stream
+ * into single events based on a configurable regex pattern.
+ *
+ *
This is useful for ingesting logs where a single logical event spans multiple lines,
+ * such as Java stack traces, Python tracebacks, or any log format where entries begin with
+ * a recognizable pattern (e.g., a timestamp).
+ *
+ * The codec supports four mutually exclusive pattern modes:
+ *
+ * - {@code event_start_pattern}: A new event begins at each matching line.
+ * - {@code event_end_pattern}: An event ends at each matching line (inclusive).
+ * - {@code continuation_line_start_pattern}: Matching lines are continuations of the previous event.
+ * - {@code continuation_line_end_pattern}: Matching lines are prepended to the next event.
+ *
+ */
+@DataPrepperPlugin(name = "multiline", pluginType = InputCodec.class, pluginConfigurationType = MultilineInputCodecConfig.class)
+public class MultilineInputCodec implements InputCodec {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MultilineInputCodec.class);
+ private static final String MESSAGE_FIELD_NAME = "message";
+
+ private final Pattern pattern;
+ private final boolean boundaryOnMatch;
+ private final boolean flushAfter;
+ private final boolean omitMatchedSection;
+ private final int maxLines;
+ private final int maxLength;
+ private final String lineSeparator;
+ private final Charset encoding;
+ private final EventFactory eventFactory;
+
+ @DataPrepperPluginConstructor
+ public MultilineInputCodec(final MultilineInputCodecConfig config, final EventFactory eventFactory) {
+ Objects.requireNonNull(config, "config must not be null");
+ this.eventFactory = Objects.requireNonNull(eventFactory, "eventFactory must not be null");
+
+ this.pattern = config.getCompiledPattern();
+ if (this.pattern == null) {
+ throw new IllegalArgumentException("A valid pattern must be configured");
+ }
+
+ final MultilineMode mode = resolveMode(config);
+ this.boundaryOnMatch = (mode == MultilineMode.EVENT_START || mode == MultilineMode.EVENT_END);
+ this.flushAfter = (mode == MultilineMode.EVENT_END || mode == MultilineMode.CONTINUATION_END);
+ this.omitMatchedSection = config.getOmitMatchedSection();
+ this.maxLines = config.getMaxLines();
+ this.maxLength = config.getMaxLength();
+ this.lineSeparator = config.getLineSeparator();
+ this.encoding = config.getEncoding();
+ }
+
+ private static MultilineMode resolveMode(final MultilineInputCodecConfig config) {
+ if (config.getEventStartPattern() != null) {
+ return MultilineMode.EVENT_START;
+ } else if (config.getEventEndPattern() != null) {
+ return MultilineMode.EVENT_END;
+ } else if (config.getContinuationLineStartPattern() != null) {
+ return MultilineMode.CONTINUATION_START;
+ } else {
+ return MultilineMode.CONTINUATION_END;
+ }
+ }
+
+ @Override
+ public void parse(final InputStream inputStream, final Consumer> eventConsumer) throws IOException {
+ Objects.requireNonNull(inputStream, "inputStream must not be null");
+ Objects.requireNonNull(eventConsumer, "eventConsumer must not be null");
+
+ final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, encoding));
+ parseLines(reader, eventConsumer);
+ }
+
+ private void parseLines(final BufferedReader reader, final Consumer> eventConsumer) throws IOException {
+ final StringBuilder buffer = new StringBuilder();
+ int lineCount = 0;
+ String line;
+
+ while ((line = reader.readLine()) != null) {
+ final boolean matches = pattern.matcher(line).find();
+ final boolean isBoundary = (boundaryOnMatch == matches);
+
+ if ((!flushAfter && isBoundary) || shouldFlush(buffer, lineCount, line)) {
+ flushIfNonEmpty(buffer, eventConsumer);
+ lineCount = 0;
+ }
+
+ appendLineToBuffer(buffer, processLine(line, matches));
+ lineCount++;
+
+ if (flushAfter && isBoundary) {
+ flushIfNonEmpty(buffer, eventConsumer);
+ lineCount = 0;
+ }
+ }
+
+ flushIfNonEmpty(buffer, eventConsumer);
+ }
+
+ private String processLine(final String line, final boolean matches) {
+ if (!omitMatchedSection || !matches) {
+ return line;
+ }
+ final Matcher matcher = pattern.matcher(line);
+ return matcher.replaceFirst("");
+ }
+
+ private void appendLineToBuffer(final StringBuilder buffer, final String processedLine) {
+ if (processedLine.isEmpty()) {
+ return;
+ }
+ if (buffer.length() > 0) {
+ buffer.append(lineSeparator);
+ }
+ buffer.append(processedLine);
+ }
+
+ private void flushIfNonEmpty(final StringBuilder buffer, final Consumer> eventConsumer) {
+ if (buffer.length() > 0) {
+ emitEvent(buffer.toString(), eventConsumer);
+ buffer.setLength(0);
+ }
+ }
+
+ /**
+ * Determines if the buffer should be flushed before appending the next line.
+ * Note: if a single line exceeds max_length on its own, it will still be emitted
+ * as a complete event without truncation.
+ */
+ private boolean shouldFlush(final StringBuilder buffer, final int lineCount, final String nextLine) {
+ if (lineCount >= maxLines) {
+ LOG.debug("Flushing multiline event due to max_lines limit of {}", maxLines);
+ return true;
+ }
+ if (buffer.length() + lineSeparator.length() + nextLine.length() > maxLength) {
+ LOG.debug("Flushing multiline event due to max_length limit of {}", maxLength);
+ return true;
+ }
+ return false;
+ }
+
+ private void emitEvent(final String message, final Consumer> eventConsumer) {
+ final Log event = eventFactory.eventBuilder(LogEventBuilder.class)
+ .withData(Collections.singletonMap(MESSAGE_FIELD_NAME, message))
+ .build();
+ eventConsumer.accept(new Record<>(event));
+ }
+}
diff --git a/data-prepper-plugins/multiline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/multiline/MultilineInputCodecConfig.java b/data-prepper-plugins/multiline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/multiline/MultilineInputCodecConfig.java
new file mode 100644
index 0000000000..4bea8356e7
--- /dev/null
+++ b/data-prepper-plugins/multiline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/multiline/MultilineInputCodecConfig.java
@@ -0,0 +1,211 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.dataprepper.plugins.codec.multiline;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import jakarta.validation.constraints.AssertTrue;
+import jakarta.validation.constraints.Min;
+import jakarta.validation.constraints.NotNull;
+
+import java.nio.charset.Charset;
+import java.nio.charset.IllegalCharsetNameException;
+import java.nio.charset.StandardCharsets;
+import java.nio.charset.UnsupportedCharsetException;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+/**
+ * Configuration class for the multiline input codec.
+ *
+ * The multiline codec groups consecutive lines from an input stream into a single event
+ * based on a regex pattern. Exactly one of the four pattern fields must be specified:
+ *
+ * - {@code event_start_pattern}: A new event begins at each line matching this pattern.
+ * - {@code event_end_pattern}: An event ends at each line matching this pattern (inclusive).
+ * - {@code continuation_line_start_pattern}: Lines matching this pattern are continuations of the previous event.
+ * - {@code continuation_line_end_pattern}: Lines matching this pattern are prepended to the next event.
+ *
+ *
+ * Example configuration for Java stack traces:
+ *
+ * codec:
+ * multiline:
+ * event_start_pattern: "^\\d{4}-\\d{2}-\\d{2}"
+ *
+ */
+public class MultilineInputCodecConfig {
+
+ static final int DEFAULT_MAX_LINES = 500;
+ static final int DEFAULT_MAX_LENGTH = 10000;
+ static final String DEFAULT_LINE_SEPARATOR = "\n";
+
+ @JsonProperty("event_start_pattern")
+ private String eventStartPattern;
+
+ @JsonProperty("event_end_pattern")
+ private String eventEndPattern;
+
+ @JsonProperty("continuation_line_start_pattern")
+ private String continuationLineStartPattern;
+
+ @JsonProperty("continuation_line_end_pattern")
+ private String continuationLineEndPattern;
+
+ @JsonProperty("omit_matched_section")
+ private boolean omitMatchedSection = false;
+
+ @Min(value = 1, message = "max_lines must be at least 1")
+ @JsonProperty("max_lines")
+ private int maxLines = DEFAULT_MAX_LINES;
+
+ @Min(value = 1, message = "max_length must be at least 1")
+ @JsonProperty("max_length")
+ private int maxLength = DEFAULT_MAX_LENGTH;
+
+ @NotNull(message = "line_separator must not be null")
+ @JsonProperty("line_separator")
+ private String lineSeparator = DEFAULT_LINE_SEPARATOR;
+
+ @JsonProperty("encoding")
+ private String encoding = StandardCharsets.UTF_8.name();
+
+ private Pattern compiledPattern;
+
+ public String getEventStartPattern() {
+ return eventStartPattern;
+ }
+
+ public String getEventEndPattern() {
+ return eventEndPattern;
+ }
+
+ public String getContinuationLineStartPattern() {
+ return continuationLineStartPattern;
+ }
+
+ public String getContinuationLineEndPattern() {
+ return continuationLineEndPattern;
+ }
+
+ public boolean getOmitMatchedSection() {
+ return omitMatchedSection;
+ }
+
+ public int getMaxLines() {
+ return maxLines;
+ }
+
+ public int getMaxLength() {
+ return maxLength;
+ }
+
+ public String getLineSeparator() {
+ return lineSeparator;
+ }
+
+ /**
+ * Returns the validated Charset, compiled on first access.
+ *
+ * @return The Charset.
+ */
+ public Charset getEncoding() {
+ return Charset.forName(encoding);
+ }
+
+ /**
+ * Returns the compiled regex pattern, compiled on first access.
+ *
+ * @return The compiled Pattern.
+ */
+ public Pattern getCompiledPattern() {
+ if (compiledPattern == null) {
+ compiledPattern = Pattern.compile(getConfiguredPatternString());
+ }
+ return compiledPattern;
+ }
+
+ @AssertTrue(message = "Exactly one pattern field must be specified: event_start_pattern, event_end_pattern, " +
+ "continuation_line_start_pattern, or continuation_line_end_pattern")
+ boolean isExactlyOnePatternSpecified() {
+ int count = 0;
+ if (eventStartPattern != null) count++;
+ if (eventEndPattern != null) count++;
+ if (continuationLineStartPattern != null) count++;
+ if (continuationLineEndPattern != null) count++;
+ return count == 1;
+ }
+
+ @AssertTrue(message = "The specified pattern must be a valid regular expression")
+ boolean isValidPattern() {
+ final String patternString = getConfiguredPatternString();
+ if (patternString == null || patternString.isEmpty()) {
+ return false;
+ }
+ try {
+ Pattern.compile(patternString);
+ return true;
+ } catch (final PatternSyntaxException e) {
+ return false;
+ }
+ }
+
+ @AssertTrue(message = "The specified encoding must be a valid charset")
+ boolean isValidEncoding() {
+ if (encoding == null || encoding.isEmpty()) {
+ return false;
+ }
+ try {
+ Charset.forName(encoding);
+ return true;
+ } catch (final IllegalCharsetNameException | UnsupportedCharsetException e) {
+ return false;
+ }
+ }
+
+ String getConfiguredPatternString() {
+ if (eventStartPattern != null) return eventStartPattern;
+ if (eventEndPattern != null) return eventEndPattern;
+ if (continuationLineStartPattern != null) return continuationLineStartPattern;
+ if (continuationLineEndPattern != null) return continuationLineEndPattern;
+ return null;
+ }
+
+ static Builder builder() {
+ return new Builder();
+ }
+
+ static class Builder {
+ private final MultilineInputCodecConfig config = new MultilineInputCodecConfig();
+
+ Builder withEventStartPattern(final String pattern) {
+ config.eventStartPattern = pattern;
+ return this;
+ }
+
+ Builder withEventEndPattern(final String pattern) {
+ config.eventEndPattern = pattern;
+ return this;
+ }
+
+ Builder withContinuationLineStartPattern(final String pattern) {
+ config.continuationLineStartPattern = pattern;
+ return this;
+ }
+
+ Builder withContinuationLineEndPattern(final String pattern) {
+ config.continuationLineEndPattern = pattern;
+ return this;
+ }
+
+ MultilineInputCodecConfig build() {
+ return config;
+ }
+ }
+}
diff --git a/data-prepper-plugins/multiline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/multiline/MultilineMode.java b/data-prepper-plugins/multiline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/multiline/MultilineMode.java
new file mode 100644
index 0000000000..fafa8cd0b0
--- /dev/null
+++ b/data-prepper-plugins/multiline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/multiline/MultilineMode.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.dataprepper.plugins.codec.multiline;
+
+/**
+ * Internal representation of the multiline grouping mode, determined from the configuration.
+ */
+enum MultilineMode {
+
+ /**
+ * A new event starts at each line matching the pattern.
+ * Non-matching lines are continuations of the preceding event.
+ */
+ EVENT_START,
+
+ /**
+ * An event ends at each line matching the pattern (inclusive).
+ * The next line begins a new event.
+ */
+ EVENT_END,
+
+ /**
+ * Lines matching the pattern are continuations of the previous event.
+ * Non-matching lines start new events.
+ */
+ CONTINUATION_START,
+
+ /**
+ * Lines matching the pattern are prepended to the next event.
+ * Non-matching lines complete the event.
+ */
+ CONTINUATION_END
+}
diff --git a/data-prepper-plugins/multiline-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/multiline/MultilineCodecsIT.java b/data-prepper-plugins/multiline-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/multiline/MultilineCodecsIT.java
new file mode 100644
index 0000000000..ad67ec24df
--- /dev/null
+++ b/data-prepper-plugins/multiline-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/multiline/MultilineCodecsIT.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.dataprepper.plugins.codec.multiline;
+
+import org.junit.jupiter.api.Test;
+import org.opensearch.dataprepper.model.codec.InputCodec;
+import org.opensearch.dataprepper.model.event.Event;
+import org.opensearch.dataprepper.model.record.Record;
+import org.opensearch.dataprepper.test.plugins.DataPrepperPluginTest;
+import org.opensearch.dataprepper.test.plugins.PluginConfigurationFile;
+import org.opensearch.dataprepper.test.plugins.junit.BaseDataPrepperPluginStandardTestSuite;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+
+@DataPrepperPluginTest(pluginName = "multiline", pluginType = InputCodec.class)
+public class MultilineCodecsIT extends BaseDataPrepperPluginStandardTestSuite {
+
+ private List> parseContent(final InputCodec codec, final String content) throws IOException {
+ final List> events = new ArrayList<>();
+ codec.parse(new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)), events::add);
+ return events;
+ }
+
+ @Test
+ void parse_java_stack_trace_with_event_start_pattern(
+ @PluginConfigurationFile("event-start-pattern.yaml") final InputCodec codec) throws IOException {
+
+ final String input =
+ "2024-01-15 10:23:45.123 ERROR [main] com.example.UserService - Request failed\n" +
+ "java.lang.NullPointerException: null\n" +
+ "\tat com.example.UserService.getUser(UserService.java:42)\n" +
+ "\tat com.example.Controller.handle(Controller.java:28)\n" +
+ "Caused by: java.sql.SQLException: Connection refused\n" +
+ "\tat com.mysql.jdbc.Connection.connect(Connection.java:456)\n" +
+ "2024-01-15 10:23:45.456 INFO [main] com.example.UserService - Retrying\n";
+
+ final List> events = parseContent(codec, input);
+
+ assertThat(events.size(), equalTo(2));
+ final String event1 = events.get(0).getData().get("message", String.class);
+ assertThat(event1, containsString("NullPointerException"));
+ assertThat(event1, containsString("at com.example.UserService.getUser"));
+ assertThat(event1, containsString("Caused by: java.sql.SQLException"));
+ assertThat(events.get(1).getData().get("message", String.class),
+ equalTo("2024-01-15 10:23:45.456 INFO [main] com.example.UserService - Retrying"));
+ }
+
+ @Test
+ void parse_with_event_end_pattern(
+ @PluginConfigurationFile("event-end-pattern.yaml") final InputCodec codec) throws IOException {
+
+ final String input =
+ "entry 1 line 1\n" +
+ "entry 1 line 2\n" +
+ "---\n" +
+ "entry 2 line 1\n" +
+ "---\n";
+
+ final List> events = parseContent(codec, input);
+
+ assertThat(events.size(), equalTo(2));
+ assertThat(events.get(0).getData().get("message", String.class),
+ equalTo("entry 1 line 1\nentry 1 line 2\n---"));
+ assertThat(events.get(1).getData().get("message", String.class),
+ equalTo("entry 2 line 1\n---"));
+ }
+
+ @Test
+ void parse_with_continuation_line_start_pattern(
+ @PluginConfigurationFile("continuation-line-start-pattern.yaml") final InputCodec codec) throws IOException {
+
+ final String input =
+ "java.lang.RuntimeException: error\n" +
+ " at com.example.A.method(A.java:1)\n" +
+ " Caused by: java.io.IOException\n" +
+ " at com.example.C.read(C.java:3)\n" +
+ "Application recovered\n";
+
+ final List> events = parseContent(codec, input);
+
+ assertThat(events.size(), equalTo(2));
+ final String event1 = events.get(0).getData().get("message", String.class);
+ assertThat(event1, containsString("RuntimeException: error"));
+ assertThat(event1, containsString("at com.example.A.method"));
+ assertThat(event1, containsString("Caused by: java.io.IOException"));
+ assertThat(events.get(1).getData().get("message", String.class),
+ equalTo("Application recovered"));
+ }
+
+ @Test
+ void parse_with_omit_matched_section(
+ @PluginConfigurationFile("omit-matched-section.yaml") final InputCodec codec) throws IOException {
+
+ final String input =
+ "2024-01-01 ERROR something bad\n" +
+ " stack trace\n" +
+ "2024-01-02 INFO recovered\n";
+
+ final List> events = parseContent(codec, input);
+
+ assertThat(events.size(), equalTo(2));
+ assertThat(events.get(0).getData().get("message", String.class),
+ equalTo("ERROR something bad\n stack trace"));
+ assertThat(events.get(1).getData().get("message", String.class),
+ equalTo("INFO recovered"));
+ }
+
+ @Test
+ void parse_with_continuation_line_end_pattern(
+ @PluginConfigurationFile("continuation-line-end-pattern.yaml") final InputCodec codec) throws IOException {
+
+ final String input =
+ " context-line-1\n" +
+ " context-line-2\n" +
+ "MAIN EVENT A\n" +
+ " context-line-3\n" +
+ "MAIN EVENT B\n";
+
+ final List> events = parseContent(codec, input);
+
+ assertThat(events.size(), equalTo(2));
+ final String event1 = events.get(0).getData().get("message", String.class);
+ assertThat(event1, containsString("context-line-1"));
+ assertThat(event1, containsString("context-line-2"));
+ assertThat(event1, containsString("MAIN EVENT A"));
+ final String event2 = events.get(1).getData().get("message", String.class);
+ assertThat(event2, containsString("context-line-3"));
+ assertThat(event2, containsString("MAIN EVENT B"));
+ }
+}
diff --git a/data-prepper-plugins/multiline-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/multiline/MultilineInputCodecConfigTest.java b/data-prepper-plugins/multiline-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/multiline/MultilineInputCodecConfigTest.java
new file mode 100644
index 0000000000..74c5f97b9a
--- /dev/null
+++ b/data-prepper-plugins/multiline-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/multiline/MultilineInputCodecConfigTest.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.dataprepper.plugins.codec.multiline;
+
+import org.junit.jupiter.api.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+class MultilineInputCodecConfigTest {
+
+ @Test
+ void defaults_are_correct() {
+ final MultilineInputCodecConfig config = new MultilineInputCodecConfig();
+
+ assertThat(config.getEventStartPattern(), nullValue());
+ assertThat(config.getEventEndPattern(), nullValue());
+ assertThat(config.getContinuationLineStartPattern(), nullValue());
+ assertThat(config.getContinuationLineEndPattern(), nullValue());
+ assertThat(config.getOmitMatchedSection(), equalTo(false));
+ assertThat(config.getMaxLines(), equalTo(MultilineInputCodecConfig.DEFAULT_MAX_LINES));
+ assertThat(config.getMaxLength(), equalTo(MultilineInputCodecConfig.DEFAULT_MAX_LENGTH));
+ assertThat(config.getLineSeparator(), equalTo(MultilineInputCodecConfig.DEFAULT_LINE_SEPARATOR));
+ assertThat(config.getConfiguredPatternString(), nullValue());
+ }
+
+ @Test
+ void isExactlyOnePatternSpecified_returns_true_for_event_start_pattern() {
+ final MultilineInputCodecConfig config = MultilineInputCodecConfig.builder()
+ .withEventStartPattern("^\\d{4}")
+ .build();
+ assertThat(config.isExactlyOnePatternSpecified(), equalTo(true));
+ }
+
+ @Test
+ void isExactlyOnePatternSpecified_returns_true_for_event_end_pattern() {
+ final MultilineInputCodecConfig config = MultilineInputCodecConfig.builder()
+ .withEventEndPattern("^---$")
+ .build();
+ assertThat(config.isExactlyOnePatternSpecified(), equalTo(true));
+ }
+
+ @Test
+ void isExactlyOnePatternSpecified_returns_true_for_continuation_line_start_pattern() {
+ final MultilineInputCodecConfig config = MultilineInputCodecConfig.builder()
+ .withContinuationLineStartPattern("^\\s")
+ .build();
+ assertThat(config.isExactlyOnePatternSpecified(), equalTo(true));
+ }
+
+ @Test
+ void isExactlyOnePatternSpecified_returns_true_for_continuation_line_end_pattern() {
+ final MultilineInputCodecConfig config = MultilineInputCodecConfig.builder()
+ .withContinuationLineEndPattern("^\\s")
+ .build();
+ assertThat(config.isExactlyOnePatternSpecified(), equalTo(true));
+ }
+
+ @Test
+ void isExactlyOnePatternSpecified_returns_false_when_none_specified() {
+ final MultilineInputCodecConfig config = new MultilineInputCodecConfig();
+ assertThat(config.isExactlyOnePatternSpecified(), equalTo(false));
+ }
+
+ @Test
+ void isExactlyOnePatternSpecified_returns_false_when_two_specified() {
+ final MultilineInputCodecConfig config = MultilineInputCodecConfig.builder()
+ .withEventStartPattern("^\\d{4}")
+ .withEventEndPattern("^---$")
+ .build();
+ assertThat(config.isExactlyOnePatternSpecified(), equalTo(false));
+ }
+
+ @Test
+ void isValidPattern_returns_true_for_valid_regex() {
+ final MultilineInputCodecConfig config = MultilineInputCodecConfig.builder()
+ .withEventStartPattern("^\\d{4}-\\d{2}-\\d{2}")
+ .build();
+ assertThat(config.isValidPattern(), equalTo(true));
+ }
+
+ @Test
+ void isValidPattern_returns_false_for_invalid_regex() {
+ final MultilineInputCodecConfig config = MultilineInputCodecConfig.builder()
+ .withEventStartPattern("[invalid(")
+ .build();
+ assertThat(config.isValidPattern(), equalTo(false));
+ }
+
+ @Test
+ void isValidPattern_returns_false_when_no_pattern_configured() {
+ final MultilineInputCodecConfig config = new MultilineInputCodecConfig();
+ assertThat(config.isValidPattern(), equalTo(false));
+ }
+
+ @Test
+ void getConfiguredPatternString_returns_null_when_none_specified() {
+ final MultilineInputCodecConfig config = new MultilineInputCodecConfig();
+ assertThat(config.getConfiguredPatternString(), nullValue());
+ }
+
+ @Test
+ void isValidEncoding_returns_true_for_default_utf8() {
+ final MultilineInputCodecConfig config = new MultilineInputCodecConfig();
+ assertThat(config.isValidEncoding(), equalTo(true));
+ }
+
+ @Test
+ void isValidEncoding_returns_true_for_valid_charset() {
+ final MultilineInputCodecConfig config = new MultilineInputCodecConfig();
+ assertThat(config.isValidEncoding(), equalTo(true));
+ assertThat(config.getEncoding().name(), equalTo("UTF-8"));
+ }
+}
diff --git a/data-prepper-plugins/multiline-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/multiline/MultilineInputCodecTest.java b/data-prepper-plugins/multiline-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/multiline/MultilineInputCodecTest.java
new file mode 100644
index 0000000000..ea8dc07d4d
--- /dev/null
+++ b/data-prepper-plugins/multiline-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/multiline/MultilineInputCodecTest.java
@@ -0,0 +1,474 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.dataprepper.plugins.codec.multiline;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.opensearch.dataprepper.event.TestEventFactory;
+import org.opensearch.dataprepper.model.event.Event;
+import org.opensearch.dataprepper.model.event.EventFactory;
+import org.opensearch.dataprepper.model.record.Record;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class MultilineInputCodecTest {
+
+ @Mock
+ private MultilineInputCodecConfig config;
+
+ private final EventFactory eventFactory = TestEventFactory.getTestEventFactory();
+
+ private MultilineInputCodec createObjectUnderTest() {
+ return new MultilineInputCodec(config, eventFactory);
+ }
+
+ private InputStream toInputStream(final String content) {
+ return new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8));
+ }
+
+ private List> parseContent(final String content) throws IOException {
+ final List> events = new ArrayList<>();
+ createObjectUnderTest().parse(toInputStream(content), events::add);
+ return events;
+ }
+
+ @Test
+ void constructor_throws_if_config_is_null() {
+ assertThrows(NullPointerException.class, () -> new MultilineInputCodec(null, eventFactory));
+ }
+
+ @Test
+ void constructor_throws_if_eventFactory_is_null() {
+ assertThrows(NullPointerException.class, () -> new MultilineInputCodec(config, null));
+ }
+
+ @Test
+ void constructor_throws_if_no_pattern_configured() {
+ when(config.getCompiledPattern()).thenReturn(null);
+
+ assertThrows(IllegalArgumentException.class, this::createObjectUnderTest);
+ }
+
+ private void setupConfig(final String patternStr) {
+ when(config.getCompiledPattern()).thenReturn(Pattern.compile(patternStr));
+ when(config.getMaxLines()).thenReturn(500);
+ when(config.getMaxLength()).thenReturn(10000);
+ when(config.getLineSeparator()).thenReturn("\n");
+ when(config.getOmitMatchedSection()).thenReturn(false);
+ when(config.getEncoding()).thenReturn(StandardCharsets.UTF_8);
+ }
+
+ @Nested
+ class EventStartMode {
+
+ @BeforeEach
+ void setUp() {
+ setupConfig("^\\d{4}-\\d{2}-\\d{2}");
+ when(config.getEventStartPattern()).thenReturn("^\\d{4}-\\d{2}-\\d{2}");
+ }
+
+ @Test
+ void groups_stack_trace_with_timestamp_start() throws IOException {
+ final String input = "2024-01-01 ERROR NullPointerException\n" +
+ " at com.example.Service.method(Service.java:42)\n" +
+ " at com.example.Main.run(Main.java:10)\n" +
+ "2024-01-01 INFO Application recovered\n";
+
+ final List> events = parseContent(input);
+
+ assertThat(events.size(), equalTo(2));
+ assertThat(events.get(0).getData().get("message", String.class),
+ equalTo("2024-01-01 ERROR NullPointerException\n" +
+ " at com.example.Service.method(Service.java:42)\n" +
+ " at com.example.Main.run(Main.java:10)"));
+ assertThat(events.get(1).getData().get("message", String.class),
+ equalTo("2024-01-01 INFO Application recovered"));
+ }
+
+ @Test
+ void multiple_single_line_events() throws IOException {
+ final String input = "2024-01-01 INFO line one\n" +
+ "2024-01-02 INFO line two\n" +
+ "2024-01-03 INFO line three\n";
+
+ final List> events = parseContent(input);
+
+ assertThat(events.size(), equalTo(3));
+ assertThat(events.get(0).getData().get("message", String.class), equalTo("2024-01-01 INFO line one"));
+ assertThat(events.get(1).getData().get("message", String.class), equalTo("2024-01-02 INFO line two"));
+ assertThat(events.get(2).getData().get("message", String.class), equalTo("2024-01-03 INFO line three"));
+ }
+
+ @Test
+ void continuation_lines_at_beginning_grouped_as_first_event() throws IOException {
+ final String input = " orphan line 1\n" +
+ " orphan line 2\n" +
+ "2024-01-01 INFO first entry\n";
+
+ final List> events = parseContent(input);
+
+ assertThat(events.size(), equalTo(2));
+ assertThat(events.get(0).getData().get("message", String.class),
+ equalTo(" orphan line 1\n orphan line 2"));
+ assertThat(events.get(1).getData().get("message", String.class),
+ equalTo("2024-01-01 INFO first entry"));
+ }
+
+ @Test
+ void last_event_flushed_at_end_of_stream() throws IOException {
+ final String input = "2024-01-01 ERROR Exception\n" +
+ " at com.example.Foo.bar(Foo.java:1)\n";
+
+ final List> events = parseContent(input);
+
+ assertThat(events.size(), equalTo(1));
+ assertThat(events.get(0).getData().get("message", String.class),
+ equalTo("2024-01-01 ERROR Exception\n at com.example.Foo.bar(Foo.java:1)"));
+ }
+
+ @Test
+ void empty_input_produces_no_events() throws IOException {
+ final List> events = parseContent("");
+ assertThat(events.size(), equalTo(0));
+ }
+
+ @Test
+ void no_lines_match_produces_single_event() throws IOException {
+ final String input = " line 1\n line 2\n line 3\n";
+
+ final List> events = parseContent(input);
+
+ assertThat(events.size(), equalTo(1));
+ assertThat(events.get(0).getData().get("message", String.class),
+ equalTo(" line 1\n line 2\n line 3"));
+ }
+ }
+
+ @Nested
+ class EventEndMode {
+
+ @BeforeEach
+ void setUp() {
+ setupConfig("^---$");
+ when(config.getEventEndPattern()).thenReturn("^---$");
+ }
+
+ @Test
+ void groups_lines_until_separator() throws IOException {
+ final String input = "line 1\n" +
+ "line 2\n" +
+ "---\n" +
+ "line 3\n" +
+ "line 4\n" +
+ "---\n";
+
+ final List> events = parseContent(input);
+
+ assertThat(events.size(), equalTo(2));
+ assertThat(events.get(0).getData().get("message", String.class),
+ equalTo("line 1\nline 2\n---"));
+ assertThat(events.get(1).getData().get("message", String.class),
+ equalTo("line 3\nline 4\n---"));
+ }
+
+ @Test
+ void trailing_lines_without_end_marker_flushed() throws IOException {
+ final String input = "line 1\n" +
+ "---\n" +
+ "line 2\n" +
+ "line 3\n";
+
+ final List> events = parseContent(input);
+
+ assertThat(events.size(), equalTo(2));
+ assertThat(events.get(0).getData().get("message", String.class),
+ equalTo("line 1\n---"));
+ assertThat(events.get(1).getData().get("message", String.class),
+ equalTo("line 2\nline 3"));
+ }
+
+ @Test
+ void single_line_matching_end_pattern() throws IOException {
+ final String input = "---\n";
+
+ final List> events = parseContent(input);
+
+ assertThat(events.size(), equalTo(1));
+ assertThat(events.get(0).getData().get("message", String.class), equalTo("---"));
+ }
+ }
+
+ @Nested
+ class ContinuationStartMode {
+
+ @BeforeEach
+ void setUp() {
+ setupConfig("^\\s+(at |\\.\\.\\.|Caused by:)");
+ when(config.getContinuationLineStartPattern()).thenReturn("^\\s+(at |\\.\\.\\.|Caused by:)");
+ }
+
+ @Test
+ void groups_stack_trace_lines_with_previous() throws IOException {
+ final String input = "java.lang.NullPointerException: null\n" +
+ " at com.example.Service.process(Service.java:42)\n" +
+ " at com.example.Main.run(Main.java:10)\n" +
+ "INFO: Recovery complete\n";
+
+ final List> events = parseContent(input);
+
+ assertThat(events.size(), equalTo(2));
+ assertThat(events.get(0).getData().get("message", String.class),
+ equalTo("java.lang.NullPointerException: null\n" +
+ " at com.example.Service.process(Service.java:42)\n" +
+ " at com.example.Main.run(Main.java:10)"));
+ assertThat(events.get(1).getData().get("message", String.class),
+ equalTo("INFO: Recovery complete"));
+ }
+
+ @Test
+ void caused_by_grouped_with_previous() throws IOException {
+ final String input = "java.lang.RuntimeException: error\n" +
+ " at com.example.A.method(A.java:1)\n" +
+ " Caused by: java.io.IOException\n" +
+ " at com.example.B.read(B.java:5)\n" +
+ "Next log entry\n";
+
+ final List> events = parseContent(input);
+
+ assertThat(events.size(), equalTo(2));
+ assertThat(events.get(0).getData().get("message", String.class),
+ equalTo("java.lang.RuntimeException: error\n" +
+ " at com.example.A.method(A.java:1)\n" +
+ " Caused by: java.io.IOException\n" +
+ " at com.example.B.read(B.java:5)"));
+ }
+ }
+
+ @Nested
+ class ContinuationEndMode {
+
+ @BeforeEach
+ void setUp() {
+ setupConfig("^\\s");
+ }
+
+ @Test
+ void continuation_lines_prepended_to_next_event() throws IOException {
+ final String input = " header line 1\n" +
+ " header line 2\n" +
+ "MAIN LOG ENTRY\n";
+
+ final List> events = parseContent(input);
+
+ assertThat(events.size(), equalTo(1));
+ assertThat(events.get(0).getData().get("message", String.class),
+ equalTo(" header line 1\n header line 2\nMAIN LOG ENTRY"));
+ }
+
+ @Test
+ void multiple_groups() throws IOException {
+ final String input = " context A\n" +
+ "EVENT A\n" +
+ " context B\n" +
+ "EVENT B\n";
+
+ final List> events = parseContent(input);
+
+ assertThat(events.size(), equalTo(2));
+ assertThat(events.get(0).getData().get("message", String.class),
+ equalTo(" context A\nEVENT A"));
+ assertThat(events.get(1).getData().get("message", String.class),
+ equalTo(" context B\nEVENT B"));
+ }
+
+ @Test
+ void trailing_continuation_lines_flushed() throws IOException {
+ final String input = "EVENT A\n" +
+ " trailing 1\n" +
+ " trailing 2\n";
+
+ final List> events = parseContent(input);
+
+ assertThat(events.size(), equalTo(2));
+ assertThat(events.get(0).getData().get("message", String.class), equalTo("EVENT A"));
+ assertThat(events.get(1).getData().get("message", String.class),
+ equalTo(" trailing 1\n trailing 2"));
+ }
+
+ @Test
+ void no_continuation_lines_each_is_separate_event() throws IOException {
+ final String input = "EVENT A\nEVENT B\nEVENT C\n";
+
+ final List> events = parseContent(input);
+
+ assertThat(events.size(), equalTo(3));
+ assertThat(events.get(0).getData().get("message", String.class), equalTo("EVENT A"));
+ assertThat(events.get(1).getData().get("message", String.class), equalTo("EVENT B"));
+ assertThat(events.get(2).getData().get("message", String.class), equalTo("EVENT C"));
+ }
+ }
+
+ @Nested
+ class OmitMatchedSection {
+
+ @Test
+ void event_start_pattern_omits_matched_section() throws IOException {
+ when(config.getCompiledPattern()).thenReturn(Pattern.compile("^\\d{4}-\\d{2}-\\d{2}\\s+"));
+ when(config.getEventStartPattern()).thenReturn("^\\d{4}-\\d{2}-\\d{2}\\s+");
+ when(config.getMaxLines()).thenReturn(500);
+ when(config.getMaxLength()).thenReturn(10000);
+ when(config.getLineSeparator()).thenReturn("\n");
+ when(config.getOmitMatchedSection()).thenReturn(true);
+ when(config.getEncoding()).thenReturn(StandardCharsets.UTF_8);
+
+ final String input = "2024-01-01 ERROR something\n" +
+ " stack trace line\n" +
+ "2024-01-02 INFO recovered\n";
+
+ final List> events = parseContent(input);
+
+ assertThat(events.size(), equalTo(2));
+ assertThat(events.get(0).getData().get("message", String.class),
+ equalTo("ERROR something\n stack trace line"));
+ assertThat(events.get(1).getData().get("message", String.class),
+ equalTo("INFO recovered"));
+ }
+
+ @Test
+ void event_end_pattern_omits_matched_section() throws IOException {
+ when(config.getCompiledPattern()).thenReturn(Pattern.compile("^---$"));
+ when(config.getEventEndPattern()).thenReturn("^---$");
+ when(config.getMaxLines()).thenReturn(500);
+ when(config.getMaxLength()).thenReturn(10000);
+ when(config.getLineSeparator()).thenReturn("\n");
+ when(config.getOmitMatchedSection()).thenReturn(true);
+ when(config.getEncoding()).thenReturn(StandardCharsets.UTF_8);
+
+ final String input = "line 1\nline 2\n---\nline 3\n---\n";
+
+ final List> events = parseContent(input);
+
+ assertThat(events.size(), equalTo(2));
+ assertThat(events.get(0).getData().get("message", String.class),
+ equalTo("line 1\nline 2"));
+ assertThat(events.get(1).getData().get("message", String.class),
+ equalTo("line 3"));
+ }
+
+ @Test
+ void omit_false_preserves_matched_section() throws IOException {
+ when(config.getCompiledPattern()).thenReturn(Pattern.compile("^\\d{4}-\\d{2}-\\d{2}\\s+"));
+ when(config.getEventStartPattern()).thenReturn("^\\d{4}-\\d{2}-\\d{2}\\s+");
+ when(config.getMaxLines()).thenReturn(500);
+ when(config.getMaxLength()).thenReturn(10000);
+ when(config.getLineSeparator()).thenReturn("\n");
+ when(config.getOmitMatchedSection()).thenReturn(false);
+ when(config.getEncoding()).thenReturn(StandardCharsets.UTF_8);
+
+ final String input = "2024-01-01 ERROR something\n";
+
+ final List> events = parseContent(input);
+
+ assertThat(events.size(), equalTo(1));
+ assertThat(events.get(0).getData().get("message", String.class),
+ equalTo("2024-01-01 ERROR something"));
+ }
+ }
+
+ @Nested
+ class MaxLinesLimit {
+
+ @BeforeEach
+ void setUp() {
+ when(config.getCompiledPattern()).thenReturn(Pattern.compile("^\\d{4}"));
+ when(config.getEventStartPattern()).thenReturn("^\\d{4}");
+ when(config.getMaxLines()).thenReturn(3);
+ when(config.getMaxLength()).thenReturn(10000);
+ when(config.getLineSeparator()).thenReturn("\n");
+ when(config.getOmitMatchedSection()).thenReturn(false);
+ when(config.getEncoding()).thenReturn(StandardCharsets.UTF_8);
+ }
+
+ @Test
+ void flushes_event_when_max_lines_exceeded() throws IOException {
+ final String input = "2024 start\n line 2\n line 3\n line 4\n line 5\n2024 next\n";
+
+ final List> events = parseContent(input);
+
+ assertThat(events.size(), equalTo(3));
+ assertThat(events.get(0).getData().get("message", String.class),
+ equalTo("2024 start\n line 2\n line 3"));
+ assertThat(events.get(1).getData().get("message", String.class),
+ equalTo(" line 4\n line 5"));
+ assertThat(events.get(2).getData().get("message", String.class),
+ equalTo("2024 next"));
+ }
+ }
+
+ @Nested
+ class MaxLengthLimit {
+
+ @BeforeEach
+ void setUp() {
+ when(config.getCompiledPattern()).thenReturn(Pattern.compile("^\\d{4}"));
+ when(config.getEventStartPattern()).thenReturn("^\\d{4}");
+ when(config.getMaxLines()).thenReturn(500);
+ when(config.getMaxLength()).thenReturn(30);
+ when(config.getLineSeparator()).thenReturn("\n");
+ when(config.getOmitMatchedSection()).thenReturn(false);
+ when(config.getEncoding()).thenReturn(StandardCharsets.UTF_8);
+ }
+
+ @Test
+ void flushes_event_when_max_length_exceeded() throws IOException {
+ final String input = "2024 start line here\n continuation is long\n2024 next entry\n";
+
+ final List> events = parseContent(input);
+
+ assertThat(events.size(), equalTo(3));
+ assertThat(events.get(0).getData().get("message", String.class),
+ equalTo("2024 start line here"));
+ assertThat(events.get(1).getData().get("message", String.class),
+ equalTo(" continuation is long"));
+ assertThat(events.get(2).getData().get("message", String.class),
+ equalTo("2024 next entry"));
+ }
+ }
+
+ @Test
+ void event_metadata_is_log_type() throws IOException {
+ setupConfig("^\\d{4}");
+ when(config.getEventStartPattern()).thenReturn("^\\d{4}");
+
+ final List> events = parseContent("2024-01-01 test\n");
+
+ assertThat(events.size(), equalTo(1));
+ assertThat(events.get(0).getData(), notNullValue());
+ assertThat(events.get(0).getData().getMetadata(), notNullValue());
+ assertThat(events.get(0).getData().getMetadata().getEventType(), equalTo("LOG"));
+ }
+}
diff --git a/data-prepper-plugins/multiline-codecs/src/test/resources/org/opensearch/dataprepper/plugins/codec/multiline/continuation-line-end-pattern.yaml b/data-prepper-plugins/multiline-codecs/src/test/resources/org/opensearch/dataprepper/plugins/codec/multiline/continuation-line-end-pattern.yaml
new file mode 100644
index 0000000000..2beb48c08a
--- /dev/null
+++ b/data-prepper-plugins/multiline-codecs/src/test/resources/org/opensearch/dataprepper/plugins/codec/multiline/continuation-line-end-pattern.yaml
@@ -0,0 +1,15 @@
+# Copyright OpenSearch Contributors
+# SPDX-License-Identifier: Apache-2.0
+#
+# The OpenSearch Contributors require contributions made to
+# this file be licensed under the Apache-2.0 license or a
+# compatible open source license.
+
+test-pipeline:
+ source:
+ unused:
+ processor:
+ - multiline:
+ continuation_line_end_pattern: "^\\s"
+ sink:
+ - unused:
diff --git a/data-prepper-plugins/multiline-codecs/src/test/resources/org/opensearch/dataprepper/plugins/codec/multiline/continuation-line-start-pattern.yaml b/data-prepper-plugins/multiline-codecs/src/test/resources/org/opensearch/dataprepper/plugins/codec/multiline/continuation-line-start-pattern.yaml
new file mode 100644
index 0000000000..7fbb62d7cc
--- /dev/null
+++ b/data-prepper-plugins/multiline-codecs/src/test/resources/org/opensearch/dataprepper/plugins/codec/multiline/continuation-line-start-pattern.yaml
@@ -0,0 +1,15 @@
+# Copyright OpenSearch Contributors
+# SPDX-License-Identifier: Apache-2.0
+#
+# The OpenSearch Contributors require contributions made to
+# this file be licensed under the Apache-2.0 license or a
+# compatible open source license.
+
+test-pipeline:
+ source:
+ unused:
+ processor:
+ - multiline:
+ continuation_line_start_pattern: "^\\s+(at |\\.\\.\\.|Caused by:)"
+ sink:
+ - unused:
diff --git a/data-prepper-plugins/multiline-codecs/src/test/resources/org/opensearch/dataprepper/plugins/codec/multiline/event-end-pattern.yaml b/data-prepper-plugins/multiline-codecs/src/test/resources/org/opensearch/dataprepper/plugins/codec/multiline/event-end-pattern.yaml
new file mode 100644
index 0000000000..06b9577b18
--- /dev/null
+++ b/data-prepper-plugins/multiline-codecs/src/test/resources/org/opensearch/dataprepper/plugins/codec/multiline/event-end-pattern.yaml
@@ -0,0 +1,15 @@
+# Copyright OpenSearch Contributors
+# SPDX-License-Identifier: Apache-2.0
+#
+# The OpenSearch Contributors require contributions made to
+# this file be licensed under the Apache-2.0 license or a
+# compatible open source license.
+
+test-pipeline:
+ source:
+ unused:
+ processor:
+ - multiline:
+ event_end_pattern: "^---$"
+ sink:
+ - unused:
diff --git a/data-prepper-plugins/multiline-codecs/src/test/resources/org/opensearch/dataprepper/plugins/codec/multiline/event-start-pattern.yaml b/data-prepper-plugins/multiline-codecs/src/test/resources/org/opensearch/dataprepper/plugins/codec/multiline/event-start-pattern.yaml
new file mode 100644
index 0000000000..c95b3b7be9
--- /dev/null
+++ b/data-prepper-plugins/multiline-codecs/src/test/resources/org/opensearch/dataprepper/plugins/codec/multiline/event-start-pattern.yaml
@@ -0,0 +1,15 @@
+# Copyright OpenSearch Contributors
+# SPDX-License-Identifier: Apache-2.0
+#
+# The OpenSearch Contributors require contributions made to
+# this file be licensed under the Apache-2.0 license or a
+# compatible open source license.
+
+test-pipeline:
+ source:
+ unused:
+ processor:
+ - multiline:
+ event_start_pattern: "^\\d{4}-\\d{2}-\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}"
+ sink:
+ - unused:
diff --git a/data-prepper-plugins/multiline-codecs/src/test/resources/org/opensearch/dataprepper/plugins/codec/multiline/omit-matched-section.yaml b/data-prepper-plugins/multiline-codecs/src/test/resources/org/opensearch/dataprepper/plugins/codec/multiline/omit-matched-section.yaml
new file mode 100644
index 0000000000..ec7b990b13
--- /dev/null
+++ b/data-prepper-plugins/multiline-codecs/src/test/resources/org/opensearch/dataprepper/plugins/codec/multiline/omit-matched-section.yaml
@@ -0,0 +1,16 @@
+# Copyright OpenSearch Contributors
+# SPDX-License-Identifier: Apache-2.0
+#
+# The OpenSearch Contributors require contributions made to
+# this file be licensed under the Apache-2.0 license or a
+# compatible open source license.
+
+test-pipeline:
+ source:
+ unused:
+ processor:
+ - multiline:
+ event_start_pattern: "^\\d{4}-\\d{2}-\\d{2}\\s+"
+ omit_matched_section: true
+ sink:
+ - unused:
diff --git a/settings.gradle b/settings.gradle
index f6f07cc1b0..3409e170eb 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -169,6 +169,7 @@ include 'release:maven'
include 'e2e-test:peerforwarder'
include 'data-prepper-plugins:failures-common'
include 'data-prepper-plugins:newline-codecs'
+include 'data-prepper-plugins:multiline-codecs'
include 'data-prepper-plugins:avro-codecs'
include 'data-prepper-plugins:kafka-plugins'
include 'data-prepper-plugins:user-agent-processor'