Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions data-prepper-plugins/multiline-codecs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# Multiline Codecs

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


This plugin provides a multiline input codec for Data Prepper that groups consecutive lines from an input stream into single events based on a configurable regex pattern.

## Usages

The multiline input codec can be configured with source plugins (e.g. S3 source, file source) in the pipeline file.

### Use Cases

- **Java/Kotlin stack traces**: Exception messages followed by `at ...` lines
- **Python tracebacks**: `Traceback` blocks spanning multiple lines
- **Timestamp-prefixed logs**: Logs where each entry starts with a timestamp and continuation lines don't
- **Multi-line JSON/XML in logs**: Structured data embedded across multiple lines within log entries
- **Custom log formats**: Any format where a recognizable pattern marks the start or end of a new event

## Configuration Options

Exactly one of the four pattern fields must be specified:

| Option | Required | Type | Default | Description |
|---|---|---|---|---|
| `event_start_pattern` | One of four | String (regex) | - | A new event begins at each line matching this pattern |
| `event_end_pattern` | One of four | String (regex) | - | An event ends at each line matching this pattern (inclusive) |
| `continuation_line_start_pattern` | One of four | String (regex) | - | Lines matching this pattern are continuations of the previous event |
| `continuation_line_end_pattern` | One of four | String (regex) | - | Lines matching this pattern are prepended to the next event |
| `omit_matched_section` | No | Boolean | `false` | When true, the matched portion of the line is omitted from the output |
| `max_lines` | No | Integer | `500` | Maximum number of lines that can be combined into a single event |
| `max_length` | No | Integer | `10000` | Maximum character length of a combined multiline event. Note: a single line exceeding this limit will still be emitted as a complete event without truncation |
| `line_separator` | No | String | `\n` | Separator string used when joining lines into a single event message. Note: `BufferedReader.readLine()` strips original line endings, so the codec normalizes joined lines using this separator. Set to `""` for no separator |
| `encoding` | No | String | `UTF-8` | Character encoding to use when reading the input stream |

Comment thread
dlvenable marked this conversation as resolved.
## How It Works

The codec reads lines from the input stream and uses the configured pattern to determine event boundaries:

1. **`event_start_pattern`** (most common): Each line matching the pattern starts a new event. All subsequent non-matching lines are appended to it.

2. **`event_end_pattern`**: Lines are accumulated until a line matches the pattern. The matching line is included in the current event, and the next line starts a new event.

3. **`continuation_line_start_pattern`**: Lines matching the pattern are continuations of the previous event. Non-matching lines start new events.

4. **`continuation_line_end_pattern`**: Lines matching the pattern are prepended to the next non-matching line's event.

## Examples

### Java Stack Traces

Each log entry starts with a timestamp. Lines without a timestamp (stack frames) are part of the previous entry.

```yaml
pipeline:
source:
s3:
codec:
multiline:
event_start_pattern: "^\\d{4}-\\d{2}-\\d{2}"
```

Input:
```
2024-01-01 12:00:00 ERROR NullPointerException
at com.example.Service.method(Service.java:42)
at com.example.Main.run(Main.java:10)
2024-01-01 12:00:01 INFO Application recovered
```

Result: 2 events (stack trace grouped with its ERROR line)

### Delimiter-Separated Entries

Log entries are separated by a `---` line.

```yaml
pipeline:
source:
s3:
codec:
multiline:
event_end_pattern: "^---$"
```

### Stack Traces (continuation pattern)

Lines starting with whitespace followed by `at ` or `Caused by:` are continuations.

```yaml
pipeline:
source:
s3:
codec:
multiline:
continuation_line_start_pattern: "^\\s+(at |\\.\\.\\.|Caused by:)"
```

### Omitting Timestamps from Output

Strip the timestamp from each event's first line:

```yaml
pipeline:
source:
s3:
codec:
multiline:
event_start_pattern: "^\\d{4}-\\d{2}-\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}\\s+"
omit_matched_section: true
```

## Developer Guide

This plugin is compatible with Java 11. See below:

- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md)
- [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md)

The following command runs the unit and integration tests:

```
./gradlew :data-prepper-plugins:multiline-codecs:test
```
16 changes: 16 additions & 0 deletions data-prepper-plugins/multiline-codecs/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

dependencies {
implementation project(':data-prepper-api')
implementation 'com.fasterxml.jackson.core:jackson-annotations'
testImplementation project(':data-prepper-plugins:common')
testImplementation project(':data-prepper-test:test-event')
testImplementation project(':data-prepper-test:plugin-test-framework')
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.dataprepper.plugins.codec.multiline;

import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventFactory;
import org.opensearch.dataprepper.model.event.LogEventBuilder;
import org.opensearch.dataprepper.model.log.Log;
import org.opensearch.dataprepper.model.record.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.util.Collections;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* An implementation of {@link InputCodec} which groups multiple lines from an input stream
* into single events based on a configurable regex pattern.
*
* <p>This is useful for ingesting logs where a single logical event spans multiple lines,
* such as Java stack traces, Python tracebacks, or any log format where entries begin with
* a recognizable pattern (e.g., a timestamp).</p>
*
* <p>The codec supports four mutually exclusive pattern modes:</p>
* <ul>
* <li>{@code event_start_pattern}: A new event begins at each matching line.</li>
* <li>{@code event_end_pattern}: An event ends at each matching line (inclusive).</li>
* <li>{@code continuation_line_start_pattern}: Matching lines are continuations of the previous event.</li>
* <li>{@code continuation_line_end_pattern}: Matching lines are prepended to the next event.</li>
* </ul>
*/
@DataPrepperPlugin(name = "multiline", pluginType = InputCodec.class, pluginConfigurationType = MultilineInputCodecConfig.class)
public class MultilineInputCodec implements InputCodec {

private static final Logger LOG = LoggerFactory.getLogger(MultilineInputCodec.class);
private static final String MESSAGE_FIELD_NAME = "message";

private final Pattern pattern;
private final boolean boundaryOnMatch;
private final boolean flushAfter;
private final boolean omitMatchedSection;
private final int maxLines;
private final int maxLength;
private final String lineSeparator;
private final Charset encoding;
private final EventFactory eventFactory;

@DataPrepperPluginConstructor
public MultilineInputCodec(final MultilineInputCodecConfig config, final EventFactory eventFactory) {
Objects.requireNonNull(config, "config must not be null");
this.eventFactory = Objects.requireNonNull(eventFactory, "eventFactory must not be null");

this.pattern = config.getCompiledPattern();
if (this.pattern == null) {
throw new IllegalArgumentException("A valid pattern must be configured");
}

final MultilineMode mode = resolveMode(config);
this.boundaryOnMatch = (mode == MultilineMode.EVENT_START || mode == MultilineMode.EVENT_END);
this.flushAfter = (mode == MultilineMode.EVENT_END || mode == MultilineMode.CONTINUATION_END);
this.omitMatchedSection = config.getOmitMatchedSection();
this.maxLines = config.getMaxLines();
this.maxLength = config.getMaxLength();
this.lineSeparator = config.getLineSeparator();
this.encoding = config.getEncoding();
}

private static MultilineMode resolveMode(final MultilineInputCodecConfig config) {
if (config.getEventStartPattern() != null) {
return MultilineMode.EVENT_START;
} else if (config.getEventEndPattern() != null) {
return MultilineMode.EVENT_END;
} else if (config.getContinuationLineStartPattern() != null) {
return MultilineMode.CONTINUATION_START;
} else {
return MultilineMode.CONTINUATION_END;
}
}

@Override
public void parse(final InputStream inputStream, final Consumer<Record<Event>> eventConsumer) throws IOException {
Objects.requireNonNull(inputStream, "inputStream must not be null");
Objects.requireNonNull(eventConsumer, "eventConsumer must not be null");

final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, encoding));
parseLines(reader, eventConsumer);
}

private void parseLines(final BufferedReader reader, final Consumer<Record<Event>> eventConsumer) throws IOException {
final StringBuilder buffer = new StringBuilder();
int lineCount = 0;
String line;

while ((line = reader.readLine()) != null) {
final boolean matches = pattern.matcher(line).find();
final boolean isBoundary = (boundaryOnMatch == matches);

if ((!flushAfter && isBoundary) || shouldFlush(buffer, lineCount, line)) {
flushIfNonEmpty(buffer, eventConsumer);
lineCount = 0;
}

appendLineToBuffer(buffer, processLine(line, matches));
lineCount++;

if (flushAfter && isBoundary) {
flushIfNonEmpty(buffer, eventConsumer);
lineCount = 0;
}
}

flushIfNonEmpty(buffer, eventConsumer);
}

private String processLine(final String line, final boolean matches) {
if (!omitMatchedSection || !matches) {
return line;
}
final Matcher matcher = pattern.matcher(line);
return matcher.replaceFirst("");
}

private void appendLineToBuffer(final StringBuilder buffer, final String processedLine) {
if (processedLine.isEmpty()) {
return;
}
if (buffer.length() > 0) {
buffer.append(lineSeparator);
}
buffer.append(processedLine);
}

private void flushIfNonEmpty(final StringBuilder buffer, final Consumer<Record<Event>> eventConsumer) {
if (buffer.length() > 0) {
emitEvent(buffer.toString(), eventConsumer);
buffer.setLength(0);
}
}

/**
* Determines if the buffer should be flushed before appending the next line.
* Note: if a single line exceeds max_length on its own, it will still be emitted
* as a complete event without truncation.
*/
private boolean shouldFlush(final StringBuilder buffer, final int lineCount, final String nextLine) {
if (lineCount >= maxLines) {
LOG.debug("Flushing multiline event due to max_lines limit of {}", maxLines);
return true;
}
if (buffer.length() + lineSeparator.length() + nextLine.length() > maxLength) {
LOG.debug("Flushing multiline event due to max_length limit of {}", maxLength);
return true;
}
return false;
}

private void emitEvent(final String message, final Consumer<Record<Event>> eventConsumer) {
final Log event = eventFactory.eventBuilder(LogEventBuilder.class)
.withData(Collections.singletonMap(MESSAGE_FIELD_NAME, message))
.build();
eventConsumer.accept(new Record<>(event));
}
}
Loading
Loading