Skip to content

Commit 5126913

Browse files
authored
Exception logs replay (#375)
1 parent 85ee8ed commit 5126913

31 files changed

Lines changed: 1674 additions & 88 deletions

src/Activity.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,8 @@ public function failed(Throwable $throwable): void
170170
$this->storedWorkflow,
171171
$throwable,
172172
$workflow->connection(),
173-
$workflow->queue()
173+
$workflow->queue(),
174+
$this::class
174175
);
175176
}
176177

src/ActivityStub.php

Lines changed: 65 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,32 +28,66 @@ public static function async(callable $callback): PromiseInterface
2828
public static function make($activity, ...$arguments): PromiseInterface
2929
{
3030
$context = WorkflowStub::getContext();
31+
$result = null;
3132

32-
$log = $context->storedWorkflow->findLogByIndex($context->index);
33+
while (true) {
34+
$log = $context->storedWorkflow->findLogByIndex($context->index);
35+
$result = null;
3336

34-
if (WorkflowStub::faked()) {
35-
$mocks = WorkflowStub::mocks();
37+
if (WorkflowStub::faked()) {
38+
$mocks = WorkflowStub::mocks();
3639

37-
if (! $log && array_key_exists($activity, $mocks)) {
38-
$result = $mocks[$activity];
40+
if (! $log && array_key_exists($activity, $mocks)) {
41+
$mockedResult = $mocks[$activity];
3942

40-
$log = $context->storedWorkflow->createLog([
41-
'index' => $context->index,
42-
'now' => $context->now,
43-
'class' => $activity,
44-
'result' => Serializer::serialize(
45-
is_callable($result) ? $result($context, ...$arguments) : $result
46-
),
47-
]);
43+
$log = $context->storedWorkflow->createLog([
44+
'index' => $context->index,
45+
'now' => $context->now,
46+
'class' => $activity,
47+
'result' => Serializer::serialize(
48+
is_callable($mockedResult) ? $mockedResult($context, ...$arguments) : $mockedResult
49+
),
50+
]);
4851

49-
WorkflowStub::recordDispatched($activity, $arguments);
52+
WorkflowStub::recordDispatched($activity, $arguments);
53+
}
54+
}
55+
56+
if (! $log) {
57+
break;
5058
}
59+
60+
if ($log->class !== Exception::class) {
61+
break;
62+
}
63+
64+
$result = Serializer::unserialize($log->result);
65+
66+
if (! self::isForeignExceptionResult($result, $activity)) {
67+
break;
68+
}
69+
70+
++$context->index;
71+
WorkflowStub::setContext($context);
5172
}
5273

5374
if ($log) {
75+
$result ??= Serializer::unserialize($log->result);
76+
77+
if (
78+
WorkflowStub::isProbing()
79+
&& WorkflowStub::probeIndex() === $context->index
80+
&& (
81+
WorkflowStub::probeClass() === null
82+
|| WorkflowStub::probeClass() === $activity
83+
)
84+
&& $log->class === Exception::class
85+
) {
86+
WorkflowStub::markProbeMatched();
87+
}
88+
5489
++$context->index;
5590
WorkflowStub::setContext($context);
56-
$result = Serializer::unserialize($log->result);
5791
if (
5892
is_array($result) &&
5993
array_key_exists('class', $result) &&
@@ -74,11 +108,25 @@ public static function make($activity, ...$arguments): PromiseInterface
74108
return resolve($result);
75109
}
76110

111+
if (WorkflowStub::isProbing()) {
112+
WorkflowStub::markProbePendingBeforeMatch();
113+
++$context->index;
114+
WorkflowStub::setContext($context);
115+
return (new Deferred())->promise();
116+
}
117+
77118
$activity::dispatch($context->index, $context->now, $context->storedWorkflow, ...$arguments);
78119

79120
++$context->index;
80121
WorkflowStub::setContext($context);
81-
$deferred = new Deferred();
82-
return $deferred->promise();
122+
return (new Deferred())->promise();
123+
}
124+
125+
private static function isForeignExceptionResult(mixed $result, string $activity): bool
126+
{
127+
return is_array($result)
128+
&& isset($result['sourceClass'])
129+
&& is_string($result['sourceClass'])
130+
&& $result['sourceClass'] !== $activity;
83131
}
84132
}

src/ChildWorkflowStub.php

Lines changed: 65 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,32 +22,66 @@ public static function all(iterable $promises): PromiseInterface
2222
public static function make($workflow, ...$arguments): PromiseInterface
2323
{
2424
$context = WorkflowStub::getContext();
25+
$result = null;
2526

26-
$log = $context->storedWorkflow->findLogByIndex($context->index);
27+
while (true) {
28+
$log = $context->storedWorkflow->findLogByIndex($context->index);
29+
$result = null;
2730

28-
if (WorkflowStub::faked()) {
29-
$mocks = WorkflowStub::mocks();
31+
if (WorkflowStub::faked()) {
32+
$mocks = WorkflowStub::mocks();
3033

31-
if (! $log && array_key_exists($workflow, $mocks)) {
32-
$result = $mocks[$workflow];
34+
if (! $log && array_key_exists($workflow, $mocks)) {
35+
$mockedResult = $mocks[$workflow];
3336

34-
$log = $context->storedWorkflow->createLog([
35-
'index' => $context->index,
36-
'now' => $context->now,
37-
'class' => $workflow,
38-
'result' => Serializer::serialize(
39-
is_callable($result) ? $result($context, ...$arguments) : $result
40-
),
41-
]);
37+
$log = $context->storedWorkflow->createLog([
38+
'index' => $context->index,
39+
'now' => $context->now,
40+
'class' => $workflow,
41+
'result' => Serializer::serialize(
42+
is_callable($mockedResult) ? $mockedResult($context, ...$arguments) : $mockedResult
43+
),
44+
]);
4245

43-
WorkflowStub::recordDispatched($workflow, $arguments);
46+
WorkflowStub::recordDispatched($workflow, $arguments);
47+
}
48+
}
49+
50+
if (! $log) {
51+
break;
4452
}
53+
54+
if ($log->class !== Exception::class) {
55+
break;
56+
}
57+
58+
$result = Serializer::unserialize($log->result);
59+
60+
if (! self::isForeignExceptionResult($result, $workflow)) {
61+
break;
62+
}
63+
64+
++$context->index;
65+
WorkflowStub::setContext($context);
4566
}
4667

4768
if ($log) {
69+
$result ??= Serializer::unserialize($log->result);
70+
71+
if (
72+
WorkflowStub::isProbing()
73+
&& WorkflowStub::probeIndex() === $context->index
74+
&& (
75+
WorkflowStub::probeClass() === null
76+
|| WorkflowStub::probeClass() === $workflow
77+
)
78+
&& $log->class === Exception::class
79+
) {
80+
WorkflowStub::markProbeMatched();
81+
}
82+
4883
++$context->index;
4984
WorkflowStub::setContext($context);
50-
$result = Serializer::unserialize($log->result);
5185
if (
5286
is_array($result)
5387
&& array_key_exists('class', $result)
@@ -67,6 +101,13 @@ public static function make($workflow, ...$arguments): PromiseInterface
67101
return resolve($result);
68102
}
69103

104+
if (WorkflowStub::isProbing()) {
105+
WorkflowStub::markProbePendingBeforeMatch();
106+
++$context->index;
107+
WorkflowStub::setContext($context);
108+
return (new Deferred())->promise();
109+
}
110+
70111
if (! $context->replaying) {
71112
$storedChildWorkflow = $context->storedWorkflow->children()
72113
->wherePivot('parent_index', $context->index)
@@ -94,7 +135,14 @@ public static function make($workflow, ...$arguments): PromiseInterface
94135

95136
++$context->index;
96137
WorkflowStub::setContext($context);
97-
$deferred = new Deferred();
98-
return $deferred->promise();
138+
return (new Deferred())->promise();
139+
}
140+
141+
private static function isForeignExceptionResult(mixed $result, string $workflow): bool
142+
{
143+
return is_array($result)
144+
&& isset($result['sourceClass'])
145+
&& is_string($result['sourceClass'])
146+
&& $result['sourceClass'] !== $workflow;
99147
}
100148
}

0 commit comments

Comments
 (0)