Skip to content

Commit 85ee8ed

Browse files
authored
Watchdog (#371)
1 parent 0f4b774 commit 85ee8ed

File tree

8 files changed

+897
-4
lines changed

8 files changed

+897
-4
lines changed

composer.json

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@
2222
"scripts": {
2323
"ecs": "vendor/bin/ecs check --fix",
2424
"stan": "vendor/bin/phpstan analyse src tests",
25-
"feature": "phpunit --testdox --testsuite feature",
25+
"feature": [
26+
"Composer\\Config::disableProcessTimeout",
27+
"phpunit --testdox --testsuite feature"
28+
],
2629
"unit": "phpunit --testdox --testsuite unit",
2730
"test": "phpunit --testdox",
2831
"coverage": "XDEBUG_MODE=coverage phpunit --testdox --testsuite unit --coverage-clover coverage.xml",

src/Providers/WorkflowServiceProvider.php

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,13 @@
44

55
namespace Workflow\Providers;
66

7+
use Illuminate\Queue\Events\Looping;
8+
use Illuminate\Support\Facades\Event;
79
use Illuminate\Support\ServiceProvider;
810
use Laravel\SerializableClosure\SerializableClosure;
911
use Workflow\Commands\ActivityMakeCommand;
1012
use Workflow\Commands\WorkflowMakeCommand;
13+
use Workflow\Watchdog;
1114

1215
final class WorkflowServiceProvider extends ServiceProvider
1316
{
@@ -24,5 +27,9 @@ public function boot(): void
2427
], 'migrations');
2528

2629
$this->commands([ActivityMakeCommand::class, WorkflowMakeCommand::class]);
30+
31+
Event::listen(Looping::class, static function (Looping $event): void {
32+
Watchdog::wake($event->connectionName, $event->queue);
33+
});
2734
}
2835
}

src/Watchdog.php

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Workflow;
6+
7+
use Illuminate\Bus\Queueable;
8+
use Illuminate\Bus\UniqueLock;
9+
use Illuminate\Contracts\Bus\Dispatcher;
10+
use Illuminate\Contracts\Queue\ShouldBeEncrypted;
11+
use Illuminate\Contracts\Queue\ShouldQueue;
12+
use Illuminate\Queue\InteractsWithQueue;
13+
use Illuminate\Support\Carbon;
14+
use Illuminate\Support\Facades\Cache;
15+
use Illuminate\Support\Facades\DB;
16+
use Workflow\Models\StoredWorkflow;
17+
use Workflow\States\WorkflowPendingStatus;
18+
19+
class Watchdog implements ShouldBeEncrypted, ShouldQueue
20+
{
21+
use InteractsWithQueue;
22+
use Queueable;
23+
24+
public const DEFAULT_TIMEOUT = 300;
25+
26+
private const CACHE_KEY = 'workflow:watchdog';
27+
28+
private const LOOP_THROTTLE_KEY = 'workflow:watchdog:looping';
29+
30+
private const RECOVERY_LOCK_PREFIX = 'workflow:watchdog:recovering:';
31+
32+
public int $tries = 0;
33+
34+
public int $maxExceptions = 0;
35+
36+
public $timeout = 0;
37+
38+
public static function wake(string $connection, ?string $queue = null): void
39+
{
40+
$timeout = self::timeout();
41+
42+
$queue = self::normalizeQueue($queue);
43+
44+
DB::afterCommit(static function () use ($connection, $queue, $timeout): void {
45+
if (Cache::has(self::CACHE_KEY)) {
46+
return;
47+
}
48+
49+
if (! Cache::add(self::LOOP_THROTTLE_KEY, true, 60)) {
50+
return;
51+
}
52+
53+
if (! self::hasRecoverablePendingWorkflows($timeout)) {
54+
return;
55+
}
56+
57+
if (! Cache::add(self::CACHE_KEY, true, $timeout)) {
58+
return;
59+
}
60+
61+
$watchdog = (new self())
62+
->onConnection($connection);
63+
64+
if ($queue !== null) {
65+
$watchdog->onQueue($queue);
66+
}
67+
68+
try {
69+
app(Dispatcher::class)->dispatch($watchdog);
70+
} catch (\Throwable $exception) {
71+
Cache::forget(self::CACHE_KEY);
72+
Cache::forget(self::LOOP_THROTTLE_KEY);
73+
74+
throw $exception;
75+
}
76+
});
77+
}
78+
79+
public function handle(): void
80+
{
81+
$timeout = self::timeout();
82+
83+
Cache::put(self::CACHE_KEY, true, $timeout);
84+
85+
$model = config('workflows.stored_workflow_model', StoredWorkflow::class);
86+
87+
$model::where('status', WorkflowPendingStatus::$name)
88+
->where('updated_at', '<=', Carbon::now()->subSeconds($timeout))
89+
->whereNotNull('arguments')
90+
->each(static function (StoredWorkflow $storedWorkflow) use ($timeout): void {
91+
self::recover($storedWorkflow, $timeout);
92+
});
93+
94+
if ($this->job !== null) {
95+
$this->release($timeout);
96+
}
97+
}
98+
99+
private static function recover(StoredWorkflow $storedWorkflow, int $timeout): bool
100+
{
101+
$claimTtl = self::bootstrapWindow($timeout);
102+
103+
return (bool) (Cache::lock(self::RECOVERY_LOCK_PREFIX . $storedWorkflow->id, $claimTtl)
104+
->get(static function () use ($storedWorkflow): bool {
105+
$storedWorkflow->refresh();
106+
107+
if ($storedWorkflow->status::class !== WorkflowPendingStatus::class) {
108+
return false;
109+
}
110+
111+
$workflowStub = $storedWorkflow->toWorkflow();
112+
$workflowClass = $storedWorkflow->class;
113+
$workflowJob = new $workflowClass($storedWorkflow, ...$storedWorkflow->workflowArguments());
114+
115+
$storedWorkflow->touch();
116+
117+
(new UniqueLock(Cache::driver()))->release($workflowJob);
118+
119+
$workflowStub->resume();
120+
121+
return true;
122+
}) ?? false);
123+
}
124+
125+
private static function timeout(): int
126+
{
127+
return self::DEFAULT_TIMEOUT;
128+
}
129+
130+
private static function hasRecoverablePendingWorkflows(int $timeout): bool
131+
{
132+
$model = config('workflows.stored_workflow_model', StoredWorkflow::class);
133+
134+
return $model::where('status', WorkflowPendingStatus::$name)
135+
->where('updated_at', '<=', Carbon::now()->subSeconds($timeout))
136+
->whereNotNull('arguments')
137+
->exists();
138+
}
139+
140+
private static function bootstrapWindow(int $timeout): int
141+
{
142+
return max(1, min($timeout, 60));
143+
}
144+
145+
private static function normalizeQueue(?string $queue): ?string
146+
{
147+
if ($queue === null) {
148+
return null;
149+
}
150+
151+
foreach (explode(',', $queue) as $candidate) {
152+
$candidate = trim($candidate);
153+
154+
if ($candidate !== '') {
155+
return $candidate;
156+
}
157+
}
158+
159+
return null;
160+
}
161+
}

src/Workflow.php

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -177,10 +177,14 @@ public function handle(): void
177177
$this->storedWorkflow->status->transitionTo(WorkflowRunningStatus::class);
178178
}
179179
} catch (TransitionNotFound) {
180-
if ($this->storedWorkflow->toWorkflow()->running()) {
181-
$this->release();
180+
$this->storedWorkflow->refresh();
181+
182+
if ($this->storedWorkflow->status::class !== WorkflowRunningStatus::class) {
183+
if ($this->storedWorkflow->toWorkflow()->running()) {
184+
$this->release();
185+
}
186+
return;
182187
}
183-
return;
184188
}
185189

186190
$parentWorkflow = $this->storedWorkflow->parents()

tests/Unit/Middleware/WithoutOverlappingMiddlewareTest.php

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,47 @@ public function testUnlockActivityAppliesTtlWhenExpiresAfterIsConfigured(): void
325325
$this->assertTrue($result);
326326
}
327327

328+
public function testUnlockActivityAppliesTtlWhenOtherActivitiesRemain(): void
329+
{
330+
$job = new \stdClass();
331+
$job->key = 'test-activity-key';
332+
333+
$remainingKey = 'other-activity-key';
334+
335+
$lock = $this->mock(Lock::class, static function (MockInterface $mock) {
336+
$mock->shouldReceive('get')
337+
->once()
338+
->andReturn(true);
339+
$mock->shouldReceive('release')
340+
->once();
341+
});
342+
343+
$this->mock(Repository::class, static function (MockInterface $mock) use ($job, $lock, $remainingKey) {
344+
$mock->shouldReceive('lock')
345+
->once()
346+
->andReturn($lock);
347+
$mock->shouldReceive('get')
348+
->with('laravel-workflow-overlap:1:activity', [])
349+
->andReturn([$job->key, $remainingKey]);
350+
$mock->shouldReceive('put')
351+
->with('laravel-workflow-overlap:1:activity', [$remainingKey], 60)
352+
->once();
353+
$mock->shouldReceive('forget')
354+
->with($job->key)
355+
->once();
356+
$mock->shouldReceive('has')
357+
->with($remainingKey)
358+
->once()
359+
->andReturn(false);
360+
});
361+
362+
$middleware = new WithoutOverlappingMiddleware(1, WithoutOverlappingMiddleware::ACTIVITY, 0, 60);
363+
364+
$result = $middleware->unlock($job);
365+
366+
$this->assertTrue($result);
367+
}
368+
328369
public function testUnlockActivityRetriesOnLockFailure(): void
329370
{
330371
$job = new \stdClass();

tests/Unit/Providers/WorkflowServiceProviderTest.php

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,39 @@
44

55
namespace Tests\Unit;
66

7+
use Illuminate\Queue\Events\Looping;
78
use Illuminate\Support\Facades\Artisan;
9+
use Illuminate\Support\Facades\Cache;
10+
use Illuminate\Support\Facades\Event;
11+
use Illuminate\Support\Facades\Queue;
12+
use Tests\Fixtures\TestSimpleWorkflow;
813
use Tests\TestCase;
14+
use Workflow\Models\StoredWorkflow;
915
use Workflow\Providers\WorkflowServiceProvider;
16+
use Workflow\Serializers\Serializer;
17+
use Workflow\States\WorkflowPendingStatus;
18+
use Workflow\Watchdog;
1019

1120
final class WorkflowServiceProviderTest extends TestCase
1221
{
1322
protected function setUp(): void
1423
{
1524
parent::setUp();
25+
26+
Cache::forget('workflow:watchdog');
27+
Cache::forget('workflow:watchdog:looping');
28+
1629
$this->app->register(WorkflowServiceProvider::class);
1730
}
1831

32+
protected function tearDown(): void
33+
{
34+
Cache::forget('workflow:watchdog');
35+
Cache::forget('workflow:watchdog:looping');
36+
37+
parent::tearDown();
38+
}
39+
1940
public function testProviderLoads(): void
2041
{
2142
$this->assertTrue(
@@ -56,4 +77,75 @@ public function testCommandsAreRegistered(): void
5677
);
5778
}
5879
}
80+
81+
public function testLoopingEventWakesWatchdog(): void
82+
{
83+
Queue::fake();
84+
Cache::forget('workflow:watchdog');
85+
86+
StoredWorkflow::create([
87+
'class' => TestSimpleWorkflow::class,
88+
'arguments' => Serializer::serialize([]),
89+
'status' => WorkflowPendingStatus::$name,
90+
'updated_at' => now()
91+
->subSeconds(Watchdog::DEFAULT_TIMEOUT + 1),
92+
]);
93+
94+
Event::dispatch(new Looping('redis', 'high,default'));
95+
96+
Queue::assertPushed(Watchdog::class, static function (Watchdog $watchdog): bool {
97+
return $watchdog->connection === 'redis'
98+
&& $watchdog->queue === 'high';
99+
});
100+
}
101+
102+
public function testLoopingEventThrottlesWake(): void
103+
{
104+
Queue::fake();
105+
Cache::forget('workflow:watchdog');
106+
107+
StoredWorkflow::create([
108+
'class' => TestSimpleWorkflow::class,
109+
'arguments' => Serializer::serialize([]),
110+
'status' => WorkflowPendingStatus::$name,
111+
'updated_at' => now()
112+
->subSeconds(Watchdog::DEFAULT_TIMEOUT + 1),
113+
]);
114+
115+
Event::dispatch(new Looping('redis', 'high,default'));
116+
Event::dispatch(new Looping('redis', 'high,default'));
117+
Event::dispatch(new Looping('redis', 'high,default'));
118+
119+
Queue::assertPushed(Watchdog::class, 1);
120+
}
121+
122+
public function testLoopingEventSkipsWhenThrottleAlreadyHeld(): void
123+
{
124+
Queue::fake();
125+
Cache::forget('workflow:watchdog');
126+
Cache::put('workflow:watchdog:looping', true, 60);
127+
128+
StoredWorkflow::create([
129+
'class' => TestSimpleWorkflow::class,
130+
'arguments' => Serializer::serialize([]),
131+
'status' => WorkflowPendingStatus::$name,
132+
'updated_at' => now()
133+
->subSeconds(Watchdog::DEFAULT_TIMEOUT + 1),
134+
]);
135+
136+
Event::dispatch(new Looping('redis', 'high,default'));
137+
138+
Queue::assertNotPushed(Watchdog::class);
139+
}
140+
141+
public function testLoopingEventSkipsWhenNoRecoverablePendingWorkflowsExist(): void
142+
{
143+
Queue::fake();
144+
Cache::forget('workflow:watchdog');
145+
Cache::forget('workflow:watchdog:looping');
146+
147+
Event::dispatch(new Looping('redis', 'high,default'));
148+
149+
Queue::assertNotPushed(Watchdog::class);
150+
}
59151
}

0 commit comments

Comments
 (0)