From c7b64672b7b3f68bcea06bf7829a842b081cf304 Mon Sep 17 00:00:00 2001 From: viktorprogger Date: Mon, 11 May 2026 18:30:02 +0200 Subject: [PATCH 1/2] Add the `basic_qos` call before consuming messages --- src/Adapter.php | 8 ++ src/Settings/QosSettings.php | 124 ++++++++++++++++++++++++ src/Settings/Queue.php | 16 ++- src/Settings/QueueSettingsInterface.php | 4 + tests/Unit/QosSettingsTest.php | 47 +++++++++ tests/Unit/QueueSettingsTest.php | 3 + tests/Unit/QueueTest.php | 80 +++++++++++++++ 7 files changed, 281 insertions(+), 1 deletion(-) create mode 100644 src/Settings/QosSettings.php create mode 100644 tests/Unit/QosSettingsTest.php diff --git a/src/Adapter.php b/src/Adapter.php index 9882863..8de3dcb 100644 --- a/src/Adapter.php +++ b/src/Adapter.php @@ -123,6 +123,14 @@ private function pushDelayed(MessageInterface $message, float $delaySeconds): vo public function subscribe(callable $handlerCallback): void { $channel = $this->queueProvider->getChannel(); + $qosSettings = $this->queueProvider->getQueueSettings()->getQosSettings(); + if ($qosSettings !== null) { + $channel->basic_qos( + $qosSettings->getPrefetchSize(), + $qosSettings->getPrefetchCount(), + $qosSettings->isGlobal(), + ); + } $channel->basic_consume( $this->queueProvider ->getQueueSettings() diff --git a/src/Settings/QosSettings.php b/src/Settings/QosSettings.php new file mode 100644 index 0000000..078c3e3 --- /dev/null +++ b/src/Settings/QosSettings.php @@ -0,0 +1,124 @@ +prefetchSize; + } + + /** + * Returns the maximum number of unacknowledged messages the broker may deliver before waiting + * for an acknowledgement. 0 means unlimited. + * + * @psalm-return non-negative-int + * + * @see https://www.rabbitmq.com/amqp-0-9-1-reference#basic.qos.prefetch-count AMQP 0-9-1 prefetch-count field + */ + public function getPrefetchCount(): int + { + return $this->prefetchCount; + } + + /** + * Returns whether the QoS limits apply globally to the channel (true) or per consumer (false). + * + * RabbitMQ redefines the AMQP 0-9-1 semantics of this field: false means the limit applies + * to each individual consumer registered on the channel; true means the limit is shared across + * all consumers on the channel. + * + * @see https://www.rabbitmq.com/docs/consumer-prefetch#per-channel-vs-per-consumer RabbitMQ per-channel vs per-consumer prefetch + * @see https://www.rabbitmq.com/amqp-0-9-1-reference#basic.qos.global AMQP 0-9-1 global field + */ + public function isGlobal(): bool + { + return $this->global; + } +} diff --git a/src/Settings/Queue.php b/src/Settings/Queue.php index ba59947..740b578 100644 --- a/src/Settings/Queue.php +++ b/src/Settings/Queue.php @@ -17,7 +17,8 @@ public function __construct( private bool $autoDelete = false, private bool $nowait = false, private AMQPTable|array $arguments = [], - private ?int $ticket = null + private ?int $ticket = null, + private ?QosSettings $qosSettings = null, ) { } @@ -61,6 +62,11 @@ public function isPassive(): bool return $this->passive; } + public function getQosSettings(): ?QosSettings + { + return $this->qosSettings; + } + /** * @psalm-return array{0: string, 1: bool, 2: bool, 3: bool, 4: bool, 5: bool, 6: AMQPTable|array, 7: int|null} * @@ -143,4 +149,12 @@ public function withPassive(bool $passive): self return $new; } + + public function withQosSettings(?QosSettings $qosSettings): self + { + $new = clone $this; + $new->qosSettings = $qosSettings; + + return $new; + } } diff --git a/src/Settings/QueueSettingsInterface.php b/src/Settings/QueueSettingsInterface.php index 0d7a483..98c32d1 100644 --- a/src/Settings/QueueSettingsInterface.php +++ b/src/Settings/QueueSettingsInterface.php @@ -24,6 +24,8 @@ public function hasNowait(): bool; public function isPassive(): bool; + public function getQosSettings(): ?QosSettings; + /** * Returns positional arguments to be used with {@see \PhpAmqpLib\Channel\AMQPChannel::queue_declare()} * @@ -50,4 +52,6 @@ public function withExclusive(bool $exclusive): self; public function withNowait(bool $nowait): self; public function withPassive(bool $passive): self; + + public function withQosSettings(?QosSettings $qosSettings): self; } diff --git a/tests/Unit/QosSettingsTest.php b/tests/Unit/QosSettingsTest.php new file mode 100644 index 0000000..99c4bb4 --- /dev/null +++ b/tests/Unit/QosSettingsTest.php @@ -0,0 +1,47 @@ +getPrefetchSize()); + self::assertSame(0, $settings->getPrefetchCount()); + self::assertFalse($settings->isGlobal()); + } + + public function testCustomValues(): void + { + $settings = new QosSettings(prefetchSize: 1024, prefetchCount: 5, global: true); + + self::assertSame(1024, $settings->getPrefetchSize()); + self::assertSame(5, $settings->getPrefetchCount()); + self::assertTrue($settings->isGlobal()); + } + + public function testNegativePrefetchSizeThrows(): void + { + $this->expectException(InvalidArgumentException::class); + $this->expectExceptionMessage('Prefetch size must be a non-negative integer, -1 given.'); + + new QosSettings(prefetchSize: -1); + } + + public function testNegativePrefetchCountThrows(): void + { + $this->expectException(InvalidArgumentException::class); + $this->expectExceptionMessage('Prefetch count must be a non-negative integer, -3 given.'); + + new QosSettings(prefetchCount: -3); + } + +} diff --git a/tests/Unit/QueueSettingsTest.php b/tests/Unit/QueueSettingsTest.php index 5c6b2ec..1fe0af2 100644 --- a/tests/Unit/QueueSettingsTest.php +++ b/tests/Unit/QueueSettingsTest.php @@ -10,6 +10,7 @@ use Yiisoft\Queue\AMQP\QueueProvider; use Yiisoft\Queue\AMQP\Settings\Exchange as ExchangeSettings; use Yiisoft\Queue\AMQP\Settings\Queue as QueueSettings; +use Yiisoft\Queue\AMQP\Settings\QosSettings; use Yiisoft\Queue\Message\JsonMessageSerializer; use Yiisoft\Queue\Message\Message; @@ -139,5 +140,7 @@ public function testImmutable(): void self::assertNotSame($queueSettings, $queueSettings->withName('test')); self::assertNotSame($queueSettings, $queueSettings->withAutoDeletable(false)); self::assertNotSame($queueSettings, $queueSettings->withArguments([])); + self::assertNotSame($queueSettings, $queueSettings->withQosSettings(new QosSettings())); + self::assertNotSame($queueSettings, $queueSettings->withQosSettings(null)); } } diff --git a/tests/Unit/QueueTest.php b/tests/Unit/QueueTest.php index 49941a1..0486f19 100644 --- a/tests/Unit/QueueTest.php +++ b/tests/Unit/QueueTest.php @@ -5,12 +5,15 @@ namespace Yiisoft\Queue\AMQP\Tests\Unit; use Exception; +use PhpAmqpLib\Channel\AMQPChannel; use Yiisoft\Queue\Adapter\AdapterInterface; use Yiisoft\Queue\AMQP\Adapter; use Yiisoft\Queue\AMQP\QueueProvider; use Yiisoft\Queue\AMQP\QueueProviderInterface; use Yiisoft\Queue\AMQP\Settings\Exchange as ExchangeSettings; use Yiisoft\Queue\AMQP\Settings\Queue as QueueSettings; +use Yiisoft\Queue\AMQP\Settings\QosSettings; +use Yiisoft\Queue\AMQP\Settings\QueueSettingsInterface; use Yiisoft\Queue\AMQP\Tests\Support\FileHelper; use Yiisoft\Queue\Cli\LoopInterface; use Yiisoft\Queue\Exception\MessageFailureException; @@ -135,4 +138,81 @@ public function testImmutable(): void self::assertNotSame($adapter, $adapter->withQueueProvider($queueProvider)); } + + public function testSubscribeCallsBasicQosWhenConfigured(): void + { + $qosSettings = new QosSettings(prefetchSize: 0, prefetchCount: 5, global: true); + + $queueSettings = $this->createMock(QueueSettingsInterface::class); + $queueSettings->method('getQosSettings')->willReturn($qosSettings); + $queueSettings->method('getName')->willReturn('test-queue'); + + $channel = $this->createMock(AMQPChannel::class); + $channel->expects(self::once()) + ->method('basic_qos') + ->with(0, 5, true); + $channel->expects(self::once()) + ->method('basic_consume') + ->with('test-queue', 'test-queue', false, false, false, true, self::anything()); + + $queueProvider = $this->createMock(QueueProviderInterface::class); + $queueProvider->method('getChannel')->willReturn($channel); + $queueProvider->method('getQueueSettings')->willReturn($queueSettings); + + $loop = $this->createMock(LoopInterface::class); + $loop->method('canContinue')->willReturn(false); + + $adapter = new Adapter($queueProvider, $this->createMock(MessageSerializerInterface::class), $loop); + $adapter->subscribe(static fn() => null); + } + + public function testSubscribeCallsBasicQosWithDefaultSettings(): void + { + $qosSettings = new QosSettings(); + + $queueSettings = $this->createMock(QueueSettingsInterface::class); + $queueSettings->method('getQosSettings')->willReturn($qosSettings); + $queueSettings->method('getName')->willReturn('test-queue'); + + $channel = $this->createMock(AMQPChannel::class); + $channel->expects(self::once()) + ->method('basic_qos') + ->with(0, 0, false); + $channel->expects(self::once()) + ->method('basic_consume') + ->with('test-queue', 'test-queue', false, false, false, true, self::anything()); + + $queueProvider = $this->createMock(QueueProviderInterface::class); + $queueProvider->method('getChannel')->willReturn($channel); + $queueProvider->method('getQueueSettings')->willReturn($queueSettings); + + $loop = $this->createMock(LoopInterface::class); + $loop->method('canContinue')->willReturn(false); + + $adapter = new Adapter($queueProvider, $this->createMock(MessageSerializerInterface::class), $loop); + $adapter->subscribe(static fn() => null); + } + + public function testSubscribeSkipsBasicQosWhenNotConfigured(): void + { + $queueSettings = $this->createMock(QueueSettingsInterface::class); + $queueSettings->method('getQosSettings')->willReturn(null); + $queueSettings->method('getName')->willReturn('test-queue'); + + $channel = $this->createMock(AMQPChannel::class); + $channel->expects(self::never())->method('basic_qos'); + $channel->expects(self::once()) + ->method('basic_consume') + ->with('test-queue', 'test-queue', false, false, false, true, self::anything()); + + $queueProvider = $this->createMock(QueueProviderInterface::class); + $queueProvider->method('getChannel')->willReturn($channel); + $queueProvider->method('getQueueSettings')->willReturn($queueSettings); + + $loop = $this->createMock(LoopInterface::class); + $loop->method('canContinue')->willReturn(false); + + $adapter = new Adapter($queueProvider, $this->createMock(MessageSerializerInterface::class), $loop); + $adapter->subscribe(static fn() => null); + } } From 0af567cf678ccfb72341395f200885d423bc8edd Mon Sep 17 00:00:00 2001 From: StyleCI Bot Date: Mon, 11 May 2026 16:30:14 +0000 Subject: [PATCH 2/2] Apply fixes from StyleCI --- tests/Unit/QosSettingsTest.php | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/Unit/QosSettingsTest.php b/tests/Unit/QosSettingsTest.php index 99c4bb4..f04cea7 100644 --- a/tests/Unit/QosSettingsTest.php +++ b/tests/Unit/QosSettingsTest.php @@ -43,5 +43,4 @@ public function testNegativePrefetchCountThrows(): void new QosSettings(prefetchCount: -3); } - }