|
| 1 | +# Middleware pipelines |
| 2 | + |
| 3 | +Yii Queue uses middlewares to run custom logic around message pushing and message processing. |
| 4 | + |
| 5 | +A middleware is a piece of code that receives a request object and can either: |
| 6 | + |
| 7 | +- change the request (for example, change the message, adapter, or error handling behavior) and continue the pipeline, or |
| 8 | +- stop the pipeline by returning without calling the next handler. |
| 9 | + |
| 10 | +This is similar to HTTP middleware, but it is applied to queue messages. |
| 11 | + |
| 12 | +## What middlewares are for |
| 13 | + |
| 14 | +Common reasons to add middlewares: |
| 15 | + |
| 16 | +- **Collect metrics** |
| 17 | + You can count pushed/processed messages, measure handler duration, or measure time between push and consume. |
| 18 | +- **Add tracing / correlation data** |
| 19 | + You can put trace ids or correlation ids into message metadata so logs from producer/consumer are connected. |
| 20 | +- **Logging and observability** |
| 21 | + You can log message ids, channels, attempts, and failures in a consistent way. |
| 22 | +- **Modify the message payload** |
| 23 | + You can obfuscate sensitive data, normalize payload, add extra fields required by consumers, or wrap a message into envelopes. |
| 24 | +- **Route and schedule** |
| 25 | + You can switch channel, choose a different adapter, or add delay when the adapter supports it. |
| 26 | + |
| 27 | +## Pipelines overview |
| 28 | + |
| 29 | +Each message may pass through three independent pipelines: |
| 30 | + |
| 31 | +- **Push pipeline** (executed when calling `QueueInterface::push()`). |
| 32 | +- **Consume pipeline** (executed when a worker processes a message). |
| 33 | +- **Failure handling pipeline** (executed when message processing throws a `Throwable`). |
| 34 | + |
| 35 | +The execution order inside a pipeline is forward in the same order you configured middlewares. |
| 36 | + |
| 37 | +```mermaid |
| 38 | +graph LR |
| 39 | + StartPush((Start)) --> PushMiddleware1[$middleware1] --> PushMiddleware2[$middleware2] --> Push(Push to a queue) |
| 40 | + -.-> PushMiddleware2[$middleware2] -.-> PushMiddleware1[$middleware1] |
| 41 | + PushMiddleware1[$middleware1] -.-> EndPush((End)) |
| 42 | +
|
| 43 | +
|
| 44 | + StartConsume((Start)) --> ConsumeMiddleware1[$middleware1] --> ConsumeMiddleware2[$middleware2] --> Consume(Consume / handle) |
| 45 | + -.-> ConsumeMiddleware2[$middleware2] -.-> ConsumeMiddleware1[$middleware1] |
| 46 | + ConsumeMiddleware1[$middleware1] -.-> EndConsume((End)) |
| 47 | +
|
| 48 | +
|
| 49 | + Consume -- Throwable --> StartFailure((Start failure)) |
| 50 | + StartFailure --> FailureMiddleware1[$failure1] --> FailureMiddleware2[$failure2] --> Failure(Handle failure / retry / requeue) |
| 51 | + -.-> FailureMiddleware2[$failure2] -.-> FailureMiddleware1[$failure1] |
| 52 | + FailureMiddleware1[$failure1] -.-> EndFailure((End failure)) |
| 53 | +``` |
| 54 | + |
| 55 | +## How to define a middleware |
| 56 | + |
| 57 | +You can use any of these formats: |
| 58 | + |
| 59 | +- A ready-to-use middleware object. |
| 60 | +- An array in the format of [yiisoft/definitions](https://github.com/yiisoft/definitions). |
| 61 | +- A `callable` (closure, invokable object, `[$object, 'method']`, etc.). It is executed through the |
| 62 | + [yiisoft/injector](https://github.com/yiisoft/injector), so its dependencies are resolved automatically. |
| 63 | +- A string for your DI container to resolve the middleware, e.g. `FooMiddleware::class`. |
| 64 | + |
| 65 | +The required interface depends on the pipeline: |
| 66 | + |
| 67 | +- Push: `Yiisoft\Queue\Middleware\Push\MiddlewarePushInterface` |
| 68 | +- Consume: `Yiisoft\Queue\Middleware\Consume\MiddlewareConsumeInterface` |
| 69 | +- Failure handling: `Yiisoft\Queue\Middleware\FailureHandling\MiddlewareFailureInterface` |
| 70 | + |
| 71 | +## Push pipeline |
| 72 | + |
| 73 | +The push pipeline is executed when calling `QueueInterface::push()`. |
| 74 | + |
| 75 | +Push middlewares can: |
| 76 | + |
| 77 | +- Modify the message (wrap it into envelopes, add metadata, obfuscate data, etc.). |
| 78 | +- Modify the adapter (change channel, add delay, route to a different backend, etc.). |
| 79 | + |
| 80 | +In particular, push middlewares may define or replace the adapter that will be used to push the message. This can be useful when: |
| 81 | + |
| 82 | +- You choose a backend dynamically (for example, based on message type or payload). |
| 83 | +- You route messages to different channels/backends (for example, `critical` vs `low`). |
| 84 | +- You apply scheduling/delay logic in a middleware. |
| 85 | + |
| 86 | +The adapter is set by returning a modified request: |
| 87 | + |
| 88 | +```php |
| 89 | +return $pushRequest->withAdapter($adapter); |
| 90 | +``` |
| 91 | + |
| 92 | +### Adapter must be configured by the end of the pipeline |
| 93 | + |
| 94 | +The pipeline ends with a final handler that actually pushes the message using the adapter. |
| 95 | + |
| 96 | +If the adapter is not configured by the time the pipeline reaches the final handler, |
| 97 | +`Yiisoft\Queue\Exception\AdapterNotConfiguredException` is thrown. |
| 98 | + |
| 99 | +### Custom push middleware |
| 100 | + |
| 101 | +Implement `MiddlewarePushInterface` and return a modified `PushRequest` from `processPush()`: |
| 102 | + |
| 103 | +```php |
| 104 | +return $pushRequest |
| 105 | + ->withMessage($newMessage) |
| 106 | + ->withAdapter($newAdapter); |
| 107 | +``` |
| 108 | + |
| 109 | +## Consume pipeline |
| 110 | + |
| 111 | +The consume pipeline is executed by the worker while processing a message. |
| 112 | + |
| 113 | +Consume middlewares are often used to modify the message and/or collect runtime information: |
| 114 | + |
| 115 | +- Measure handler execution time. |
| 116 | +- Add correlation ids and include them into logs. |
| 117 | +- Convert thrown exceptions into domain-specific failures. |
| 118 | + |
| 119 | +The final handler of the consume pipeline invokes the resolved message handler. |
| 120 | + |
| 121 | +## Failure handling pipeline |
| 122 | + |
| 123 | +When a `Throwable` escapes the consume pipeline, the worker switches to the failure handling pipeline. |
| 124 | + |
| 125 | +The pipeline receives a `FailureHandlingRequest` that contains: |
| 126 | + |
| 127 | +- the message |
| 128 | +- the caught exception |
| 129 | +- the queue instance |
| 130 | + |
| 131 | +The pipeline is selected by queue channel; if there is no channel-specific pipeline configured, |
| 132 | +`FailureMiddlewareDispatcher::DEFAULT_PIPELINE` is used. |
| 133 | + |
| 134 | +See [Failure handling pipeline](failure-handling-pipeline.md) for the step-by-step flow and built-in middlewares. |
| 135 | + |
| 136 | +## Configuration |
| 137 | + |
| 138 | +### With yiisoft/config |
| 139 | + |
| 140 | +When using [yiisoft/config](https://github.com/yiisoft/config), pipelines are configured in params under `yiisoft/queue`: |
| 141 | + |
| 142 | +- `middlewares-push` |
| 143 | +- `middlewares-consume` |
| 144 | +- `middlewares-fail` |
| 145 | + |
| 146 | +See [Configuration with yiisoft/config](configuration-with-config.md) for examples. |
| 147 | + |
| 148 | +### Manual configuration (without yiisoft/config) |
| 149 | + |
| 150 | +When configuring the component manually, you instantiate the middleware dispatchers and pass them to `Queue` / `Worker`. |
| 151 | + |
| 152 | +See [Manual configuration](configuration-manual.md) for a full runnable example. |
| 153 | + |
| 154 | +## Runtime overrides |
| 155 | + |
| 156 | +You can override middleware stacks at runtime: |
| 157 | + |
| 158 | +- `Queue::withMiddlewares(...)` replaces the whole push middleware stack for that queue instance. |
| 159 | +- `Queue::withMiddlewaresAdded(...)` appends middlewares to the existing stack. |
| 160 | + |
| 161 | +These methods affect only the push pipeline of that `Queue` instance. |
0 commit comments