Skip to content

Commit 895a8e4

Browse files
Micha Kienerjbarrez
authored andcommitted
adding option to have an inbound event filter in the default event inbound pipeline
1 parent 16e1de6 commit 895a8e4

13 files changed

Lines changed: 178 additions & 4 deletions

File tree

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/* Licensed under the Apache License, Version 2.0 (the "License");
2+
* you may not use this file except in compliance with the License.
3+
* You may obtain a copy of the License at
4+
*
5+
* http://www.apache.org/licenses/LICENSE-2.0
6+
*
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
* limitations under the License.
12+
*/
13+
package org.flowable.eventregistry.api;
14+
15+
/**
16+
* This interface must be implemented by a custom filtering bean to hook into the default inbound event processing
17+
* pipeline in order to have a very effective way to filter out events which should not be processed any further and
18+
* thus preventing expensive processing time like DB lookups and the full consumer invocation.
19+
*
20+
* @param <T> the type of the expected payload (e.g. a JsonNode)
21+
*
22+
* @author Micha Kiener
23+
*/
24+
@FunctionalInterface
25+
public interface InboundEventFilter<T> {
26+
27+
/**
28+
* Returns true, if the event should be further processed or false, if the event can be ignored and will not be processed
29+
* any further and the pipeline will stop afterwards.
30+
*
31+
* @param payload the payload of the event
32+
* @return true, if the event should continue to be processed, false, if the pipeline will ignore the event and stop any
33+
* further processing
34+
*/
35+
boolean filter(T payload);
36+
37+
default boolean filter(FlowableEventInfo<T> event) {
38+
return filter(event.getPayload());
39+
}
40+
}

modules/flowable-event-registry-api/src/main/java/org/flowable/eventregistry/api/model/InboundChannelModelBuilder.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,12 @@ interface InboundEventProcessingPipelineBuilder {
214214
*/
215215
InboundEventKeyXmlDetectorBuilder xmlDeserializer();
216216

217+
/**
218+
* Uses a delegate expression to filter the events before further processing.
219+
* The expression needs to resolve to a bean implementing the {@link org.flowable.eventregistry.api.InboundEventFilter} interface.
220+
*/
221+
InboundEventProcessingPipelineBuilder delegateExpressionEventFilter(String delegateExpression);
222+
217223
/**
218224
* Uses a delegate expression to deserialize the event.
219225
*/

modules/flowable-event-registry-json-converter/src/test/java/org/flowable/eventregistry/converter/channel/JmsInboundChannelJsonConverterTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ protected void validateModel(ChannelModel channelModel) {
5353
assertThat(model.getDestination()).isEqualTo("customer");
5454
assertThat(model.getDeserializerType()).isEqualTo("json");
5555

56+
assertThat(model.getEventFilterDelegateExpression()).isEqualTo("testEventFilterExpression");
57+
5658
ChannelEventKeyDetection eventKeyDetection = model.getChannelEventKeyDetection();
5759
assertThat(eventKeyDetection).isNotNull();
5860
assertThat(eventKeyDetection.getFixedValue()).isNull();

modules/flowable-event-registry-json-converter/src/test/java/org/flowable/eventregistry/converter/channel/KafkaInboundChannelJsonConverterTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ protected void validateModel(ChannelModel channelModel) {
5252

5353
assertThat(model.getDeserializerType()).isEqualTo("json");
5454

55+
assertThat(model.getEventFilterDelegateExpression()).isEqualTo("testEventFilterExpression");
56+
5557
ChannelEventKeyDetection eventKeyDetection = model.getChannelEventKeyDetection();
5658
assertThat(eventKeyDetection).isNotNull();
5759
assertThat(eventKeyDetection.getFixedValue()).isNull();

modules/flowable-event-registry-json-converter/src/test/java/org/flowable/eventregistry/converter/channel/RabbitInboundChannelJsonConverterTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ protected void validateModel(ChannelModel channelModel) {
5252

5353
assertThat(model.getDeserializerType()).isEqualTo("json");
5454

55+
assertThat(model.getEventFilterDelegateExpression()).isEqualTo("testEventFilterExpression");
56+
5557
ChannelEventKeyDetection eventKeyDetection = model.getChannelEventKeyDetection();
5658
assertThat(eventKeyDetection).isNotNull();
5759
assertThat(eventKeyDetection.getFixedValue()).isNull();

modules/flowable-event-registry-json-converter/src/test/resources/org/flowable/eventregistry/converter/channel/simpleJmsInboundChannel.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
"subscription": "testSubscription",
1111
"concurrency": "3",
1212
"deserializerType": "json",
13+
"eventFilterDelegateExpression": "testEventFilterExpression",
1314
"channelEventKeyDetection": {
1415
"jsonField": "eventKey"
1516
}

modules/flowable-event-registry-json-converter/src/test/resources/org/flowable/eventregistry/converter/channel/simpleKafkaInboundChannel.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
}
2222
],
2323

24+
"eventFilterDelegateExpression": "testEventFilterExpression",
25+
2426
"deserializerType": "json",
2527
"channelEventKeyDetection": {
2628
"jsonField": "eventKey"

modules/flowable-event-registry-json-converter/src/test/resources/org/flowable/eventregistry/converter/channel/simpleRabbitInboundChannel.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
"executor": "rabbitTaskExecutor",
1919
"ackMode": "NONE",
2020

21+
"eventFilterDelegateExpression": "testEventFilterExpression",
22+
2123
"deserializerType": "json",
2224
"channelEventKeyDetection": {
2325
"jsonField": "eventKey"

modules/flowable-event-registry-model/src/main/java/org/flowable/eventregistry/model/InboundChannelModel.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ public class InboundChannelModel extends ChannelModel {
2323

2424
protected String contextExtractorDelegateExpression;
2525
protected String deserializerDelegateExpression;
26+
protected String eventFilterDelegateExpression;
2627
protected String payloadExtractorDelegateExpression;
2728
protected String headerExtractorDelegateExpression;
2829
protected String eventTransformerDelegateExpression;
@@ -48,6 +49,14 @@ public void setDeserializerType(String deserializerType) {
4849
this.deserializerType = deserializerType;
4950
}
5051

52+
public String getEventFilterDelegateExpression() {
53+
return eventFilterDelegateExpression;
54+
}
55+
56+
public void setEventFilterDelegateExpression(String eventFilterDelegateExpression) {
57+
this.eventFilterDelegateExpression = eventFilterDelegateExpression;
58+
}
59+
5160
public String getContextExtractorDelegateExpression() {
5261
return contextExtractorDelegateExpression;
5362
}

modules/flowable-event-registry/src/main/java/org/flowable/eventregistry/impl/model/InboundChannelDefinitionBuilderImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,12 @@ public InboundEventKeyXmlDetectorBuilder xmlDeserializer() {
340340
return new InboundEventKeyXmlDetectorBuilderImpl(xmlPipelineBuilder);
341341
}
342342

343+
@Override
344+
public InboundEventProcessingPipelineBuilder delegateExpressionEventFilter(String delegateExpression) {
345+
channelModel.setEventFilterDelegateExpression(delegateExpression);
346+
return this;
347+
}
348+
343349
@Override
344350
public InboundEventKeyDetectorBuilder delegateExpressionDeserializer(String delegateExpression) {
345351
channelModel.setDeserializerType("expression");

0 commit comments

Comments
 (0)