feat: Workflow Message Queue (WMQ) — push messages into running workflows#917
Closed
mp-orkes wants to merge 13 commits into
Closed
feat: Workflow Message Queue (WMQ) — push messages into running workflows#917mp-orkes wants to merge 13 commits into
mp-orkes wants to merge 13 commits into
Conversation
mp-orkes
added a commit
to conductor-oss/python-sdk
that referenced
this pull request
Mar 25, 2026
Implements the WMQ feature from conductor-oss/conductor#917: - POST /workflow/{workflowId}/message via WorkflowResourceApi.send_workflow_message - executor.send_message / workflow_client.send_message convenience methods - PullWorkflowMessagesTask for consuming messages inside workflow definitions - TaskType.PULL_WORKFLOW_MESSAGES enum value - WorkflowMessage model (id, workflow_id, payload, received_at) - docs/workflow-message-queue.md usage guide with examples Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
6 tasks
40b7486 to
ace95d3
Compare
Introduces a per-workflow message queue that allows external systems to push messages into a running workflow, consumed by the new PULL_WORKFLOW_MESSAGES system task. Designed for agentic loops, webhook-driven workflows, and notification pipelines. Key changes: - WorkflowMessage POJO and WorkflowMessageQueueDAO interface - InMemoryWorkflowMessageQueueDAO (default) and RedisWorkflowMessageQueueDAO - WorkflowMessageQueueProperties (@ConfigurationProperties) - PullWorkflowMessages async system task with 1-second polling via getEvaluationOffset() - PullWorkflowMessagesTaskMapper for DeciderService integration - REST endpoint: POST /api/workflow/{workflowId}/messages - Feature-flagged via conductor.workflow-message-queue.enabled=true - Architecture and implementation docs Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…geQueueDAO synchronized methods already serialize all access; ConcurrentHashMap was providing no additional benefit and causing confusion.
prevents external mutation of queued messages after push, since the payload map is sourced from the Jackson-deserialized request body. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- replace LRANGE+LTRIM with a Lua script for atomic pop - replace check-then-push with a Lua script for atomic push with size enforcement - validate ttlSeconds at push time to prevent silent integer overflow
…nation Inject Optional<WorkflowMessageQueueDAO> into WorkflowExecutorOps so that when WMQ is enabled, the workflow's message queue is automatically cleaned up when the workflow reaches a terminal state (COMPLETED or TERMINATED). Optional injection ensures no-op behavior when WMQ is disabled. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Add LOGGER for observability - Narrow catch(Exception) to catch(NotFoundException) on initial workflow lookup so unexpected exceptions propagate rather than silently 404 - Add TOCTOU re-validation after push: if the workflow has transitioned out of RUNNING state between the initial check and push, clean up and return 409 CONFLICT instead of silently accepting the message - Log swallowed decide() exceptions at WARN level so failures are visible without breaking the request Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The WMQ feature is opt-in. Setting enabled=false in the default application.properties ensures existing deployments are not affected until operators explicitly enable the feature. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
ace95d3 to
6bb814d
Compare
Contributor
Author
|
Closing in favor of a new PR from branch feat/workflow-message-queue. |
mp-orkes
added a commit
to conductor-oss/python-sdk
that referenced
this pull request
Apr 6, 2026
Implements the WMQ feature from conductor-oss/conductor#917: - POST /workflow/{workflowId}/message via WorkflowResourceApi.send_workflow_message - executor.send_message / workflow_client.send_message convenience methods - PullWorkflowMessagesTask for consuming messages inside workflow definitions - TaskType.PULL_WORKFLOW_MESSAGES enum value - WorkflowMessage model (id, workflow_id, payload, received_at) - docs/workflow-message-queue.md usage guide with examples Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What is this?
The Workflow Message Queue (WMQ) is a new opt-in feature that lets external systems push arbitrary JSON messages into a running workflow instance. The workflow consumes those messages at defined checkpoints using a new system task:
PULL_WORKFLOW_MESSAGES.This fills a gap that existing Conductor mechanisms don't cover:
Usage
1. Enable the feature
2. Add a
PULL_WORKFLOW_MESSAGEStask to your workflow{ "name": "wait_for_approval", "taskReferenceName": "wait_for_approval_ref", "type": "PULL_WORKFLOW_MESSAGES", "inputParameters": { "batchSize": 1 } }The task stays
IN_PROGRESSuntil at least one message arrives, then completes with the messages in its output:{ "messages": [ { "id": "3f2504e0-...", "workflowId": "8e2c14e1-...", "payload": { "decision": "approved", "approvedBy": "user@example.com" }, "receivedAt": "2025-06-15T10:30:00Z" } ], "count": 1 }3. Push a message from an external system
Returns the generated message ID. Rejects with
409 Conflictif the workflow is notRUNNING.When the feature flag is off, the endpoint does not exist at all (absent from Swagger, returns 404).
Use cases
Implementation overview
Full details are in the docs added with this PR:
docs/workflow-message-queue-architecture.md— design rationale, data flows, failure modes, security considerationsdocs/workflow-message-queue-implementation.md— class-level reference for every file added or modifiedComponents added
WorkflowMessagecommonid,workflowId,payload,receivedAt)WorkflowMessageQueueDAOcorepush,pop,size,deleteWorkflowMessageQueuePropertiescore@ConfigurationPropertiesfor all WMQ settingsInMemoryWorkflowMessageQueueDAOcoreConcurrentHashMapPullWorkflowMessagescoregetEvaluationOffset()PullWorkflowMessagesTaskMappercoreDeciderServicecan schedule the taskRedisWorkflowMessageQueueDAOredis-persistenceAnyRedisConditionis true)WorkflowMessageQueueResourcerestFiles modified
TaskType.javaPULL_WORKFLOW_MESSAGESenum value and string constantJedisProxy.javarpush,llen,lrange,ltrimlist operationsapplication.propertiesTest plan
PULL_WORKFLOW_MESSAGEStask, verify task entersIN_PROGRESSPOST /api/workflow/{id}/messages, verify task completes within ~1s with message in outputbatchSize > 1, verify all are returned in a single completion409 Conflictresponseconductor.workflow-message-queue.enabled=false, verify endpoint returns 404 and task type is unknownmaxQueueSizeis enforced (push beyond limit returns error)🤖 Generated with Claude Code