Fix Flowable workflow termination message lookup and async-job race on deletion#27117
Fix Flowable workflow termination message lookup and async-job race on deletion#27117
Conversation
…n deletion WorkflowHandler.findTerminationMessageName was splitting task definition keys on underscore and looking for messages named '<key>_terminateProcess'. But Workflow.getFlowableElementId joins with a dot, so actual task keys are '<subProcess>.approvalTask' and termination messages are '<subProcess>.terminateProcess'. The bug has been latent since the original Custom Workflows commit - every conflict-resolution path silently failed to terminate the old subprocess, leaving it dangling until force-delete. Fix: split on the last dot, derive the subprocess ID, and check for the '<subProcessId>.terminateProcess' subscription directly. Remove the dead 'terminateChangeReviewProcess' pattern and the debug scaffolding loops. Also add getManagementService/getRepositoryService getters to WorkflowHandler for use in integration tests. In WorkflowDefinitionResourceIT, replace waitForWorkflowIdle to check pending async jobs on the trigger process definition IDs instead of waiting for process instances to drain. Async-after jobs on the fetch task are ephemeral (<100ms) - waiting for them has near-zero cost, whereas the old approach waited for the entire batch loop (potentially minutes with many entities). Also remove the waitForWorkflowIdle call from cleanupCreatedWorkflows @AfterEach since it ran a query per workflow in every test unnecessarily.
There was a problem hiding this comment.
Pull request overview
Fixes a long-standing Flowable custom-workflow termination bug by aligning termination message lookup with the actual element/message naming scheme, and hardens integration-test cleanup by waiting for short-lived async jobs to drain before deleting workflow deployments.
Changes:
- Update
WorkflowHandler.findTerminationMessageName(...)to derive<subProcessId>.terminateProcessfrom task definition keys using the last.separator and validate via message subscription query. - Add
ManagementService/RepositoryServiceaccessors onWorkflowHandlerto support test-side Flowable queries. - Update
WorkflowDefinitionResourceITcleanup paths to wait for trigger process async jobs to drain (instead of waiting for process instances to finish), and avoid per-test unconditional idle-waits.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java | Correct termination message name resolution to match Flowable element IDs; add Flowable service getters for test usage. |
| openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java | Add waitForWorkflowIdle to avoid async-job vs deletion race during workflow cleanup and reduce test suite latency. |
...-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java
Outdated
Show resolved
Hide resolved
🟡 Playwright Results — all passed (23 flaky)✅ 3595 passed · ❌ 0 failed · 🟡 23 flaky · ⏭️ 207 skipped
🟡 23 flaky test(s) (passed on retry)
How to debug locally# Download playwright-test-results-<shard> artifact and unzip
npx playwright show-trace path/to/trace.zip # view trace |
…ocesses WorkflowInstanceListener and WorkflowInstanceStageListener were firing on Flowable process instances with numeric keys (e.g. 239376, 239373) that have no corresponding OM workflow definition, causing EntityNotFoundException noise in AUT runs. - WorkflowInstanceListener: skip addWorkflowInstance/updateWorkflowInstance when getMainWorkflowDefinitionNameFromTrigger returns the raw process key unchanged (no 'Trigger' suffix to strip = not an OM trigger workflow) - WorkflowInstanceStageListener: skip addNewStage/updateStage when the process key is purely numeric (isUnknownFlowableProcess guard) - WorkflowDefinitionResourceIT: fix waitForWorkflowIdle to use TriggerFactory.getTriggerWorkflowId(fqn) instead of name+"Trigger%" so it correctly queries by FQN-based process definition key
...-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java
Outdated
Show resolved
Hide resolved
| private boolean isUnknownFlowableProcess(String processKey) { | ||
| return processKey.matches("\\d+"); | ||
| } |
There was a problem hiding this comment.
isUnknownFlowableProcess treats any numeric-only process key as “not an OM workflow”. WorkflowDefinition name/fullyQualifiedName uses entityName, which allows numeric-only values, so a workflow named e.g. "123" would have stage tracking silently disabled. Consider detecting non-OM processes via presence/format of the expected businessKey/variables (or validating/disallowing numeric-only workflow names) rather than processKey.matches("\\d+").
| * picked up and executed. These jobs are ephemeral (complete in <100ms) but if the deployment | ||
| * is deleted while one is in flight the Flowable job executor NPEs when it tries to resolve the | ||
| * process definition. Returns immediately for event-based workflows that have no | ||
| * "<name>Trigger-*" process definitions deployed, or for periodic-batch workflows whose |
There was a problem hiding this comment.
The Javadoc says this returns immediately for event-based workflows that have no "Trigger-*" process definitions, but the code queries processDefinitionKeyLike(... + "%"), which matches both "Trigger" and "Trigger-...". Update the comment to reflect the actual matching criteria ("Trigger%") so future readers don’t infer the wrong behavior.
| * "<name>Trigger-*" process definitions deployed, or for periodic-batch workflows whose | |
| * "<name>Trigger%" process definitions deployed, or for periodic-batch workflows whose |
| List<ProcessDefinition> triggerPDs = | ||
| repositoryService | ||
| .createProcessDefinitionQuery() | ||
| .processDefinitionKeyLike(TriggerFactory.getTriggerWorkflowId(workflowFqn) + "%") | ||
| .list(); |
There was a problem hiding this comment.
processDefinitionKeyLike(...) uses SQL LIKE semantics where _ and % are wildcards. Since workflow names/FQNs in tests can include _ (e.g., via ns.prefix(...)), this query may pick up unrelated process definitions and make the wait slower/flaky. Consider filtering the returned list by pd.getKey().startsWith(baseKey) (or otherwise escaping LIKE wildcards) before collecting PD IDs.
| List<ProcessDefinition> triggerPDs = | |
| repositoryService | |
| .createProcessDefinitionQuery() | |
| .processDefinitionKeyLike(TriggerFactory.getTriggerWorkflowId(workflowFqn) + "%") | |
| .list(); | |
| String triggerWorkflowId = TriggerFactory.getTriggerWorkflowId(workflowFqn); | |
| List<ProcessDefinition> triggerPDs = | |
| repositoryService | |
| .createProcessDefinitionQuery() | |
| .processDefinitionKeyLike(triggerWorkflowId + "%") | |
| .list() | |
| .stream() | |
| .filter(pd -> pd.getKey() != null && pd.getKey().startsWith(triggerWorkflowId)) | |
| .toList(); |
| .createExecutionQuery() | ||
| .processInstanceId(task.getProcessInstanceId()) | ||
| .messageEventSubscriptionName(messageName) | ||
| .count() | ||
| > 0; |
There was a problem hiding this comment.
findTerminationMessageName queries Flowable for the message subscription via .count(), and terminateTaskProcessInstance immediately runs a second query (.singleResult()) for the same messageEventSubscriptionName. You can avoid the extra round-trip by having one of these queries serve both purposes (e.g., fetch the Execution once and branch on null).
- WorkflowInstanceStageListener: replace fragile numeric-key guard with
businessKey-based guard — skip stage tracking when no business key is
present (non-OM-managed process instance) instead of using processKey
format heuristic that could misfire on numeric-named workflows
- WorkflowDefinitionResourceIT: fix Javadoc comment ("Trigger-*" ->
"Trigger%") and add startsWith filter after processDefinitionKeyLike
query to prevent SQL LIKE wildcard '_' from matching unrelated PDs
- WorkflowHandler: eliminate double Flowable round-trip in termination
logic by merging findTerminationMessageName into terminateTask helper
that does a single singleResult() query per task
|
The Java checkstyle failed. Please run You can install the pre-commit hooks with |
...c/main/java/org/openmetadata/service/governance/workflows/WorkflowInstanceStageListener.java
Outdated
Show resolved
Hide resolved
|
@gitar-bot Fix the Edge Case: Silent skip on null businessKey uses DEBUG; WARN would aid diagnosis and do mvn spotless:apply on the root of the repository and push it |
…ceStageListener Co-authored-by: Ram Narayan Balaji <81347100+yan-3005@users.noreply.github.com>
Done. Changed |
Code Review ✅ Approved 2 resolved / 2 findingsFixes Flowable workflow termination message lookup and async-job race on deletion by correcting variable references and improving logging verbosity. Two issues were addressed: ✅ 2 resolved✅ Bug: waitForWorkflowIdle uses undefined
|
| Compact |
|
Was this helpful? React with 👍 / 👎 | Gitar
|
…n deletion (open-metadata#27117) * Fix Flowable workflow termination message lookup and async-job race on deletion WorkflowHandler.findTerminationMessageName was splitting task definition keys on underscore and looking for messages named '<key>_terminateProcess'. But Workflow.getFlowableElementId joins with a dot, so actual task keys are '<subProcess>.approvalTask' and termination messages are '<subProcess>.terminateProcess'. The bug has been latent since the original Custom Workflows commit - every conflict-resolution path silently failed to terminate the old subprocess, leaving it dangling until force-delete. Fix: split on the last dot, derive the subprocess ID, and check for the '<subProcessId>.terminateProcess' subscription directly. Remove the dead 'terminateChangeReviewProcess' pattern and the debug scaffolding loops. Also add getManagementService/getRepositoryService getters to WorkflowHandler for use in integration tests. In WorkflowDefinitionResourceIT, replace waitForWorkflowIdle to check pending async jobs on the trigger process definition IDs instead of waiting for process instances to drain. Async-after jobs on the fetch task are ephemeral (<100ms) - waiting for them has near-zero cost, whereas the old approach waited for the entire batch loop (potentially minutes with many entities). Also remove the waitForWorkflowIdle call from cleanupCreatedWorkflows @AfterEach since it ran a query per workflow in every test unnecessarily. * Fix spurious EntityNotFoundException errors from orphaned Flowable processes WorkflowInstanceListener and WorkflowInstanceStageListener were firing on Flowable process instances with numeric keys (e.g. 239376, 239373) that have no corresponding OM workflow definition, causing EntityNotFoundException noise in AUT runs. - WorkflowInstanceListener: skip addWorkflowInstance/updateWorkflowInstance when getMainWorkflowDefinitionNameFromTrigger returns the raw process key unchanged (no 'Trigger' suffix to strip = not an OM trigger workflow) - WorkflowInstanceStageListener: skip addNewStage/updateStage when the process key is purely numeric (isUnknownFlowableProcess guard) - WorkflowDefinitionResourceIT: fix waitForWorkflowIdle to use TriggerFactory.getTriggerWorkflowId(fqn) instead of name+"Trigger%" so it correctly queries by FQN-based process definition key * Fix compilation issues * Address Copilot review feedback on PR open-metadata#27117 - WorkflowInstanceStageListener: replace fragile numeric-key guard with businessKey-based guard — skip stage tracking when no business key is present (non-OM-managed process instance) instead of using processKey format heuristic that could misfire on numeric-named workflows - WorkflowDefinitionResourceIT: fix Javadoc comment ("Trigger-*" -> "Trigger%") and add startsWith filter after processDefinitionKeyLike query to prevent SQL LIKE wildcard '_' from matching unrelated PDs - WorkflowHandler: eliminate double Flowable round-trip in termination logic by merging findTerminationMessageName into terminateTask helper that does a single singleResult() query per task * fix: change DEBUG to WARN for null businessKey skip in WorkflowInstanceStageListener Co-authored-by: Ram Narayan Balaji <81347100+yan-3005@users.noreply.github.com> --------- Co-authored-by: Gitar <noreply@gitar.ai>
…n deletion (open-metadata#27117) * Fix Flowable workflow termination message lookup and async-job race on deletion WorkflowHandler.findTerminationMessageName was splitting task definition keys on underscore and looking for messages named '<key>_terminateProcess'. But Workflow.getFlowableElementId joins with a dot, so actual task keys are '<subProcess>.approvalTask' and termination messages are '<subProcess>.terminateProcess'. The bug has been latent since the original Custom Workflows commit - every conflict-resolution path silently failed to terminate the old subprocess, leaving it dangling until force-delete. Fix: split on the last dot, derive the subprocess ID, and check for the '<subProcessId>.terminateProcess' subscription directly. Remove the dead 'terminateChangeReviewProcess' pattern and the debug scaffolding loops. Also add getManagementService/getRepositoryService getters to WorkflowHandler for use in integration tests. In WorkflowDefinitionResourceIT, replace waitForWorkflowIdle to check pending async jobs on the trigger process definition IDs instead of waiting for process instances to drain. Async-after jobs on the fetch task are ephemeral (<100ms) - waiting for them has near-zero cost, whereas the old approach waited for the entire batch loop (potentially minutes with many entities). Also remove the waitForWorkflowIdle call from cleanupCreatedWorkflows @AfterEach since it ran a query per workflow in every test unnecessarily. * Fix spurious EntityNotFoundException errors from orphaned Flowable processes WorkflowInstanceListener and WorkflowInstanceStageListener were firing on Flowable process instances with numeric keys (e.g. 239376, 239373) that have no corresponding OM workflow definition, causing EntityNotFoundException noise in AUT runs. - WorkflowInstanceListener: skip addWorkflowInstance/updateWorkflowInstance when getMainWorkflowDefinitionNameFromTrigger returns the raw process key unchanged (no 'Trigger' suffix to strip = not an OM trigger workflow) - WorkflowInstanceStageListener: skip addNewStage/updateStage when the process key is purely numeric (isUnknownFlowableProcess guard) - WorkflowDefinitionResourceIT: fix waitForWorkflowIdle to use TriggerFactory.getTriggerWorkflowId(fqn) instead of name+"Trigger%" so it correctly queries by FQN-based process definition key * Fix compilation issues * Address Copilot review feedback on PR open-metadata#27117 - WorkflowInstanceStageListener: replace fragile numeric-key guard with businessKey-based guard — skip stage tracking when no business key is present (non-OM-managed process instance) instead of using processKey format heuristic that could misfire on numeric-named workflows - WorkflowDefinitionResourceIT: fix Javadoc comment ("Trigger-*" -> "Trigger%") and add startsWith filter after processDefinitionKeyLike query to prevent SQL LIKE wildcard '_' from matching unrelated PDs - WorkflowHandler: eliminate double Flowable round-trip in termination logic by merging findTerminationMessageName into terminateTask helper that does a single singleResult() query per task * fix: change DEBUG to WARN for null businessKey skip in WorkflowInstanceStageListener Co-authored-by: Ram Narayan Balaji <81347100+yan-3005@users.noreply.github.com> --------- Co-authored-by: Gitar <noreply@gitar.ai>



Summary
WorkflowHandler.findTerminationMessageNamewas splitting task definition keys on_and searching for messages named<key>_terminateProcess.Workflow.getFlowableElementIdjoins with a., so actual task keys are<subProcess>.approvalTaskand messages are<subProcess>.terminateProcess. Every conflict-resolution path (a newer workflow instance replacing an older one for the same entity) silently failed since the original Custom Workflows commit — the old subprocess was never terminated via message, left dangling until force-delete. Fixed by splitting on the last.and looking up<subProcessId>.terminateProcessdirectly.waitForWorkflowIdlenow waits for async jobs on the trigger process definition IDs rather than waiting for process instances to drain. The async-after jobs onPeriodicBatchEntityTrigger's fetch task are ephemeral (<100ms); the old approach waited for the entire batch loop to finish (potentially minutes with many entities), causing test suite time to blow up.waitForWorkflowIdlefromcleanupCreatedWorkflows@AfterEach— it ran a Flowable query per workflow in every test regardless of whether the workflow was ever triggered.Test plan
WorkflowDefinitionResourceIT#test_DataCompletenessWorkflow_SDK— zeroNullPointerException/JOB_EXECUTION_FAILURElinesWorkflowDefinitionResourceIT#test_TagApprovalWorkflow_SDK— zeroNo termination message foundwarnings; no 60sConditionTimeoutExceptiontimeoutWorkflowDefinitionResourceITsuite — no regressions, test time back to baseline (~2m30s)