From 7bccf9fa29ad4b936b690139a6954b1d65e36473 Mon Sep 17 00:00:00 2001 From: Ram Narayan Balaji Date: Tue, 7 Apr 2026 11:52:07 +0530 Subject: [PATCH 1/5] Fix Flowable workflow termination message lookup and async-job race on deletion WorkflowHandler.findTerminationMessageName was splitting task definition keys on underscore and looking for messages named '_terminateProcess'. But Workflow.getFlowableElementId joins with a dot, so actual task keys are '.approvalTask' and termination messages are '.terminateProcess'. The bug has been latent since the original Custom Workflows commit - every conflict-resolution path silently failed to terminate the old subprocess, leaving it dangling until force-delete. Fix: split on the last dot, derive the subprocess ID, and check for the '.terminateProcess' subscription directly. Remove the dead 'terminateChangeReviewProcess' pattern and the debug scaffolding loops. Also add getManagementService/getRepositoryService getters to WorkflowHandler for use in integration tests. In WorkflowDefinitionResourceIT, replace waitForWorkflowIdle to check pending async jobs on the trigger process definition IDs instead of waiting for process instances to drain. Async-after jobs on the fetch task are ephemeral (<100ms) - waiting for them has near-zero cost, whereas the old approach waited for the entire batch loop (potentially minutes with many entities). Also remove the waitForWorkflowIdle call from cleanupCreatedWorkflows @AfterEach since it ran a query per workflow in every test unnecessarily. --- .../tests/WorkflowDefinitionResourceIT.java | 47 ++++++++ .../governance/workflows/WorkflowHandler.java | 102 +++++------------- 2 files changed, 75 insertions(+), 74 deletions(-) diff --git a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java index 2ba3c12f184f..61f2df90e471 100644 --- a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java +++ b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java @@ -31,6 +31,10 @@ import java.util.function.BiConsumer; import java.util.function.Supplier; import java.util.stream.Stream; +import org.awaitility.core.ConditionTimeoutException; +import org.flowable.engine.ManagementService; +import org.flowable.engine.RepositoryService; +import org.flowable.engine.repository.ProcessDefinition; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; @@ -122,6 +126,7 @@ import org.openmetadata.sdk.exceptions.OpenMetadataException; import org.openmetadata.sdk.network.HttpMethod; import org.openmetadata.sdk.network.RequestOptions; +import org.openmetadata.service.governance.workflows.WorkflowHandler; import org.openmetadata.service.resources.feeds.MessageParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1898,6 +1903,8 @@ private void safeDeleteWorkflow(OpenMetadataClient client, String workflowName) try { WorkflowDefinition wd = client.workflowDefinitions().getByName(workflowName, null); + waitForWorkflowIdle(workflowName); + // Force delete with hardDelete=true and recursive=true to clean up properly executeWithDeadlockRetryVoid( () -> { @@ -1950,6 +1957,44 @@ private void safeDeleteWorkflow(OpenMetadataClient client, String workflowName) } } + /** + * Waits until any async-after jobs left by PeriodicBatchEntityTrigger's fetch task have been + * picked up and executed. These jobs are ephemeral (complete in <100ms) but if the deployment + * is deleted while one is in flight the Flowable job executor NPEs when it tries to resolve the + * process definition. Returns immediately for event-based workflows that have no + * "<name>Trigger-*" process definitions deployed, or for periodic-batch workflows whose + * trigger PDs currently have no pending jobs. + */ + private void waitForWorkflowIdle(String workflowName) { + RepositoryService repositoryService = WorkflowHandler.getInstance().getRepositoryService(); + List triggerPDs = + repositoryService + .createProcessDefinitionQuery() + .processDefinitionKeyLike(workflowName + "Trigger%") + .list(); + if (triggerPDs.isEmpty()) { + return; + } + ManagementService managementService = WorkflowHandler.getInstance().getManagementService(); + List triggerPDIds = triggerPDs.stream().map(ProcessDefinition::getId).toList(); + try { + await("async-after jobs for " + workflowName + " trigger PDs to drain") + .atMost(Duration.ofSeconds(10)) + .pollInterval(Duration.ofMillis(100)) + .pollDelay(Duration.ZERO) + .ignoreExceptions() + .until( + () -> + triggerPDIds.stream() + .allMatch( + pdId -> + managementService.createJobQuery().processDefinitionId(pdId).count() + == 0)); + } catch (ConditionTimeoutException timeout) { + LOG.warn("waitForWorkflowIdle({}) timed out after 10s; proceeding with delete", workflowName); + } + } + // Helper method to register a workflow for cleanup private void trackWorkflow(String name, String id) { if (name != null && id != null) { @@ -2500,6 +2545,7 @@ void test_PeriodicBatchWorkflowEntityFiltering(TestNamespace ns, TestInfo test) // Cleanup - Use hardDelete to prevent duplicate key violations on retries if (workflowId != null) { try { + waitForWorkflowIdle(workflowName); Map params = new HashMap<>(); params.put("hardDelete", "true"); client.workflowDefinitions().delete(workflowId, params); @@ -2943,6 +2989,7 @@ void test_MultiEntityPeriodicQueryWithFilters(TestNamespace ns, TestInfo test) try { WorkflowDefinition wd = client.workflowDefinitions().getByName("MultiEntityPeriodicQuery", null); + waitForWorkflowIdle("MultiEntityPeriodicQuery"); executeWithDeadlockRetryVoid( () -> { try { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java index 4c78fba0a389..81e262643c1b 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java @@ -18,8 +18,6 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.flowable.bpmn.converter.BpmnXMLConverter; -import org.flowable.bpmn.model.BpmnModel; -import org.flowable.bpmn.model.Message; import org.flowable.common.engine.api.FlowableObjectNotFoundException; import org.flowable.common.engine.impl.el.DefaultExpressionManager; import org.flowable.engine.HistoryService; @@ -1046,8 +1044,6 @@ public void terminateTaskProcessInstance(UUID customTaskId, String reason) { terminationMessageName, customTaskId); } - } else { - LOG.warn("No termination message found for task {}", customTaskId); } } } catch (FlowableObjectNotFoundException ex) { @@ -1055,83 +1051,33 @@ public void terminateTaskProcessInstance(UUID customTaskId, String reason) { } } - /** - * Find the termination message name for a task. - * Uses deterministic message names based on the subprocess ID. - */ private String findTerminationMessageName(RuntimeService runtimeService, Task task) { try { String taskDefinitionKey = task.getTaskDefinitionKey(); - LOG.debug( - "Finding termination message for task {} with definition key '{}'", - task.getId(), - taskDefinitionKey); - - // List all message event subscriptions for this process instance - List allMessageExecutions = - runtimeService - .createExecutionQuery() - .processInstanceId(task.getProcessInstanceId()) - .list(); - - LOG.debug( - "Found {} executions for process {}", - allMessageExecutions.size(), - task.getProcessInstanceId()); - - for (Execution exec : allMessageExecutions) { - if (exec.getActivityId() != null) { - LOG.debug("Execution {} has activity ID: {}", exec.getId(), exec.getActivityId()); - } - } - - // Get the BpmnModel to see what messages are available - BpmnModel model = - processEngine.getRepositoryService().getBpmnModel(task.getProcessDefinitionId()); - - LOG.debug("Available messages in model:"); - for (Message msg : model.getMessages()) { - LOG.debug(" - Message ID: {}, Name: {}", msg.getId(), msg.getName()); + int lastDot = taskDefinitionKey.lastIndexOf('.'); + if (lastDot < 0) { + LOG.warn( + "Task definition key '{}' has no '.' separator; cannot derive termination message", + taskDefinitionKey); + return null; } - - // Extract the subprocess ID from the task definition key - // E.g., "ApproveGlossaryTerm_approvalTask" -> "ApproveGlossaryTerm" - String subProcessId = - taskDefinitionKey.contains("_") - ? taskDefinitionKey.substring(0, taskDefinitionKey.lastIndexOf("_")) - : taskDefinitionKey; - - LOG.debug( - "Extracted subprocess ID: '{}' from task key '{}'", subProcessId, taskDefinitionKey); - - // Try both possible termination message patterns - // UserApprovalTask uses: subProcessId_terminateProcess - // ChangeReviewTask uses: subProcessId_terminateChangeReviewProcess - String[] messagePatterns = { - subProcessId + "_terminateProcess", subProcessId + "_terminateChangeReviewProcess" - }; - - for (String messageName : messagePatterns) { - LOG.debug("Checking for message subscription: {}", messageName); - List executions = - runtimeService - .createExecutionQuery() - .processInstanceId(task.getProcessInstanceId()) - .messageEventSubscriptionName(messageName) - .list(); - if (!executions.isEmpty()) { - LOG.debug( - "Found {} executions with message subscription '{}'", executions.size(), messageName); - return messageName; - } else { - LOG.debug("No executions found for message: {}", messageName); - } + String subProcessId = taskDefinitionKey.substring(0, lastDot); + String messageName = subProcessId + ".terminateProcess"; + boolean hasSubscription = + runtimeService + .createExecutionQuery() + .processInstanceId(task.getProcessInstanceId()) + .messageEventSubscriptionName(messageName) + .count() + > 0; + if (hasSubscription) { + return messageName; } - LOG.warn( - "No termination message found for task {} with definition key '{}'", + "No termination message subscription '{}' found for task {} (definition key '{}')", + messageName, task.getId(), - task.getTaskDefinitionKey()); + taskDefinitionKey); return null; } catch (Exception e) { LOG.error("Error finding termination message for task {}", task.getId(), e); @@ -1551,4 +1497,12 @@ public void terminateDuplicateInstances( public RuntimeService getRuntimeService() { return processEngine.getRuntimeService(); } + + public ManagementService getManagementService() { + return processEngine.getManagementService(); + } + + public RepositoryService getRepositoryService() { + return processEngine.getRepositoryService(); + } } From f182537720ae83686b53a689f8bbc4a6ddbdb6ce Mon Sep 17 00:00:00 2001 From: Ram Narayan Balaji Date: Tue, 7 Apr 2026 19:08:27 +0530 Subject: [PATCH 2/5] Fix spurious EntityNotFoundException errors from orphaned Flowable processes WorkflowInstanceListener and WorkflowInstanceStageListener were firing on Flowable process instances with numeric keys (e.g. 239376, 239373) that have no corresponding OM workflow definition, causing EntityNotFoundException noise in AUT runs. - WorkflowInstanceListener: skip addWorkflowInstance/updateWorkflowInstance when getMainWorkflowDefinitionNameFromTrigger returns the raw process key unchanged (no 'Trigger' suffix to strip = not an OM trigger workflow) - WorkflowInstanceStageListener: skip addNewStage/updateStage when the process key is purely numeric (isUnknownFlowableProcess guard) - WorkflowDefinitionResourceIT: fix waitForWorkflowIdle to use TriggerFactory.getTriggerWorkflowId(fqn) instead of name+"Trigger%" so it correctly queries by FQN-based process definition key --- .../tests/WorkflowDefinitionResourceIT.java | 11 ++++---- .../workflows/WorkflowInstanceListener.java | 25 +++++++++++++------ .../WorkflowInstanceStageListener.java | 18 +++++++++++++ 3 files changed, 42 insertions(+), 12 deletions(-) diff --git a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java index 61f2df90e471..a60111e633fa 100644 --- a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java +++ b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java @@ -127,6 +127,7 @@ import org.openmetadata.sdk.network.HttpMethod; import org.openmetadata.sdk.network.RequestOptions; import org.openmetadata.service.governance.workflows.WorkflowHandler; +import org.openmetadata.service.governance.workflows.elements.TriggerFactory; import org.openmetadata.service.resources.feeds.MessageParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1903,7 +1904,7 @@ private void safeDeleteWorkflow(OpenMetadataClient client, String workflowName) try { WorkflowDefinition wd = client.workflowDefinitions().getByName(workflowName, null); - waitForWorkflowIdle(workflowName); + waitForWorkflowIdle(wd.getFullyQualifiedName()); // Force delete with hardDelete=true and recursive=true to clean up properly executeWithDeadlockRetryVoid( @@ -1965,12 +1966,12 @@ private void safeDeleteWorkflow(OpenMetadataClient client, String workflowName) * "<name>Trigger-*" process definitions deployed, or for periodic-batch workflows whose * trigger PDs currently have no pending jobs. */ - private void waitForWorkflowIdle(String workflowName) { + private void waitForWorkflowIdle(String workflowFqn) { RepositoryService repositoryService = WorkflowHandler.getInstance().getRepositoryService(); List triggerPDs = repositoryService .createProcessDefinitionQuery() - .processDefinitionKeyLike(workflowName + "Trigger%") + .processDefinitionKeyLike(TriggerFactory.getTriggerWorkflowId(workflowFqn) + "%") .list(); if (triggerPDs.isEmpty()) { return; @@ -1991,7 +1992,7 @@ private void waitForWorkflowIdle(String workflowName) { managementService.createJobQuery().processDefinitionId(pdId).count() == 0)); } catch (ConditionTimeoutException timeout) { - LOG.warn("waitForWorkflowIdle({}) timed out after 10s; proceeding with delete", workflowName); + LOG.warn("waitForWorkflowIdle({}) timed out after 10s; proceeding with delete", workflowFqn); } } @@ -2989,7 +2990,7 @@ void test_MultiEntityPeriodicQueryWithFilters(TestNamespace ns, TestInfo test) try { WorkflowDefinition wd = client.workflowDefinitions().getByName("MultiEntityPeriodicQuery", null); - waitForWorkflowIdle("MultiEntityPeriodicQuery"); + waitForWorkflowIdle(wd.getFullyQualifiedName()); executeWithDeadlockRetryVoid( () -> { try { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceListener.java index 2eaf691ae3bf..158215867995 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceListener.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceListener.java @@ -95,11 +95,16 @@ private void updateBusinessKey(String processInstanceId) { private void addWorkflowInstance( DelegateExecution execution, WorkflowInstanceRepository workflowInstanceRepository) { + String processKey = getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()); + String workflowDefinitionName = getMainWorkflowDefinitionNameFromTrigger(processKey); + if (workflowDefinitionName.equals(processKey)) { + LOG.debug( + "[WORKFLOW_INSTANCE_SKIP] ProcessInstance: {} - process key '{}' is not an OM trigger workflow, skipping", + execution.getProcessInstanceId(), + processKey); + return; + } updateBusinessKey(execution.getProcessInstanceId()); - - String workflowDefinitionName = - getMainWorkflowDefinitionNameFromTrigger( - getProcessDefinitionKeyFromId(execution.getProcessDefinitionId())); UUID workflowInstanceId = UUID.fromString(execution.getProcessInstanceBusinessKey()); workflowInstanceRepository.addNewWorkflowInstance( @@ -116,9 +121,15 @@ private void addWorkflowInstance( private void updateWorkflowInstance( DelegateExecution execution, WorkflowInstanceRepository workflowInstanceRepository) { - String workflowDefinitionName = - getMainWorkflowDefinitionNameFromTrigger( - getProcessDefinitionKeyFromId(execution.getProcessDefinitionId())); + String processKey = getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()); + String workflowDefinitionName = getMainWorkflowDefinitionNameFromTrigger(processKey); + if (workflowDefinitionName.equals(processKey)) { + LOG.debug( + "[WORKFLOW_INSTANCE_SKIP] ProcessInstance: {} - process key '{}' is not an OM trigger workflow, skipping", + execution.getProcessInstanceId(), + processKey); + return; + } UUID workflowInstanceId = UUID.fromString(execution.getProcessInstanceBusinessKey()); // Capture all variables including any failure indicators diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceStageListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceStageListener.java index 4a87401e13dd..b141a5db3fd3 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceStageListener.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceStageListener.java @@ -135,6 +135,13 @@ private void addNewStage( String workflowDefinitionName = getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()); String processInstanceId = execution.getProcessInstanceId(); + if (isUnknownFlowableProcess(workflowDefinitionName)) { + LOG.debug( + "[STAGE_SKIP] ProcessInstance: {} - process key '{}' is not an OM workflow, skipping stage tracking", + processInstanceId, + workflowDefinitionName); + return; + } // Check business key first - critical for stage tracking String businessKey = execution.getProcessInstanceBusinessKey(); @@ -195,6 +202,13 @@ private void updateStage( String workflowDefinitionName = getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()); String processInstanceId = execution.getProcessInstanceId(); + if (isUnknownFlowableProcess(workflowDefinitionName)) { + LOG.debug( + "[STAGE_SKIP] ProcessInstance: {} - process key '{}' is not an OM workflow, skipping stage tracking", + processInstanceId, + workflowDefinitionName); + return; + } String stage = Optional.ofNullable(execution.getCurrentActivityId()).orElse(workflowDefinitionName); @@ -220,4 +234,8 @@ private void updateStage( stage, workflowInstanceStateId); } + + private boolean isUnknownFlowableProcess(String processKey) { + return processKey.matches("\\d+"); + } } From 6f09caa20c7a39770d7478e847bc26191252f034 Mon Sep 17 00:00:00 2001 From: Ram Narayan Balaji Date: Tue, 7 Apr 2026 19:34:16 +0530 Subject: [PATCH 3/5] Fix compilation issues --- .../org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java index a60111e633fa..3a6c70444223 100644 --- a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java +++ b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java @@ -1979,7 +1979,7 @@ private void waitForWorkflowIdle(String workflowFqn) { ManagementService managementService = WorkflowHandler.getInstance().getManagementService(); List triggerPDIds = triggerPDs.stream().map(ProcessDefinition::getId).toList(); try { - await("async-after jobs for " + workflowName + " trigger PDs to drain") + await("async-after jobs for " + workflowFqn + " trigger PDs to drain") .atMost(Duration.ofSeconds(10)) .pollInterval(Duration.ofMillis(100)) .pollDelay(Duration.ZERO) From 8f44e69d6bab34b5983aa0b6bef823b58d18438f Mon Sep 17 00:00:00 2001 From: Ram Narayan Balaji Date: Tue, 7 Apr 2026 21:49:52 +0530 Subject: [PATCH 4/5] Address Copilot review feedback on PR #27117 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - WorkflowInstanceStageListener: replace fragile numeric-key guard with businessKey-based guard — skip stage tracking when no business key is present (non-OM-managed process instance) instead of using processKey format heuristic that could misfire on numeric-named workflows - WorkflowDefinitionResourceIT: fix Javadoc comment ("Trigger-*" -> "Trigger%") and add startsWith filter after processDefinitionKeyLike query to prevent SQL LIKE wildcard '_' from matching unrelated PDs - WorkflowHandler: eliminate double Flowable round-trip in termination logic by merging findTerminationMessageName into terminateTask helper that does a single singleResult() query per task --- .../tests/WorkflowDefinitionResourceIT.java | 10 +++- .../governance/workflows/WorkflowHandler.java | 59 ++++++------------- .../WorkflowInstanceStageListener.java | 28 +-------- 3 files changed, 29 insertions(+), 68 deletions(-) diff --git a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java index 3a6c70444223..fa1ee6844449 100644 --- a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java +++ b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java @@ -1963,16 +1963,20 @@ private void safeDeleteWorkflow(OpenMetadataClient client, String workflowName) * picked up and executed. These jobs are ephemeral (complete in <100ms) but if the deployment * is deleted while one is in flight the Flowable job executor NPEs when it tries to resolve the * process definition. Returns immediately for event-based workflows that have no - * "<name>Trigger-*" process definitions deployed, or for periodic-batch workflows whose + * "<name>Trigger%" process definitions deployed, or for periodic-batch workflows whose * trigger PDs currently have no pending jobs. */ private void waitForWorkflowIdle(String workflowFqn) { RepositoryService repositoryService = WorkflowHandler.getInstance().getRepositoryService(); + String triggerWorkflowId = TriggerFactory.getTriggerWorkflowId(workflowFqn); List triggerPDs = repositoryService .createProcessDefinitionQuery() - .processDefinitionKeyLike(TriggerFactory.getTriggerWorkflowId(workflowFqn) + "%") - .list(); + .processDefinitionKeyLike(triggerWorkflowId + "%") + .list() + .stream() + .filter(pd -> pd.getKey() != null && pd.getKey().startsWith(triggerWorkflowId)) + .toList(); if (triggerPDs.isEmpty()) { return; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java index 81e262643c1b..0bc92af66066 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java @@ -1025,33 +1025,14 @@ public void terminateTaskProcessInstance(UUID customTaskId, String reason) { .processVariableValueEquals("customTaskId", customTaskId.toString()) .list(); for (Task task : tasks) { - // Find the correct termination message for this task - String terminationMessageName = findTerminationMessageName(runtimeService, task); - if (terminationMessageName != null) { - Execution execution = - runtimeService - .createExecutionQuery() - .processInstanceId(task.getProcessInstanceId()) - .messageEventSubscriptionName(terminationMessageName) - .singleResult(); - if (execution != null) { - runtimeService.messageEventReceived(terminationMessageName, execution.getId()); - LOG.debug( - "Terminated task {} using message '{}'", customTaskId, terminationMessageName); - } else { - LOG.warn( - "No execution found for termination message '{}' for task {}", - terminationMessageName, - customTaskId); - } - } + terminateTask(runtimeService, task, customTaskId); } } catch (FlowableObjectNotFoundException ex) { LOG.debug("Flowable Task for Task ID {} not found.", customTaskId); } } - private String findTerminationMessageName(RuntimeService runtimeService, Task task) { + private void terminateTask(RuntimeService runtimeService, Task task, UUID customTaskId) { try { String taskDefinitionKey = task.getTaskDefinitionKey(); int lastDot = taskDefinitionKey.lastIndexOf('.'); @@ -1059,29 +1040,27 @@ private String findTerminationMessageName(RuntimeService runtimeService, Task ta LOG.warn( "Task definition key '{}' has no '.' separator; cannot derive termination message", taskDefinitionKey); - return null; + return; } - String subProcessId = taskDefinitionKey.substring(0, lastDot); - String messageName = subProcessId + ".terminateProcess"; - boolean hasSubscription = + String messageName = taskDefinitionKey.substring(0, lastDot) + ".terminateProcess"; + Execution execution = runtimeService - .createExecutionQuery() - .processInstanceId(task.getProcessInstanceId()) - .messageEventSubscriptionName(messageName) - .count() - > 0; - if (hasSubscription) { - return messageName; + .createExecutionQuery() + .processInstanceId(task.getProcessInstanceId()) + .messageEventSubscriptionName(messageName) + .singleResult(); + if (execution != null) { + runtimeService.messageEventReceived(messageName, execution.getId()); + LOG.debug("Terminated task {} using message '{}'", customTaskId, messageName); + } else { + LOG.warn( + "No termination message subscription '{}' found for task {} (definition key '{}')", + messageName, + task.getId(), + taskDefinitionKey); } - LOG.warn( - "No termination message subscription '{}' found for task {} (definition key '{}')", - messageName, - task.getId(), - taskDefinitionKey); - return null; } catch (Exception e) { - LOG.error("Error finding termination message for task {}", task.getId(), e); - return null; + LOG.error("Error terminating task {}", task.getId(), e); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceStageListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceStageListener.java index b141a5db3fd3..78c76bd85996 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceStageListener.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceStageListener.java @@ -135,26 +135,14 @@ private void addNewStage( String workflowDefinitionName = getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()); String processInstanceId = execution.getProcessInstanceId(); - if (isUnknownFlowableProcess(workflowDefinitionName)) { + String businessKey = execution.getProcessInstanceBusinessKey(); + if (businessKey == null || businessKey.isEmpty()) { LOG.debug( - "[STAGE_SKIP] ProcessInstance: {} - process key '{}' is not an OM workflow, skipping stage tracking", + "[STAGE_SKIP] ProcessInstance: {} (workflow: {}) - no business key, not an OM-managed process instance", processInstanceId, workflowDefinitionName); return; } - - // Check business key first - critical for stage tracking - String businessKey = execution.getProcessInstanceBusinessKey(); - if (businessKey == null || businessKey.isEmpty()) { - LOG.error( - "[STAGE_MISSING_KEY] Workflow: {}, ProcessInstance: {} - Business key is missing for stage creation", - workflowDefinitionName, - processInstanceId); - throw new IllegalStateException( - String.format( - "Business key is missing for stage creation in workflow: %s", - workflowDefinitionName)); - } UUID workflowInstanceId = UUID.fromString(businessKey); // Get or create workflow instance execution ID @@ -202,13 +190,6 @@ private void updateStage( String workflowDefinitionName = getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()); String processInstanceId = execution.getProcessInstanceId(); - if (isUnknownFlowableProcess(workflowDefinitionName)) { - LOG.debug( - "[STAGE_SKIP] ProcessInstance: {} - process key '{}' is not an OM workflow, skipping stage tracking", - processInstanceId, - workflowDefinitionName); - return; - } String stage = Optional.ofNullable(execution.getCurrentActivityId()).orElse(workflowDefinitionName); @@ -235,7 +216,4 @@ private void updateStage( workflowInstanceStateId); } - private boolean isUnknownFlowableProcess(String processKey) { - return processKey.matches("\\d+"); - } } From b8f714213c35bdceb12fb1c178ef119e21945775 Mon Sep 17 00:00:00 2001 From: Gitar Date: Tue, 7 Apr 2026 16:44:11 +0000 Subject: [PATCH 5/5] fix: change DEBUG to WARN for null businessKey skip in WorkflowInstanceStageListener Co-authored-by: Ram Narayan Balaji <81347100+yan-3005@users.noreply.github.com> --- .../governance/workflows/WorkflowInstanceStageListener.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceStageListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceStageListener.java index 78c76bd85996..2d80b6430a18 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceStageListener.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceStageListener.java @@ -137,7 +137,7 @@ private void addNewStage( String processInstanceId = execution.getProcessInstanceId(); String businessKey = execution.getProcessInstanceBusinessKey(); if (businessKey == null || businessKey.isEmpty()) { - LOG.debug( + LOG.warn( "[STAGE_SKIP] ProcessInstance: {} (workflow: {}) - no business key, not an OM-managed process instance", processInstanceId, workflowDefinitionName); @@ -215,5 +215,4 @@ private void updateStage( stage, workflowInstanceStateId); } - }