Skip to content

feat: Workflow Message Queue (WMQ) — push messages into running workflows#917

Closed
mp-orkes wants to merge 13 commits into
mainfrom
workflow/message-queue
Closed

feat: Workflow Message Queue (WMQ) — push messages into running workflows#917
mp-orkes wants to merge 13 commits into
mainfrom
workflow/message-queue

Conversation

@mp-orkes
Copy link
Copy Markdown
Contributor

What is this?

The Workflow Message Queue (WMQ) is a new opt-in feature that lets external systems push arbitrary JSON messages into a running workflow instance. The workflow consumes those messages at defined checkpoints using a new system task: PULL_WORKFLOW_MESSAGES.

This fills a gap that existing Conductor mechanisms don't cover:

  • Event handlers operate at the workflow-definition level and can't target a specific running instance by ID.
  • WAIT task has no structured payload — unblocking it carries no data.
  • HTTP task requires the workflow to reach out. WMQ inverts that: the workflow receives data pushed by an external party.

Usage

1. Enable the feature

conductor.workflow-message-queue.enabled=true
conductor.workflow-message-queue.maxQueueSize=1000
conductor.workflow-message-queue.ttlSeconds=86400
conductor.workflow-message-queue.maxBatchSize=100

2. Add a PULL_WORKFLOW_MESSAGES task to your workflow

{
  "name": "wait_for_approval",
  "taskReferenceName": "wait_for_approval_ref",
  "type": "PULL_WORKFLOW_MESSAGES",
  "inputParameters": {
    "batchSize": 1
  }
}

The task stays IN_PROGRESS until at least one message arrives, then completes with the messages in its output:

{
  "messages": [
    {
      "id": "3f2504e0-...",
      "workflowId": "8e2c14e1-...",
      "payload": { "decision": "approved", "approvedBy": "user@example.com" },
      "receivedAt": "2025-06-15T10:30:00Z"
    }
  ],
  "count": 1
}

3. Push a message from an external system

POST /api/workflow/{workflowId}/messages
Content-Type: application/json

{ "decision": "approved", "approvedBy": "user@example.com" }

Returns the generated message ID. Rejects with 409 Conflict if the workflow is not RUNNING.

When the feature flag is off, the endpoint does not exist at all (absent from Swagger, returns 404).

Use cases

Use case How WMQ helps
Agentic / agent loops An AI agent workflow waits for tool results or human confirmations. The caller pushes a message; the loop unblocks.
Webhook-driven workflows An async HTTP callback injects data into a paused workflow. The callback target is the WMQ push endpoint.
Human-in-the-loop Structured approval decisions are injected into a running workflow by an operator tool or UI.
Notification pipelines A workflow loops and reads messages in configurable batches, then fans out.

Implementation overview

Full details are in the docs added with this PR:

Components added

Component Module Purpose
WorkflowMessage common POJO for a single message (id, workflowId, payload, receivedAt)
WorkflowMessageQueueDAO core Interface: push, pop, size, delete
WorkflowMessageQueueProperties core @ConfigurationProperties for all WMQ settings
InMemoryWorkflowMessageQueueDAO core Default fallback DAO using ConcurrentHashMap
PullWorkflowMessages core Async system task; polls every 1s via getEvaluationOffset()
PullWorkflowMessagesTaskMapper core Task mapper so DeciderService can schedule the task
RedisWorkflowMessageQueueDAO redis-persistence Redis List-backed DAO (active when AnyRedisCondition is true)
WorkflowMessageQueueResource rest REST controller for the push endpoint

Files modified

File Change
TaskType.java Added PULL_WORKFLOW_MESSAGES enum value and string constant
JedisProxy.java Added rpush, llen, lrange, ltrim list operations
application.properties WMQ feature enabled by default in the OSS server

Test plan

  • Start server, create a workflow with a PULL_WORKFLOW_MESSAGES task, verify task enters IN_PROGRESS
  • Push a message via POST /api/workflow/{id}/messages, verify task completes within ~1s with message in output
  • Push multiple messages with batchSize > 1, verify all are returned in a single completion
  • Push to a completed workflow, verify 409 Conflict response
  • Set conductor.workflow-message-queue.enabled=false, verify endpoint returns 404 and task type is unknown
  • Verify maxQueueSize is enforced (push beyond limit returns error)

🤖 Generated with Claude Code

@mp-orkes mp-orkes requested a review from v1r3n March 24, 2026 22:57
@mp-orkes mp-orkes self-assigned this Mar 24, 2026
mp-orkes added a commit to conductor-oss/python-sdk that referenced this pull request Mar 25, 2026
Implements the WMQ feature from conductor-oss/conductor#917:
- POST /workflow/{workflowId}/message via WorkflowResourceApi.send_workflow_message
- executor.send_message / workflow_client.send_message convenience methods
- PullWorkflowMessagesTask for consuming messages inside workflow definitions
- TaskType.PULL_WORKFLOW_MESSAGES enum value
- WorkflowMessage model (id, workflow_id, payload, received_at)
- docs/workflow-message-queue.md usage guide with examples

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@mp-orkes mp-orkes force-pushed the workflow/message-queue branch 2 times, most recently from 40b7486 to ace95d3 Compare March 31, 2026 14:34
mp-orkes and others added 12 commits April 1, 2026 11:35
Introduces a per-workflow message queue that allows external systems to push
messages into a running workflow, consumed by the new PULL_WORKFLOW_MESSAGES
system task. Designed for agentic loops, webhook-driven workflows, and
notification pipelines.

Key changes:
- WorkflowMessage POJO and WorkflowMessageQueueDAO interface
- InMemoryWorkflowMessageQueueDAO (default) and RedisWorkflowMessageQueueDAO
- WorkflowMessageQueueProperties (@ConfigurationProperties)
- PullWorkflowMessages async system task with 1-second polling via getEvaluationOffset()
- PullWorkflowMessagesTaskMapper for DeciderService integration
- REST endpoint: POST /api/workflow/{workflowId}/messages
- Feature-flagged via conductor.workflow-message-queue.enabled=true
- Architecture and implementation docs

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…geQueueDAO

synchronized methods already serialize all access; ConcurrentHashMap was
providing no additional benefit and causing confusion.
prevents external mutation of queued messages after push, since the
payload map is sourced from the Jackson-deserialized request body.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- replace LRANGE+LTRIM with a Lua script for atomic pop
- replace check-then-push with a Lua script for atomic push with size enforcement
- validate ttlSeconds at push time to prevent silent integer overflow
…nation

Inject Optional<WorkflowMessageQueueDAO> into WorkflowExecutorOps so that
when WMQ is enabled, the workflow's message queue is automatically cleaned up
when the workflow reaches a terminal state (COMPLETED or TERMINATED).
Optional injection ensures no-op behavior when WMQ is disabled.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Add LOGGER for observability
- Narrow catch(Exception) to catch(NotFoundException) on initial workflow
  lookup so unexpected exceptions propagate rather than silently 404
- Add TOCTOU re-validation after push: if the workflow has transitioned out
  of RUNNING state between the initial check and push, clean up and return
  409 CONFLICT instead of silently accepting the message
- Log swallowed decide() exceptions at WARN level so failures are visible
  without breaking the request

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The WMQ feature is opt-in. Setting enabled=false in the default
application.properties ensures existing deployments are not affected
until operators explicitly enable the feature.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@mp-orkes mp-orkes force-pushed the workflow/message-queue branch from ace95d3 to 6bb814d Compare April 1, 2026 14:35
@mp-orkes
Copy link
Copy Markdown
Contributor Author

mp-orkes commented Apr 6, 2026

Closing in favor of a new PR from branch feat/workflow-message-queue.

@mp-orkes mp-orkes closed this Apr 6, 2026
@mp-orkes mp-orkes deleted the workflow/message-queue branch April 6, 2026 17:19
mp-orkes added a commit to conductor-oss/python-sdk that referenced this pull request Apr 6, 2026
Implements the WMQ feature from conductor-oss/conductor#917:
- POST /workflow/{workflowId}/message via WorkflowResourceApi.send_workflow_message
- executor.send_message / workflow_client.send_message convenience methods
- PullWorkflowMessagesTask for consuming messages inside workflow definitions
- TaskType.PULL_WORKFLOW_MESSAGES enum value
- WorkflowMessage model (id, workflow_id, payload, received_at)
- docs/workflow-message-queue.md usage guide with examples

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant