Skip to content

Commit c93d80e

Browse files
committed
Add createJob(unique: true) for fan-out dedup
When a fan-out dispatcher enqueues per-tenant work on every cron tick, plain `createJob()` happily inserts a duplicate row even if the previous tick's job for the same tenant is still pending or stuck. The result is a slow accumulation of identical pending jobs -- in one observed case, 5876 stuck `VolunteerCheckOutReminder` rows from a single tenant whose DB connection had been decommissioned. `isQueued()` already exists for this, but every caller has to wrap each `createJob()` in a two-line manual guard. This adds the guard to `createJob()` itself as an opt-in `unique` flag: $queuedJobsTable->createJob( 'VolunteerCheckOutReminder', ['account_uuid' => $accountUuid], [ 'reference' => 'volunteer_check_out:' . $accountUuid, 'unique' => true, ], ); When `unique` is set and a pending (`completed IS NULL`) job exists for the same `(reference, resolved job_task)` pair, the existing entity is returned and no new row is inserted. The dedup hit is logged at info level. `unique` without a `reference` throws `InvalidArgumentException` at the call site -- failing fast beats silently inserting an undeduped row. The flag lives on `JobConfig` as a request-time property outside `_keyMap`, so it never leaks into `toArray()` output or the `queued_jobs` row. Default is `false`, fully BC. Race window: two ticks landing simultaneously can both pass the `isQueued()` check and both insert. A DB-level unique constraint would close that, but it requires a migration and a decision on how callers should opt in per-table -- out of scope for this PR. The 99% effectiveness already kills the slow-buildup scenario this is built for.
1 parent c575000 commit c93d80e

4 files changed

Lines changed: 220 additions & 3 deletions

File tree

docs/guide/queueing-jobs.md

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ The `createJob()` function takes three arguments.
2727
- The first argument is the name of the type of job that you are creating.
2828
- The second argument is optional, but if set must be an array of data and will be passed as a payload parameter to the `run()` function of the worker.
2929
It can also be a (DTO) object that implements `CakeDto\Dto\FromArrayToArrayInterface` or provides `toArray()` method.
30-
- The third argument is options (`'notBefore'`, `'priority'`, `'group'`, `'reference'`). Either as array or `Queue\Config\JobConfig` class.
30+
- The third argument is options (`'notBefore'`, `'priority'`, `'group'`, `'reference'`, `'unique'`). Either as array or `Queue\Config\JobConfig` class. See [Avoiding parallel (re)queueing](#avoiding-parallel-re-queueing) for the `unique` flag.
3131

3232
> `priority` is sorted ascending, therefore a task with priority 1 will be executed before a task with priority 5
3333
@@ -201,6 +201,41 @@ For more complex use cases, you can manually use `->find()->where()`, of course.
201201

202202
Note that the 2nd argument (job type) is optional, but recommended. If you do not use it, make sure your reference is globally unique.
203203

204+
### Built-in dedup with `unique: true`
205+
206+
For fan-out dispatchers and cron-driven tasks that enqueue many similar
207+
jobs, the manual `isQueued()` guard above is two lines of boilerplate
208+
per enqueue site. The `unique` option folds that into `createJob()`
209+
directly: if a pending (incomplete) job already exists for the same
210+
`(reference, resolved job_task)` pair, the existing entity is returned
211+
and no new row is inserted.
212+
213+
```php
214+
$queuedJobsTable->createJob(
215+
'VolunteerCheckOutReminder',
216+
['account_uuid' => $accountUuid],
217+
[
218+
'reference' => 'volunteer_check_out:' . $accountUuid,
219+
'unique' => true,
220+
],
221+
);
222+
// First call: inserts a new pending job, returns the new entity.
223+
// Subsequent calls while that job is pending: return the *existing*
224+
// entity without inserting. Once the original completes, the next call
225+
// inserts a fresh row.
226+
```
227+
228+
`unique: true` without a `reference` throws `InvalidArgumentException`
229+
— fail-fast at the call site rather than silently inserting an
230+
undeduped row. The dedup check uses an info-level log entry
231+
(`Queue.createJob: dedup hit for <task> reference=<ref> ...`) so
232+
operators can confirm the guard is firing.
233+
234+
The dedup window is "the previous job is not yet completed" — pending,
235+
in-progress, and failed-but-not-completed jobs all block. This is the
236+
right shape for cron fan-out: if a previous tick is stuck or still
237+
running, the next tick holds back instead of stacking a duplicate.
238+
204239
## Updating progress/status
205240

206241
The `createJob()` method returns the entity. So you can store the ID and at any time ask the queue about the status of this job.

src/Config/JobConfig.php

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@ class JobConfig {
4040
*/
4141
public const FIELD_STATUS = 'status';
4242

43+
/**
44+
* @var string
45+
*/
46+
public const FIELD_UNIQUE = 'unique';
47+
4348
/**
4449
* For camelBacked input/output.
4550
*
@@ -92,6 +97,14 @@ class JobConfig {
9297
*/
9398
protected $status;
9499

100+
/**
101+
* Request-time flag. Not persisted; intentionally outside `_keyMap` so
102+
* it never leaks into `toArray()` output or the `queued_jobs` row.
103+
*
104+
* @var bool
105+
*/
106+
protected bool $unique = false;
107+
95108
/**
96109
* @var array<string, array<string, string>>
97110
*/
@@ -128,6 +141,15 @@ class JobConfig {
128141
public function fromArray(array $data, ?string $type = null) {
129142
$type = $this->keyType($type, static::TYPE_CAMEL);
130143

144+
// `unique` is a request-time flag, not persisted state, so it lives
145+
// outside `_keyMap`. Handle it here so a caller can pass it in the
146+
// array shape (`createJob($task, $data, ['reference' => ..., 'unique' => true])`)
147+
// without tripping the strict `field()` lookup below.
148+
if (array_key_exists('unique', $data)) {
149+
$this->unique = (bool)$data['unique'];
150+
unset($data['unique']);
151+
}
152+
131153
foreach ($data as $field => $value) {
132154
if ($type !== static::TYPE_CAMEL) {
133155
$field = $this->field($field, $type);
@@ -477,4 +499,27 @@ public function hasStatus(): bool {
477499
return $this->status !== null;
478500
}
479501

502+
/**
503+
* Enable dedup for this job: if a pending (incomplete) job already
504+
* exists with the same `reference` and resolved `job_task`,
505+
* `createJob()` returns that existing entity instead of inserting a
506+
* duplicate. Requires `reference` to be set.
507+
*
508+
* @param bool $unique
509+
*
510+
* @return $this
511+
*/
512+
public function setUnique(bool $unique) {
513+
$this->unique = $unique;
514+
515+
return $this;
516+
}
517+
518+
/**
519+
* @return bool
520+
*/
521+
public function isUnique(): bool {
522+
return $this->unique;
523+
}
524+
480525
}

src/Model/Table/QueuedJobsTable.php

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
use Cake\Event\EventInterface;
1111
use Cake\Event\EventManager;
1212
use Cake\I18n\DateTime;
13+
use Cake\Log\Log;
1314
use Cake\ORM\Query\SelectQuery;
1415
use Cake\ORM\Table;
1516
use Cake\Validation\Validator;
@@ -185,12 +186,19 @@ public function createConfig(): JobConfig {
185186
* - group: Used to group similar QueuedJobs
186187
* - reference: An optional reference string
187188
* - status: To set an initial status text
189+
* - unique: When true (with a `reference`), an existing pending job
190+
* matching `(reference, resolved job_task)` is returned instead of
191+
* inserting a duplicate row. Useful for fan-out dispatchers that
192+
* could otherwise stack up identical per-tenant work when a previous
193+
* run is still pending or stuck.
188194
*
189195
* @param string $jobTask Job task name or FQCN.
190196
* @param object|array<string, mixed>|null $data Array of data or DTO like object.
191197
* @param \Queue\Config\JobConfig|array<string, mixed> $config Config to save along with the job.
192198
*
193-
* @return \Queue\Model\Entity\QueuedJob Saved job entity
199+
* @throws \InvalidArgumentException If `unique` is set without a `reference`.
200+
*
201+
* @return \Queue\Model\Entity\QueuedJob Saved job entity (or the existing pending entity if `unique` deduped).
194202
*/
195203
public function createJob(string $jobTask, array|object|null $data = null, array|JobConfig $config = []): QueuedJob {
196204
if (!$config instanceof JobConfig) {
@@ -206,8 +214,34 @@ public function createJob(string $jobTask, array|object|null $data = null, array
206214
throw new InvalidArgumentException('Data must be `array|null`, implement `' . FromArrayToArrayInterface::class . '` or provide a `toArray()` method');
207215
}
208216

217+
$resolvedTask = $this->jobTask($jobTask);
218+
219+
if ($config->isUnique()) {
220+
if (!$config->hasReference()) {
221+
throw new InvalidArgumentException('createJob() with `unique` requires a `reference` to dedupe on.');
222+
}
223+
224+
$existing = $this->find()
225+
->where([
226+
'reference' => $config->getReferenceOrFail(),
227+
'job_task' => $resolvedTask,
228+
'completed IS' => null,
229+
])
230+
->first();
231+
if ($existing !== null) {
232+
Log::info(sprintf(
233+
'Queue.createJob: dedup hit for %s reference=%s (returning existing job_id=%d)',
234+
$resolvedTask,
235+
$config->getReferenceOrFail(),
236+
$existing->id,
237+
));
238+
239+
return $existing;
240+
}
241+
}
242+
209243
$queuedJob = [
210-
'job_task' => $this->jobTask($jobTask),
244+
'job_task' => $resolvedTask,
211245
'data' => $data,
212246
'notbefore' => $config->hasNotBefore() ? $this->getDateTime($config->getNotBeforeOrFail()) : null,
213247
'priority' => $config->getPriority(),

tests/TestCase/Model/Table/QueuedJobsTableTest.php

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
use Cake\I18n\DateTime;
1717
use Cake\ORM\TableRegistry;
1818
use Cake\TestSuite\TestCase;
19+
use InvalidArgumentException;
1920
use Queue\Model\Enum\Priority;
2021
use Queue\Model\Table\QueuedJobsTable;
2122
use Queue\Queue\Task\ExampleTask;
@@ -707,6 +708,108 @@ public function testPriority() {
707708
$this->assertSame(['key' => 'k2'], $data);
708709
}
709710

711+
/**
712+
* @return void
713+
*/
714+
public function testCreateJobUniqueReturnsExistingPendingJob(): void {
715+
$first = $this->QueuedJobs->createJob(
716+
'Foo',
717+
['k' => 'v1'],
718+
['reference' => 'tenant-1', 'unique' => true],
719+
);
720+
721+
$second = $this->QueuedJobs->createJob(
722+
'Foo',
723+
['k' => 'v2'],
724+
['reference' => 'tenant-1', 'unique' => true],
725+
);
726+
727+
$this->assertSame($first->id, $second->id, 'unique should return the existing pending job');
728+
// Original payload is preserved; the second call is a no-op insert.
729+
$this->assertSame(['k' => 'v1'], $second->data);
730+
$this->assertSame(1, $this->QueuedJobs->find()->where(['reference' => 'tenant-1'])->count());
731+
}
732+
733+
/**
734+
* Once the first job is completed, a later `unique` call must create a
735+
* fresh row -- otherwise the next scheduled run could never enqueue.
736+
*
737+
* @return void
738+
*/
739+
public function testCreateJobUniqueInsertsAgainAfterCompletion(): void {
740+
$first = $this->QueuedJobs->createJob(
741+
'Foo',
742+
null,
743+
['reference' => 'tenant-2', 'unique' => true],
744+
);
745+
746+
$first->completed = new DateTime();
747+
$this->QueuedJobs->saveOrFail($first);
748+
749+
$second = $this->QueuedJobs->createJob(
750+
'Foo',
751+
null,
752+
['reference' => 'tenant-2', 'unique' => true],
753+
);
754+
755+
$this->assertNotSame($first->id, $second->id);
756+
$this->assertSame(2, $this->QueuedJobs->find()->where(['reference' => 'tenant-2'])->count());
757+
}
758+
759+
/**
760+
* Dedup is scoped to (reference, job_task) -- a different task name with
761+
* the same reference still inserts.
762+
*
763+
* @return void
764+
*/
765+
public function testCreateJobUniqueIsScopedByJobTask(): void {
766+
Configure::write('Queue.skipExistenceCheck', true);
767+
768+
$foo = $this->QueuedJobs->createJob(
769+
'Foo',
770+
null,
771+
['reference' => 'shared-ref', 'unique' => true],
772+
);
773+
774+
$bar = $this->QueuedJobs->createJob(
775+
'Bar',
776+
null,
777+
['reference' => 'shared-ref', 'unique' => true],
778+
);
779+
780+
$this->assertNotSame($foo->id, $bar->id);
781+
$this->assertSame('Foo', $foo->job_task);
782+
$this->assertSame('Bar', $bar->job_task);
783+
}
784+
785+
/**
786+
* Without `unique`, two calls with the same reference both insert -- BC
787+
* preserved for callers that use `reference` for audit but want every
788+
* scheduled run to enqueue independently.
789+
*
790+
* @return void
791+
*/
792+
public function testCreateJobWithoutUniqueDoesNotDedupe(): void {
793+
$first = $this->QueuedJobs->createJob('Foo', null, ['reference' => 'tenant-3']);
794+
$second = $this->QueuedJobs->createJob('Foo', null, ['reference' => 'tenant-3']);
795+
796+
$this->assertNotSame($first->id, $second->id);
797+
$this->assertSame(2, $this->QueuedJobs->find()->where(['reference' => 'tenant-3'])->count());
798+
}
799+
800+
/**
801+
* `unique` without a `reference` is a programming error -- fail fast at
802+
* the call site rather than silently inserting an undeduped row.
803+
*
804+
* @return void
805+
*/
806+
public function testCreateJobUniqueWithoutReferenceThrows(): void {
807+
$this->expectException(InvalidArgumentException::class);
808+
$this->expectExceptionMessageMatches('/unique.*reference/i');
809+
810+
$this->QueuedJobs->createJob('Foo', null, ['unique' => true]);
811+
}
812+
710813
/**
711814
* @return void
712815
*/

0 commit comments

Comments
 (0)