Yii Queue uses middlewares to run custom logic around message pushing and message processing.
A middleware is a piece of code that receives a request object and can either:
- change the request (for example, change the message, adapter, or error handling behavior) and continue the pipeline, or
- stop the pipeline by returning without calling the next handler.
The pipeline mechanism is similar to HTTP middleware, but applied to queue messages.
Common reasons to add middlewares:
- Collect metrics You can count pushed/processed messages, measure handler duration, or measure time between push and consume.
- Add tracing / correlation data You can put trace ids or correlation ids into message metadata so logs from producer/consumer are connected.
- Logging and observability You can log message ids, queue names, attempts, and failures in a consistent way.
- Modify the message payload You can obfuscate sensitive data, normalize payload, add extra fields required by consumers, or wrap a message into envelopes.
- Route and schedule You can switch queue, choose a different adapter, or add delay when the adapter supports it.
Each message may pass through three independent pipelines:
- Push pipeline (executed when calling
QueueInterface::push()). - Consume pipeline (executed when a worker processes a message).
- Failure handling pipeline (executed when message processing throws a
Throwable).
The execution order inside a pipeline is forward in the same order you configured middlewares.
graph LR
StartPush((Start)) --> PushMiddleware1[$middleware1] --> PushMiddleware2[$middleware2] --> Push(Push to a queue)
-.-> PushMiddleware2[$middleware2] -.-> PushMiddleware1[$middleware1]
PushMiddleware1[$middleware1] -.-> EndPush((End))
StartConsume((Start)) --> ConsumeMiddleware1[$middleware1] --> ConsumeMiddleware2[$middleware2] --> Consume(Consume / handle)
-.-> ConsumeMiddleware2[$middleware2] -.-> ConsumeMiddleware1[$middleware1]
ConsumeMiddleware1[$middleware1] -.-> EndConsume((End))
Consume -- Throwable --> StartFailure((Start failure))
StartFailure --> FailureMiddleware1[$failure1] --> FailureMiddleware2[$failure2] --> Failure(Handle failure / retry / requeue)
-.-> FailureMiddleware2[$failure2] -.-> FailureMiddleware1[$failure1]
FailureMiddleware1[$failure1] -.-> EndFailure((End failure))
You can use any of these formats:
- A ready-to-use middleware object.
- An array in the format of yiisoft/definitions, which defines a middleware implementation.
- A string for your DI container to resolve the middleware, e.g.
FooMiddleware::class. - An extended callable definition. A callable should either be a middleware itself or return a configured middleware object.
Note: The formats above are supported by the default
PushMiddlewareFactory. When using a customPushMiddlewareFactoryInterfaceimplementation, it may accept additional definition formats.
The required interface depends on the pipeline:
- Push:
Yiisoft\Queue\Middleware\Push\PushMiddlewareInterface - Consume:
Yiisoft\Queue\Middleware\Consume\ConsumeMiddlewareInterface - Failure handling:
Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareInterface
The push pipeline is executed when calling QueueInterface::push().
Push middlewares can:
- Modify the message (wrap it into envelopes, add metadata, obfuscate data, etc.).
- Modify the adapter (add delay, route to a different backend, etc.).
In particular, push middlewares may define or replace the adapter that will be used to push the message. This can be useful when:
- You choose a backend dynamically (for example, based on message type or payload).
- You route messages to different queues/backends (for example,
criticalvslow). - You apply scheduling/delay logic in a middleware.
The adapter is set by returning a modified request:
return $pushRequest->withAdapter($adapter);The push pipeline ends with a final handler that actually pushes the message using the adapter.
If the adapter is not configured by the time the pipeline reaches the final handler,
Yiisoft\Queue\Exception\AdapterNotConfiguredException is thrown.
Implement PushMiddlewareInterface and return a modified PushRequest from processPush():
return $pushRequest
->withMessage($newMessage)
->withAdapter($newAdapter);The consume pipeline is executed by the worker while processing a message.
Consume middlewares are often used to modify the message and/or collect runtime information:
- Measure handler execution time.
- Add correlation ids and include them into logs.
- Convert thrown exceptions into domain-specific failures.
The final handler of the consume pipeline invokes the resolved message handler.
When a Throwable escapes the consume pipeline, the worker switches to the failure handling pipeline.
The pipeline receives a FailureHandlingRequest that contains:
- the message
- the caught exception
- the queue instance
The pipeline is selected by queue name; if there is no queue-specific pipeline configured,
FailureMiddlewareDispatcher::DEFAULT_PIPELINE is used.
See Error handling on message processing for the step-by-step flow and built-in middlewares.
When using yiisoft/config, pipelines are configured in params under yiisoft/queue:
middlewares-pushmiddlewares-consumemiddlewares-fail
See Configuration with yiisoft/config for examples.
When configuring the component manually, you instantiate the middleware dispatchers and pass them to Queue / Worker.
See Manual configuration for a full runnable example.
You can override middleware stacks at runtime:
Queue::withMiddlewares(...)replaces the whole push middleware stack for that queue instance.Queue::withMiddlewaresAdded(...)appends middlewares to the existing stack.
These methods affect only the push pipeline of the Queue instance they are called on.