Skip to content

Commit ffdd442

Browse files
yan-3005gitar-bot
authored andcommitted
Fix Flowable workflow termination message lookup and async-job race on deletion (open-metadata#27117)
* Fix Flowable workflow termination message lookup and async-job race on deletion WorkflowHandler.findTerminationMessageName was splitting task definition keys on underscore and looking for messages named '<key>_terminateProcess'. But Workflow.getFlowableElementId joins with a dot, so actual task keys are '<subProcess>.approvalTask' and termination messages are '<subProcess>.terminateProcess'. The bug has been latent since the original Custom Workflows commit - every conflict-resolution path silently failed to terminate the old subprocess, leaving it dangling until force-delete. Fix: split on the last dot, derive the subprocess ID, and check for the '<subProcessId>.terminateProcess' subscription directly. Remove the dead 'terminateChangeReviewProcess' pattern and the debug scaffolding loops. Also add getManagementService/getRepositoryService getters to WorkflowHandler for use in integration tests. In WorkflowDefinitionResourceIT, replace waitForWorkflowIdle to check pending async jobs on the trigger process definition IDs instead of waiting for process instances to drain. Async-after jobs on the fetch task are ephemeral (<100ms) - waiting for them has near-zero cost, whereas the old approach waited for the entire batch loop (potentially minutes with many entities). Also remove the waitForWorkflowIdle call from cleanupCreatedWorkflows @AfterEach since it ran a query per workflow in every test unnecessarily. * Fix spurious EntityNotFoundException errors from orphaned Flowable processes WorkflowInstanceListener and WorkflowInstanceStageListener were firing on Flowable process instances with numeric keys (e.g. 239376, 239373) that have no corresponding OM workflow definition, causing EntityNotFoundException noise in AUT runs. - WorkflowInstanceListener: skip addWorkflowInstance/updateWorkflowInstance when getMainWorkflowDefinitionNameFromTrigger returns the raw process key unchanged (no 'Trigger' suffix to strip = not an OM trigger workflow) - WorkflowInstanceStageListener: skip addNewStage/updateStage when the process key is purely numeric (isUnknownFlowableProcess guard) - WorkflowDefinitionResourceIT: fix waitForWorkflowIdle to use TriggerFactory.getTriggerWorkflowId(fqn) instead of name+"Trigger%" so it correctly queries by FQN-based process definition key * Fix compilation issues * Address Copilot review feedback on PR open-metadata#27117 - WorkflowInstanceStageListener: replace fragile numeric-key guard with businessKey-based guard — skip stage tracking when no business key is present (non-OM-managed process instance) instead of using processKey format heuristic that could misfire on numeric-named workflows - WorkflowDefinitionResourceIT: fix Javadoc comment ("Trigger-*" -> "Trigger%") and add startsWith filter after processDefinitionKeyLike query to prevent SQL LIKE wildcard '_' from matching unrelated PDs - WorkflowHandler: eliminate double Flowable round-trip in termination logic by merging findTerminationMessageName into terminateTask helper that does a single singleResult() query per task * fix: change DEBUG to WARN for null businessKey skip in WorkflowInstanceStageListener Co-authored-by: Ram Narayan Balaji <81347100+yan-3005@users.noreply.github.com> --------- Co-authored-by: Gitar <noreply@gitar.ai>
1 parent 4a946a8 commit ffdd442

4 files changed

Lines changed: 106 additions & 115 deletions

File tree

openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@
3131
import java.util.function.BiConsumer;
3232
import java.util.function.Supplier;
3333
import java.util.stream.Stream;
34+
import org.awaitility.core.ConditionTimeoutException;
35+
import org.flowable.engine.ManagementService;
36+
import org.flowable.engine.RepositoryService;
37+
import org.flowable.engine.repository.ProcessDefinition;
3438
import org.junit.jupiter.api.AfterEach;
3539
import org.junit.jupiter.api.BeforeEach;
3640
import org.junit.jupiter.api.Disabled;
@@ -122,6 +126,8 @@
122126
import org.openmetadata.sdk.exceptions.OpenMetadataException;
123127
import org.openmetadata.sdk.network.HttpMethod;
124128
import org.openmetadata.sdk.network.RequestOptions;
129+
import org.openmetadata.service.governance.workflows.WorkflowHandler;
130+
import org.openmetadata.service.governance.workflows.elements.TriggerFactory;
125131
import org.openmetadata.service.resources.feeds.MessageParser;
126132
import org.slf4j.Logger;
127133
import org.slf4j.LoggerFactory;
@@ -1898,6 +1904,8 @@ private void safeDeleteWorkflow(OpenMetadataClient client, String workflowName)
18981904
try {
18991905
WorkflowDefinition wd = client.workflowDefinitions().getByName(workflowName, null);
19001906

1907+
waitForWorkflowIdle(wd.getFullyQualifiedName());
1908+
19011909
// Force delete with hardDelete=true and recursive=true to clean up properly
19021910
executeWithDeadlockRetryVoid(
19031911
() -> {
@@ -1950,6 +1958,48 @@ private void safeDeleteWorkflow(OpenMetadataClient client, String workflowName)
19501958
}
19511959
}
19521960

1961+
/**
1962+
* Waits until any async-after jobs left by PeriodicBatchEntityTrigger's fetch task have been
1963+
* picked up and executed. These jobs are ephemeral (complete in &lt;100ms) but if the deployment
1964+
* is deleted while one is in flight the Flowable job executor NPEs when it tries to resolve the
1965+
* process definition. Returns immediately for event-based workflows that have no
1966+
* "&lt;name&gt;Trigger%" process definitions deployed, or for periodic-batch workflows whose
1967+
* trigger PDs currently have no pending jobs.
1968+
*/
1969+
private void waitForWorkflowIdle(String workflowFqn) {
1970+
RepositoryService repositoryService = WorkflowHandler.getInstance().getRepositoryService();
1971+
String triggerWorkflowId = TriggerFactory.getTriggerWorkflowId(workflowFqn);
1972+
List<ProcessDefinition> triggerPDs =
1973+
repositoryService
1974+
.createProcessDefinitionQuery()
1975+
.processDefinitionKeyLike(triggerWorkflowId + "%")
1976+
.list()
1977+
.stream()
1978+
.filter(pd -> pd.getKey() != null && pd.getKey().startsWith(triggerWorkflowId))
1979+
.toList();
1980+
if (triggerPDs.isEmpty()) {
1981+
return;
1982+
}
1983+
ManagementService managementService = WorkflowHandler.getInstance().getManagementService();
1984+
List<String> triggerPDIds = triggerPDs.stream().map(ProcessDefinition::getId).toList();
1985+
try {
1986+
await("async-after jobs for " + workflowFqn + " trigger PDs to drain")
1987+
.atMost(Duration.ofSeconds(10))
1988+
.pollInterval(Duration.ofMillis(100))
1989+
.pollDelay(Duration.ZERO)
1990+
.ignoreExceptions()
1991+
.until(
1992+
() ->
1993+
triggerPDIds.stream()
1994+
.allMatch(
1995+
pdId ->
1996+
managementService.createJobQuery().processDefinitionId(pdId).count()
1997+
== 0));
1998+
} catch (ConditionTimeoutException timeout) {
1999+
LOG.warn("waitForWorkflowIdle({}) timed out after 10s; proceeding with delete", workflowFqn);
2000+
}
2001+
}
2002+
19532003
// Helper method to register a workflow for cleanup
19542004
private void trackWorkflow(String name, String id) {
19552005
if (name != null && id != null) {
@@ -2500,6 +2550,7 @@ void test_PeriodicBatchWorkflowEntityFiltering(TestNamespace ns, TestInfo test)
25002550
// Cleanup - Use hardDelete to prevent duplicate key violations on retries
25012551
if (workflowId != null) {
25022552
try {
2553+
waitForWorkflowIdle(workflowName);
25032554
Map<String, String> params = new HashMap<>();
25042555
params.put("hardDelete", "true");
25052556
client.workflowDefinitions().delete(workflowId, params);
@@ -2943,6 +2994,7 @@ void test_MultiEntityPeriodicQueryWithFilters(TestNamespace ns, TestInfo test)
29432994
try {
29442995
WorkflowDefinition wd =
29452996
client.workflowDefinitions().getByName("MultiEntityPeriodicQuery", null);
2997+
waitForWorkflowIdle(wd.getFullyQualifiedName());
29462998
executeWithDeadlockRetryVoid(
29472999
() -> {
29483000
try {

openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java

Lines changed: 31 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
import lombok.Getter;
1919
import lombok.extern.slf4j.Slf4j;
2020
import org.flowable.bpmn.converter.BpmnXMLConverter;
21-
import org.flowable.bpmn.model.BpmnModel;
22-
import org.flowable.bpmn.model.Message;
2321
import org.flowable.common.engine.api.FlowableObjectNotFoundException;
2422
import org.flowable.common.engine.impl.el.DefaultExpressionManager;
2523
import org.flowable.engine.HistoryService;
@@ -1027,115 +1025,42 @@ public void terminateTaskProcessInstance(UUID customTaskId, String reason) {
10271025
.processVariableValueEquals("customTaskId", customTaskId.toString())
10281026
.list();
10291027
for (Task task : tasks) {
1030-
// Find the correct termination message for this task
1031-
String terminationMessageName = findTerminationMessageName(runtimeService, task);
1032-
if (terminationMessageName != null) {
1033-
Execution execution =
1034-
runtimeService
1035-
.createExecutionQuery()
1036-
.processInstanceId(task.getProcessInstanceId())
1037-
.messageEventSubscriptionName(terminationMessageName)
1038-
.singleResult();
1039-
if (execution != null) {
1040-
runtimeService.messageEventReceived(terminationMessageName, execution.getId());
1041-
LOG.debug(
1042-
"Terminated task {} using message '{}'", customTaskId, terminationMessageName);
1043-
} else {
1044-
LOG.warn(
1045-
"No execution found for termination message '{}' for task {}",
1046-
terminationMessageName,
1047-
customTaskId);
1048-
}
1049-
} else {
1050-
LOG.warn("No termination message found for task {}", customTaskId);
1051-
}
1028+
terminateTask(runtimeService, task, customTaskId);
10521029
}
10531030
} catch (FlowableObjectNotFoundException ex) {
10541031
LOG.debug("Flowable Task for Task ID {} not found.", customTaskId);
10551032
}
10561033
}
10571034

1058-
/**
1059-
* Find the termination message name for a task.
1060-
* Uses deterministic message names based on the subprocess ID.
1061-
*/
1062-
private String findTerminationMessageName(RuntimeService runtimeService, Task task) {
1035+
private void terminateTask(RuntimeService runtimeService, Task task, UUID customTaskId) {
10631036
try {
10641037
String taskDefinitionKey = task.getTaskDefinitionKey();
1065-
LOG.debug(
1066-
"Finding termination message for task {} with definition key '{}'",
1067-
task.getId(),
1068-
taskDefinitionKey);
1069-
1070-
// List all message event subscriptions for this process instance
1071-
List<Execution> allMessageExecutions =
1038+
int lastDot = taskDefinitionKey.lastIndexOf('.');
1039+
if (lastDot < 0) {
1040+
LOG.warn(
1041+
"Task definition key '{}' has no '.' separator; cannot derive termination message",
1042+
taskDefinitionKey);
1043+
return;
1044+
}
1045+
String messageName = taskDefinitionKey.substring(0, lastDot) + ".terminateProcess";
1046+
Execution execution =
10721047
runtimeService
10731048
.createExecutionQuery()
10741049
.processInstanceId(task.getProcessInstanceId())
1075-
.list();
1076-
1077-
LOG.debug(
1078-
"Found {} executions for process {}",
1079-
allMessageExecutions.size(),
1080-
task.getProcessInstanceId());
1081-
1082-
for (Execution exec : allMessageExecutions) {
1083-
if (exec.getActivityId() != null) {
1084-
LOG.debug("Execution {} has activity ID: {}", exec.getId(), exec.getActivityId());
1085-
}
1086-
}
1087-
1088-
// Get the BpmnModel to see what messages are available
1089-
BpmnModel model =
1090-
processEngine.getRepositoryService().getBpmnModel(task.getProcessDefinitionId());
1091-
1092-
LOG.debug("Available messages in model:");
1093-
for (Message msg : model.getMessages()) {
1094-
LOG.debug(" - Message ID: {}, Name: {}", msg.getId(), msg.getName());
1095-
}
1096-
1097-
// Extract the subprocess ID from the task definition key
1098-
// E.g., "ApproveGlossaryTerm_approvalTask" -> "ApproveGlossaryTerm"
1099-
String subProcessId =
1100-
taskDefinitionKey.contains("_")
1101-
? taskDefinitionKey.substring(0, taskDefinitionKey.lastIndexOf("_"))
1102-
: taskDefinitionKey;
1103-
1104-
LOG.debug(
1105-
"Extracted subprocess ID: '{}' from task key '{}'", subProcessId, taskDefinitionKey);
1106-
1107-
// Try both possible termination message patterns
1108-
// UserApprovalTask uses: subProcessId_terminateProcess
1109-
// ChangeReviewTask uses: subProcessId_terminateChangeReviewProcess
1110-
String[] messagePatterns = {
1111-
subProcessId + "_terminateProcess", subProcessId + "_terminateChangeReviewProcess"
1112-
};
1113-
1114-
for (String messageName : messagePatterns) {
1115-
LOG.debug("Checking for message subscription: {}", messageName);
1116-
List<Execution> executions =
1117-
runtimeService
1118-
.createExecutionQuery()
1119-
.processInstanceId(task.getProcessInstanceId())
1120-
.messageEventSubscriptionName(messageName)
1121-
.list();
1122-
if (!executions.isEmpty()) {
1123-
LOG.debug(
1124-
"Found {} executions with message subscription '{}'", executions.size(), messageName);
1125-
return messageName;
1126-
} else {
1127-
LOG.debug("No executions found for message: {}", messageName);
1128-
}
1050+
.messageEventSubscriptionName(messageName)
1051+
.singleResult();
1052+
if (execution != null) {
1053+
runtimeService.messageEventReceived(messageName, execution.getId());
1054+
LOG.debug("Terminated task {} using message '{}'", customTaskId, messageName);
1055+
} else {
1056+
LOG.warn(
1057+
"No termination message subscription '{}' found for task {} (definition key '{}')",
1058+
messageName,
1059+
task.getId(),
1060+
taskDefinitionKey);
11291061
}
1130-
1131-
LOG.warn(
1132-
"No termination message found for task {} with definition key '{}'",
1133-
task.getId(),
1134-
task.getTaskDefinitionKey());
1135-
return null;
11361062
} catch (Exception e) {
1137-
LOG.error("Error finding termination message for task {}", task.getId(), e);
1138-
return null;
1063+
LOG.error("Error terminating task {}", task.getId(), e);
11391064
}
11401065
}
11411066

@@ -1551,4 +1476,12 @@ public void terminateDuplicateInstances(
15511476
public RuntimeService getRuntimeService() {
15521477
return processEngine.getRuntimeService();
15531478
}
1479+
1480+
public ManagementService getManagementService() {
1481+
return processEngine.getManagementService();
1482+
}
1483+
1484+
public RepositoryService getRepositoryService() {
1485+
return processEngine.getRepositoryService();
1486+
}
15541487
}

openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceListener.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,16 @@ private void updateBusinessKey(String processInstanceId) {
9595

9696
private void addWorkflowInstance(
9797
DelegateExecution execution, WorkflowInstanceRepository workflowInstanceRepository) {
98+
String processKey = getProcessDefinitionKeyFromId(execution.getProcessDefinitionId());
99+
String workflowDefinitionName = getMainWorkflowDefinitionNameFromTrigger(processKey);
100+
if (workflowDefinitionName.equals(processKey)) {
101+
LOG.debug(
102+
"[WORKFLOW_INSTANCE_SKIP] ProcessInstance: {} - process key '{}' is not an OM trigger workflow, skipping",
103+
execution.getProcessInstanceId(),
104+
processKey);
105+
return;
106+
}
98107
updateBusinessKey(execution.getProcessInstanceId());
99-
100-
String workflowDefinitionName =
101-
getMainWorkflowDefinitionNameFromTrigger(
102-
getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()));
103108
UUID workflowInstanceId = UUID.fromString(execution.getProcessInstanceBusinessKey());
104109

105110
workflowInstanceRepository.addNewWorkflowInstance(
@@ -116,9 +121,15 @@ private void addWorkflowInstance(
116121

117122
private void updateWorkflowInstance(
118123
DelegateExecution execution, WorkflowInstanceRepository workflowInstanceRepository) {
119-
String workflowDefinitionName =
120-
getMainWorkflowDefinitionNameFromTrigger(
121-
getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()));
124+
String processKey = getProcessDefinitionKeyFromId(execution.getProcessDefinitionId());
125+
String workflowDefinitionName = getMainWorkflowDefinitionNameFromTrigger(processKey);
126+
if (workflowDefinitionName.equals(processKey)) {
127+
LOG.debug(
128+
"[WORKFLOW_INSTANCE_SKIP] ProcessInstance: {} - process key '{}' is not an OM trigger workflow, skipping",
129+
execution.getProcessInstanceId(),
130+
processKey);
131+
return;
132+
}
122133
UUID workflowInstanceId = UUID.fromString(execution.getProcessInstanceBusinessKey());
123134

124135
// Capture all variables including any failure indicators

openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceStageListener.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -135,18 +135,13 @@ private void addNewStage(
135135
String workflowDefinitionName =
136136
getProcessDefinitionKeyFromId(execution.getProcessDefinitionId());
137137
String processInstanceId = execution.getProcessInstanceId();
138-
139-
// Check business key first - critical for stage tracking
140138
String businessKey = execution.getProcessInstanceBusinessKey();
141139
if (businessKey == null || businessKey.isEmpty()) {
142-
LOG.error(
143-
"[STAGE_MISSING_KEY] Workflow: {}, ProcessInstance: {} - Business key is missing for stage creation",
144-
workflowDefinitionName,
145-
processInstanceId);
146-
throw new IllegalStateException(
147-
String.format(
148-
"Business key is missing for stage creation in workflow: %s",
149-
workflowDefinitionName));
140+
LOG.warn(
141+
"[STAGE_SKIP] ProcessInstance: {} (workflow: {}) - no business key, not an OM-managed process instance",
142+
processInstanceId,
143+
workflowDefinitionName);
144+
return;
150145
}
151146
UUID workflowInstanceId = UUID.fromString(businessKey);
152147

0 commit comments

Comments
 (0)