Skip to content

Commit 09ea032

Browse files
committed
Add when conditions to commonly used processors
Signed-off-by: Taylor Gray <tylgry@amazon.com>
1 parent 94974df commit 09ea032

33 files changed

Lines changed: 587 additions & 141 deletions

File tree

data-prepper-plugins/grok-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessor.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@
1111
import io.krakens.grok.api.Match;
1212
import io.micrometer.core.instrument.Counter;
1313
import io.micrometer.core.instrument.Timer;
14+
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
1415
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
16+
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
1517
import org.opensearch.dataprepper.model.annotations.SingleThread;
1618
import org.opensearch.dataprepper.model.configuration.PluginSetting;
1719
import org.opensearch.dataprepper.model.event.Event;
@@ -39,6 +41,7 @@
3941
import java.util.LinkedHashMap;
4042
import java.util.List;
4143
import java.util.Map;
44+
import java.util.Objects;
4245
import java.util.Set;
4346
import java.util.concurrent.ExecutionException;
4447
import java.util.concurrent.ExecutorService;
@@ -79,17 +82,21 @@ public class GrokProcessor extends AbstractProcessor<Record<Event>, Record<Event
7982
private final Set<String> keysToOverwrite;
8083
private final ExecutorService executorService;
8184

82-
public GrokProcessor(final PluginSetting pluginSetting) {
83-
this(pluginSetting, GrokCompiler.newInstance(), Executors.newSingleThreadExecutor());
85+
private final ExpressionEvaluator<Boolean> expressionEvaluator;
86+
87+
@DataPrepperPluginConstructor
88+
public GrokProcessor(final PluginSetting pluginSetting, final ExpressionEvaluator<Boolean> expressionEvaluator) {
89+
this(pluginSetting, GrokCompiler.newInstance(), Executors.newSingleThreadExecutor(), expressionEvaluator);
8490
}
8591

86-
GrokProcessor(final PluginSetting pluginSetting, final GrokCompiler grokCompiler, final ExecutorService executorService) {
92+
GrokProcessor(final PluginSetting pluginSetting, final GrokCompiler grokCompiler, final ExecutorService executorService, final ExpressionEvaluator<Boolean> expressionEvaluator) {
8793
super(pluginSetting);
8894
this.grokProcessorConfig = GrokProcessorConfig.buildConfig(pluginSetting);
8995
this.keysToOverwrite = new HashSet<>(grokProcessorConfig.getkeysToOverwrite());
9096
this.grokCompiler = grokCompiler;
9197
this.fieldToGrok = new LinkedHashMap<>();
9298
this.executorService = executorService;
99+
this.expressionEvaluator = expressionEvaluator;
93100

94101
grokProcessingMatchCounter = pluginMetrics.counter(GROK_PROCESSING_MATCH);
95102
grokProcessingMismatchCounter = pluginMetrics.counter(GROK_PROCESSING_MISMATCH);
@@ -114,6 +121,10 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
114121
try {
115122
final Event event = record.getData();
116123

124+
if (Objects.nonNull(grokProcessorConfig.getGrokWhen()) && !expressionEvaluator.evaluate(grokProcessorConfig.getGrokWhen(), event)) {
125+
continue;
126+
}
127+
117128
if (grokProcessorConfig.getTimeoutMillis() == 0) {
118129
grokProcessingTime.record(() -> matchAndMerge(event));
119130
} else {

data-prepper-plugins/grok-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessorConfig.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ public class GrokProcessorConfig {
2121
static final String PATTERNS_FILES_GLOB = "patterns_files_glob";
2222
static final String TIMEOUT_MILLIS = "timeout_millis";
2323
static final String TARGET_KEY = "target_key";
24+
static final String GROK_WHEN = "grok_when";
2425

2526
static final boolean DEFAULT_BREAK_ON_MATCH = true;
2627
static final boolean DEFAULT_KEEP_EMPTY_CAPTURES = false;
@@ -39,6 +40,7 @@ public class GrokProcessorConfig {
3940
private final Map<String, String> patternDefinitions;
4041
private final int timeoutMillis;
4142
private final String targetKey;
43+
private final String grokWhen;
4244

4345
private GrokProcessorConfig(final boolean breakOnMatch,
4446
final boolean keepEmptyCaptures,
@@ -49,7 +51,8 @@ private GrokProcessorConfig(final boolean breakOnMatch,
4951
final String patternsFilesGlob,
5052
final Map<String, String> patternDefinitions,
5153
final int timeoutMillis,
52-
final String targetKey) {
54+
final String targetKey,
55+
final String grokWhen) {
5356

5457
this.breakOnMatch = breakOnMatch;
5558
this.keepEmptyCaptures = keepEmptyCaptures;
@@ -61,6 +64,7 @@ private GrokProcessorConfig(final boolean breakOnMatch,
6164
this.patternDefinitions = patternDefinitions;
6265
this.timeoutMillis = timeoutMillis;
6366
this.targetKey = targetKey;
67+
this.grokWhen = grokWhen;
6468
}
6569

6670
public static GrokProcessorConfig buildConfig(final PluginSetting pluginSetting) {
@@ -73,7 +77,8 @@ public static GrokProcessorConfig buildConfig(final PluginSetting pluginSetting)
7377
pluginSetting.getStringOrDefault(PATTERNS_FILES_GLOB, DEFAULT_PATTERNS_FILES_GLOB),
7478
pluginSetting.getTypedMap(PATTERN_DEFINITIONS, String.class, String.class),
7579
pluginSetting.getIntegerOrDefault(TIMEOUT_MILLIS, DEFAULT_TIMEOUT_MILLIS),
76-
pluginSetting.getStringOrDefault(TARGET_KEY, DEFAULT_TARGET_KEY));
80+
pluginSetting.getStringOrDefault(TARGET_KEY, DEFAULT_TARGET_KEY),
81+
pluginSetting.getStringOrDefault(GROK_WHEN, null));
7782
}
7883

7984
public boolean isBreakOnMatch() {
@@ -115,4 +120,6 @@ public int getTimeoutMillis() {
115120
public String getTargetKey() {
116121
return targetKey;
117122
}
123+
124+
public String getGrokWhen() { return grokWhen; }
118125
}

data-prepper-plugins/grok-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessorConfigTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ public void testDefault() {
7373
assertThat(grokProcessorConfig.getTargetKey(), equalTo(DEFAULT_TARGET_KEY));
7474
assertThat(grokProcessorConfig.isNamedCapturesOnly(), equalTo(DEFAULT_NAMED_CAPTURES_ONLY));
7575
assertThat(grokProcessorConfig.getTimeoutMillis(), equalTo(DEFAULT_TIMEOUT_MILLIS));
76+
assertThat(grokProcessorConfig.getGrokWhen(), equalTo(null));
7677
}
7778

7879
@Test

data-prepper-plugins/grok-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessorIT.java

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
import org.junit.jupiter.params.ParameterizedTest;
1515
import org.junit.jupiter.params.provider.Arguments;
1616
import org.junit.jupiter.params.provider.MethodSource;
17+
import org.mockito.Mock;
18+
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
1719
import org.opensearch.dataprepper.model.configuration.PluginSetting;
1820
import org.opensearch.dataprepper.model.event.Event;
1921
import org.opensearch.dataprepper.model.record.Record;
@@ -40,6 +42,9 @@ public class GrokProcessorIT {
4042
private final String PLUGIN_NAME = "grok";
4143
private String messageInput;
4244

45+
@Mock
46+
private ExpressionEvaluator<Boolean> expressionEvaluator;
47+
4348
@BeforeEach
4449
public void setup() {
4550

@@ -53,7 +58,8 @@ public void setup() {
5358
GrokProcessorConfig.DEFAULT_PATTERNS_FILES_GLOB,
5459
Collections.emptyMap(),
5560
GrokProcessorConfig.DEFAULT_TIMEOUT_MILLIS,
56-
GrokProcessorConfig.DEFAULT_TARGET_KEY);
61+
GrokProcessorConfig.DEFAULT_TARGET_KEY,
62+
null);
5763

5864
pluginSetting.setPipelineName("grokPipeline");
5965

@@ -77,7 +83,8 @@ private PluginSetting completePluginSettingForGrokProcessor(final boolean breakO
7783
final String patternsFilesGlob,
7884
final Map<String, String> patternDefinitions,
7985
final int timeoutMillis,
80-
final String targetKey) {
86+
final String targetKey,
87+
final String grokWhen) {
8188
final Map<String, Object> settings = new HashMap<>();
8289
settings.put(GrokProcessorConfig.BREAK_ON_MATCH, breakOnMatch);
8390
settings.put(GrokProcessorConfig.NAMED_CAPTURES_ONLY, namedCapturesOnly);
@@ -89,6 +96,7 @@ private PluginSetting completePluginSettingForGrokProcessor(final boolean breakO
8996
settings.put(GrokProcessorConfig.PATTERNS_FILES_GLOB, patternsFilesGlob);
9097
settings.put(GrokProcessorConfig.TIMEOUT_MILLIS, timeoutMillis);
9198
settings.put(GrokProcessorConfig.TARGET_KEY, targetKey);
99+
settings.put(GrokProcessorConfig.GROK_WHEN, grokWhen);
92100

93101
return new PluginSetting(PLUGIN_NAME, settings);
94102
}
@@ -101,7 +109,7 @@ public void testMatchNoCapturesWithExistingAndNonExistingKey() throws JsonProces
101109
matchConfig.put("bad_key", Collections.singletonList(nonMatchingPattern));
102110

103111
pluginSetting.getSettings().put(GrokProcessorConfig.MATCH, matchConfig);
104-
grokProcessor = new GrokProcessor(pluginSetting);
112+
grokProcessor = new GrokProcessor(pluginSetting, expressionEvaluator);
105113

106114
final Map<String, Object> testData = new HashMap();
107115
testData.put("message", messageInput);
@@ -121,7 +129,7 @@ public void testSingleMatchSinglePatternWithDefaults() throws JsonProcessingExce
121129
matchConfig.put("message", Collections.singletonList("%{COMMONAPACHELOG}"));
122130

123131
pluginSetting.getSettings().put(GrokProcessorConfig.MATCH, matchConfig);
124-
grokProcessor = new GrokProcessor(pluginSetting);
132+
grokProcessor = new GrokProcessor(pluginSetting, expressionEvaluator);
125133

126134
final Map<String, Object> testData = new HashMap();
127135
testData.put("message", messageInput);
@@ -159,7 +167,7 @@ public void testSingleMatchMultiplePatternWithBreakOnMatchFalse() throws JsonPro
159167

160168
pluginSetting.getSettings().put(GrokProcessorConfig.MATCH, matchConfig);
161169
pluginSetting.getSettings().put(GrokProcessorConfig.BREAK_ON_MATCH, false);
162-
grokProcessor = new GrokProcessor(pluginSetting);
170+
grokProcessor = new GrokProcessor(pluginSetting, expressionEvaluator);
163171

164172
final Map<String, Object> testData = new HashMap();
165173
testData.put("message", messageInput);
@@ -194,7 +202,7 @@ public void testSingleMatchTypeConversionWithDefaults() throws JsonProcessingExc
194202
matchConfig.put("message", Collections.singletonList("\"(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})\" %{NUMBER:response:int} (?:%{NUMBER:bytes:float}|-)"));
195203

196204
pluginSetting.getSettings().put(GrokProcessorConfig.MATCH, matchConfig);
197-
grokProcessor = new GrokProcessor(pluginSetting);
205+
grokProcessor = new GrokProcessor(pluginSetting, expressionEvaluator);
198206

199207
final Map<String, Object> testData = new HashMap();
200208
testData.put("message", messageInput);
@@ -226,7 +234,7 @@ public void testMultipleMatchWithBreakOnMatchFalse() throws JsonProcessingExcept
226234

227235
pluginSetting.getSettings().put(GrokProcessorConfig.MATCH, matchConfig);
228236
pluginSetting.getSettings().put(GrokProcessorConfig.BREAK_ON_MATCH, false);
229-
grokProcessor = new GrokProcessor(pluginSetting);
237+
grokProcessor = new GrokProcessor(pluginSetting, expressionEvaluator);
230238

231239
final Map<String, Object> testData = new HashMap();
232240
testData.put("message", messageInput);
@@ -264,7 +272,7 @@ public void testMatchWithKeepEmptyCapturesTrue() throws JsonProcessingException
264272

265273
pluginSetting.getSettings().put(GrokProcessorConfig.MATCH, matchConfig);
266274
pluginSetting.getSettings().put(GrokProcessorConfig.KEEP_EMPTY_CAPTURES, true);
267-
grokProcessor = new GrokProcessor(pluginSetting);
275+
grokProcessor = new GrokProcessor(pluginSetting, expressionEvaluator);
268276

269277
final Map<String, Object> testData = new HashMap();
270278
testData.put("message", messageInput);
@@ -300,7 +308,7 @@ public void testMatchWithNamedCapturesOnlyFalse() throws JsonProcessingException
300308

301309
pluginSetting.getSettings().put(GrokProcessorConfig.MATCH, matchConfig);
302310
pluginSetting.getSettings().put(GrokProcessorConfig.NAMED_CAPTURES_ONLY, false);
303-
grokProcessor = new GrokProcessor(pluginSetting);
311+
grokProcessor = new GrokProcessor(pluginSetting, expressionEvaluator);
304312

305313
final Map<String, Object> testData = new HashMap();
306314
testData.put("message", "This is my greedy data before matching 192.0.2.1 123456");
@@ -332,7 +340,7 @@ public void testPatternDefinitions() throws JsonProcessingException {
332340

333341
pluginSetting.getSettings().put(GrokProcessorConfig.MATCH, matchConfig);
334342
pluginSetting.getSettings().put(GrokProcessorConfig.PATTERN_DEFINITIONS, patternDefinitions);
335-
grokProcessor = new GrokProcessor(pluginSetting);
343+
grokProcessor = new GrokProcessor(pluginSetting, expressionEvaluator);
336344

337345
final Map<String, Object> testData = new HashMap();
338346
testData.put("message", "This is my greedy data before matching with my phone number 123-456-789");
@@ -375,7 +383,7 @@ public void testPatternsDirWithDefaultPatternsFilesGlob() throws JsonProcessingE
375383

376384
pluginSetting.getSettings().put(GrokProcessorConfig.MATCH, matchConfig);
377385
pluginSetting.getSettings().put(GrokProcessorConfig.PATTERNS_DIRECTORIES, patternsDirectories);
378-
grokProcessor = new GrokProcessor(pluginSetting);
386+
grokProcessor = new GrokProcessor(pluginSetting, expressionEvaluator);
379387

380388
final Record<Event> resultRecord = buildRecordWithEvent(resultData);
381389

@@ -408,7 +416,7 @@ public void testPatternsDirWithCustomPatternsFilesGlob() throws JsonProcessingEx
408416
pluginSetting.getSettings().put(GrokProcessorConfig.MATCH, matchConfig);
409417
pluginSetting.getSettings().put(GrokProcessorConfig.PATTERNS_DIRECTORIES, patternsDirectories);
410418
pluginSetting.getSettings().put(GrokProcessorConfig.PATTERNS_FILES_GLOB, "*1.txt");
411-
grokProcessor = new GrokProcessor(pluginSetting);
419+
grokProcessor = new GrokProcessor(pluginSetting, expressionEvaluator);
412420

413421
final Record<Event> resultRecord = buildRecordWithEvent(resultData);
414422

@@ -423,7 +431,7 @@ public void testPatternsDirWithCustomPatternsFilesGlob() throws JsonProcessingEx
423431

424432
pluginSetting.getSettings().put(GrokProcessorConfig.MATCH, matchConfigWithPatterns2Pattern);
425433

426-
Throwable throwable = assertThrows(IllegalArgumentException.class, () -> new GrokProcessor((pluginSetting)));
434+
Throwable throwable = assertThrows(IllegalArgumentException.class, () -> new GrokProcessor(pluginSetting, expressionEvaluator));
427435
assertThat("No definition for key 'CUSTOMBIRTHDAYPATTERN' found, aborting", equalTo(throwable.getMessage()));
428436
}
429437

@@ -433,7 +441,7 @@ public void testMatchWithNamedCapturesSyntax() throws JsonProcessingException {
433441
matchConfig.put("message", Collections.singletonList("%{GREEDYDATA:greedy_data} (?<mynumber>\\d\\d\\d-\\d\\d\\d-\\d\\d\\d)"));
434442

435443
pluginSetting.getSettings().put(GrokProcessorConfig.MATCH, matchConfig);
436-
grokProcessor = new GrokProcessor(pluginSetting);
444+
grokProcessor = new GrokProcessor(pluginSetting, expressionEvaluator);
437445

438446
final Map<String, Object> testData = new HashMap();
439447
testData.put("message", "This is my greedy data before matching with my phone number 123-456-789");
@@ -457,14 +465,14 @@ public void testMatchWithNamedCapturesSyntax() throws JsonProcessingException {
457465
@Test
458466
public void testCompileNonRegisteredPatternThrowsIllegalArgumentException() {
459467

460-
grokProcessor = new GrokProcessor(pluginSetting);
468+
grokProcessor = new GrokProcessor(pluginSetting, expressionEvaluator);
461469

462470
final Map<String, List<String>> matchConfig = new HashMap<>();
463471
matchConfig.put("message", Collections.singletonList("%{NONEXISTENTPATTERN}"));
464472

465473
pluginSetting.getSettings().put(GrokProcessorConfig.MATCH, matchConfig);
466474

467-
assertThrows(IllegalArgumentException.class, () -> new GrokProcessor(pluginSetting));
475+
assertThrows(IllegalArgumentException.class, () -> new GrokProcessor(pluginSetting, expressionEvaluator));
468476
}
469477

470478
@ParameterizedTest
@@ -474,7 +482,7 @@ void testDataPrepperBuiltInGrokPatterns(final String matchPattern, final String
474482
matchConfig.put("message", Collections.singletonList(matchPattern));
475483

476484
pluginSetting.getSettings().put(GrokProcessorConfig.MATCH, matchConfig);
477-
grokProcessor = new GrokProcessor(pluginSetting);
485+
grokProcessor = new GrokProcessor(pluginSetting, expressionEvaluator);
478486

479487
final Map<String, Object> testData = new HashMap();
480488
testData.put("message", logInput);

0 commit comments

Comments
 (0)