-
-
Notifications
You must be signed in to change notification settings - Fork 29
Replace SynchronousAdapter with SynchronousPushHandler
#270
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
c9e25db
ee6e8c3
0a1022b
6ba2134
c89194d
b1ebd7b
0e17a9c
884ea35
0eeb85f
1cbb04e
62521e7
7f8c23e
b55ab08
0f4ac41
02bf196
9908377
bacf410
2b0fa1e
d0e84b4
e94de01
2479f7c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,39 @@ | ||
| Synchronous Mode | ||
| ================ | ||
|
vjik marked this conversation as resolved.
Outdated
|
||
|
|
||
| Run tasks synchronously in the same process. Useful for: | ||
|
|
||
| - developing and debugging an application; | ||
| - writing tests; | ||
| - production setups where the application is built around `QueueInterface` from day one but | ||
| doesn't have an external broker yet — you can switch to a real adapter later without touching | ||
| the call sites. | ||
|
|
||
| To enable it, construct the queue without an adapter (the `adapter` argument defaults to `null`): | ||
|
vjik marked this conversation as resolved.
Outdated
|
||
|
|
||
| ```php | ||
| $logger = $DIContainer->get(\Psr\Log\LoggerInterface::class); | ||
|
|
||
| $worker = $DIContainer->get(\Yiisoft\Queue\Worker\WorkerInterface::class); | ||
| $loop = $DIContainer->get(\Yiisoft\Queue\Cli\LoopInterface::class); | ||
| $pushMiddlewareDispatcher = $DIContainer->get( | ||
| \Yiisoft\Queue\Middleware\Push\PushMiddlewareDispatcher::class | ||
| ); | ||
|
|
||
| $queue = new Yiisoft\Queue\Queue( | ||
| $worker, | ||
| $loop, | ||
| $logger, | ||
| $pushMiddlewareDispatcher, | ||
| ); | ||
| ``` | ||
|
|
||
| In synchronous mode every message passed to `push()` is processed immediately by the worker. | ||
| The value returned from `push()` is the message after push-middlewares — without an `IdEnvelope`, | ||
| since no adapter is involved to assign an ID. | ||
|
|
||
| Limitations: | ||
|
|
||
| - `run()` does nothing and returns `0`. | ||
| - `listen()` throws `BadMethodCallException`. | ||
| - `status()` throws `BadMethodCallException` — there is no message storage to track IDs. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't it better to do nothing on
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's better. Implemented. |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,27 @@ | ||
| <?php | ||
|
|
||
| declare(strict_types=1); | ||
|
|
||
| namespace Yiisoft\Queue\Middleware\Push; | ||
|
|
||
| use Yiisoft\Queue\Message\MessageInterface; | ||
| use Yiisoft\Queue\QueueInterface; | ||
| use Yiisoft\Queue\Worker\WorkerInterface; | ||
|
|
||
| /** | ||
| * @internal | ||
| */ | ||
| final class SynchronousPushHandler implements MessageHandlerPushInterface | ||
| { | ||
| public function __construct( | ||
| private readonly WorkerInterface $worker, | ||
| private readonly QueueInterface $queue, | ||
| ) {} | ||
|
|
||
| public function handlePush(MessageInterface $message): MessageInterface | ||
| { | ||
| $this->worker->process($message, $this->queue); | ||
|
|
||
| return $message; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,6 +5,7 @@ | |
| namespace Yiisoft\Queue; | ||
|
|
||
| use BackedEnum; | ||
| use BadMethodCallException; | ||
| use Psr\Log\LoggerInterface; | ||
| use Yiisoft\Queue\Adapter\AdapterInterface; | ||
| use Yiisoft\Queue\Cli\LoopInterface; | ||
|
|
@@ -13,6 +14,7 @@ | |
| use Yiisoft\Queue\Middleware\Push\MessageHandlerPushInterface; | ||
| use Yiisoft\Queue\Middleware\Push\MiddlewarePushInterface; | ||
| use Yiisoft\Queue\Middleware\Push\PushMiddlewareDispatcher; | ||
| use Yiisoft\Queue\Middleware\Push\SynchronousPushHandler; | ||
| use Yiisoft\Queue\Worker\WorkerInterface; | ||
| use Yiisoft\Queue\Message\IdEnvelope; | ||
| use Yiisoft\Queue\Provider\QueueProviderInterface; | ||
|
|
@@ -23,21 +25,23 @@ final class Queue implements QueueInterface | |
| * @var array|array[]|callable[]|MiddlewarePushInterface[]|string[] | ||
| */ | ||
| private array $middlewareDefinitions; | ||
| private AdapterPushHandler $adapterPushHandler; | ||
| private MessageHandlerPushInterface $finalPushHandler; | ||
|
samdark marked this conversation as resolved.
|
||
| private string $name; | ||
|
|
||
| public function __construct( | ||
| private readonly AdapterInterface $adapter, | ||
| private readonly WorkerInterface $worker, | ||
| private readonly LoopInterface $loop, | ||
| private readonly LoggerInterface $logger, | ||
| private readonly PushMiddlewareDispatcher $pushMiddlewareDispatcher, | ||
| private readonly ?AdapterInterface $adapter = null, | ||
| string|BackedEnum $name = QueueProviderInterface::DEFAULT_QUEUE, | ||
| MiddlewarePushInterface|callable|array|string ...$middlewareDefinitions, | ||
| ) { | ||
| $this->name = StringNormalizer::normalize($name); | ||
| $this->middlewareDefinitions = $middlewareDefinitions; | ||
| $this->adapterPushHandler = new AdapterPushHandler($this->adapter); | ||
| $this->finalPushHandler = $adapter === null | ||
| ? new SynchronousPushHandler($worker, $this) | ||
| : new AdapterPushHandler($adapter); | ||
| } | ||
|
|
||
| public function getName(): string | ||
|
|
@@ -59,6 +63,14 @@ public function push( | |
| $this->createPushHandler(...$middlewareDefinitions), | ||
| ); | ||
|
|
||
| if ($this->adapter === null) { | ||
| $this->logger->info( | ||
| 'Processed message with message type "{messageType}" synchronously.', | ||
| ['messageType' => $message->getType()], | ||
| ); | ||
| return $message; | ||
| } | ||
|
|
||
| /** @var string $messageId */ | ||
| $messageId = $message->getMetadata()[IdEnvelope::MESSAGE_ID_KEY] ?? 'null'; | ||
| $this->logger->info( | ||
|
|
@@ -71,6 +83,13 @@ public function push( | |
|
|
||
| public function run(int $max = 0): int | ||
| { | ||
| if ($this->adapter === null) { | ||
| $this->logger->debug( | ||
| 'Queue is in synchronous mode (no adapter). Messages are processed on push. run() does nothing.', | ||
| ); | ||
| return 0; | ||
| } | ||
|
|
||
| $this->logger->debug('Start processing queue messages.'); | ||
| $count = 0; | ||
|
|
||
|
|
@@ -95,13 +114,25 @@ public function run(int $max = 0): int | |
|
|
||
| public function listen(): void | ||
| { | ||
| if ($this->adapter === null) { | ||
| throw new BadMethodCallException( | ||
| 'Cannot listen without an adapter. Queue is in synchronous mode.', | ||
| ); | ||
| } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should it do
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
|
|
||
| $this->logger->info('Start listening to the queue.'); | ||
| $this->adapter->subscribe(fn(MessageInterface $message) => $this->handle($message)); | ||
| $this->logger->info('Finish listening to the queue.'); | ||
| } | ||
|
|
||
| public function status(string|int $id): MessageStatus | ||
| { | ||
| if ($this->adapter === null) { | ||
| throw new BadMethodCallException( | ||
| 'Cannot get message status without an adapter. Queue is in synchronous mode.', | ||
| ); | ||
| } | ||
|
|
||
| return $this->adapter->status($id); | ||
| } | ||
|
|
||
|
|
@@ -131,12 +162,12 @@ private function handle(MessageInterface $message): bool | |
| private function createPushHandler(MiddlewarePushInterface|callable|array|string ...$middlewares): MessageHandlerPushInterface | ||
| { | ||
| return new class ( | ||
| $this->adapterPushHandler, | ||
| $this->finalPushHandler, | ||
| $this->pushMiddlewareDispatcher, | ||
| array_merge($this->middlewareDefinitions, $middlewares), | ||
| ) implements MessageHandlerPushInterface { | ||
| public function __construct( | ||
| private readonly AdapterPushHandler $adapterPushHandler, | ||
| private readonly MessageHandlerPushInterface $finishHandler, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the name
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is final handler in push handlers stack. See https://github.com/yiisoft/queue/blob/master/src/Middleware/Push/MiddlewarePushStack.php#L27
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Worth adding some phpdoc.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
| private readonly PushMiddlewareDispatcher $dispatcher, | ||
| /** | ||
| * @var array|array[]|callable[]|MiddlewarePushInterface[]|string[] | ||
|
|
@@ -148,7 +179,7 @@ public function handlePush(MessageInterface $message): MessageInterface | |
| { | ||
| return $this->dispatcher | ||
| ->withMiddlewares($this->middlewares) | ||
| ->dispatch($message, $this->adapterPushHandler); | ||
| ->dispatch($message, $this->finishHandler); | ||
| } | ||
| }; | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.