Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.awaitility.core.ConditionTimeoutException;
import org.flowable.engine.ManagementService;
import org.flowable.engine.RepositoryService;
import org.flowable.engine.repository.ProcessDefinition;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
Expand Down Expand Up @@ -122,6 +126,8 @@
import org.openmetadata.sdk.exceptions.OpenMetadataException;
import org.openmetadata.sdk.network.HttpMethod;
import org.openmetadata.sdk.network.RequestOptions;
import org.openmetadata.service.governance.workflows.WorkflowHandler;
import org.openmetadata.service.governance.workflows.elements.TriggerFactory;
import org.openmetadata.service.resources.feeds.MessageParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -1898,6 +1904,8 @@ private void safeDeleteWorkflow(OpenMetadataClient client, String workflowName)
try {
WorkflowDefinition wd = client.workflowDefinitions().getByName(workflowName, null);

waitForWorkflowIdle(wd.getFullyQualifiedName());

// Force delete with hardDelete=true and recursive=true to clean up properly
executeWithDeadlockRetryVoid(
() -> {
Expand Down Expand Up @@ -1950,6 +1958,48 @@ private void safeDeleteWorkflow(OpenMetadataClient client, String workflowName)
}
}

/**
* Waits until any async-after jobs left by PeriodicBatchEntityTrigger's fetch task have been
* picked up and executed. These jobs are ephemeral (complete in <100ms) but if the deployment
* is deleted while one is in flight the Flowable job executor NPEs when it tries to resolve the
* process definition. Returns immediately for event-based workflows that have no
* "<name>Trigger%" process definitions deployed, or for periodic-batch workflows whose
* trigger PDs currently have no pending jobs.
*/
private void waitForWorkflowIdle(String workflowFqn) {
RepositoryService repositoryService = WorkflowHandler.getInstance().getRepositoryService();
String triggerWorkflowId = TriggerFactory.getTriggerWorkflowId(workflowFqn);
List<ProcessDefinition> triggerPDs =
repositoryService
.createProcessDefinitionQuery()
.processDefinitionKeyLike(triggerWorkflowId + "%")
.list()
.stream()
.filter(pd -> pd.getKey() != null && pd.getKey().startsWith(triggerWorkflowId))
.toList();
if (triggerPDs.isEmpty()) {
return;
}
ManagementService managementService = WorkflowHandler.getInstance().getManagementService();
List<String> triggerPDIds = triggerPDs.stream().map(ProcessDefinition::getId).toList();
try {
await("async-after jobs for " + workflowFqn + " trigger PDs to drain")
.atMost(Duration.ofSeconds(10))
.pollInterval(Duration.ofMillis(100))
.pollDelay(Duration.ZERO)
.ignoreExceptions()
.until(
() ->
triggerPDIds.stream()
.allMatch(
pdId ->
managementService.createJobQuery().processDefinitionId(pdId).count()
== 0));
} catch (ConditionTimeoutException timeout) {
LOG.warn("waitForWorkflowIdle({}) timed out after 10s; proceeding with delete", workflowFqn);
}
}

// Helper method to register a workflow for cleanup
private void trackWorkflow(String name, String id) {
if (name != null && id != null) {
Expand Down Expand Up @@ -2500,6 +2550,7 @@ void test_PeriodicBatchWorkflowEntityFiltering(TestNamespace ns, TestInfo test)
// Cleanup - Use hardDelete to prevent duplicate key violations on retries
if (workflowId != null) {
try {
waitForWorkflowIdle(workflowName);
Map<String, String> params = new HashMap<>();
params.put("hardDelete", "true");
client.workflowDefinitions().delete(workflowId, params);
Expand Down Expand Up @@ -2943,6 +2994,7 @@ void test_MultiEntityPeriodicQueryWithFilters(TestNamespace ns, TestInfo test)
try {
WorkflowDefinition wd =
client.workflowDefinitions().getByName("MultiEntityPeriodicQuery", null);
waitForWorkflowIdle(wd.getFullyQualifiedName());
executeWithDeadlockRetryVoid(
() -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.flowable.bpmn.converter.BpmnXMLConverter;
import org.flowable.bpmn.model.BpmnModel;
import org.flowable.bpmn.model.Message;
import org.flowable.common.engine.api.FlowableObjectNotFoundException;
import org.flowable.common.engine.impl.el.DefaultExpressionManager;
import org.flowable.engine.HistoryService;
Expand Down Expand Up @@ -1027,115 +1025,42 @@ public void terminateTaskProcessInstance(UUID customTaskId, String reason) {
.processVariableValueEquals("customTaskId", customTaskId.toString())
.list();
for (Task task : tasks) {
// Find the correct termination message for this task
String terminationMessageName = findTerminationMessageName(runtimeService, task);
if (terminationMessageName != null) {
Execution execution =
runtimeService
.createExecutionQuery()
.processInstanceId(task.getProcessInstanceId())
.messageEventSubscriptionName(terminationMessageName)
.singleResult();
if (execution != null) {
runtimeService.messageEventReceived(terminationMessageName, execution.getId());
LOG.debug(
"Terminated task {} using message '{}'", customTaskId, terminationMessageName);
} else {
LOG.warn(
"No execution found for termination message '{}' for task {}",
terminationMessageName,
customTaskId);
}
} else {
LOG.warn("No termination message found for task {}", customTaskId);
}
terminateTask(runtimeService, task, customTaskId);
}
} catch (FlowableObjectNotFoundException ex) {
LOG.debug("Flowable Task for Task ID {} not found.", customTaskId);
}
}

/**
* Find the termination message name for a task.
* Uses deterministic message names based on the subprocess ID.
*/
private String findTerminationMessageName(RuntimeService runtimeService, Task task) {
private void terminateTask(RuntimeService runtimeService, Task task, UUID customTaskId) {
try {
String taskDefinitionKey = task.getTaskDefinitionKey();
LOG.debug(
"Finding termination message for task {} with definition key '{}'",
task.getId(),
taskDefinitionKey);

// List all message event subscriptions for this process instance
List<Execution> allMessageExecutions =
int lastDot = taskDefinitionKey.lastIndexOf('.');
if (lastDot < 0) {
LOG.warn(
"Task definition key '{}' has no '.' separator; cannot derive termination message",
taskDefinitionKey);
return;
}
String messageName = taskDefinitionKey.substring(0, lastDot) + ".terminateProcess";
Execution execution =
runtimeService
.createExecutionQuery()
.processInstanceId(task.getProcessInstanceId())
.list();

LOG.debug(
"Found {} executions for process {}",
allMessageExecutions.size(),
task.getProcessInstanceId());

for (Execution exec : allMessageExecutions) {
if (exec.getActivityId() != null) {
LOG.debug("Execution {} has activity ID: {}", exec.getId(), exec.getActivityId());
}
}

// Get the BpmnModel to see what messages are available
BpmnModel model =
processEngine.getRepositoryService().getBpmnModel(task.getProcessDefinitionId());

LOG.debug("Available messages in model:");
for (Message msg : model.getMessages()) {
LOG.debug(" - Message ID: {}, Name: {}", msg.getId(), msg.getName());
}

// Extract the subprocess ID from the task definition key
// E.g., "ApproveGlossaryTerm_approvalTask" -> "ApproveGlossaryTerm"
String subProcessId =
taskDefinitionKey.contains("_")
? taskDefinitionKey.substring(0, taskDefinitionKey.lastIndexOf("_"))
: taskDefinitionKey;

LOG.debug(
"Extracted subprocess ID: '{}' from task key '{}'", subProcessId, taskDefinitionKey);

// Try both possible termination message patterns
// UserApprovalTask uses: subProcessId_terminateProcess
// ChangeReviewTask uses: subProcessId_terminateChangeReviewProcess
String[] messagePatterns = {
subProcessId + "_terminateProcess", subProcessId + "_terminateChangeReviewProcess"
};

for (String messageName : messagePatterns) {
LOG.debug("Checking for message subscription: {}", messageName);
List<Execution> executions =
runtimeService
.createExecutionQuery()
.processInstanceId(task.getProcessInstanceId())
.messageEventSubscriptionName(messageName)
.list();
if (!executions.isEmpty()) {
LOG.debug(
"Found {} executions with message subscription '{}'", executions.size(), messageName);
return messageName;
} else {
LOG.debug("No executions found for message: {}", messageName);
}
.messageEventSubscriptionName(messageName)
.singleResult();
if (execution != null) {
runtimeService.messageEventReceived(messageName, execution.getId());
LOG.debug("Terminated task {} using message '{}'", customTaskId, messageName);
} else {
LOG.warn(
"No termination message subscription '{}' found for task {} (definition key '{}')",
messageName,
task.getId(),
taskDefinitionKey);
}

LOG.warn(
"No termination message found for task {} with definition key '{}'",
task.getId(),
task.getTaskDefinitionKey());
return null;
} catch (Exception e) {
LOG.error("Error finding termination message for task {}", task.getId(), e);
return null;
LOG.error("Error terminating task {}", task.getId(), e);
}
}

Expand Down Expand Up @@ -1551,4 +1476,12 @@ public void terminateDuplicateInstances(
public RuntimeService getRuntimeService() {
return processEngine.getRuntimeService();
}

public ManagementService getManagementService() {
return processEngine.getManagementService();
}

public RepositoryService getRepositoryService() {
return processEngine.getRepositoryService();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,16 @@ private void updateBusinessKey(String processInstanceId) {

private void addWorkflowInstance(
DelegateExecution execution, WorkflowInstanceRepository workflowInstanceRepository) {
String processKey = getProcessDefinitionKeyFromId(execution.getProcessDefinitionId());
String workflowDefinitionName = getMainWorkflowDefinitionNameFromTrigger(processKey);
if (workflowDefinitionName.equals(processKey)) {
LOG.debug(
"[WORKFLOW_INSTANCE_SKIP] ProcessInstance: {} - process key '{}' is not an OM trigger workflow, skipping",
execution.getProcessInstanceId(),
processKey);
return;
}
updateBusinessKey(execution.getProcessInstanceId());

String workflowDefinitionName =
getMainWorkflowDefinitionNameFromTrigger(
getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()));
UUID workflowInstanceId = UUID.fromString(execution.getProcessInstanceBusinessKey());

workflowInstanceRepository.addNewWorkflowInstance(
Expand All @@ -116,9 +121,15 @@ private void addWorkflowInstance(

private void updateWorkflowInstance(
DelegateExecution execution, WorkflowInstanceRepository workflowInstanceRepository) {
String workflowDefinitionName =
getMainWorkflowDefinitionNameFromTrigger(
getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()));
String processKey = getProcessDefinitionKeyFromId(execution.getProcessDefinitionId());
String workflowDefinitionName = getMainWorkflowDefinitionNameFromTrigger(processKey);
if (workflowDefinitionName.equals(processKey)) {
LOG.debug(
"[WORKFLOW_INSTANCE_SKIP] ProcessInstance: {} - process key '{}' is not an OM trigger workflow, skipping",
execution.getProcessInstanceId(),
processKey);
return;
}
UUID workflowInstanceId = UUID.fromString(execution.getProcessInstanceBusinessKey());

// Capture all variables including any failure indicators
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,18 +135,13 @@ private void addNewStage(
String workflowDefinitionName =
getProcessDefinitionKeyFromId(execution.getProcessDefinitionId());
String processInstanceId = execution.getProcessInstanceId();

// Check business key first - critical for stage tracking
String businessKey = execution.getProcessInstanceBusinessKey();
if (businessKey == null || businessKey.isEmpty()) {
LOG.error(
"[STAGE_MISSING_KEY] Workflow: {}, ProcessInstance: {} - Business key is missing for stage creation",
workflowDefinitionName,
processInstanceId);
throw new IllegalStateException(
String.format(
"Business key is missing for stage creation in workflow: %s",
workflowDefinitionName));
LOG.warn(
"[STAGE_SKIP] ProcessInstance: {} (workflow: {}) - no business key, not an OM-managed process instance",
processInstanceId,
workflowDefinitionName);
return;
}
UUID workflowInstanceId = UUID.fromString(businessKey);

Expand Down
Loading