Skip to content

Commit c842272

Browse files
committed
Rewrite Camel Behavior to be more multi instance resilient
Use a synchronized block to determine the default camelContextObj. Do not use class field for the TargetType
1 parent 6027843 commit c842272

5 files changed

Lines changed: 70 additions & 60 deletions

File tree

modules/flowable-camel/src/main/java/org/flowable/camel/CamelBehavior.java

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -74,28 +74,29 @@ public abstract class CamelBehavior extends AbstractBpmnActivityBehavior impleme
7474
protected CamelContext camelContextObj;
7575
protected List<MapExceptionEntry> mapExceptions;
7676

77-
protected abstract void setPropertTargetVariable(FlowableEndpoint endpoint);
78-
7977
public enum TargetType {
8078
BODY_AS_MAP, BODY, PROPERTIES
8179
}
8280

83-
protected TargetType toTargetType;
81+
protected TargetType determineToTargetType(FlowableEndpoint endpoint) {
82+
if (endpoint.isCopyVariablesToBodyAsMap()) {
83+
return TargetType.BODY_AS_MAP;
84+
}
8485

85-
protected void updateTargetVariables(FlowableEndpoint endpoint) {
86-
toTargetType = null;
87-
if (endpoint.isCopyVariablesToBodyAsMap())
88-
toTargetType = TargetType.BODY_AS_MAP;
89-
else if (endpoint.isCopyCamelBodyToBody())
90-
toTargetType = TargetType.BODY;
91-
else if (endpoint.isCopyVariablesToProperties())
92-
toTargetType = TargetType.PROPERTIES;
86+
if (endpoint.isCopyCamelBodyToBody()) {
87+
return TargetType.BODY;
88+
}
89+
90+
if (endpoint.isCopyVariablesToProperties()) {
91+
return TargetType.PROPERTIES;
92+
}
9393

94-
if (toTargetType == null)
95-
setPropertTargetVariable(endpoint);
94+
return getDefaultToTargetType();
9695
}
9796

98-
protected void copyVariables(Map<String, Object> variables, Exchange exchange, FlowableEndpoint endpoint) {
97+
protected abstract TargetType getDefaultToTargetType();
98+
99+
protected void copyVariables(Map<String, Object> variables, Exchange exchange, FlowableEndpoint endpoint, TargetType toTargetType) {
99100
switch (toTargetType) {
100101
case BODY_AS_MAP:
101102
copyVariablesToBodyAsMap(variables, exchange);
@@ -112,8 +113,6 @@ protected void copyVariables(Map<String, Object> variables, Exchange exchange, F
112113

113114
@Override
114115
public void execute(DelegateExecution execution) {
115-
setAppropriateCamelContext(execution);
116-
117116
boolean isV5Execution = false;
118117
if ((Context.getCommandContext() != null && Flowable5Util.isFlowable5ProcessDefinitionId(Context.getCommandContext(), execution.getProcessDefinitionId())) ||
119118
(Context.getCommandContext() == null && Flowable5Util.getFlowable5CompatibilityHandler() != null)) {
@@ -143,11 +142,14 @@ public void execute(DelegateExecution execution) {
143142

144143
protected FlowableEndpoint createEndpoint(DelegateExecution execution, boolean isV5Execution) {
145144
String uri = "flowable://" + getProcessDefinitionKey(execution, isV5Execution) + ":" + execution.getCurrentActivityId();
146-
return getEndpoint(uri);
145+
CamelContext camelContext = getCamelContext(execution, isV5Execution);
146+
return getEndpoint(camelContext, uri);
147147
}
148148

149-
protected FlowableEndpoint getEndpoint(String key) {
150-
for (Endpoint e : camelContextObj.getEndpoints()) {
149+
protected abstract CamelContext getCamelContext(DelegateExecution execution, boolean isV5Execution);
150+
151+
protected FlowableEndpoint getEndpoint(CamelContext camelContext, String key) {
152+
for (Endpoint e : camelContext.getEndpoints()) {
151153
if (e.getEndpointKey().equals(key) && (e instanceof FlowableEndpoint)) {
152154
return (FlowableEndpoint) e;
153155
}
@@ -160,8 +162,8 @@ protected Exchange createExchange(DelegateExecution activityExecution, FlowableE
160162
ex.setProperty(FlowableProducer.PROCESS_ID_PROPERTY, activityExecution.getProcessInstanceId());
161163
ex.setProperty(FlowableProducer.EXECUTION_ID_PROPERTY, activityExecution.getId());
162164
Map<String, Object> variables = activityExecution.getVariables();
163-
updateTargetVariables(endpoint);
164-
copyVariables(variables, ex, endpoint);
165+
TargetType toTargetType = determineToTargetType(endpoint);
166+
copyVariables(variables, ex, endpoint, toTargetType);
165167
return ex;
166168
}
167169

@@ -232,8 +234,6 @@ protected boolean isASync(DelegateExecution execution) {
232234
return async;
233235
}
234236

235-
protected abstract void setAppropriateCamelContext(DelegateExecution execution);
236-
237237
protected String getStringFromField(Expression expression, DelegateExecution execution) {
238238
if (expression != null) {
239239
Object value = expression.getValue(execution);

modules/flowable-camel/src/main/java/org/flowable/camel/SpringCamelBehavior.java

Lines changed: 41 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import org.apache.camel.spring.SpringCamelContext;
1717
import org.apache.commons.lang3.StringUtils;
1818
import org.flowable.common.engine.api.FlowableException;
19-
import org.flowable.common.engine.impl.context.Context;
2019
import org.flowable.engine.ProcessEngineConfiguration;
2120
import org.flowable.engine.compatibility.Flowable5CompatibilityHandler;
2221
import org.flowable.engine.delegate.DelegateExecution;
@@ -32,43 +31,58 @@ public abstract class SpringCamelBehavior extends CamelBehavior {
3231

3332
private static final long serialVersionUID = 1L;
3433

35-
@Override
36-
protected void setAppropriateCamelContext(DelegateExecution execution) {
34+
protected final Object contextLock = new Object();
35+
36+
@Override
37+
protected CamelContext getCamelContext(DelegateExecution execution, boolean isV5Execution) {
3738
// Get the appropriate String representation of the CamelContext object
3839
// from ActivityExecution (if available).
3940
String camelContextValue = getStringFromField(camelContext, execution);
41+
if (StringUtils.isEmpty(camelContextValue)) {
42+
if (camelContextObj != null) {
43+
// No processing required. No custom CamelContext & the default is already set.
44+
return camelContextObj;
45+
}
4046

41-
// If the String representation of the CamelContext object from ActivityExecution is empty, use the default.
42-
if (StringUtils.isEmpty(camelContextValue) && camelContextObj != null) {
43-
// No processing required. No custom CamelContext & the default is already set.
47+
// Use a lock to resolve the default camel context.
48+
synchronized (contextLock) {
49+
// Check again, in case another thread set the member variable in the meantime.
50+
if (camelContextObj != null) {
51+
return camelContextObj;
52+
}
53+
camelContextObj = resolveCamelContext(camelContextValue, isV5Execution);
54+
}
4455

56+
return camelContextObj;
4557
} else {
46-
// Get the ProcessEngineConfiguration object.
47-
ProcessEngineConfiguration engineConfiguration = org.flowable.engine.impl.context.Context.getProcessEngineConfiguration();
48-
if ((Context.getCommandContext() != null && Flowable5Util.isFlowable5ProcessDefinitionId(Context.getCommandContext(), execution.getProcessDefinitionId())) ||
49-
(Context.getCommandContext() == null && Flowable5Util.getFlowable5CompatibilityHandler() != null)) {
58+
return resolveCamelContext(camelContextValue, isV5Execution);
59+
}
60+
}
5061

51-
Flowable5CompatibilityHandler compatibilityHandler = Flowable5Util.getFlowable5CompatibilityHandler();
52-
camelContextObj = (CamelContext) compatibilityHandler.getCamelContextObject(camelContextValue);
62+
protected CamelContext resolveCamelContext(String camelContextValue, boolean isV5Execution) {
63+
ProcessEngineConfiguration engineConfiguration = org.flowable.engine.impl.context.Context.getProcessEngineConfiguration();
64+
if (isV5Execution) {
5365

54-
} else {
55-
// Convert it to a SpringProcessEngineConfiguration. If this doesn't work, throw a RuntimeException.
56-
try {
57-
SpringProcessEngineConfiguration springConfiguration = (SpringProcessEngineConfiguration) engineConfiguration;
58-
if (StringUtils.isEmpty(camelContextValue) && camelContextObj == null) {
59-
camelContextValue = springConfiguration.getDefaultCamelContext();
60-
}
66+
Flowable5CompatibilityHandler compatibilityHandler = Flowable5Util.getFlowable5CompatibilityHandler();
67+
return (CamelContext) compatibilityHandler.getCamelContextObject(camelContextValue);
6168

62-
// Get the CamelContext object and set the super's member variable.
63-
Object ctx = springConfiguration.getApplicationContext().getBean(camelContextValue);
64-
if (!(ctx instanceof SpringCamelContext)) {
65-
throw new FlowableException("Could not find CamelContext named " + camelContextValue + ".");
66-
}
67-
camelContextObj = (SpringCamelContext) ctx;
69+
} else {
70+
// Convert it to a SpringProcessEngineConfiguration. If this doesn't work, throw a RuntimeException.
71+
try {
72+
SpringProcessEngineConfiguration springConfiguration = (SpringProcessEngineConfiguration) engineConfiguration;
73+
if (StringUtils.isEmpty(camelContextValue)) {
74+
camelContextValue = springConfiguration.getDefaultCamelContext();
75+
}
6876

69-
} catch (Exception e) {
70-
throw new FlowableException("Expecting a SpringProcessEngineConfiguration for the Camel module.", e);
77+
// Get the CamelContext object and set the super's member variable.
78+
Object ctx = springConfiguration.getApplicationContext().getBean(camelContextValue);
79+
if (!(ctx instanceof SpringCamelContext)) {
80+
throw new FlowableException("Could not find CamelContext named " + camelContextValue + ".");
7181
}
82+
return (SpringCamelContext) ctx;
83+
84+
} catch (Exception e) {
85+
throw new FlowableException("Expecting a SpringProcessEngineConfiguration for the Camel module.", e);
7286
}
7387
}
7488
}

modules/flowable-camel/src/main/java/org/flowable/camel/impl/CamelBehaviorBodyAsMapImpl.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313

1414
package org.flowable.camel.impl;
1515

16-
import org.flowable.camel.FlowableEndpoint;
1716
import org.flowable.camel.SpringCamelBehavior;
1817

1918
/**
@@ -26,8 +25,7 @@ public class CamelBehaviorBodyAsMapImpl extends SpringCamelBehavior {
2625
private static final long serialVersionUID = 1L;
2726

2827
@Override
29-
protected void setPropertTargetVariable(FlowableEndpoint endpoint) {
30-
toTargetType = TargetType.BODY_AS_MAP;
28+
protected TargetType getDefaultToTargetType() {
29+
return TargetType.BODY_AS_MAP;
3130
}
32-
3331
}

modules/flowable-camel/src/main/java/org/flowable/camel/impl/CamelBehaviorCamelBodyImpl.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313

1414
package org.flowable.camel.impl;
1515

16-
import org.flowable.camel.FlowableEndpoint;
1716
import org.flowable.camel.SpringCamelBehavior;
1817

1918
/**
@@ -27,7 +26,7 @@ public class CamelBehaviorCamelBodyImpl extends SpringCamelBehavior {
2726
private static final long serialVersionUID = 1L;
2827

2928
@Override
30-
protected void setPropertTargetVariable(FlowableEndpoint endpoint) {
31-
toTargetType = TargetType.BODY;
29+
protected TargetType getDefaultToTargetType() {
30+
return TargetType.BODY;
3231
}
3332
}

modules/flowable-camel/src/main/java/org/flowable/camel/impl/CamelBehaviorDefaultImpl.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313

1414
package org.flowable.camel.impl;
1515

16-
import org.flowable.camel.FlowableEndpoint;
1716
import org.flowable.camel.SpringCamelBehavior;
1817

1918
/**
@@ -26,7 +25,7 @@ public class CamelBehaviorDefaultImpl extends SpringCamelBehavior {
2625
private static final long serialVersionUID = 003L;
2726

2827
@Override
29-
protected void setPropertTargetVariable(FlowableEndpoint endpoint) {
30-
toTargetType = TargetType.PROPERTIES;
28+
protected TargetType getDefaultToTargetType() {
29+
return TargetType.PROPERTIES;
3130
}
3231
}

0 commit comments

Comments
 (0)