Skip to content

Commit a343bd4

Browse files
committed
Address review comments on new design implemenatation
Signed-off-by: Manisha Yadav <yavmanis@amazon.com>
1 parent eb89a29 commit a343bd4

10 files changed

Lines changed: 137 additions & 368 deletions

File tree

data-prepper-plugins/multiline-codecs/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,5 @@ dependencies {
1212
implementation 'com.fasterxml.jackson.core:jackson-annotations'
1313
testImplementation project(':data-prepper-plugins:common')
1414
testImplementation project(':data-prepper-test:test-event')
15+
testImplementation project(':data-prepper-test:plugin-test-framework')
1516
}

data-prepper-plugins/multiline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/multiline/MultilineInputCodec.java

Lines changed: 30 additions & 158 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ public class MultilineInputCodec implements InputCodec {
5454
private static final String MESSAGE_FIELD_NAME = "message";
5555

5656
private final Pattern pattern;
57-
private final MultilineMode mode;
57+
private final boolean boundaryOnMatch;
58+
private final boolean flushAfter;
5859
private final boolean omitMatchedSection;
5960
private final int maxLines;
6061
private final int maxLength;
@@ -72,7 +73,9 @@ public MultilineInputCodec(final MultilineInputCodecConfig config, final EventFa
7273
throw new IllegalArgumentException("A valid pattern must be configured");
7374
}
7475

75-
this.mode = resolveMode(config);
76+
final MultilineMode mode = resolveMode(config);
77+
this.boundaryOnMatch = (mode == MultilineMode.EVENT_START || mode == MultilineMode.EVENT_END);
78+
this.flushAfter = (mode == MultilineMode.EVENT_END || mode == MultilineMode.CONTINUATION_END);
7679
this.omitMatchedSection = config.getOmitMatchedSection();
7780
this.maxLines = config.getMaxLines();
7881
this.maxLength = config.getMaxLength();
@@ -97,190 +100,59 @@ public void parse(final InputStream inputStream, final Consumer<Record<Event>> e
97100
Objects.requireNonNull(inputStream, "inputStream must not be null");
98101
Objects.requireNonNull(eventConsumer, "eventConsumer must not be null");
99102

100-
try (final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, encoding))) {
101-
switch (mode) {
102-
case EVENT_START:
103-
parseEventStartMode(reader, eventConsumer);
104-
break;
105-
case EVENT_END:
106-
parseEventEndMode(reader, eventConsumer);
107-
break;
108-
case CONTINUATION_START:
109-
parseContinuationStartMode(reader, eventConsumer);
110-
break;
111-
case CONTINUATION_END:
112-
parseContinuationEndMode(reader, eventConsumer);
113-
break;
114-
default:
115-
throw new IllegalStateException("Unknown multiline mode: " + mode);
116-
}
117-
}
103+
final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, encoding));
104+
parseLines(reader, eventConsumer);
118105
}
119106

120-
/**
121-
* EVENT_START mode: A new event begins at each line matching the pattern.
122-
* Non-matching lines are continuations of the preceding event.
123-
*/
124-
private void parseEventStartMode(final BufferedReader reader, final Consumer<Record<Event>> eventConsumer) throws IOException {
107+
private void parseLines(final BufferedReader reader, final Consumer<Record<Event>> eventConsumer) throws IOException {
125108
final StringBuilder buffer = new StringBuilder();
126109
int lineCount = 0;
127110
String line;
128111

129112
while ((line = reader.readLine()) != null) {
130113
final boolean matches = pattern.matcher(line).find();
114+
final boolean isBoundary = (boundaryOnMatch == matches);
131115

132-
if (matches || shouldFlush(buffer, lineCount, line)) {
133-
if (buffer.length() > 0) {
134-
emitEvent(buffer.toString(), eventConsumer);
135-
buffer.setLength(0);
136-
lineCount = 0;
137-
}
138-
}
139-
140-
if (buffer.length() > 0) {
141-
buffer.append(lineSeparator);
142-
}
143-
buffer.append(processLine(line, matches));
144-
lineCount++;
145-
}
146-
147-
if (buffer.length() > 0) {
148-
emitEvent(buffer.toString(), eventConsumer);
149-
}
150-
}
151-
152-
/**
153-
* EVENT_END mode: An event ends at each line matching the pattern (inclusive).
154-
* The matching line is included in the current event, then a new event begins.
155-
*/
156-
private void parseEventEndMode(final BufferedReader reader, final Consumer<Record<Event>> eventConsumer) throws IOException {
157-
final StringBuilder buffer = new StringBuilder();
158-
int lineCount = 0;
159-
String line;
160-
161-
while ((line = reader.readLine()) != null) {
162-
final boolean matches = pattern.matcher(line).find();
163-
164-
if (shouldFlush(buffer, lineCount, line)) {
165-
if (buffer.length() > 0) {
166-
emitEvent(buffer.toString(), eventConsumer);
167-
buffer.setLength(0);
168-
lineCount = 0;
169-
}
116+
if ((!flushAfter && isBoundary) || shouldFlush(buffer, lineCount, line)) {
117+
flushIfNonEmpty(buffer, eventConsumer);
118+
lineCount = 0;
170119
}
171120

172-
if (buffer.length() > 0) {
173-
buffer.append(lineSeparator);
174-
}
175-
buffer.append(processLine(line, matches));
121+
appendLineToBuffer(buffer, processLine(line, matches));
176122
lineCount++;
177123

178-
if (matches) {
179-
emitEvent(buffer.toString(), eventConsumer);
180-
buffer.setLength(0);
124+
if (flushAfter && isBoundary) {
125+
flushIfNonEmpty(buffer, eventConsumer);
181126
lineCount = 0;
182127
}
183128
}
184129

185-
if (buffer.length() > 0) {
186-
emitEvent(buffer.toString(), eventConsumer);
187-
}
130+
flushIfNonEmpty(buffer, eventConsumer);
188131
}
189132

190-
/**
191-
* CONTINUATION_START mode: Lines matching the pattern are continuations of the previous event.
192-
* Non-matching lines start new events.
193-
*/
194-
private void parseContinuationStartMode(final BufferedReader reader, final Consumer<Record<Event>> eventConsumer) throws IOException {
195-
final StringBuilder buffer = new StringBuilder();
196-
int lineCount = 0;
197-
String line;
198-
199-
while ((line = reader.readLine()) != null) {
200-
final boolean matches = pattern.matcher(line).find();
201-
202-
if (!matches || shouldFlush(buffer, lineCount, line)) {
203-
if (buffer.length() > 0) {
204-
emitEvent(buffer.toString(), eventConsumer);
205-
buffer.setLength(0);
206-
lineCount = 0;
207-
}
208-
}
209-
210-
if (buffer.length() > 0) {
211-
buffer.append(lineSeparator);
212-
}
213-
buffer.append(processLine(line, matches));
214-
lineCount++;
215-
}
216-
217-
if (buffer.length() > 0) {
218-
emitEvent(buffer.toString(), eventConsumer);
133+
private String processLine(final String line, final boolean matches) {
134+
if (!omitMatchedSection || !matches) {
135+
return line;
219136
}
137+
final Matcher matcher = pattern.matcher(line);
138+
return matcher.replaceFirst("");
220139
}
221140

222-
/**
223-
* CONTINUATION_END mode: Lines matching the pattern are prepended to the next event.
224-
* Non-matching lines complete the current event.
225-
*/
226-
private void parseContinuationEndMode(final BufferedReader reader, final Consumer<Record<Event>> eventConsumer) throws IOException {
227-
final StringBuilder buffer = new StringBuilder();
228-
int lineCount = 0;
229-
boolean bufferHasNonContinuation = false;
230-
String line;
231-
232-
while ((line = reader.readLine()) != null) {
233-
final boolean matches = pattern.matcher(line).find();
234-
235-
if (!matches) {
236-
if (bufferHasNonContinuation) {
237-
emitEvent(buffer.toString(), eventConsumer);
238-
buffer.setLength(0);
239-
lineCount = 0;
240-
bufferHasNonContinuation = false;
241-
}
242-
if (buffer.length() > 0) {
243-
buffer.append(lineSeparator);
244-
}
245-
buffer.append(processLine(line, false));
246-
lineCount++;
247-
bufferHasNonContinuation = true;
248-
continue;
249-
}
250-
251-
if (bufferHasNonContinuation) {
252-
emitEvent(buffer.toString(), eventConsumer);
253-
buffer.setLength(0);
254-
lineCount = 0;
255-
bufferHasNonContinuation = false;
256-
}
257-
258-
if (shouldFlush(buffer, lineCount, line)) {
259-
if (buffer.length() > 0) {
260-
emitEvent(buffer.toString(), eventConsumer);
261-
buffer.setLength(0);
262-
lineCount = 0;
263-
}
264-
}
265-
266-
if (buffer.length() > 0) {
267-
buffer.append(lineSeparator);
268-
}
269-
buffer.append(processLine(line, matches));
270-
lineCount++;
141+
private void appendLineToBuffer(final StringBuilder buffer, final String processedLine) {
142+
if (processedLine.isEmpty()) {
143+
return;
271144
}
272-
273145
if (buffer.length() > 0) {
274-
emitEvent(buffer.toString(), eventConsumer);
146+
buffer.append(lineSeparator);
275147
}
148+
buffer.append(processedLine);
276149
}
277150

278-
private String processLine(final String line, final boolean matches) {
279-
if (!omitMatchedSection || !matches) {
280-
return line;
151+
private void flushIfNonEmpty(final StringBuilder buffer, final Consumer<Record<Event>> eventConsumer) {
152+
if (buffer.length() > 0) {
153+
emitEvent(buffer.toString(), eventConsumer);
154+
buffer.setLength(0);
281155
}
282-
final Matcher matcher = pattern.matcher(line);
283-
return matcher.replaceFirst("");
284156
}
285157

286158
/**

data-prepper-plugins/multiline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/multiline/MultilineInputCodecConfig.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@ public class MultilineInputCodecConfig {
7777
private String encoding = StandardCharsets.UTF_8.name();
7878

7979
private Pattern compiledPattern;
80-
private Charset encodingCharset;
8180

8281
public String getEventStartPattern() {
8382
return eventStartPattern;
@@ -112,22 +111,23 @@ public String getLineSeparator() {
112111
}
113112

114113
/**
115-
* Returns the validated Charset. The encoding is validated once during
116-
* bean validation and stored to avoid repeated parsing.
114+
* Returns the validated Charset, compiled on first access.
117115
*
118-
* @return The validated Charset.
116+
* @return The Charset.
119117
*/
120118
public Charset getEncoding() {
121-
return encodingCharset;
119+
return Charset.forName(encoding);
122120
}
123121

124122
/**
125-
* Returns the compiled regex pattern. The pattern is compiled once during validation
126-
* and reused to avoid duplicate compilation.
123+
* Returns the compiled regex pattern, compiled on first access.
127124
*
128125
* @return The compiled Pattern.
129126
*/
130127
public Pattern getCompiledPattern() {
128+
if (compiledPattern == null) {
129+
compiledPattern = Pattern.compile(getConfiguredPatternString());
130+
}
131131
return compiledPattern;
132132
}
133133

@@ -149,7 +149,7 @@ boolean isValidPattern() {
149149
return false;
150150
}
151151
try {
152-
compiledPattern = Pattern.compile(patternString);
152+
Pattern.compile(patternString);
153153
return true;
154154
} catch (final PatternSyntaxException e) {
155155
return false;
@@ -162,7 +162,7 @@ boolean isValidEncoding() {
162162
return false;
163163
}
164164
try {
165-
encodingCharset = Charset.forName(encoding);
165+
Charset.forName(encoding);
166166
return true;
167167
} catch (final IllegalCharsetNameException | UnsupportedCharsetException e) {
168168
return false;

0 commit comments

Comments
 (0)