55namespace Yiisoft \Queue \AMQP ;
66
77use BackedEnum ;
8+ use InvalidArgumentException ;
9+ use PhpAmqpLib \Exchange \AMQPExchangeType ;
810use PhpAmqpLib \Message \AMQPMessage ;
911use Throwable ;
1012use Yiisoft \Queue \Adapter \AdapterInterface ;
1113use Yiisoft \Queue \AMQP \Exception \NotImplementedException ;
14+ use Yiisoft \Queue \AMQP \Settings \ExchangeSettingsInterface ;
15+ use Yiisoft \Queue \AMQP \Settings \QueueSettingsInterface ;
1216use Yiisoft \Queue \Cli \LoopInterface ;
13- use Yiisoft \Queue \JobStatus ;
17+ use Yiisoft \Queue \Message \ DelayEnvelope ;
1418use Yiisoft \Queue \Message \MessageInterface ;
1519use Yiisoft \Queue \Message \MessageSerializerInterface ;
20+ use Yiisoft \Queue \MessageStatus ;
1621
1722final class Adapter implements AdapterInterface
1823{
19- private ?AMQPMessage $ amqpMessage = null ;
20-
2124 public function __construct (
2225 private QueueProviderInterface $ queueProvider ,
2326 private readonly MessageSerializerInterface $ serializer ,
@@ -29,9 +32,8 @@ public function withChannel(BackedEnum|string $channel): self
2932 {
3033 $ instance = clone $ this ;
3134
32- $ channelName = is_string ($ channel ) ? $ channel : (string ) $ channel ->value ;
33- $ instance ->queueProvider = $ this ->queueProvider ->withChannelName ($ channelName );
34- $ instance ->amqpMessage = null ;
35+ $ queueName = is_string ($ channel ) ? $ channel : (string ) $ channel ->value ;
36+ $ instance ->queueProvider = $ this ->queueProvider ->withQueueName ($ queueName );
3537
3638 return $ instance ;
3739 }
@@ -47,29 +49,30 @@ public function runExisting(callable $handlerCallback): void
4749 /**
4850 * @return never
4951 */
50- public function status (int |string $ id ): JobStatus
52+ public function status (int |string $ id ): MessageStatus
5153 {
5254 throw new NotImplementedException ('Status check is not supported by the adapter ' . self ::class . '. ' );
5355 }
5456
5557 public function push (MessageInterface $ message ): MessageInterface
5658 {
57- $ this ->amqpMessage ??= new AMQPMessage (
59+ $ queueProvider = $ this ->getQueueProviderForMessage ($ message );
60+
61+ $ amqpMessage = new AMQPMessage (
5862 '' ,
59- $ this -> queueProvider ->getMessageProperties (),
63+ $ queueProvider ->getMessageProperties (),
6064 );
61- $ amqpMessage = $ this ->amqpMessage ;
6265
6366 $ payload = $ this ->serializer ->serialize ($ message );
6467 $ amqpMessage ->setBody ($ payload );
65- $ exchangeSettings = $ this -> queueProvider ->getExchangeSettings ();
68+ $ exchangeSettings = $ queueProvider ->getExchangeSettings ();
6669
67- $ this -> queueProvider
70+ $ queueProvider
6871 ->getChannel ()
6972 ->basic_publish (
7073 $ amqpMessage ,
7174 $ exchangeSettings ?->getName() ?? '' ,
72- $ exchangeSettings ? '' : $ this -> queueProvider
75+ $ exchangeSettings ? '' : $ queueProvider
7376 ->getQueueSettings ()
7477 ->getName ()
7578 );
@@ -80,6 +83,17 @@ public function push(MessageInterface $message): MessageInterface
8083 public function subscribe (callable $ handlerCallback ): void
8184 {
8285 $ channel = $ this ->queueProvider ->getChannel ();
86+ $ qosSettings = $ this ->queueProvider
87+ ->getQueueSettings ()
88+ ->getQosSettings ();
89+ if ($ qosSettings !== null ) {
90+ $ channel ->basic_qos (
91+ $ qosSettings ->getPrefetchSize (),
92+ $ qosSettings ->getPrefetchCount (),
93+ $ qosSettings ->isGlobal (),
94+ );
95+ }
96+
8397 $ channel ->basic_consume (
8498 $ this ->queueProvider
8599 ->getQueueSettings ()
@@ -128,4 +142,71 @@ public function getChannel(): string
128142 {
129143 return $ this ->queueProvider ->getQueueSettings ()->getName ();
130144 }
145+
146+ private function getQueueProviderForMessage (MessageInterface $ message ): QueueProviderInterface
147+ {
148+ $ delaySeconds = DelayEnvelope::fromMessage ($ message )->getDelaySeconds ();
149+ if ($ delaySeconds <= 0 ) {
150+ return $ this ->queueProvider ;
151+ }
152+
153+ $ exchangeSettings = $ this ->queueProvider ->getExchangeSettings ();
154+ if ($ exchangeSettings === null ) {
155+ throw new InvalidArgumentException ('Message cannot be delayed to a queue without an exchange. Exchange is mandatory. ' );
156+ }
157+
158+ $ delayMilliseconds = (int ) ceil ($ delaySeconds * 1000 );
159+
160+ return $ this ->queueProvider
161+ ->withMessageProperties ($ this ->getDelayMessageProperties ($ delayMilliseconds ))
162+ ->withExchangeSettings ($ this ->getDelayExchangeSettings ($ exchangeSettings ))
163+ ->withQueueSettings (
164+ $ this ->getDelayQueueSettings (
165+ $ this ->queueProvider ->getQueueSettings (),
166+ $ exchangeSettings ,
167+ $ delayMilliseconds ,
168+ )
169+ );
170+ }
171+
172+ /**
173+ * @psalm-return array{expiration: string, delivery_mode: int}&array
174+ */
175+ private function getDelayMessageProperties (int $ delayMilliseconds ): array
176+ {
177+ return array_merge (
178+ $ this ->queueProvider ->getMessageProperties (),
179+ [
180+ 'expiration ' => (string ) $ delayMilliseconds ,
181+ 'delivery_mode ' => AMQPMessage::DELIVERY_MODE_PERSISTENT ,
182+ ],
183+ );
184+ }
185+
186+ private function getDelayQueueSettings (
187+ QueueSettingsInterface $ queueSettings ,
188+ ExchangeSettingsInterface $ exchangeSettings ,
189+ int $ delayMilliseconds ,
190+ ): QueueSettingsInterface {
191+ $ deliveryTime = time () + (int ) ceil ($ delayMilliseconds / 1000 );
192+
193+ return $ queueSettings
194+ ->withName ("{$ queueSettings ->getName ()}.dlx. $ deliveryTime " )
195+ ->withAutoDeletable (true )
196+ ->withArguments (
197+ [
198+ 'x-dead-letter-exchange ' => ['S ' , $ exchangeSettings ->getName ()],
199+ 'x-expires ' => ['I ' , $ delayMilliseconds + 30000 ],
200+ 'x-message-ttl ' => ['I ' , $ delayMilliseconds ],
201+ ]
202+ );
203+ }
204+
205+ private function getDelayExchangeSettings (ExchangeSettingsInterface $ exchangeSettings ): ExchangeSettingsInterface
206+ {
207+ return $ exchangeSettings
208+ ->withName ("{$ exchangeSettings ->getName ()}.dlx " )
209+ ->withAutoDelete (true )
210+ ->withType (AMQPExchangeType::TOPIC );
211+ }
131212}
0 commit comments