Skip to content

Commit 09b79ec

Browse files
committed
Merge branch '2.x' into 2.next
2 parents f48f0d3 + 553b801 commit 09b79ec

25 files changed

+520
-87
lines changed

.phive/phars.xml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
<?xml version="1.0" encoding="UTF-8"?>
22
<phive xmlns="https://phar.io/phive">
3-
<phar name="phpstan" version="1.10.56" installed="1.10.56" location="./tools/phpstan" copy="false"/>
4-
<phar name="psalm" version="5.20.0" installed="5.20.0" location="./tools/psalm" copy="false"/>
3+
<phar name="phpstan" version="2.1.17" installed="2.1.17" location="./tools/phpstan" copy="false"/>
54
</phive>

composer.json

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@
3434
},
3535
"suggest": {
3636
"cakephp/bake": "Required if you want to generate jobs.",
37-
"cakephp/migrations": "Needed for running the migrations necessary for using Failed Jobs."
37+
"cakephp/migrations": "Needed for running the migrations necessary for using Failed Jobs.",
38+
"cakedc/cakephp-enqueue": "Required if you want store jobs in database."
3839
},
3940
"autoload": {
4041
"psr-4": {
@@ -61,14 +62,9 @@
6162
],
6263
"cs-check": "phpcs --colors -p src/ tests/",
6364
"cs-fix": "phpcbf --colors -p src/ tests/",
64-
"stan": [
65-
"@phpstan",
66-
"@psalm"
67-
],
65+
"stan": "@phpstan",
6866
"phpstan": "tools/phpstan analyse",
69-
"psalm": "tools/psalm --show-info=false",
7067
"stan-baseline": "tools/phpstan --generate-baseline",
71-
"psalm-baseline": "tools/psalm --set-baseline=psalm-baseline.xml",
7268
"stan-setup": "phive install",
7369
"test": "phpunit",
7470
"test-coverage": "phpunit --coverage-clover=clover.xml"

docs/en/index.rst

Lines changed: 156 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ The following configuration should be present in the config array of your **conf
5454
// The name of an event listener class to associate with the worker
5555
'listener' => \App\Listener\WorkerListener::class,
5656

57+
// (optional) The processor class to use for processing messages.
58+
// Must implement Interop\Queue\Processor. Defaults to Cake\Queue\Queue\Processor
59+
'processor' => \App\Queue\CustomProcessor::class,
60+
5761
// The amount of time in milliseconds to sleep if no jobs are currently available. default: 10000
5862
'receiveTimeout' => 10000,
5963

@@ -119,14 +123,14 @@ A simple job that logs received messages would look like::
119123

120124
/**
121125
* The maximum number of times the job may be attempted. (optional property)
122-
*
126+
*
123127
* @var int|null
124128
*/
125129
public static $maxAttempts = 3;
126130

127131
/**
128132
* Whether there should be only one instance of a job on the queue at a time. (optional property)
129-
*
133+
*
130134
* @var bool
131135
*/
132136
public static $shouldBeUnique = false;
@@ -299,7 +303,7 @@ queue jobs, you can use the ``QueueTransport``. In your application's
299303

300304
return [
301305
// ... other configuration
302-
'EmailTransport' => [
306+
'EmailTransport' => [
303307
'default' => [
304308
'className' => MailTransport::class,
305309
// Configuration for MailTransport.
@@ -323,6 +327,153 @@ With this configuration in place, any time you send an email with the ``default`
323327
email profile CakePHP will generate a queue message. Once that queue message is
324328
processed the default ``MailTransport`` will be used to deliver the email messages.
325329

330+
Custom Processors
331+
================
332+
333+
You can customize how messages are processed by specifying a custom processor class
334+
in your queue configuration. Custom processors must implement the ``Interop\Queue\Processor``
335+
interface.
336+
337+
Example custom processor that extends the main Processor::
338+
339+
<?php
340+
declare(strict_types=1);
341+
342+
namespace App\Queue;
343+
344+
use Cake\Core\ContainerInterface;
345+
use Cake\Queue\Job\Message;
346+
use Cake\Queue\Queue\Processor;
347+
use Enqueue\Consumption\Result;
348+
use Error;
349+
use Interop\Queue\Context;
350+
use Interop\Queue\Message as QueueMessage;
351+
use Interop\Queue\Processor as InteropProcessor;
352+
use Psr\Log\LoggerInterface;
353+
use RuntimeException;
354+
use Throwable;
355+
356+
/**
357+
* Timed Processor
358+
*
359+
* Extends the original Processor to add timing metrics to all events.
360+
*/
361+
class TimedProcessor extends Processor
362+
{
363+
/**
364+
* Constructor
365+
*
366+
* @param \Psr\Log\LoggerInterface|null $logger Logger instance
367+
* @param \Cake\Core\ContainerInterface|null $container DI container instance
368+
*/
369+
public function __construct(?LoggerInterface $logger = null, ?ContainerInterface $container = null)
370+
{
371+
parent::__construct($logger, $container);
372+
}
373+
374+
/**
375+
* Process message with timing
376+
*
377+
* @param \Interop\Queue\Message $message Message
378+
* @param \Interop\Queue\Context $context Context
379+
* @return object|string
380+
*/
381+
public function process(QueueMessage $message, Context $context): string|object
382+
{
383+
$this->dispatchEvent('Processor.message.seen', ['queueMessage' => $message]);
384+
385+
$jobMessage = new Message($message, $context, $this->container);
386+
try {
387+
$jobMessage->getCallable();
388+
} catch (RuntimeException | Error $e) {
389+
$this->logger->debug('Invalid callable for message. Rejecting message from queue.');
390+
$this->dispatchEvent('Processor.message.invalid', ['message' => $jobMessage]);
391+
392+
return InteropProcessor::REJECT;
393+
}
394+
395+
$startTime = microtime(true) * 1000;
396+
$this->dispatchEvent('Processor.message.start', ['message' => $jobMessage]);
397+
398+
try {
399+
$response = $this->processMessage($jobMessage);
400+
} catch (Throwable $e) {
401+
$message->setProperty('jobException', $e);
402+
403+
$this->logger->debug(sprintf('Message encountered exception: %s', $e->getMessage()));
404+
$this->dispatchEvent('Processor.message.exception', [
405+
'message' => $jobMessage,
406+
'exception' => $e,
407+
'duration' => (int)((microtime(true) * 1000) - $startTime),
408+
]);
409+
410+
return Result::requeue('Exception occurred while processing message');
411+
}
412+
413+
$duration = (int)((microtime(true) * 1000) - $startTime);
414+
415+
if ($response === InteropProcessor::ACK) {
416+
$this->logger->debug('Message processed successfully');
417+
$this->dispatchEvent('Processor.message.success', [
418+
'message' => $jobMessage,
419+
'duration' => $duration,
420+
]);
421+
422+
return InteropProcessor::ACK;
423+
}
424+
425+
if ($response === InteropProcessor::REJECT) {
426+
$this->logger->debug('Message processed with rejection');
427+
$this->dispatchEvent('Processor.message.reject', [
428+
'message' => $jobMessage,
429+
'duration' => $duration,
430+
]);
431+
432+
return InteropProcessor::REJECT;
433+
}
434+
435+
$this->logger->debug('Message processed with failure, requeuing');
436+
$this->dispatchEvent('Processor.message.failure', [
437+
'message' => $jobMessage,
438+
'duration' => $duration,
439+
]);
440+
441+
return InteropProcessor::REQUEUE;
442+
}
443+
}
444+
445+
Configuration example::
446+
447+
'Queue' => [
448+
'default' => [
449+
'url' => 'redis://localhost:6379',
450+
'queue' => 'default',
451+
// No processor specified - uses default Processor class
452+
],
453+
'timed' => [
454+
'url' => 'redis://localhost:6379',
455+
'queue' => 'timed',
456+
'processor' => \App\Queue\TimedProcessor::class, // Custom processor with timing
457+
],
458+
],
459+
460+
**Note**: If no processor is specified in the configuration, the default
461+
``Cake\Queue\Queue\Processor`` class will be used. Custom processors are useful
462+
for adding custom logging, metrics collection, or specialized message handling.
463+
464+
**Important**: The `--processor` command line option is different from the `processor` configuration option:
465+
466+
- **Configuration `processor`**: Specifies the processor class to use for processing messages
467+
- **Command line `--processor`**: Specifies the processor name for Enqueue topic binding (used in `bindTopic()`)
468+
469+
Example usage::
470+
471+
# Use custom processor class from config
472+
bin/cake queue worker --config=timed
473+
474+
# Use custom processor class AND specify topic binding name
475+
bin/cake queue worker --config=timed --processor=my-topic-processor
476+
326477
Run the worker
327478
==============
328479

@@ -336,7 +487,7 @@ This shell can take a few different options:
336487

337488
- ``--config`` (default: default): Name of a queue config to use
338489
- ``--queue`` (default: default): Name of queue to bind to
339-
- ``--processor`` (default: ``null``): Name of processor to bind to
490+
- ``--processor`` (default: ``null``): Name of processor to bind to (for Enqueue topic binding, not the processor class)
340491
- ``--logger`` (default: ``stdout``): Name of a configured logger
341492
- ``--max-jobs`` (default: ``null``): Maximum number of jobs to process. Worker will exit after limit is reached.
342493
- ``--max-runtime`` (default: ``null``): Maximum number of seconds to run. Worker will exit after limit is reached.
@@ -370,7 +521,7 @@ Requeue Failed Jobs
370521

371522
Push jobs back onto the queue and remove them from the ``queue_failed_jobs``
372523
table. If a job fails to requeue it is not guaranteed that the job was not run.
373-
524+
374525
.. code-block:: bash
375526
376527
bin/cake queue requeue

phpstan-baseline.neon

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
parameters:
2+
ignoreErrors:
3+
-
4+
message: '#^Parameter \#1 \$callback of static method Closure\:\:fromCallable\(\) expects callable\(\)\: mixed, array\{mixed, string\} given\.$#'
5+
identifier: argument.type
6+
count: 1
7+
path: src/Job/Message.php

phpstan.neon

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
1+
includes:
2+
- phpstan-baseline.neon
3+
14
parameters:
25
level: 8
3-
checkMissingIterableValueType: false
4-
checkGenericClassInNonGenericObjectType: false
56
treatPhpDocTypesAsCertain: false
67
bootstrapFiles:
78
- tests/bootstrap.php
89
paths:
910
- src/
1011
ignoreErrors:
11-
- '#Parameter \#1 \$callback of static method Closure::fromCallable\(\) expects callable\(\): mixed, array\{mixed, string\} given.#'
12-
- '#Method Cake\\Queue\\Job\\Message::getTarget\(\) should return array\{class-string, string\} but returns non-empty-array.#'
12+
- identifier: missingType.iterableValue
13+
- identifier: missingType.generics
14+
- identifier: trait.unused

psalm-baseline.xml

Lines changed: 0 additions & 3 deletions
This file was deleted.

psalm.xml

Lines changed: 0 additions & 28 deletions
This file was deleted.

src/Command/RequeueCommand.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ public function execute(Arguments $args, ConsoleIo $io): void
138138
'config' => $failedJob->config,
139139
'priority' => $failedJob->priority,
140140
'queue' => $failedJob->queue,
141-
]
141+
],
142142
);
143143

144144
$failedJobsTable->deleteOrFail($failedJob);

src/Command/WorkerCommand.php

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
use Enqueue\Consumption\Extension\LimitConsumptionTimeExtension;
3535
use Enqueue\Consumption\Extension\LoggerExtension;
3636
use Enqueue\Consumption\ExtensionInterface;
37+
use Interop\Queue\Processor as InteropProcessor;
3738
use Psr\Log\LoggerInterface;
3839
use Psr\Log\NullLogger;
3940

@@ -110,7 +111,7 @@ public function getOptionParser(): ConsoleOptionParser
110111
'short' => 'a',
111112
]);
112113
$parser->setDescription(
113-
'Runs a queue worker that consumes from the named queue.'
114+
'Runs a queue worker that consumes from the named queue.',
114115
);
115116

116117
return $parser;
@@ -170,6 +171,34 @@ protected function getLogger(Arguments $args): LoggerInterface
170171
return $logger ?? new NullLogger();
171172
}
172173

174+
/**
175+
* Creates and returns a Processor object
176+
*
177+
* @param \Cake\Console\Arguments $args Arguments
178+
* @param \Cake\Console\ConsoleIo $io ConsoleIo
179+
* @param \Psr\Log\LoggerInterface $logger Logger instance
180+
* @return \Interop\Queue\Processor
181+
*/
182+
protected function getProcessor(Arguments $args, ConsoleIo $io, LoggerInterface $logger): InteropProcessor
183+
{
184+
$configKey = (string)$args->getOption('config');
185+
$config = QueueManager::getConfig($configKey);
186+
187+
$processorClass = $config['processor'] ?? Processor::class;
188+
189+
if (!class_exists($processorClass)) {
190+
$io->error(sprintf(sprintf('Processor class %s not found', $processorClass)));
191+
$this->abort();
192+
}
193+
194+
if (!is_subclass_of($processorClass, InteropProcessor::class)) {
195+
$io->error(sprintf(sprintf('Processor class %s must implement Interop\Queue\Processor', $processorClass)));
196+
$this->abort();
197+
}
198+
199+
return new $processorClass($logger, $this->container);
200+
}
201+
173202
/**
174203
* @param \Cake\Console\Arguments $args Arguments
175204
* @param \Cake\Console\ConsoleIo $io ConsoleIo
@@ -184,7 +213,7 @@ public function execute(Arguments $args, ConsoleIo $io): int
184213
}
185214

186215
$logger = $this->getLogger($args);
187-
$processor = new Processor($logger, $this->container);
216+
$processor = $this->getProcessor($args, $io, $logger);
188217
$extension = $this->getQueueExtension($args, $logger);
189218

190219
$hasListener = Configure::check(sprintf('Queue.%s.listener', $config));
@@ -197,7 +226,10 @@ public function execute(Arguments $args, ConsoleIo $io): int
197226

198227
/** @var \Cake\Event\EventListenerInterface $listener */
199228
$listener = new $listenerClassName();
200-
$processor->getEventManager()->on($listener);
229+
230+
if ($processor instanceof Processor) {
231+
$processor->getEventManager()->on($listener);
232+
}
201233
}
202234
$client = QueueManager::engine($config);
203235
$queue = $args->getOption('queue')

0 commit comments

Comments
 (0)