Skip to content

Commit 8704f3f

Browse files
committed
Move deploy-time start event registration onto the StartEvent ActivityBehavior
Each process-level start event now owns its full deploy lifecycle via a new ProcessLevelStartEventActivityBehavior interface (deploy + undeploy), plus typed deploy/undeploy contexts and four built-in behaviors (Message, Signal, Timer, EventRegistry). BpmnDeploymentHelper iterates start events and the EventSubscriptionManager / TimerManager classes are deleted. A new findEventSubscriptionsByTypesAndProcessDefinitionId finder runs a single SQL with EVENT_TYPE_ IN (...), so cleanup is one DB round-trip regardless of how many handler types the previous process definition declared.
1 parent da9faf3 commit 8704f3f

28 files changed

Lines changed: 865 additions & 456 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/* Licensed under the Apache License, Version 2.0 (the "License");
2+
* you may not use this file except in compliance with the License.
3+
* You may obtain a copy of the License at
4+
*
5+
* http://www.apache.org/licenses/LICENSE-2.0
6+
*
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
* limitations under the License.
12+
*/
13+
package org.flowable.engine.impl.bpmn.behavior;
14+
15+
import org.flowable.bpmn.constants.BpmnXMLConstants;
16+
import org.flowable.common.engine.api.scope.ScopeTypes;
17+
import org.flowable.engine.impl.persistence.entity.ProcessDefinitionEntity;
18+
import org.flowable.engine.impl.util.CorrelationUtil;
19+
import org.flowable.engine.impl.util.CountingEntityUtil;
20+
import org.flowable.eventsubscription.api.EventSubscription;
21+
import org.flowable.eventsubscription.api.EventSubscriptionBuilder;
22+
import org.flowable.eventsubscription.service.EventSubscriptionService;
23+
24+
/**
25+
* Process-level event-registry start event behavior. Owns the deploy-time event-registry subscription
26+
* registration for this start event, including the dynamic-correlation re-point logic when the
27+
* process definition is superseded.
28+
*/
29+
public class EventRegistryStartEventActivityBehavior extends FlowNodeActivityBehavior implements ProcessLevelStartEventActivityBehavior {
30+
31+
private static final long serialVersionUID = 1L;
32+
33+
protected String eventDefinitionKey;
34+
protected boolean manualCorrelation;
35+
36+
public EventRegistryStartEventActivityBehavior(String eventDefinitionKey, boolean manualCorrelation) {
37+
this.eventDefinitionKey = eventDefinitionKey;
38+
this.manualCorrelation = manualCorrelation;
39+
}
40+
41+
@Override
42+
public void deploy(ProcessLevelStartEventDeployContext context) {
43+
// dynamic, manual subscription mode: deploy does not create a subscription — those are added
44+
// explicitly by the application at runtime.
45+
if (manualCorrelation) {
46+
return;
47+
}
48+
49+
ProcessDefinitionEntity processDefinition = context.getProcessDefinition();
50+
EventSubscriptionService eventSubscriptionService = context.getEventSubscriptionService();
51+
EventSubscriptionBuilder eventSubscriptionBuilder = eventSubscriptionService.createEventSubscriptionBuilder()
52+
.eventType(eventDefinitionKey)
53+
.activityId(context.getStartEvent().getId())
54+
.processDefinitionId(processDefinition.getId())
55+
.scopeType(ScopeTypes.BPMN)
56+
.configuration(CorrelationUtil.getCorrelationKey(BpmnXMLConstants.ELEMENT_EVENT_CORRELATION_PARAMETER, context.getCommandContext(), context.getStartEvent(), null));
57+
58+
if (processDefinition.getTenantId() != null) {
59+
eventSubscriptionBuilder.tenantId(processDefinition.getTenantId());
60+
}
61+
62+
EventSubscription eventSubscription = eventSubscriptionBuilder.create();
63+
CountingEntityUtil.handleInsertEventSubscriptionEntityCount(eventSubscription);
64+
}
65+
66+
@Override
67+
public void undeploy(ProcessLevelStartEventUndeployContext context) {
68+
if (manualCorrelation) {
69+
// dynamic mode: keep existing subscriptions but re-point them to the new process definition instead of deleting.
70+
ProcessDefinitionEntity previousProcessDefinition = context.getPreviousProcessDefinition();
71+
ProcessDefinitionEntity newProcessDefinition = context.getNewProcessDefinition();
72+
context.getProcessEngineConfiguration().getEventSubscriptionServiceConfiguration().getEventSubscriptionService().updateEventSubscriptionProcessDefinitionId(
73+
previousProcessDefinition.getId(), newProcessDefinition.getId(),
74+
eventDefinitionKey, context.getStartEvent().getId(), newProcessDefinition.getKey(), null);
75+
} else {
76+
context.registerObsoleteEventSubscriptionType(eventDefinitionKey);
77+
}
78+
}
79+
80+
public String getEventDefinitionKey() {
81+
return eventDefinitionKey;
82+
}
83+
84+
public boolean isManualCorrelation() {
85+
return manualCorrelation;
86+
}
87+
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/* Licensed under the Apache License, Version 2.0 (the "License");
2+
* you may not use this file except in compliance with the License.
3+
* You may obtain a copy of the License at
4+
*
5+
* http://www.apache.org/licenses/LICENSE-2.0
6+
*
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
* limitations under the License.
12+
*/
13+
package org.flowable.engine.impl.bpmn.behavior;
14+
15+
import java.util.List;
16+
17+
import org.flowable.bpmn.model.MessageEventDefinition;
18+
import org.flowable.common.engine.api.FlowableException;
19+
import org.flowable.common.engine.impl.interceptor.CommandContext;
20+
import org.flowable.engine.impl.event.EventDefinitionExpressionUtil;
21+
import org.flowable.engine.impl.event.MessageEventHandler;
22+
import org.flowable.engine.impl.persistence.entity.ProcessDefinitionEntity;
23+
import org.flowable.engine.impl.util.CountingEntityUtil;
24+
import org.flowable.eventsubscription.service.EventSubscriptionService;
25+
import org.flowable.eventsubscription.service.impl.persistence.entity.EventSubscriptionEntity;
26+
import org.flowable.eventsubscription.service.impl.persistence.entity.MessageEventSubscriptionEntity;
27+
28+
/**
29+
* Process-level message start event behavior. Owns the deploy-time message subscription registration
30+
* for this start event.
31+
*/
32+
public class MessageStartEventActivityBehavior extends FlowNodeActivityBehavior implements ProcessLevelStartEventActivityBehavior {
33+
34+
private static final long serialVersionUID = 1L;
35+
36+
protected MessageEventDefinition messageEventDefinition;
37+
38+
public MessageStartEventActivityBehavior(MessageEventDefinition messageEventDefinition) {
39+
this.messageEventDefinition = messageEventDefinition;
40+
}
41+
42+
@Override
43+
public void deploy(ProcessLevelStartEventDeployContext context) {
44+
CommandContext commandContext = context.getCommandContext();
45+
EventSubscriptionService eventSubscriptionService = context.getEventSubscriptionService();
46+
ProcessDefinitionEntity processDefinition = context.getProcessDefinition();
47+
48+
String messageName = EventDefinitionExpressionUtil.determineMessageName(commandContext, messageEventDefinition, processDefinition);
49+
List<EventSubscriptionEntity> subscriptionsForSameMessageName = eventSubscriptionService
50+
.findEventSubscriptionsByName(MessageEventHandler.EVENT_HANDLER_TYPE, messageName, processDefinition.getTenantId());
51+
52+
for (EventSubscriptionEntity eventSubscriptionEntity : subscriptionsForSameMessageName) {
53+
// throw exception only if there's already a subscription as start event
54+
if (eventSubscriptionEntity.getProcessInstanceId() == null || eventSubscriptionEntity.getProcessInstanceId().isEmpty()) {
55+
// the event subscription has no instance-id, so it's a message start event
56+
throw new FlowableException("Cannot deploy process definition '" + processDefinition.getResourceName()
57+
+ "': there already is a message event subscription for the message with name '" + messageName + "'. For " + eventSubscriptionEntity);
58+
}
59+
}
60+
61+
MessageEventSubscriptionEntity newSubscription = eventSubscriptionService.createMessageEventSubscription();
62+
newSubscription.setEventName(messageName);
63+
newSubscription.setActivityId(context.getStartEvent().getId());
64+
newSubscription.setConfiguration(processDefinition.getId());
65+
newSubscription.setProcessDefinitionId(processDefinition.getId());
66+
67+
if (processDefinition.getTenantId() != null) {
68+
newSubscription.setTenantId(processDefinition.getTenantId());
69+
}
70+
71+
eventSubscriptionService.insertEventSubscription(newSubscription);
72+
CountingEntityUtil.handleInsertEventSubscriptionEntityCount(newSubscription);
73+
}
74+
75+
@Override
76+
public void undeploy(ProcessLevelStartEventUndeployContext context) {
77+
context.registerObsoleteEventSubscriptionType(MessageEventHandler.EVENT_HANDLER_TYPE);
78+
}
79+
80+
public MessageEventDefinition getMessageEventDefinition() {
81+
return messageEventDefinition;
82+
}
83+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/* Licensed under the Apache License, Version 2.0 (the "License");
2+
* you may not use this file except in compliance with the License.
3+
* You may obtain a copy of the License at
4+
*
5+
* http://www.apache.org/licenses/LICENSE-2.0
6+
*
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
* limitations under the License.
12+
*/
13+
package org.flowable.engine.impl.bpmn.behavior;
14+
15+
/**
16+
* Implemented by an {@code ActivityBehavior} attached to a process-level (i.e. NOT inside an
17+
* {@code EventSubProcess}) {@code StartEvent} that registers a deploy-time artifact — an event
18+
* subscription, a start timer job, etc. — and removes or updates that artifact when the process
19+
* definition is superseded by a new version.
20+
* <p>
21+
* Both {@link #deploy} and {@link #undeploy} must be implemented; a behavior that opts into the
22+
* deploy-time lifecycle owns both halves of it. Built-in start event types that don't register
23+
* anything (e.g. none / error) simply don't implement this interface.
24+
* <p>
25+
* Custom integrations supplying their own {@code EventDefinition} + parse handler + behavior pick
26+
* up deploy-time registration by implementing this interface — no change in
27+
* {@code BpmnDeploymentHelper} is required.
28+
*/
29+
public interface ProcessLevelStartEventActivityBehavior {
30+
31+
/**
32+
* Register the deploy-time artifact (event subscription, timer job, etc.) for this start event
33+
* when its process definition is freshly deployed.
34+
*/
35+
void deploy(ProcessLevelStartEventDeployContext context);
36+
37+
/**
38+
* Remove or update the deploy-time artifact for this start event when its process definition is
39+
* superseded by a new version. Called on the previous (now-superseded) process definition's
40+
* start events.
41+
*/
42+
void undeploy(ProcessLevelStartEventUndeployContext context);
43+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/* Licensed under the Apache License, Version 2.0 (the "License");
2+
* you may not use this file except in compliance with the License.
3+
* You may obtain a copy of the License at
4+
*
5+
* http://www.apache.org/licenses/LICENSE-2.0
6+
*
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
* limitations under the License.
12+
*/
13+
package org.flowable.engine.impl.bpmn.behavior;
14+
15+
import org.flowable.bpmn.model.BpmnModel;
16+
import org.flowable.bpmn.model.Process;
17+
import org.flowable.bpmn.model.StartEvent;
18+
import org.flowable.common.engine.impl.interceptor.CommandContext;
19+
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
20+
import org.flowable.engine.impl.persistence.entity.ProcessDefinitionEntity;
21+
import org.flowable.eventsubscription.service.EventSubscriptionService;
22+
23+
/**
24+
* Context passed to {@link ProcessLevelStartEventActivityBehavior#deploy} carrying the freshly-deployed
25+
* process definition together with its parsed model and the start event in scope.
26+
*/
27+
public class ProcessLevelStartEventDeployContext {
28+
29+
protected final ProcessDefinitionEntity processDefinition;
30+
protected final Process process;
31+
protected final BpmnModel bpmnModel;
32+
protected final StartEvent startEvent;
33+
protected final ProcessEngineConfigurationImpl processEngineConfiguration;
34+
protected final CommandContext commandContext;
35+
36+
public ProcessLevelStartEventDeployContext(ProcessDefinitionEntity processDefinition,
37+
Process process, BpmnModel bpmnModel, StartEvent startEvent,
38+
ProcessEngineConfigurationImpl processEngineConfiguration, CommandContext commandContext) {
39+
this.processDefinition = processDefinition;
40+
this.process = process;
41+
this.bpmnModel = bpmnModel;
42+
this.startEvent = startEvent;
43+
this.processEngineConfiguration = processEngineConfiguration;
44+
this.commandContext = commandContext;
45+
}
46+
47+
public ProcessDefinitionEntity getProcessDefinition() {
48+
return processDefinition;
49+
}
50+
51+
public Process getProcess() {
52+
return process;
53+
}
54+
55+
public BpmnModel getBpmnModel() {
56+
return bpmnModel;
57+
}
58+
59+
public StartEvent getStartEvent() {
60+
return startEvent;
61+
}
62+
63+
public ProcessEngineConfigurationImpl getProcessEngineConfiguration() {
64+
return processEngineConfiguration;
65+
}
66+
67+
public CommandContext getCommandContext() {
68+
return commandContext;
69+
}
70+
71+
public EventSubscriptionService getEventSubscriptionService() {
72+
return processEngineConfiguration.getEventSubscriptionServiceConfiguration().getEventSubscriptionService();
73+
}
74+
75+
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/* Licensed under the Apache License, Version 2.0 (the "License");
2+
* you may not use this file except in compliance with the License.
3+
* You may obtain a copy of the License at
4+
*
5+
* http://www.apache.org/licenses/LICENSE-2.0
6+
*
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
* limitations under the License.
12+
*/
13+
package org.flowable.engine.impl.bpmn.behavior;
14+
15+
import java.util.Set;
16+
17+
import org.flowable.bpmn.model.StartEvent;
18+
import org.flowable.common.engine.impl.interceptor.CommandContext;
19+
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
20+
import org.flowable.engine.impl.persistence.entity.ProcessDefinitionEntity;
21+
22+
/**
23+
* Context passed to {@link ProcessLevelStartEventActivityBehavior#undeploy} when a process definition
24+
* is being superseded by a new version. Carries both the previous (now-superseded) process definition
25+
* and the new one — most behaviors only need the previous process definition, but the EventRegistry "manual" re-point
26+
* branch updates subscriptions to point at {@link #getNewProcessDefinition()}.
27+
* <p>
28+
* Behaviors register their obsolete event subscription / timer job handler types via
29+
* {@link #registerObsoleteEventSubscriptionType(String)} and
30+
* {@link #registerObsoleteTimerJobHandlerType(String)}. The deployer issues one mass-delete per
31+
* unique registered type after the undeploy iteration — fewer DB round-trips than per-start-event
32+
* deletes, and tighter than the historical fixed Message+Signal+EventRegistry sweep that always
33+
* ran regardless of which types the previous process definition actually used.
34+
*/
35+
public class ProcessLevelStartEventUndeployContext {
36+
37+
protected final ProcessDefinitionEntity previousProcessDefinition;
38+
protected final ProcessDefinitionEntity newProcessDefinition;
39+
protected final StartEvent startEvent;
40+
protected final ProcessEngineConfigurationImpl processEngineConfiguration;
41+
protected final CommandContext commandContext;
42+
protected final Set<String> obsoleteEventSubscriptionTypes;
43+
protected final Set<String> obsoleteTimerJobHandlerTypes;
44+
45+
public ProcessLevelStartEventUndeployContext(ProcessDefinitionEntity previousProcessDefinition, ProcessDefinitionEntity newProcessDefinition,
46+
StartEvent startEvent,
47+
ProcessEngineConfigurationImpl processEngineConfiguration, CommandContext commandContext,
48+
Set<String> obsoleteEventSubscriptionTypes, Set<String> obsoleteTimerJobHandlerTypes) {
49+
this.previousProcessDefinition = previousProcessDefinition;
50+
this.newProcessDefinition = newProcessDefinition;
51+
this.startEvent = startEvent;
52+
this.processEngineConfiguration = processEngineConfiguration;
53+
this.commandContext = commandContext;
54+
this.obsoleteEventSubscriptionTypes = obsoleteEventSubscriptionTypes;
55+
this.obsoleteTimerJobHandlerTypes = obsoleteTimerJobHandlerTypes;
56+
}
57+
58+
public ProcessDefinitionEntity getPreviousProcessDefinition() {
59+
return previousProcessDefinition;
60+
}
61+
62+
public ProcessDefinitionEntity getNewProcessDefinition() {
63+
return newProcessDefinition;
64+
}
65+
66+
public StartEvent getStartEvent() {
67+
return startEvent;
68+
}
69+
70+
public ProcessEngineConfigurationImpl getProcessEngineConfiguration() {
71+
return processEngineConfiguration;
72+
}
73+
74+
public CommandContext getCommandContext() {
75+
return commandContext;
76+
}
77+
78+
/**
79+
* Registers an event subscription handler type that the deployer should mass-delete for the
80+
* previous process definition after the undeploy pass. Multiple behaviors registering the same
81+
* type result in a single DB sweep.
82+
*/
83+
public void registerObsoleteEventSubscriptionType(String eventHandlerType) {
84+
obsoleteEventSubscriptionTypes.add(eventHandlerType);
85+
}
86+
87+
/**
88+
* Registers a timer job handler type that the deployer should mass-cancel for the previous
89+
* process definition after the undeploy pass. Multiple behaviors registering the same type
90+
* result in a single DB sweep.
91+
*/
92+
public void registerObsoleteTimerJobHandlerType(String timerJobHandlerType) {
93+
obsoleteTimerJobHandlerTypes.add(timerJobHandlerType);
94+
}
95+
}

0 commit comments

Comments
 (0)