Add multiline input codec for grouping multi-line log events#6911
Conversation
✅ License Header Check PassedAll newly added files have proper license headers. Great work! 🎉 |
e680d2a to
0754b3e
Compare
MatthewHird
left a comment
There was a problem hiding this comment.
General implementation is fine (just some minor changes requested), but I think we should consider changing the user facing configuration fields to be more user friendly (see comments on README file)
| | `match` | Yes | String (regex) | - | A regular expression pattern used to identify line boundaries | | ||
| | `negate` | No | Boolean | `false` | When `false`, lines matching the pattern are continuation lines. When `true`, lines NOT matching the pattern are continuation lines | | ||
| | `what` | No | String | `previous` | Whether continuation lines belong to the `previous` or `next` event | |
There was a problem hiding this comment.
Instead of combining match, negate, and what, can we just have 4 mutually exclusive pattern fields (meaning exactly 1 of the fields must be used) that cover the 4 scenarios? It would be more descriptive and understandable for the user.
event_line_start_pattern: same as negate=true + what=previous
event_line_end_pattern: same as negate=true + what=next
continuation_line_start_pattern: same as negate=false + what=previous
continuation_line_end_pattern: same as negate=false + what=next
Additionally, you could add an optional omit_matched_section (boolean; default to false) field that when true, will omit the pattern matched section from each entry.
There was a problem hiding this comment.
Strong +1 to moving away from match/negate/what. The negate + what combo is basically a 2x2 truth table users have to reason through, and it's the one part of this model that most current log tooling has moved away from. The trend now is to name the boundary directly instead of using a boolean inversion plus a relative previous/next.
Building on the mutually-exclusive fields idea: I wonder if we actually need four fields, or if two would cover it:
event_start_pattern: a new event begins at each matching lineevent_end_pattern: an event ends at each matching line (inclusive)
The reason two might be enough is that the "continuation" cases are usually just the boundary cases expressed inversely. This PR's own README kind of shows it. The stack trace example is documented both as match: "^\s+(at |Caused by:)" (continuation framing) and as match: "^\d{4}-..." + negate: true (boundary framing), for the same input. Naming the boundary directly tends to be the more intuitive phrasing and drops the booleans entirely.
The only thing you strictly lose with two fields is the case where the continuation set isn't the complement of the boundary set, which is rare and usually expressible with a negative lookahead in the boundary regex anyway. If we think first-class continuation patterns are a must-have, then the four-field version makes sense. Otherwise two keeps it genuinely simpler rather than just better labeled.
omit_matched_section is a nice addition. I'd scope it to the event boundary fields though, since stripping the match from every continuation line gets a bit murky, whereas omitting a single boundary marker per entry is well defined.
There was a problem hiding this comment.
I agree only having event_start_pattern and event_end_pattern simplifies things by the "pattern" args always specifying the event separator markers (never the continuation markers).
My only argument are against it is it just moves the negate from being an arg to being in the regex, which in some ways can be harder understand at a glance.
My other thought is if we can specify continuations and we include omit_matched_section, which I think is useful for general use cases
There was a problem hiding this comment.
My only argument are against it is it just moves the negate from being an arg to being in the regex, which in some ways can be harder understand at a glance.
Yes, I agree with this difficulty.
Being able to define just the event_X_pattern would be a good experience for most users. We could have the continuation_line_X_pattern as an option for edge cases.
There was a problem hiding this comment.
Updated the implementation approach for these config params as suggested
| Objects.requireNonNull(inputStream, "inputStream must not be null"); | ||
| Objects.requireNonNull(eventConsumer, "eventConsumer must not be null"); | ||
|
|
||
| try (final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) { |
There was a problem hiding this comment.
We should specify an encoding (e.g. new InputStreamReader(inputStream, StandardCharsets.UTF_8)). It should default to utf-8, but we could also allow the user to specify an encoding field as well.
There was a problem hiding this comment.
Added the optional encoding field as suggested
| dependencies { | ||
| implementation project(':data-prepper-api') | ||
| implementation 'com.fasterxml.jackson.core:jackson-annotations' | ||
| implementation libs.parquet.common |
There was a problem hiding this comment.
Is this actually used in this change?
| if (!isContinuation && buffer.length() > 0) { | ||
| emitEvent(buffer.toString(), eventConsumer); | ||
| buffer.setLength(0); | ||
| lineCount = 0; | ||
| } | ||
|
|
||
| if (shouldFlush(buffer, lineCount, line)) { | ||
| if (buffer.length() > 0) { | ||
| emitEvent(buffer.toString(), eventConsumer); | ||
| buffer.setLength(0); | ||
| lineCount = 0; | ||
| } | ||
| } |
There was a problem hiding this comment.
This should just be a single check:
if (!isContinuation || shouldFlush(buffer, lineCount, line)) {
if (buffer.length() > 0) {
emitEvent(buffer.toString(), eventConsumer);
buffer.setLength(0);
lineCount = 0;
}
}
| buffer.append(line); | ||
| lineCount++; | ||
| } | ||
|
|
||
| if (buffer.length() > 0) { | ||
| emitEvent(buffer.toString(), eventConsumer); | ||
| } |
There was a problem hiding this comment.
If a single line is longer than the max_length, it will still be emitted as a single event (it won't be truncated).
This is okay, but we should document it in code and in the readme.
There was a problem hiding this comment.
Documented the same in the README and Code
| private void setField(final Object object, final String fieldName, final Object value) throws Exception { | ||
| final Field field = object.getClass().getDeclaredField(fieldName); | ||
| try { | ||
| field.setAccessible(true); | ||
| field.set(object, value); | ||
| } finally { | ||
| field.setAccessible(false); | ||
| } | ||
| } |
There was a problem hiding this comment.
Instead of using reflection to break into the class to set the fields, just add a constructor or a builder to the class so you can set the fields.
I don't think these tests are even useful since all they are doing is checking the getters work.
| try { | ||
| Pattern.compile(match); | ||
| return true; | ||
| } catch (final PatternSyntaxException e) { | ||
| return false; | ||
| } |
There was a problem hiding this comment.
Validation is duplicated, as it is both here on the Config, and the same validation is done in the MultilineInputCodec constructor.
Also you are going through the effort of compiling the pattern here, then throw away the result to compile it again later.
You should compile it and store the Pattern as a (@nonnull) field. This way you don't need validate it again since you know you have a valid pattern already
There was a problem hiding this comment.
Agreed. You can expose Pattern getMatch() instead of String getMatch() here.
There was a problem hiding this comment.
Updated as suggested
| public Boolean getNegate() { | ||
| return negate; | ||
| } |
There was a problem hiding this comment.
For booleans in this Config in general, ideally should return an unboxed boolean instead of boxed Boolean. If the value of negate is null, it should return false.
| public class MultilineInputCodec implements InputCodec { | ||
|
|
||
| private static final Logger LOG = LoggerFactory.getLogger(MultilineInputCodec.class); | ||
| static final String MESSAGE_FIELD_NAME = "message"; |
| import static org.mockito.Mockito.verify; | ||
|
|
||
| @ExtendWith(MockitoExtension.class) | ||
| public class MultilineCodecsIT { |
There was a problem hiding this comment.
For integration tests like these, use DataPrepperPluginTest along with real configurations.
Here is an example:
There was a problem hiding this comment.
The current PluginInstanceParameterResolver in the plugin-test-framework only supports loading plugins from the processor section (loadPluginModel checks pipelineModel.getProcessors()). It doesn't support InputCodec types loaded from the source codec section. This is consistent with how existing codec ITs (JsonCodecsIT, CsvCodecsIT) are implemented.
There was a problem hiding this comment.
@yavmanis , It does look that way, but you can put the codec in the processors: section. This will work for codecs as well.
| public class MultilineCodecsIT { | ||
|
|
||
| @Mock | ||
| private MultilineInputCodecConfig config; |
There was a problem hiding this comment.
You will not need a mocked config with the DataPrepperPluginTest.
There was a problem hiding this comment.
PluginInstanceParameterResolver doesn't support InputCodec types. It only supports loading plugins from the processor section hence DataPrepperPluginTest cannot be used here for IT test
| * compatible open source license. | ||
| */ | ||
|
|
||
| plugins { |
There was a problem hiding this comment.
You don't need this section. It is inherited.
| testImplementation project(':data-prepper-test:test-event') | ||
| } | ||
|
|
||
| test { |
There was a problem hiding this comment.
You don't need this section. It is inherited.
| try { | ||
| Pattern.compile(match); | ||
| return true; | ||
| } catch (final PatternSyntaxException e) { | ||
| return false; | ||
| } |
There was a problem hiding this comment.
Agreed. You can expose Pattern getMatch() instead of String getMatch() here.
| private Boolean negate = false; | ||
|
|
||
| @NotNull(message = "what must not be null") | ||
| @JsonProperty("what") |
There was a problem hiding this comment.
I'm having a hard time parsing this English. Parse "what previous" or "what next"
There was a problem hiding this comment.
It directly comes from the config keys we have chosen, per above discussion we need to change this contract.
There was a problem hiding this comment.
No longer needed since the configuration has been updated as per the new approach
|
@dlvenable @bagmarnikhil @MatthewHird |
| public String getMatch() { | ||
| return match; | ||
| @JsonProperty("encoding") | ||
| private String encoding = StandardCharsets.UTF_8.name(); |
There was a problem hiding this comment.
Instead of storing a string, can we do what we did with the pattern?
Add @AssertTrue to validate the encoding string is a valid Charset and store the Charset value as a field (private Charset encodingCharset;).
There was a problem hiding this comment.
Updated as suggested and also added AssertTrue to validate
Signed-off-by: Manisha Yadav <yavmanis@amazon.com>
…sign Signed-off-by: Manisha Yadav <yavmanis@amazon.com>
722e199 to
eb89a29
Compare
| import static org.mockito.Mockito.verify; | ||
|
|
||
| @ExtendWith(MockitoExtension.class) | ||
| public class MultilineCodecsIT { |
There was a problem hiding this comment.
@yavmanis , It does look that way, but you can put the codec in the processors: section. This will work for codecs as well.
| return false; | ||
| } | ||
| try { | ||
| compiledPattern = Pattern.compile(patternString); |
There was a problem hiding this comment.
We should compile this in the getter method rather than add side-effects to the isValidPattern method.
| } | ||
|
|
||
| @Test | ||
| void constructor_throws_if_pattern_is_invalid() { |
There was a problem hiding this comment.
This is the same test as constructor_throws_if_no_pattern_configured. Update by passing an invalid regex string in for getCompiledPattern.
| try (final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, encoding))) { | ||
| switch (mode) { | ||
| case EVENT_START: | ||
| parseEventStartMode(reader, eventConsumer); |
There was a problem hiding this comment.
There is too much duplication here. I got this recommendation from Claude, though I haven't verified it:
private void parse(final BufferedReader reader, final Consumer<Record<Event>> consumer,
final boolean boundaryOnMatch, final boolean flushAfter) throws IOException {
final StringBuilder buffer = new StringBuilder();
int lineCount = 0;
String line;
while ((line = reader.readLine()) != null) {
final boolean matches = pattern.matcher(line).find(); // drives omit
final boolean isBoundary = (boundaryOnMatch == matches); // drives flush
if ((!flushAfter && isBoundary) || shouldFlush(buffer, lineCount, line)) {
flushIfNonEmpty(buffer, consumer);
lineCount = 0;
}
if (buffer.length() > 0) buffer.append(lineSeparator);
buffer.append(processLine(line, matches));
lineCount++;
if (flushAfter && isBoundary) {
flushIfNonEmpty(buffer, consumer);
lineCount = 0;
}
}
flushIfNonEmpty(buffer, consumer);
}
Call with:
case EVENT_START: parse(reader, consumer, true, false); break;
case CONTINUATION_START: parse(reader, consumer, false, false); break;
case EVENT_END: parse(reader, consumer, true, true); break;
case CONTINUATION_END: parse(reader, consumer, false, true); break;
Regardless of the exact solution, we should consolidate this code.
| assertThat(events.get(0).getData().get("message", String.class), | ||
| equalTo("line 1\nline 2\n")); | ||
| assertThat(events.get(1).getData().get("message", String.class), | ||
| equalTo("line 3\n")); |
There was a problem hiding this comment.
I'm not sure that we want to retain this newline. It is inconsistent with the other configurations. We should probably not have the trailing newline.
dlvenable
left a comment
There was a problem hiding this comment.
I'm removing my "request changes" hold
|
@dlvenable I have addressed the review comments. Can you please take a look again? |
Signed-off-by: Manisha Yadav <yavmanis@amazon.com>
a343bd4 to
71da7e0
Compare
graytaylor0
left a comment
There was a problem hiding this comment.
Thanks! The change looks good to me, please just follow up with the documentation PR.
| @@ -0,0 +1,121 @@ | |||
| # Multiline Codecs | |||
There was a problem hiding this comment.
As a follow up please update this documentation as well with a PR (https://github.com/opensearch-project/documentation-website/tree/main/_data-prepper/pipelines/configuration)
5aa6928
into
opensearch-project:main
Description
This PR adds a new
multilineinput codec plugin that groups consecutive lines from an input stream into single events based on a configurable regex pattern.Issues Resolved
Resolves #3284
Problem
Many log formats produce events that span multiple lines — Java stack traces, Python tracebacks, multi-line JSON/XML embedded in syslog, and SQL query logs. The existing
newlinecodec treats each line as a separate event, which breaks these multi-line entries into incomplete fragments that are difficult to search, correlate, and analyze in OpenSearch.Solution
The
multilinecodec uses a regex pattern to identify event boundaries and groups continuation lines with their parent event. It provides four mutually exclusive, self-descriptive pattern fields — users directly name the boundary instead of reasoning through boolean inversions.Configuration
Exactly one of the four pattern fields must be specified:
event_start_patternevent_end_patterncontinuation_line_start_patterncontinuation_line_end_patternomit_matched_sectionfalsemax_lines500max_length10000line_separator\nencodingUTF-8Example: Java Stack Traces
Input:
Output: 2 events (each event's
messagefield contains the grouped lines joined by\n){"message":"2024-01-01 12:00:00 ERROR NullPointerException\n at com.example.Service.method(Service.java:42)\n at com.example.Main.run(Main.java:10)"} {"message":"2024-01-01 12:00:01 INFO Application recovered"}The first 3 lines are grouped into a single event because only the first line matches the
event_start_pattern. The\nin the JSON output is theline_separatorjoining the grouped lines within themessagefield.Use Cases Tested
Caused bychains\line continuationsevent_end_pattern)continuation_line_end_pattern)omit_matched_sectionstripping timestamps from outputmax_lines,max_length) preventing unbounded memory growthTesting
MultilineInputCodecTest): Covers all 4 modes, edge cases, safety limits, and error conditionsMultilineCodecsIT): End-to-end tests with realistic log formats (Java, Python, XML, syslog, delimiters)MultilineInputCodecConfigTest): Validation logic for mutually exclusive patterns and regex compilationfilesource andfilesink across multiple log formatsFiles Changed
settings.gradleinclude 'data-prepper-plugins:multiline-codecs'data-prepper-plugins/multiline-codecs/build.gradledata-prepper-plugins/multiline-codecs/README.md.../multiline/MultilineInputCodec.java.../multiline/MultilineInputCodecConfig.java.../multiline/MultilineMode.java.../multiline/MultilineInputCodecTest.java.../multiline/MultilineCodecsIT.java.../multiline/MultilineInputCodecConfigTest.javaCheck List
./gradlew :data-prepper-plugins:multiline-codecs:build)settings.gradleBy submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.