Skip to content

Commit 8660895

Browse files
martin-grofcikfiliphr
authored andcommitted
Support job_retries_decremented event for CMMN
Fixes #4062
1 parent 6f15d2c commit 8660895

2 files changed

Lines changed: 127 additions & 1 deletion

File tree

modules/flowable-cmmn-engine/src/main/java/org/flowable/cmmn/engine/impl/cmd/JobRetryCmd.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,24 @@
1818
import java.util.Date;
1919
import java.util.GregorianCalendar;
2020

21+
import org.flowable.cmmn.engine.CmmnEngineConfiguration;
2122
import org.flowable.cmmn.engine.impl.util.CommandContextUtil;
23+
import org.flowable.common.engine.api.delegate.event.FlowableEngineEventType;
24+
import org.flowable.common.engine.api.delegate.event.FlowableEventDispatcher;
2225
import org.flowable.common.engine.impl.interceptor.Command;
2326
import org.flowable.common.engine.impl.interceptor.CommandContext;
2427
import org.flowable.common.engine.impl.util.ExceptionUtil;
2528
import org.flowable.job.api.FlowableUnrecoverableJobException;
2629
import org.flowable.job.service.JobService;
2730
import org.flowable.job.service.TimerJobService;
31+
import org.flowable.job.service.event.impl.FlowableJobEventBuilder;
2832
import org.flowable.job.service.impl.persistence.entity.AbstractRuntimeJobEntity;
2933
import org.flowable.job.service.impl.persistence.entity.JobEntity;
3034

3135
/**
3236
* @author Saeid Mirzaei
3337
* @author Joram Barrez
38+
* @author martin.grofcik
3439
*/
3540
public class JobRetryCmd implements Command<Object> {
3641

@@ -52,7 +57,7 @@ public Object execute(CommandContext commandContext) {
5257
return null;
5358
}
5459

55-
AbstractRuntimeJobEntity newJobEntity = null;
60+
AbstractRuntimeJobEntity newJobEntity;
5661
if (job.getRetries() <= 1 || isUnrecoverableException()) {
5762
newJobEntity = jobService.moveJobToDeadLetterJob(job);
5863
} else {
@@ -73,6 +78,16 @@ public Object execute(CommandContext commandContext) {
7378
newJobEntity.setExceptionStacktrace(getExceptionStacktrace());
7479
}
7580

81+
// Dispatch both an update and a retry-decrement event
82+
CmmnEngineConfiguration configuration = CommandContextUtil.getCmmnEngineConfiguration(commandContext);
83+
FlowableEventDispatcher eventDispatcher = configuration.getEventDispatcher();
84+
if (eventDispatcher != null && eventDispatcher.isEnabled()) {
85+
eventDispatcher.dispatchEvent(FlowableJobEventBuilder.createEntityEvent(FlowableEngineEventType.ENTITY_UPDATED, newJobEntity),
86+
configuration.getEngineCfgKey());
87+
eventDispatcher.dispatchEvent(FlowableJobEventBuilder.createEntityEvent(FlowableEngineEventType.JOB_RETRIES_DECREMENTED, newJobEntity),
88+
configuration.getEngineCfgKey());
89+
}
90+
7691
return null;
7792
}
7893

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/* Licensed under the Apache License, Version 2.0 (the "License");
2+
* you may not use this file except in compliance with the License.
3+
* You may obtain a copy of the License at
4+
*
5+
* http://www.apache.org/licenses/LICENSE-2.0
6+
*
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
* limitations under the License.
12+
*/
13+
package org.flowable.cmmn.test.event;
14+
15+
import static org.assertj.core.api.Assertions.assertThat;
16+
import static org.flowable.common.engine.api.delegate.event.FlowableEngineEventType.ENTITY_UPDATED;
17+
import static org.flowable.common.engine.api.delegate.event.FlowableEngineEventType.JOB_RETRIES_DECREMENTED;
18+
19+
import java.util.ArrayList;
20+
import java.util.Collection;
21+
import java.util.List;
22+
import java.util.Set;
23+
24+
import org.flowable.cmmn.api.runtime.CaseInstance;
25+
import org.flowable.cmmn.engine.test.CmmnDeployment;
26+
import org.flowable.cmmn.test.FlowableCmmnTestCase;
27+
import org.flowable.common.engine.api.delegate.event.AbstractFlowableEventListener;
28+
import org.flowable.common.engine.api.delegate.event.FlowableEngineEntityEvent;
29+
import org.flowable.common.engine.api.delegate.event.FlowableEvent;
30+
import org.flowable.common.engine.api.delegate.event.FlowableEventType;
31+
import org.flowable.job.api.Job;
32+
import org.flowable.task.api.Task;
33+
import org.junit.jupiter.api.AfterEach;
34+
import org.junit.jupiter.api.BeforeEach;
35+
import org.junit.jupiter.api.Test;
36+
37+
/**
38+
* @author martin.grofcik
39+
*/
40+
public class JobRetriesDecrementedEventTest extends FlowableCmmnTestCase {
41+
42+
TestEventListener listener = new TestEventListener();
43+
44+
@BeforeEach
45+
void addListener() {
46+
listener.events.clear();
47+
cmmnEngineConfiguration.getEventDispatcher().addEventListener(listener);
48+
}
49+
50+
@AfterEach
51+
void removeListener() {
52+
cmmnEngineConfiguration.getEventDispatcher().removeEventListener(listener);
53+
}
54+
55+
@Test
56+
@CmmnDeployment(resources = "org/flowable/cmmn/test/async/AsyncTaskTest.testAsyncServiceTask.cmmn")
57+
void noEventForSuccess() {
58+
CaseInstance caseInstance = cmmnRuntimeService.createCaseInstanceBuilder().caseDefinitionKey("testAsyncServiceTask").start();
59+
Task task = cmmnTaskService.createTaskQuery()
60+
.caseInstanceId(caseInstance.getId())
61+
.singleResult();
62+
assertThat(task.getName()).isEqualTo("Task before service task");
63+
cmmnTaskService.complete(task.getId());
64+
65+
waitForJobExecutorToProcessAllAsyncJobs();
66+
67+
assertThat(listener.events).isEmpty();
68+
}
69+
70+
@Test
71+
@CmmnDeployment(resources = "org/flowable/cmmn/test/async/AsyncTaskTest.testAsyncServiceTaskWithFailure.cmmn")
72+
void jobRetriesDecrementedOnFailure() {
73+
CaseInstance caseInstance = cmmnRuntimeService.createCaseInstanceBuilder().caseDefinitionKey("testAsyncServiceTask").start();
74+
Task task = cmmnTaskService.createTaskQuery()
75+
.caseInstanceId(caseInstance.getId())
76+
.singleResult();
77+
assertThat(task.getName()).isEqualTo("Task before service task");
78+
cmmnTaskService.complete(task.getId());
79+
80+
waitForJobExecutorToProcessAllAsyncJobs();
81+
82+
assertThat(listener.events)
83+
.filteredOn( e -> caseInstance.getId().equals(getScopeId(e)))
84+
.extracting(FlowableEvent::getType)
85+
.containsOnly(JOB_RETRIES_DECREMENTED, ENTITY_UPDATED);
86+
}
87+
88+
private String getScopeId(FlowableEvent e) {
89+
return ((Job) ((FlowableEngineEntityEvent) e).getEntity()).getScopeId();
90+
}
91+
92+
private static class TestEventListener extends AbstractFlowableEventListener {
93+
private static final Collection<? extends FlowableEventType> SUPPORTED_TYPES = Set.of(JOB_RETRIES_DECREMENTED, ENTITY_UPDATED);
94+
List<FlowableEvent> events = new ArrayList<>();
95+
96+
@Override
97+
public void onEvent(FlowableEvent event) {
98+
events.add(event);
99+
}
100+
101+
@Override
102+
public boolean isFailOnException() {
103+
return false;
104+
}
105+
106+
@Override
107+
public Collection<? extends FlowableEventType> getTypes() {
108+
return SUPPORTED_TYPES;
109+
}
110+
}
111+
}

0 commit comments

Comments
 (0)