-
Notifications
You must be signed in to change notification settings - Fork 57
Expand file tree
/
Copy pathInterceptorCallsCounter.php
More file actions
115 lines (100 loc) · 4.14 KB
/
InterceptorCallsCounter.php
File metadata and controls
115 lines (100 loc) · 4.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
<?php
/**
* This file is part of Temporal package.
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
declare(strict_types=1);
namespace Temporal\Tests\Interceptor;
use React\Promise\PromiseInterface;
use Temporal\Interceptor\ActivityInbound\ActivityInput;
use Temporal\Interceptor\ActivityInboundInterceptor;
use Temporal\Interceptor\HeaderInterface;
use Temporal\Interceptor\Trait\ActivityInboundInterceptorTrait;
use Temporal\Interceptor\Trait\WorkflowClientCallsInterceptorTrait;
use Temporal\Interceptor\Trait\WorkflowInboundCallsInterceptorTrait;
use Temporal\Interceptor\Trait\WorkflowOutboundRequestInterceptorTrait;
use Temporal\Interceptor\WorkflowClient\SignalWithStartInput;
use Temporal\Interceptor\WorkflowClient\StartInput;
use Temporal\Interceptor\WorkflowClient\UpdateWithStartInput;
use Temporal\Interceptor\WorkflowClient\UpdateWithStartOutput;
use Temporal\Interceptor\WorkflowClientCallsInterceptor;
use Temporal\Interceptor\WorkflowInbound\InitInput;
use Temporal\Interceptor\WorkflowInbound\SignalInput;
use Temporal\Interceptor\WorkflowInbound\UpdateInput;
use Temporal\Interceptor\WorkflowInbound\WorkflowInput;
use Temporal\Interceptor\WorkflowInboundCallsInterceptor;
use Temporal\Interceptor\WorkflowOutboundRequestInterceptor;
use Temporal\Worker\Transport\Command\RequestInterface;
use Temporal\Workflow;
use Temporal\Workflow\WorkflowExecution;
/**
* Adds in the Header a key with an interceptor method name that was called
* with value of the number of times it was called.
*
* Note: some methods like {@see self::signal()} have no ability to change the header.
* @psalm-immutable
*/
final class InterceptorCallsCounter implements
WorkflowOutboundRequestInterceptor,
ActivityInboundInterceptor,
WorkflowInboundCallsInterceptor,
WorkflowClientCallsInterceptor
{
use WorkflowOutboundRequestInterceptorTrait;
use ActivityInboundInterceptorTrait;
use WorkflowInboundCallsInterceptorTrait;
use WorkflowClientCallsInterceptorTrait;
private function increment(HeaderInterface $header, string $key): HeaderInterface
{
$value = $header->getValue($key);
$value = $value === null ? 1 : (int)$value + 1;
return $header->withValue($key, (string)$value);
}
public function handleOutboundRequest(RequestInterface $request, callable $next): PromiseInterface
{
$header = $this->increment(Workflow::getCurrentContext()->getHeader(), $request->getName());
return $next($request->withHeader($header));
}
public function handleActivityInbound(ActivityInput $input, callable $next): mixed
{
return $next($input->with(header: $this->increment($input->header, __FUNCTION__)));
}
public function init(InitInput $input, callable $next): void
{
$next($input->with(header: $this->increment($input->header, __FUNCTION__)));
}
public function execute(WorkflowInput $input, callable $next): void
{
$next($input->with(header: $this->increment($input->header, __FUNCTION__)));
}
public function handleSignal(SignalInput $input, callable $next): void
{
$next($input->with(header: $this->increment($input->header, __FUNCTION__)));
}
public function start(StartInput $input, callable $next): WorkflowExecution
{
return $next($input->with(header: $this->increment($input->header, __FUNCTION__)));
}
public function signalWithStart(SignalWithStartInput $input, callable $next): WorkflowExecution
{
return $next(
$input->with(
workflowStartInput: $input->workflowStartInput->with(
header: $this->increment($input->workflowStartInput->header, __FUNCTION__),
),
),
);
}
public function updateWithStart(UpdateWithStartInput $input, callable $next): UpdateWithStartOutput
{
return $next(
$input->with(
workflowStartInput: $input->workflowStartInput->with(
header: $this->increment($input->workflowStartInput->header, __FUNCTION__),
),
),
);
}
}