Skip to content

Commit aa9424f

Browse files
authored
Merge pull request #273 from laravel-workflow/signal-race-condition
Signal race condition
2 parents 971239e + 2456aa1 commit aa9424f

7 files changed

Lines changed: 204 additions & 8 deletions

src/Workflow.php

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -128,14 +128,20 @@ public function handle(): void
128128
->wherePivot('parent_index', '!=', StoredWorkflow::ACTIVE_WORKFLOW_INDEX)
129129
->first();
130130

131-
$log = $this->storedWorkflow->logs()
132-
->whereIndex($this->index)
131+
$logs = $this->storedWorkflow->logs()
132+
->whereIn('index', [$this->index, $this->index + 1])
133+
->get();
134+
135+
$log = $logs->where('index', $this->index)
136+
->first();
137+
138+
$nextLog = $logs->where('index', $this->index + 1)
133139
->first();
134140

135141
$this->storedWorkflow
136142
->signals()
137-
->when($log, static function ($query, $log): void {
138-
$query->where('created_at', '<=', $log->created_at->format('Y-m-d H:i:s.u'));
143+
->when($nextLog, static function ($query, $nextLog): void {
144+
$query->where('created_at', '<=', $nextLog->created_at->format('Y-m-d H:i:s.u'));
139145
})
140146
->each(function ($signal): void {
141147
$this->{$signal->method}(...Serializer::unserialize($signal->arguments));
@@ -163,8 +169,14 @@ public function handle(): void
163169
while ($this->coroutine->valid()) {
164170
$this->index = WorkflowStub::getContext()->index;
165171

166-
$nextLog = $this->storedWorkflow->logs()
167-
->whereIndex($this->index)
172+
$logs = $this->storedWorkflow->logs()
173+
->whereIn('index', [$this->index, $this->index + 1])
174+
->get();
175+
176+
$log = $logs->where('index', $this->index)
177+
->first();
178+
179+
$nextLog = $logs->where('index', $this->index + 1)
168180
->first();
169181

170182
if ($log) {
@@ -179,8 +191,6 @@ public function handle(): void
179191
});
180192
}
181193

182-
$log = $nextLog;
183-
184194
$this->now = $log ? $log->now : Carbon::now();
185195

186196
WorkflowStub::setContext([

tests/Feature/WorkflowTest.php

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66

77
use Tests\Fixtures\TestActivity;
88
use Tests\Fixtures\TestOtherActivity;
9+
use Tests\Fixtures\TestSignalExceptionWorkflow;
10+
use Tests\Fixtures\TestSignalExceptionWorkflowLeader;
911
use Tests\Fixtures\TestWorkflow;
1012
use Tests\TestCase;
1113
use Workflow\Signal;
@@ -55,4 +57,76 @@ public function testCompletedDelay(): void
5557
->values()
5658
->toArray());
5759
}
60+
61+
public function testTestSignalExceptionWorkflowEarly(): void
62+
{
63+
$workflow = WorkflowStub::make(TestSignalExceptionWorkflow::class);
64+
65+
$workflow->start([
66+
'test' => 'data',
67+
]);
68+
69+
sleep(1);
70+
71+
$workflow->shouldRetry();
72+
73+
while ($workflow->running());
74+
75+
$this->assertSame(WorkflowCompletedStatus::class, $workflow->status());
76+
$this->assertTrue($workflow->output());
77+
}
78+
79+
public function testTestSignalExceptionWorkflowLate(): void
80+
{
81+
$workflow = WorkflowStub::make(TestSignalExceptionWorkflow::class);
82+
83+
$workflow->start([
84+
'test' => 'data',
85+
]);
86+
87+
sleep(3);
88+
89+
$workflow->shouldRetry();
90+
91+
while ($workflow->running());
92+
93+
$this->assertSame(WorkflowCompletedStatus::class, $workflow->status());
94+
$this->assertTrue($workflow->output());
95+
}
96+
97+
public function testTestSignalExceptionWorkflowLeaderEarly(): void
98+
{
99+
$workflow = WorkflowStub::make(TestSignalExceptionWorkflowLeader::class);
100+
101+
$workflow->start([
102+
'test' => 'data',
103+
]);
104+
105+
sleep(1);
106+
107+
$workflow->shouldRetry();
108+
109+
while ($workflow->running());
110+
111+
$this->assertSame(WorkflowCompletedStatus::class, $workflow->status());
112+
$this->assertTrue($workflow->output());
113+
}
114+
115+
public function testTestSignalExceptionWorkflowLeaderLate(): void
116+
{
117+
$workflow = WorkflowStub::make(TestSignalExceptionWorkflowLeader::class);
118+
119+
$workflow->start([
120+
'test' => 'data',
121+
]);
122+
123+
sleep(3);
124+
125+
$workflow->shouldRetry();
126+
127+
while ($workflow->running());
128+
129+
$this->assertSame(WorkflowCompletedStatus::class, $workflow->status());
130+
$this->assertTrue($workflow->output());
131+
}
58132
}

tests/Fixtures/TestParentWorkflow.php

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,17 @@
66

77
use Workflow\ActivityStub;
88
use Workflow\ChildWorkflowStub;
9+
use Workflow\SignalMethod;
910
use Workflow\Workflow;
1011

1112
class TestParentWorkflow extends Workflow
1213
{
14+
#[SignalMethod]
15+
public function ping(): void
16+
{
17+
// Do nothing
18+
}
19+
1320
public function execute()
1421
{
1522
$otherResult = yield ChildWorkflowStub::make(TestChildWorkflow::class);
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Tests\Fixtures;
6+
7+
use Throwable;
8+
use Workflow\ActivityStub;
9+
use Workflow\SignalMethod;
10+
use Workflow\Workflow;
11+
use Workflow\WorkflowStub;
12+
13+
class TestSignalExceptionWorkflow extends Workflow
14+
{
15+
protected bool $shouldRetry = false;
16+
17+
#[SignalMethod]
18+
public function shouldRetry(): void
19+
{
20+
$this->shouldRetry = true;
21+
}
22+
23+
public function execute(array $data = [])
24+
{
25+
$shouldThrow = true;
26+
while (true) {
27+
try {
28+
yield ActivityStub::make(TestActivity::class);
29+
yield ActivityStub::make(TestSingleTryExceptionActivity::class, $shouldThrow);
30+
return true;
31+
} catch (Throwable) {
32+
yield WorkflowStub::await(fn () => $this->shouldRetry);
33+
$this->shouldRetry = false;
34+
$shouldThrow = false;
35+
}
36+
}
37+
}
38+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Tests\Fixtures;
6+
7+
use Throwable;
8+
use Workflow\ActivityStub;
9+
use Workflow\SignalMethod;
10+
use Workflow\Workflow;
11+
use Workflow\WorkflowStub;
12+
13+
class TestSignalExceptionWorkflowLeader extends Workflow
14+
{
15+
protected bool $shouldRetry = false;
16+
17+
#[SignalMethod]
18+
public function shouldRetry(): void
19+
{
20+
$this->shouldRetry = true;
21+
}
22+
23+
public function execute(array $data = [])
24+
{
25+
$shouldThrow = true;
26+
while (true) {
27+
try {
28+
yield ActivityStub::make(TestSingleTryExceptionActivity::class, $shouldThrow);
29+
return true;
30+
} catch (Throwable) {
31+
yield WorkflowStub::await(fn () => $this->shouldRetry);
32+
$this->shouldRetry = false;
33+
$shouldThrow = false;
34+
}
35+
}
36+
}
37+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Tests\Fixtures;
6+
7+
use Exception;
8+
use Workflow\Activity;
9+
10+
final class TestSingleTryExceptionActivity extends Activity
11+
{
12+
public $tries = 1;
13+
14+
public function execute($shouldThrow)
15+
{
16+
if ($shouldThrow) {
17+
throw new Exception('failed');
18+
}
19+
20+
return 'activity';
21+
}
22+
}

tests/Unit/WorkflowTest.php

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,14 @@ public function testParentPending(): void
255255
'result' => Serializer::serialize('activity'),
256256
]);
257257

258+
$storedParentWorkflow->signals()
259+
->create([
260+
'method' => 'ping',
261+
'arguments' => Serializer::serialize([]),
262+
'created_at' => now()
263+
->addSeconds(1),
264+
]);
265+
258266
$childWorkflow = WorkflowStub::load(WorkflowStub::make(TestChildWorkflow::class)->id());
259267

260268
$storedChildWorkflow = StoredWorkflow::findOrFail($childWorkflow->id());

0 commit comments

Comments
 (0)