diff --git a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java index 2ba3c12f184f..fa1ee6844449 100644 --- a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java +++ b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java @@ -31,6 +31,10 @@ import java.util.function.BiConsumer; import java.util.function.Supplier; import java.util.stream.Stream; +import org.awaitility.core.ConditionTimeoutException; +import org.flowable.engine.ManagementService; +import org.flowable.engine.RepositoryService; +import org.flowable.engine.repository.ProcessDefinition; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; @@ -122,6 +126,8 @@ import org.openmetadata.sdk.exceptions.OpenMetadataException; import org.openmetadata.sdk.network.HttpMethod; import org.openmetadata.sdk.network.RequestOptions; +import org.openmetadata.service.governance.workflows.WorkflowHandler; +import org.openmetadata.service.governance.workflows.elements.TriggerFactory; import org.openmetadata.service.resources.feeds.MessageParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1898,6 +1904,8 @@ private void safeDeleteWorkflow(OpenMetadataClient client, String workflowName) try { WorkflowDefinition wd = client.workflowDefinitions().getByName(workflowName, null); + waitForWorkflowIdle(wd.getFullyQualifiedName()); + // Force delete with hardDelete=true and recursive=true to clean up properly executeWithDeadlockRetryVoid( () -> { @@ -1950,6 +1958,48 @@ private void safeDeleteWorkflow(OpenMetadataClient client, String workflowName) } } + /** + * Waits until any async-after jobs left by PeriodicBatchEntityTrigger's fetch task have been + * picked up and executed. These jobs are ephemeral (complete in <100ms) but if the deployment + * is deleted while one is in flight the Flowable job executor NPEs when it tries to resolve the + * process definition. Returns immediately for event-based workflows that have no + * "<name>Trigger%" process definitions deployed, or for periodic-batch workflows whose + * trigger PDs currently have no pending jobs. + */ + private void waitForWorkflowIdle(String workflowFqn) { + RepositoryService repositoryService = WorkflowHandler.getInstance().getRepositoryService(); + String triggerWorkflowId = TriggerFactory.getTriggerWorkflowId(workflowFqn); + List triggerPDs = + repositoryService + .createProcessDefinitionQuery() + .processDefinitionKeyLike(triggerWorkflowId + "%") + .list() + .stream() + .filter(pd -> pd.getKey() != null && pd.getKey().startsWith(triggerWorkflowId)) + .toList(); + if (triggerPDs.isEmpty()) { + return; + } + ManagementService managementService = WorkflowHandler.getInstance().getManagementService(); + List triggerPDIds = triggerPDs.stream().map(ProcessDefinition::getId).toList(); + try { + await("async-after jobs for " + workflowFqn + " trigger PDs to drain") + .atMost(Duration.ofSeconds(10)) + .pollInterval(Duration.ofMillis(100)) + .pollDelay(Duration.ZERO) + .ignoreExceptions() + .until( + () -> + triggerPDIds.stream() + .allMatch( + pdId -> + managementService.createJobQuery().processDefinitionId(pdId).count() + == 0)); + } catch (ConditionTimeoutException timeout) { + LOG.warn("waitForWorkflowIdle({}) timed out after 10s; proceeding with delete", workflowFqn); + } + } + // Helper method to register a workflow for cleanup private void trackWorkflow(String name, String id) { if (name != null && id != null) { @@ -2500,6 +2550,7 @@ void test_PeriodicBatchWorkflowEntityFiltering(TestNamespace ns, TestInfo test) // Cleanup - Use hardDelete to prevent duplicate key violations on retries if (workflowId != null) { try { + waitForWorkflowIdle(workflowName); Map params = new HashMap<>(); params.put("hardDelete", "true"); client.workflowDefinitions().delete(workflowId, params); @@ -2943,6 +2994,7 @@ void test_MultiEntityPeriodicQueryWithFilters(TestNamespace ns, TestInfo test) try { WorkflowDefinition wd = client.workflowDefinitions().getByName("MultiEntityPeriodicQuery", null); + waitForWorkflowIdle(wd.getFullyQualifiedName()); executeWithDeadlockRetryVoid( () -> { try { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java index 4c78fba0a389..0bc92af66066 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java @@ -18,8 +18,6 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.flowable.bpmn.converter.BpmnXMLConverter; -import org.flowable.bpmn.model.BpmnModel; -import org.flowable.bpmn.model.Message; import org.flowable.common.engine.api.FlowableObjectNotFoundException; import org.flowable.common.engine.impl.el.DefaultExpressionManager; import org.flowable.engine.HistoryService; @@ -1027,115 +1025,42 @@ public void terminateTaskProcessInstance(UUID customTaskId, String reason) { .processVariableValueEquals("customTaskId", customTaskId.toString()) .list(); for (Task task : tasks) { - // Find the correct termination message for this task - String terminationMessageName = findTerminationMessageName(runtimeService, task); - if (terminationMessageName != null) { - Execution execution = - runtimeService - .createExecutionQuery() - .processInstanceId(task.getProcessInstanceId()) - .messageEventSubscriptionName(terminationMessageName) - .singleResult(); - if (execution != null) { - runtimeService.messageEventReceived(terminationMessageName, execution.getId()); - LOG.debug( - "Terminated task {} using message '{}'", customTaskId, terminationMessageName); - } else { - LOG.warn( - "No execution found for termination message '{}' for task {}", - terminationMessageName, - customTaskId); - } - } else { - LOG.warn("No termination message found for task {}", customTaskId); - } + terminateTask(runtimeService, task, customTaskId); } } catch (FlowableObjectNotFoundException ex) { LOG.debug("Flowable Task for Task ID {} not found.", customTaskId); } } - /** - * Find the termination message name for a task. - * Uses deterministic message names based on the subprocess ID. - */ - private String findTerminationMessageName(RuntimeService runtimeService, Task task) { + private void terminateTask(RuntimeService runtimeService, Task task, UUID customTaskId) { try { String taskDefinitionKey = task.getTaskDefinitionKey(); - LOG.debug( - "Finding termination message for task {} with definition key '{}'", - task.getId(), - taskDefinitionKey); - - // List all message event subscriptions for this process instance - List allMessageExecutions = + int lastDot = taskDefinitionKey.lastIndexOf('.'); + if (lastDot < 0) { + LOG.warn( + "Task definition key '{}' has no '.' separator; cannot derive termination message", + taskDefinitionKey); + return; + } + String messageName = taskDefinitionKey.substring(0, lastDot) + ".terminateProcess"; + Execution execution = runtimeService .createExecutionQuery() .processInstanceId(task.getProcessInstanceId()) - .list(); - - LOG.debug( - "Found {} executions for process {}", - allMessageExecutions.size(), - task.getProcessInstanceId()); - - for (Execution exec : allMessageExecutions) { - if (exec.getActivityId() != null) { - LOG.debug("Execution {} has activity ID: {}", exec.getId(), exec.getActivityId()); - } - } - - // Get the BpmnModel to see what messages are available - BpmnModel model = - processEngine.getRepositoryService().getBpmnModel(task.getProcessDefinitionId()); - - LOG.debug("Available messages in model:"); - for (Message msg : model.getMessages()) { - LOG.debug(" - Message ID: {}, Name: {}", msg.getId(), msg.getName()); - } - - // Extract the subprocess ID from the task definition key - // E.g., "ApproveGlossaryTerm_approvalTask" -> "ApproveGlossaryTerm" - String subProcessId = - taskDefinitionKey.contains("_") - ? taskDefinitionKey.substring(0, taskDefinitionKey.lastIndexOf("_")) - : taskDefinitionKey; - - LOG.debug( - "Extracted subprocess ID: '{}' from task key '{}'", subProcessId, taskDefinitionKey); - - // Try both possible termination message patterns - // UserApprovalTask uses: subProcessId_terminateProcess - // ChangeReviewTask uses: subProcessId_terminateChangeReviewProcess - String[] messagePatterns = { - subProcessId + "_terminateProcess", subProcessId + "_terminateChangeReviewProcess" - }; - - for (String messageName : messagePatterns) { - LOG.debug("Checking for message subscription: {}", messageName); - List executions = - runtimeService - .createExecutionQuery() - .processInstanceId(task.getProcessInstanceId()) - .messageEventSubscriptionName(messageName) - .list(); - if (!executions.isEmpty()) { - LOG.debug( - "Found {} executions with message subscription '{}'", executions.size(), messageName); - return messageName; - } else { - LOG.debug("No executions found for message: {}", messageName); - } + .messageEventSubscriptionName(messageName) + .singleResult(); + if (execution != null) { + runtimeService.messageEventReceived(messageName, execution.getId()); + LOG.debug("Terminated task {} using message '{}'", customTaskId, messageName); + } else { + LOG.warn( + "No termination message subscription '{}' found for task {} (definition key '{}')", + messageName, + task.getId(), + taskDefinitionKey); } - - LOG.warn( - "No termination message found for task {} with definition key '{}'", - task.getId(), - task.getTaskDefinitionKey()); - return null; } catch (Exception e) { - LOG.error("Error finding termination message for task {}", task.getId(), e); - return null; + LOG.error("Error terminating task {}", task.getId(), e); } } @@ -1551,4 +1476,12 @@ public void terminateDuplicateInstances( public RuntimeService getRuntimeService() { return processEngine.getRuntimeService(); } + + public ManagementService getManagementService() { + return processEngine.getManagementService(); + } + + public RepositoryService getRepositoryService() { + return processEngine.getRepositoryService(); + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceListener.java index 2eaf691ae3bf..158215867995 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceListener.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceListener.java @@ -95,11 +95,16 @@ private void updateBusinessKey(String processInstanceId) { private void addWorkflowInstance( DelegateExecution execution, WorkflowInstanceRepository workflowInstanceRepository) { + String processKey = getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()); + String workflowDefinitionName = getMainWorkflowDefinitionNameFromTrigger(processKey); + if (workflowDefinitionName.equals(processKey)) { + LOG.debug( + "[WORKFLOW_INSTANCE_SKIP] ProcessInstance: {} - process key '{}' is not an OM trigger workflow, skipping", + execution.getProcessInstanceId(), + processKey); + return; + } updateBusinessKey(execution.getProcessInstanceId()); - - String workflowDefinitionName = - getMainWorkflowDefinitionNameFromTrigger( - getProcessDefinitionKeyFromId(execution.getProcessDefinitionId())); UUID workflowInstanceId = UUID.fromString(execution.getProcessInstanceBusinessKey()); workflowInstanceRepository.addNewWorkflowInstance( @@ -116,9 +121,15 @@ private void addWorkflowInstance( private void updateWorkflowInstance( DelegateExecution execution, WorkflowInstanceRepository workflowInstanceRepository) { - String workflowDefinitionName = - getMainWorkflowDefinitionNameFromTrigger( - getProcessDefinitionKeyFromId(execution.getProcessDefinitionId())); + String processKey = getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()); + String workflowDefinitionName = getMainWorkflowDefinitionNameFromTrigger(processKey); + if (workflowDefinitionName.equals(processKey)) { + LOG.debug( + "[WORKFLOW_INSTANCE_SKIP] ProcessInstance: {} - process key '{}' is not an OM trigger workflow, skipping", + execution.getProcessInstanceId(), + processKey); + return; + } UUID workflowInstanceId = UUID.fromString(execution.getProcessInstanceBusinessKey()); // Capture all variables including any failure indicators diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceStageListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceStageListener.java index 4a87401e13dd..2d80b6430a18 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceStageListener.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceStageListener.java @@ -135,18 +135,13 @@ private void addNewStage( String workflowDefinitionName = getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()); String processInstanceId = execution.getProcessInstanceId(); - - // Check business key first - critical for stage tracking String businessKey = execution.getProcessInstanceBusinessKey(); if (businessKey == null || businessKey.isEmpty()) { - LOG.error( - "[STAGE_MISSING_KEY] Workflow: {}, ProcessInstance: {} - Business key is missing for stage creation", - workflowDefinitionName, - processInstanceId); - throw new IllegalStateException( - String.format( - "Business key is missing for stage creation in workflow: %s", - workflowDefinitionName)); + LOG.warn( + "[STAGE_SKIP] ProcessInstance: {} (workflow: {}) - no business key, not an OM-managed process instance", + processInstanceId, + workflowDefinitionName); + return; } UUID workflowInstanceId = UUID.fromString(businessKey);