This example demonstrates the Work Queue (Task Queue) messaging pattern using AsyncAPI Python. In this pattern, tasks are distributed among multiple workers, with each task being processed by exactly one worker.
- 1:N Distribution: One producer sends tasks to multiple workers
- Load Balancing: Tasks are automatically distributed among available workers
- Reliability: Each task is delivered to exactly one worker (no duplication)
- Scalability: Add more workers to handle increased load
Producer → [Task Queue] → Worker 1
├→ Worker 2
└→ Worker 3
- Producer: Sends tasks to a durable queue
- Queue: AMQP queue that holds tasks until processed
- Workers: Multiple instances that compete for tasks
spec/common.asyncapi.yaml- Shared channel and message definitionsspec/producer.asyncapi.yaml- Task producer specificationspec/worker.asyncapi.yaml- Task worker specificationmain-producer.py- Task producer implementationmain-worker.py- Worker implementation (accepts worker ID argument)test_workqueue.py- Automated test demonstrating work queue behavior
-
Setup environment:
make venv install generate
-
Run the automated test:
make test-workqueue
-
Manual testing: Start multiple workers in separate terminals, then run producer:
# Terminal 1 make worker1 # Terminal 2 make worker2 # Terminal 3 make worker3 # Terminal 4 - Send tasks make producer
- ✅ Each task is processed by exactly one worker
- ✅ Tasks are distributed among available workers
- ✅ Workers can be added/removed dynamically
- ✅ Queue persists tasks if no workers are available
- ✅ Failed tasks can be retried (depending on configuration)
The work queue uses:
- Queue Type: Durable, non-exclusive queue
- Routing: Direct routing to named queue
- Delivery: Round-robin distribution among consumers
- Acknowledgment: Manual ACK for reliability
This pattern is ideal for background job processing, image processing pipelines, email sending, and other scalable task processing scenarios.