Skip to content

Commit 9e17f90

Browse files
yan-3005gitar-bot
andauthored
fix: resolve governance workflow deadlock and remove WorkflowTransactionManager (#26452)
* fix: resolve governance workflow deadlock and remove WorkflowTransactionManager * chore: apply changes Co-authored-by: yan-3005 <yan-3005@users.noreply.github.com> * Comments: 1. WorkflowDefinitionResource.java: Removed RETRY_CONFIG, RETRY, isTransientDatabaseError, and the 4 dead imports (Retry, RetryConfig, Duration, ExceptionUtils). 2. CollectionDAO.java: Removed LIMIT 1000 from listAllStatesForInstance query. --------- Co-authored-by: Gitar <noreply@gitar.ai> Co-authored-by: yan-3005 <yan-3005@users.noreply.github.com>
1 parent f6599b2 commit 9e17f90

7 files changed

Lines changed: 105 additions & 472 deletions

File tree

openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java

Lines changed: 46 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1474,44 +1474,65 @@ public void terminateDuplicateInstances(
14741474
return;
14751475
}
14761476

1477+
// Terminate Flowable process instances OUTSIDE any JDBI transaction.
1478+
// Calling runtimeService.deleteProcessInstance() inside a JDBI transaction causes a race
1479+
// condition: the uncommitted DELETE on ACT_RU_EXECUTION holds an X-lock, Flowable's async
1480+
// job executor concurrently tries to INSERT a timer job referencing that execution (FK
1481+
// S-lock wait), and when the JDBI tx commits the execution is gone, so the timer INSERT
1482+
// fails with SQLIntegrityConstraintViolationException.
1483+
for (WorkflowInstance instance : conflictingInstances) {
1484+
ProcessInstance mainInstance =
1485+
runningProcessInstances.stream()
1486+
.filter(
1487+
pi ->
1488+
pi.getBusinessKey() != null
1489+
&& pi.getBusinessKey().equals(instance.getId().toString()))
1490+
.findFirst()
1491+
.orElse(null);
1492+
1493+
if (mainInstance != null) {
1494+
String processId = mainInstance.getId();
1495+
long activeUserTasks =
1496+
processEngine
1497+
.getTaskService()
1498+
.createTaskQuery()
1499+
.processInstanceId(processId)
1500+
.active()
1501+
.count();
1502+
if (activeUserTasks == 0) {
1503+
LOG.debug(
1504+
"Process instance {} has no active user tasks — it is auto-completing; skipping external deletion",
1505+
processId);
1506+
continue;
1507+
}
1508+
LOG.info(
1509+
"Terminating main workflow instance {} for conflicting instance {}",
1510+
mainInstance.getId(),
1511+
instance.getId());
1512+
try {
1513+
runtimeService.deleteProcessInstance(
1514+
processId, "Terminated due to conflicting workflow instance");
1515+
} catch (FlowableObjectNotFoundException e) {
1516+
LOG.debug(
1517+
"Process instance {} already completed before termination, skipping", processId);
1518+
}
1519+
}
1520+
}
1521+
14771522
Entity.getJdbi()
14781523
.inTransaction(
14791524
TransactionIsolationLevel.READ_COMMITTED,
14801525
handle -> {
14811526
try {
1482-
// Terminate both trigger and main workflow instances
1483-
1484-
// Now terminate the main workflow instances that contain the user tasks
14851527
for (WorkflowInstance instance : conflictingInstances) {
1486-
ProcessInstance mainInstance =
1487-
runningProcessInstances.stream()
1488-
.filter(
1489-
pi ->
1490-
pi.getBusinessKey() != null
1491-
&& pi.getBusinessKey().equals(instance.getId().toString()))
1492-
.findFirst()
1493-
.orElse(null);
1494-
1495-
if (mainInstance != null) {
1496-
LOG.info(
1497-
"Terminating main workflow instance {} for conflicting instance {}",
1498-
mainInstance.getId(),
1499-
instance.getId());
1500-
runtimeService.deleteProcessInstance(
1501-
mainInstance.getId(), "Terminated due to conflicting workflow instance");
1502-
}
1503-
15041528
workflowInstanceStateRepository.markInstanceStatesAsFailed(
15051529
instance.getId(), "Terminated due to conflicting workflow instance");
15061530
workflowInstanceRepository.markInstanceAsFailed(
15071531
instance.getId(), "Terminated due to conflicting workflow instance");
15081532
}
1509-
15101533
return null;
15111534
} catch (Exception e) {
1512-
LOG.error(
1513-
"Failed to terminate conflicting instances in transaction: {}",
1514-
e.getMessage());
1535+
LOG.error("Failed to update instance states in transaction: {}", e.getMessage());
15151536
throw e;
15161537
}
15171538
});

0 commit comments

Comments
 (0)