Skip to content

Add multiline input codec for grouping multi-line log events#6911

Merged
graytaylor0 merged 3 commits into
opensearch-project:mainfrom
yavmanis:feature/multiline_codec_support
Jun 18, 2026
Merged

Add multiline input codec for grouping multi-line log events#6911
graytaylor0 merged 3 commits into
opensearch-project:mainfrom
yavmanis:feature/multiline_codec_support

Conversation

@yavmanis

@yavmanis yavmanis commented Jun 8, 2026

Copy link
Copy Markdown
Contributor

Description

This PR adds a new multiline input codec plugin that groups consecutive lines from an input stream into single events based on a configurable regex pattern.

Issues Resolved

Resolves #3284

Problem

Many log formats produce events that span multiple lines — Java stack traces, Python tracebacks, multi-line JSON/XML embedded in syslog, and SQL query logs. The existing newline codec treats each line as a separate event, which breaks these multi-line entries into incomplete fragments that are difficult to search, correlate, and analyze in OpenSearch.

Solution

The multiline codec uses a regex pattern to identify event boundaries and groups continuation lines with their parent event. It provides four mutually exclusive, self-descriptive pattern fields — users directly name the boundary instead of reasoning through boolean inversions.

Configuration

Exactly one of the four pattern fields must be specified:

Option Required Type Default Description
event_start_pattern One of four String (regex) - A new event begins at each matching line
event_end_pattern One of four String (regex) - An event ends at each matching line (inclusive)
continuation_line_start_pattern One of four String (regex) - Matching lines are continuations of previous event
continuation_line_end_pattern One of four String (regex) - Matching lines are prepended to next event
omit_matched_section No Boolean false Strip the matched portion from the output
max_lines No Integer 500 Max lines per event (safety limit). A single line exceeding this is still emitted without truncation
max_length No Integer 10000 Max characters per event (safety limit). A single line exceeding this is still emitted without truncation
line_separator No String \n Separator used when joining lines
encoding No String UTF-8 Character encoding for reading the input stream

Example: Java Stack Traces

pipeline:
  source:
    s3:
      codec:
        multiline:
          event_start_pattern: "^\\d{4}-\\d{2}-\\d{2}"

Input:

2024-01-01 12:00:00 ERROR NullPointerException
  at com.example.Service.method(Service.java:42)
  at com.example.Main.run(Main.java:10)
2024-01-01 12:00:01 INFO Application recovered

Output: 2 events (each event's message field contains the grouped lines joined by \n)

{"message":"2024-01-01 12:00:00 ERROR NullPointerException\n  at com.example.Service.method(Service.java:42)\n  at com.example.Main.run(Main.java:10)"}
{"message":"2024-01-01 12:00:01 INFO Application recovered"}

The first 3 lines are grouped into a single event because only the first line matches the event_start_pattern. The \n in the JSON output is the line_separator joining the grouped lines within the message field.

Use Cases Tested

  • Java/Kotlin stack traces with Caused by chains
  • Python tracebacks
  • Multi-line XML/SOAP payloads in logs
  • Cisco ISE syslog with \ line continuations
  • Delimiter-separated entries (event_end_pattern)
  • Continuation lines prepended to next event (continuation_line_end_pattern)
  • omit_matched_section stripping timestamps from output
  • Safety limits (max_lines, max_length) preventing unbounded memory growth
  • Edge cases: empty input, single line, orphan continuations at start/end of stream, all lines match, no lines match

Testing

  • Unit tests (MultilineInputCodecTest): Covers all 4 modes, edge cases, safety limits, and error conditions
  • Integration tests (MultilineCodecsIT): End-to-end tests with realistic log formats (Java, Python, XML, syslog, delimiters)
  • Config tests (MultilineInputCodecConfigTest): Validation logic for mutually exclusive patterns and regex compilation
  • Manual pipeline testing: Verified with Data Prepper binary using file source and file sink across multiple log formats
  • Code coverage: 100% instruction coverage, 93% branch coverage

Files Changed

File Change
settings.gradle Added include 'data-prepper-plugins:multiline-codecs'
data-prepper-plugins/multiline-codecs/build.gradle New - build configuration
data-prepper-plugins/multiline-codecs/README.md New - documentation
.../multiline/MultilineInputCodec.java New - codec implementation
.../multiline/MultilineInputCodecConfig.java New - configuration class with builder
.../multiline/MultilineMode.java New - internal enum for mode resolution
.../multiline/MultilineInputCodecTest.java New - unit tests
.../multiline/MultilineCodecsIT.java New - integration tests
.../multiline/MultilineInputCodecConfigTest.java New - config validation tests

Check List

  • New functionality includes testing
  • New functionality has been documented in README
  • Commits are signed off with a real name per the DCO
  • All tests pass (./gradlew :data-prepper-plugins:multiline-codecs:build)
  • No changes to existing files beyond settings.gradle

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@github-actions

github-actions Bot commented Jun 8, 2026

Copy link
Copy Markdown

✅ License Header Check Passed

All newly added files have proper license headers. Great work! 🎉

@yavmanis yavmanis force-pushed the feature/multiline_codec_support branch from e680d2a to 0754b3e Compare June 9, 2026 06:31
@yavmanis yavmanis marked this pull request as ready for review June 10, 2026 06:07

@MatthewHird MatthewHird left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General implementation is fine (just some minor changes requested), but I think we should consider changing the user facing configuration fields to be more user friendly (see comments on README file)

Comment on lines +21 to +23
| `match` | Yes | String (regex) | - | A regular expression pattern used to identify line boundaries |
| `negate` | No | Boolean | `false` | When `false`, lines matching the pattern are continuation lines. When `true`, lines NOT matching the pattern are continuation lines |
| `what` | No | String | `previous` | Whether continuation lines belong to the `previous` or `next` event |

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of combining match, negate, and what, can we just have 4 mutually exclusive pattern fields (meaning exactly 1 of the fields must be used) that cover the 4 scenarios? It would be more descriptive and understandable for the user.

event_line_start_pattern: same as negate=true + what=previous
event_line_end_pattern: same as negate=true + what=next
continuation_line_start_pattern: same as negate=false + what=previous
continuation_line_end_pattern: same as negate=false + what=next

Additionally, you could add an optional omit_matched_section (boolean; default to false) field that when true, will omit the pattern matched section from each entry.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strong +1 to moving away from match/negate/what. The negate + what combo is basically a 2x2 truth table users have to reason through, and it's the one part of this model that most current log tooling has moved away from. The trend now is to name the boundary directly instead of using a boolean inversion plus a relative previous/next.

Building on the mutually-exclusive fields idea: I wonder if we actually need four fields, or if two would cover it:

  • event_start_pattern: a new event begins at each matching line
  • event_end_pattern: an event ends at each matching line (inclusive)

The reason two might be enough is that the "continuation" cases are usually just the boundary cases expressed inversely. This PR's own README kind of shows it. The stack trace example is documented both as match: "^\s+(at |Caused by:)" (continuation framing) and as match: "^\d{4}-..." + negate: true (boundary framing), for the same input. Naming the boundary directly tends to be the more intuitive phrasing and drops the booleans entirely.

The only thing you strictly lose with two fields is the case where the continuation set isn't the complement of the boundary set, which is rare and usually expressible with a negative lookahead in the boundary regex anyway. If we think first-class continuation patterns are a must-have, then the four-field version makes sense. Otherwise two keeps it genuinely simpler rather than just better labeled.

omit_matched_section is a nice addition. I'd scope it to the event boundary fields though, since stripping the match from every continuation line gets a bit murky, whereas omitting a single boundary marker per entry is well defined.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree only having event_start_pattern and event_end_pattern simplifies things by the "pattern" args always specifying the event separator markers (never the continuation markers).

My only argument are against it is it just moves the negate from being an arg to being in the regex, which in some ways can be harder understand at a glance.

My other thought is if we can specify continuations and we include omit_matched_section, which I think is useful  for general use cases

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My only argument are against it is it just moves the negate from being an arg to being in the regex, which in some ways can be harder understand at a glance.

Yes, I agree with this difficulty.

Being able to define just the event_X_pattern would be a good experience for most users. We could have the continuation_line_X_pattern as an option for edge cases.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the implementation approach for these config params as suggested

Objects.requireNonNull(inputStream, "inputStream must not be null");
Objects.requireNonNull(eventConsumer, "eventConsumer must not be null");

try (final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should specify an encoding (e.g. new InputStreamReader(inputStream, StandardCharsets.UTF_8)). It should default to utf-8, but we could also allow the user to specify an encoding field as well.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the optional encoding field as suggested

dependencies {
implementation project(':data-prepper-api')
implementation 'com.fasterxml.jackson.core:jackson-annotations'
implementation libs.parquet.common

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this actually used in this change?

Comment thread data-prepper-plugins/multiline-codecs/README.md
Comment on lines +109 to +121
if (!isContinuation && buffer.length() > 0) {
emitEvent(buffer.toString(), eventConsumer);
buffer.setLength(0);
lineCount = 0;
}

if (shouldFlush(buffer, lineCount, line)) {
if (buffer.length() > 0) {
emitEvent(buffer.toString(), eventConsumer);
buffer.setLength(0);
lineCount = 0;
}
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should just be a single check:

            if (!isContinuation || shouldFlush(buffer, lineCount, line)) {
                if (buffer.length() > 0) {
                    emitEvent(buffer.toString(), eventConsumer);
                    buffer.setLength(0);
                    lineCount = 0;
                }
            }

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

Comment on lines +126 to +132
buffer.append(line);
lineCount++;
}

if (buffer.length() > 0) {
emitEvent(buffer.toString(), eventConsumer);
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a single line is longer than the max_length, it will still be emitted as a single event (it won't be truncated).

This is okay, but we should document it in code and in the readme.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documented the same in the README and Code

Comment on lines +85 to +93
private void setField(final Object object, final String fieldName, final Object value) throws Exception {
final Field field = object.getClass().getDeclaredField(fieldName);
try {
field.setAccessible(true);
field.set(object, value);
} finally {
field.setAccessible(false);
}
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using reflection to break into the class to set the fields, just add a constructor or a builder to the class so you can set the fields.

I don't think these tests are even useful since all they are doing is checking the getters work.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

Comment on lines +140 to +145
try {
Pattern.compile(match);
return true;
} catch (final PatternSyntaxException e) {
return false;
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Validation is duplicated, as it is both here on the Config, and the same validation is done in the MultilineInputCodec constructor.

Also you are going through the effort of compiling the pattern here, then throw away the result to compile it again later.

You should compile it and store the Pattern as a (@nonnull) field. This way you don't need validate it again since you know you have a valid pattern already

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. You can expose Pattern getMatch() instead of String getMatch() here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated as suggested

Comment on lines +92 to +94
public Boolean getNegate() {
return negate;
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For booleans in this Config in general, ideally should return an unboxed boolean instead of boxed Boolean. If the value of negate is null, it should return false.

public class MultilineInputCodec implements InputCodec {

private static final Logger LOG = LoggerFactory.getLogger(MultilineInputCodec.class);
static final String MESSAGE_FIELD_NAME = "message";

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be private?

@dlvenable dlvenable left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @yavmanis for this contribution!

import static org.mockito.Mockito.verify;

@ExtendWith(MockitoExtension.class)
public class MultilineCodecsIT {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For integration tests like these, use DataPrepperPluginTest along with real configurations.

Here is an example:

@DataPrepperPluginTest(pluginName = "parse_json", pluginType = Processor.class)
class ParseJsonProcessorIT extends BaseDataPrepperPluginStandardTestSuite {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current PluginInstanceParameterResolver in the plugin-test-framework only supports loading plugins from the processor section (loadPluginModel checks pipelineModel.getProcessors()). It doesn't support InputCodec types loaded from the source codec section. This is consistent with how existing codec ITs (JsonCodecsIT, CsvCodecsIT) are implemented.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yavmanis , It does look that way, but you can put the codec in the processors: section. This will work for codecs as well.

public class MultilineCodecsIT {

@Mock
private MultilineInputCodecConfig config;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You will not need a mocked config with the DataPrepperPluginTest.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PluginInstanceParameterResolver doesn't support InputCodec types. It only supports loading plugins from the processor section hence DataPrepperPluginTest cannot be used here for IT test

* compatible open source license.
*/

plugins {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need this section. It is inherited.

testImplementation project(':data-prepper-test:test-event')
}

test {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need this section. It is inherited.

Comment on lines +140 to +145
try {
Pattern.compile(match);
return true;
} catch (final PatternSyntaxException e) {
return false;
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. You can expose Pattern getMatch() instead of String getMatch() here.

private Boolean negate = false;

@NotNull(message = "what must not be null")
@JsonProperty("what")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm having a hard time parsing this English. Parse "what previous" or "what next"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It directly comes from the config keys we have chosen, per above discussion we need to change this contract.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No longer needed since the configuration has been updated as per the new approach

@yavmanis

Copy link
Copy Markdown
Contributor Author

@dlvenable @bagmarnikhil @MatthewHird
I have addressed the review comments and made the necessary changes about the config as suggested. Can you please take a look?

public String getMatch() {
return match;
@JsonProperty("encoding")
private String encoding = StandardCharsets.UTF_8.name();

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of storing a string, can we do what we did with the pattern?

Add @AssertTrue to validate the encoding string is a valid Charset and store the Charset value as a field (private Charset encodingCharset;).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated as suggested and also added AssertTrue to validate

yavmanis added 2 commits June 16, 2026 10:55
Signed-off-by: Manisha Yadav <yavmanis@amazon.com>
…sign

Signed-off-by: Manisha Yadav <yavmanis@amazon.com>
@yavmanis yavmanis force-pushed the feature/multiline_codec_support branch from 722e199 to eb89a29 Compare June 16, 2026 12:48

@dlvenable dlvenable left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @yavmanis for the improvements! I have a few more comments and responses.

import static org.mockito.Mockito.verify;

@ExtendWith(MockitoExtension.class)
public class MultilineCodecsIT {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yavmanis , It does look that way, but you can put the codec in the processors: section. This will work for codecs as well.

return false;
}
try {
compiledPattern = Pattern.compile(patternString);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should compile this in the getter method rather than add side-effects to the isValidPattern method.

}

@Test
void constructor_throws_if_pattern_is_invalid() {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same test as constructor_throws_if_no_pattern_configured. Update by passing an invalid regex string in for getCompiledPattern.

try (final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, encoding))) {
switch (mode) {
case EVENT_START:
parseEventStartMode(reader, eventConsumer);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is too much duplication here. I got this recommendation from Claude, though I haven't verified it:

private void parse(final BufferedReader reader, final Consumer<Record<Event>> consumer,
                     final boolean boundaryOnMatch, final boolean flushAfter) throws IOException {
      final StringBuilder buffer = new StringBuilder();
      int lineCount = 0;
      String line;
      while ((line = reader.readLine()) != null) {
          final boolean matches = pattern.matcher(line).find();          // drives omit
          final boolean isBoundary = (boundaryOnMatch == matches);       // drives flush

          if ((!flushAfter && isBoundary) || shouldFlush(buffer, lineCount, line)) {
              flushIfNonEmpty(buffer, consumer);
              lineCount = 0;
          }
          if (buffer.length() > 0) buffer.append(lineSeparator);
          buffer.append(processLine(line, matches));
          lineCount++;

          if (flushAfter && isBoundary) {
              flushIfNonEmpty(buffer, consumer);
              lineCount = 0;
          }
      }
      flushIfNonEmpty(buffer, consumer);
  }

Call with:

case EVENT_START:        parse(reader, consumer, true,  false); break;
  case CONTINUATION_START: parse(reader, consumer, false, false); break;
  case EVENT_END:          parse(reader, consumer, true,  true);  break;
  case CONTINUATION_END:   parse(reader, consumer, false, true);  break;

Regardless of the exact solution, we should consolidate this code.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

assertThat(events.get(0).getData().get("message", String.class),
equalTo("line 1\nline 2\n"));
assertThat(events.get(1).getData().get("message", String.class),
equalTo("line 3\n"));

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that we want to retain this newline. It is inconsistent with the other configurations. We should probably not have the trailing newline.

@dlvenable dlvenable left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm removing my "request changes" hold

@yavmanis

Copy link
Copy Markdown
Contributor Author

@dlvenable I have addressed the review comments. Can you please take a look again?

Signed-off-by: Manisha Yadav <yavmanis@amazon.com>
@yavmanis yavmanis force-pushed the feature/multiline_codec_support branch from a343bd4 to 71da7e0 Compare June 17, 2026 10:33

@graytaylor0 graytaylor0 left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! The change looks good to me, please just follow up with the documentation PR.

@@ -0,0 +1,121 @@
# Multiline Codecs

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@graytaylor0 graytaylor0 dismissed dlvenable’s stale review June 18, 2026 17:10

Requested changes were addressed

@graytaylor0 graytaylor0 merged commit 5aa6928 into opensearch-project:main Jun 18, 2026
110 of 115 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support multiline logs like error stack trace, logs with db queries

5 participants