-
Notifications
You must be signed in to change notification settings - Fork 331
Add multiline input codec for grouping multi-line log events #6911
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
graytaylor0
merged 3 commits into
opensearch-project:main
from
yavmanis:feature/multiline_codec_support
Jun 18, 2026
Merged
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,121 @@ | ||
| # Multiline Codecs | ||
|
|
||
| This plugin provides a multiline input codec for Data Prepper that groups consecutive lines from an input stream into single events based on a configurable regex pattern. | ||
|
|
||
| ## Usages | ||
|
|
||
| The multiline input codec can be configured with source plugins (e.g. S3 source, file source) in the pipeline file. | ||
|
|
||
| ### Use Cases | ||
|
|
||
| - **Java/Kotlin stack traces**: Exception messages followed by `at ...` lines | ||
| - **Python tracebacks**: `Traceback` blocks spanning multiple lines | ||
| - **Timestamp-prefixed logs**: Logs where each entry starts with a timestamp and continuation lines don't | ||
| - **Multi-line JSON/XML in logs**: Structured data embedded across multiple lines within log entries | ||
| - **Custom log formats**: Any format where a recognizable pattern marks the start or end of a new event | ||
|
|
||
| ## Configuration Options | ||
|
|
||
| Exactly one of the four pattern fields must be specified: | ||
|
|
||
| | Option | Required | Type | Default | Description | | ||
| |---|---|---|---|---| | ||
| | `event_start_pattern` | One of four | String (regex) | - | A new event begins at each line matching this pattern | | ||
| | `event_end_pattern` | One of four | String (regex) | - | An event ends at each line matching this pattern (inclusive) | | ||
| | `continuation_line_start_pattern` | One of four | String (regex) | - | Lines matching this pattern are continuations of the previous event | | ||
| | `continuation_line_end_pattern` | One of four | String (regex) | - | Lines matching this pattern are prepended to the next event | | ||
| | `omit_matched_section` | No | Boolean | `false` | When true, the matched portion of the line is omitted from the output | | ||
| | `max_lines` | No | Integer | `500` | Maximum number of lines that can be combined into a single event | | ||
| | `max_length` | No | Integer | `10000` | Maximum character length of a combined multiline event. Note: a single line exceeding this limit will still be emitted as a complete event without truncation | | ||
| | `line_separator` | No | String | `\n` | Separator string used when joining lines into a single event message. Note: `BufferedReader.readLine()` strips original line endings, so the codec normalizes joined lines using this separator. Set to `""` for no separator | | ||
| | `encoding` | No | String | `UTF-8` | Character encoding to use when reading the input stream | | ||
|
|
||
|
dlvenable marked this conversation as resolved.
|
||
| ## How It Works | ||
|
|
||
| The codec reads lines from the input stream and uses the configured pattern to determine event boundaries: | ||
|
|
||
| 1. **`event_start_pattern`** (most common): Each line matching the pattern starts a new event. All subsequent non-matching lines are appended to it. | ||
|
|
||
| 2. **`event_end_pattern`**: Lines are accumulated until a line matches the pattern. The matching line is included in the current event, and the next line starts a new event. | ||
|
|
||
| 3. **`continuation_line_start_pattern`**: Lines matching the pattern are continuations of the previous event. Non-matching lines start new events. | ||
|
|
||
| 4. **`continuation_line_end_pattern`**: Lines matching the pattern are prepended to the next non-matching line's event. | ||
|
|
||
| ## Examples | ||
|
|
||
| ### Java Stack Traces | ||
|
|
||
| Each log entry starts with a timestamp. Lines without a timestamp (stack frames) are part of the previous entry. | ||
|
|
||
| ```yaml | ||
| pipeline: | ||
| source: | ||
| s3: | ||
| codec: | ||
| multiline: | ||
| event_start_pattern: "^\\d{4}-\\d{2}-\\d{2}" | ||
| ``` | ||
|
|
||
| Input: | ||
| ``` | ||
| 2024-01-01 12:00:00 ERROR NullPointerException | ||
| at com.example.Service.method(Service.java:42) | ||
| at com.example.Main.run(Main.java:10) | ||
| 2024-01-01 12:00:01 INFO Application recovered | ||
| ``` | ||
|
|
||
| Result: 2 events (stack trace grouped with its ERROR line) | ||
|
|
||
| ### Delimiter-Separated Entries | ||
|
|
||
| Log entries are separated by a `---` line. | ||
|
|
||
| ```yaml | ||
| pipeline: | ||
| source: | ||
| s3: | ||
| codec: | ||
| multiline: | ||
| event_end_pattern: "^---$" | ||
| ``` | ||
|
|
||
| ### Stack Traces (continuation pattern) | ||
|
|
||
| Lines starting with whitespace followed by `at ` or `Caused by:` are continuations. | ||
|
|
||
| ```yaml | ||
| pipeline: | ||
| source: | ||
| s3: | ||
| codec: | ||
| multiline: | ||
| continuation_line_start_pattern: "^\\s+(at |\\.\\.\\.|Caused by:)" | ||
| ``` | ||
|
|
||
| ### Omitting Timestamps from Output | ||
|
|
||
| Strip the timestamp from each event's first line: | ||
|
|
||
| ```yaml | ||
| pipeline: | ||
| source: | ||
| s3: | ||
| codec: | ||
| multiline: | ||
| event_start_pattern: "^\\d{4}-\\d{2}-\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}\\s+" | ||
| omit_matched_section: true | ||
| ``` | ||
|
|
||
| ## Developer Guide | ||
|
|
||
| This plugin is compatible with Java 11. See below: | ||
|
|
||
| - [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md) | ||
| - [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md) | ||
|
|
||
| The following command runs the unit and integration tests: | ||
|
|
||
| ``` | ||
| ./gradlew :data-prepper-plugins:multiline-codecs:test | ||
| ``` | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,16 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| * | ||
| * The OpenSearch Contributors require contributions made to | ||
| * this file be licensed under the Apache-2.0 license or a | ||
| * compatible open source license. | ||
| */ | ||
|
|
||
| dependencies { | ||
| implementation project(':data-prepper-api') | ||
| implementation 'com.fasterxml.jackson.core:jackson-annotations' | ||
| testImplementation project(':data-prepper-plugins:common') | ||
| testImplementation project(':data-prepper-test:test-event') | ||
| testImplementation project(':data-prepper-test:plugin-test-framework') | ||
| } |
181 changes: 181 additions & 0 deletions
181
...src/main/java/org/opensearch/dataprepper/plugins/codec/multiline/MultilineInputCodec.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,181 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| * | ||
| * The OpenSearch Contributors require contributions made to | ||
| * this file be licensed under the Apache-2.0 license or a | ||
| * compatible open source license. | ||
| */ | ||
|
|
||
| package org.opensearch.dataprepper.plugins.codec.multiline; | ||
|
|
||
| import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; | ||
| import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; | ||
| import org.opensearch.dataprepper.model.codec.InputCodec; | ||
| import org.opensearch.dataprepper.model.event.Event; | ||
| import org.opensearch.dataprepper.model.event.EventFactory; | ||
| import org.opensearch.dataprepper.model.event.LogEventBuilder; | ||
| import org.opensearch.dataprepper.model.log.Log; | ||
| import org.opensearch.dataprepper.model.record.Record; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import java.io.BufferedReader; | ||
| import java.io.IOException; | ||
| import java.io.InputStream; | ||
| import java.io.InputStreamReader; | ||
| import java.nio.charset.Charset; | ||
| import java.util.Collections; | ||
| import java.util.Objects; | ||
| import java.util.function.Consumer; | ||
| import java.util.regex.Matcher; | ||
| import java.util.regex.Pattern; | ||
|
|
||
| /** | ||
| * An implementation of {@link InputCodec} which groups multiple lines from an input stream | ||
| * into single events based on a configurable regex pattern. | ||
| * | ||
| * <p>This is useful for ingesting logs where a single logical event spans multiple lines, | ||
| * such as Java stack traces, Python tracebacks, or any log format where entries begin with | ||
| * a recognizable pattern (e.g., a timestamp).</p> | ||
| * | ||
| * <p>The codec supports four mutually exclusive pattern modes:</p> | ||
| * <ul> | ||
| * <li>{@code event_start_pattern}: A new event begins at each matching line.</li> | ||
| * <li>{@code event_end_pattern}: An event ends at each matching line (inclusive).</li> | ||
| * <li>{@code continuation_line_start_pattern}: Matching lines are continuations of the previous event.</li> | ||
| * <li>{@code continuation_line_end_pattern}: Matching lines are prepended to the next event.</li> | ||
| * </ul> | ||
| */ | ||
| @DataPrepperPlugin(name = "multiline", pluginType = InputCodec.class, pluginConfigurationType = MultilineInputCodecConfig.class) | ||
| public class MultilineInputCodec implements InputCodec { | ||
|
|
||
| private static final Logger LOG = LoggerFactory.getLogger(MultilineInputCodec.class); | ||
| private static final String MESSAGE_FIELD_NAME = "message"; | ||
|
|
||
| private final Pattern pattern; | ||
| private final boolean boundaryOnMatch; | ||
| private final boolean flushAfter; | ||
| private final boolean omitMatchedSection; | ||
| private final int maxLines; | ||
| private final int maxLength; | ||
| private final String lineSeparator; | ||
| private final Charset encoding; | ||
| private final EventFactory eventFactory; | ||
|
|
||
| @DataPrepperPluginConstructor | ||
| public MultilineInputCodec(final MultilineInputCodecConfig config, final EventFactory eventFactory) { | ||
| Objects.requireNonNull(config, "config must not be null"); | ||
| this.eventFactory = Objects.requireNonNull(eventFactory, "eventFactory must not be null"); | ||
|
|
||
| this.pattern = config.getCompiledPattern(); | ||
| if (this.pattern == null) { | ||
| throw new IllegalArgumentException("A valid pattern must be configured"); | ||
| } | ||
|
|
||
| final MultilineMode mode = resolveMode(config); | ||
| this.boundaryOnMatch = (mode == MultilineMode.EVENT_START || mode == MultilineMode.EVENT_END); | ||
| this.flushAfter = (mode == MultilineMode.EVENT_END || mode == MultilineMode.CONTINUATION_END); | ||
| this.omitMatchedSection = config.getOmitMatchedSection(); | ||
| this.maxLines = config.getMaxLines(); | ||
| this.maxLength = config.getMaxLength(); | ||
| this.lineSeparator = config.getLineSeparator(); | ||
| this.encoding = config.getEncoding(); | ||
| } | ||
|
|
||
| private static MultilineMode resolveMode(final MultilineInputCodecConfig config) { | ||
| if (config.getEventStartPattern() != null) { | ||
| return MultilineMode.EVENT_START; | ||
| } else if (config.getEventEndPattern() != null) { | ||
| return MultilineMode.EVENT_END; | ||
| } else if (config.getContinuationLineStartPattern() != null) { | ||
| return MultilineMode.CONTINUATION_START; | ||
| } else { | ||
| return MultilineMode.CONTINUATION_END; | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void parse(final InputStream inputStream, final Consumer<Record<Event>> eventConsumer) throws IOException { | ||
| Objects.requireNonNull(inputStream, "inputStream must not be null"); | ||
| Objects.requireNonNull(eventConsumer, "eventConsumer must not be null"); | ||
|
|
||
| final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, encoding)); | ||
| parseLines(reader, eventConsumer); | ||
| } | ||
|
|
||
| private void parseLines(final BufferedReader reader, final Consumer<Record<Event>> eventConsumer) throws IOException { | ||
| final StringBuilder buffer = new StringBuilder(); | ||
| int lineCount = 0; | ||
| String line; | ||
|
|
||
| while ((line = reader.readLine()) != null) { | ||
| final boolean matches = pattern.matcher(line).find(); | ||
| final boolean isBoundary = (boundaryOnMatch == matches); | ||
|
|
||
| if ((!flushAfter && isBoundary) || shouldFlush(buffer, lineCount, line)) { | ||
| flushIfNonEmpty(buffer, eventConsumer); | ||
| lineCount = 0; | ||
| } | ||
|
|
||
| appendLineToBuffer(buffer, processLine(line, matches)); | ||
| lineCount++; | ||
|
|
||
| if (flushAfter && isBoundary) { | ||
| flushIfNonEmpty(buffer, eventConsumer); | ||
| lineCount = 0; | ||
| } | ||
| } | ||
|
|
||
| flushIfNonEmpty(buffer, eventConsumer); | ||
| } | ||
|
|
||
| private String processLine(final String line, final boolean matches) { | ||
| if (!omitMatchedSection || !matches) { | ||
| return line; | ||
| } | ||
| final Matcher matcher = pattern.matcher(line); | ||
| return matcher.replaceFirst(""); | ||
| } | ||
|
|
||
| private void appendLineToBuffer(final StringBuilder buffer, final String processedLine) { | ||
| if (processedLine.isEmpty()) { | ||
| return; | ||
| } | ||
| if (buffer.length() > 0) { | ||
| buffer.append(lineSeparator); | ||
| } | ||
| buffer.append(processedLine); | ||
| } | ||
|
|
||
| private void flushIfNonEmpty(final StringBuilder buffer, final Consumer<Record<Event>> eventConsumer) { | ||
| if (buffer.length() > 0) { | ||
| emitEvent(buffer.toString(), eventConsumer); | ||
| buffer.setLength(0); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Determines if the buffer should be flushed before appending the next line. | ||
| * Note: if a single line exceeds max_length on its own, it will still be emitted | ||
| * as a complete event without truncation. | ||
| */ | ||
| private boolean shouldFlush(final StringBuilder buffer, final int lineCount, final String nextLine) { | ||
| if (lineCount >= maxLines) { | ||
| LOG.debug("Flushing multiline event due to max_lines limit of {}", maxLines); | ||
| return true; | ||
| } | ||
| if (buffer.length() + lineSeparator.length() + nextLine.length() > maxLength) { | ||
| LOG.debug("Flushing multiline event due to max_length limit of {}", maxLength); | ||
| return true; | ||
| } | ||
| return false; | ||
| } | ||
|
|
||
| private void emitEvent(final String message, final Consumer<Record<Event>> eventConsumer) { | ||
| final Log event = eventFactory.eventBuilder(LogEventBuilder.class) | ||
| .withData(Collections.singletonMap(MESSAGE_FIELD_NAME, message)) | ||
| .build(); | ||
| eventConsumer.accept(new Record<>(event)); | ||
| } | ||
| } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a follow up please update this documentation as well with a PR (https://github.com/opensearch-project/documentation-website/tree/main/_data-prepper/pipelines/configuration)