-
-
Notifications
You must be signed in to change notification settings - Fork 66
Expand file tree
/
Copy pathTimers.php
More file actions
82 lines (68 loc) · 2.4 KB
/
Timers.php
File metadata and controls
82 lines (68 loc) · 2.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
<?php
declare(strict_types=1);
namespace Workflow\Traits;
use Carbon\CarbonInterval;
use React\Promise\Deferred;
use React\Promise\PromiseInterface;
use function React\Promise\resolve;
use Workflow\Serializers\Serializer;
use Workflow\Signal;
trait Timers
{
public static function timer($seconds): PromiseInterface
{
if (is_string($seconds)) {
$seconds = CarbonInterval::fromString($seconds)->totalSeconds;
}
if ($seconds <= 0) {
++self::$context->index;
return resolve(true);
}
$log = self::$context->storedWorkflow->logs()
->whereIndex(self::$context->index)
->first();
if ($log) {
++self::$context->index;
return resolve(Serializer::unserialize($log->result));
}
$timer = self::$context->storedWorkflow->timers()
->whereIndex(self::$context->index)
->first();
if ($timer === null) {
$when = self::$context->now->copy()
->addSeconds($seconds);
if (! self::$context->replaying) {
$timer = self::$context->storedWorkflow->timers()
->create([
'index' => self::$context->index,
'stop_at' => $when,
]);
}
}
$result = $timer->stop_at
->lessThanOrEqualTo(self::$context->now);
if ($result === true) {
if (! self::$context->replaying) {
try {
self::$context->storedWorkflow->logs()
->create([
'index' => self::$context->index,
'now' => self::$context->now,
'class' => Signal::class,
'result' => Serializer::serialize(true),
]);
} catch (\Illuminate\Database\UniqueConstraintViolationException $exception) {
// already logged
}
}
++self::$context->index;
return resolve(true);
}
if (! self::$context->replaying) {
Signal::dispatch(self::$context->storedWorkflow, self::connection(), self::queue())->delay($timer->stop_at);
}
++self::$context->index;
$deferred = new Deferred();
return $deferred->promise();
}
}