Skip to content

Commit fbe7ff2

Browse files
Expose standalone worker visibility
1 parent c38a31d commit fbe7ff2

4 files changed

Lines changed: 666 additions & 69 deletions

File tree

Lines changed: 252 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,252 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Workflow\V2\Support;
6+
7+
use Carbon\CarbonInterface;
8+
use Illuminate\Database\Eloquent\Builder;
9+
use Illuminate\Database\Eloquent\Model;
10+
use Illuminate\Support\Carbon;
11+
use InvalidArgumentException;
12+
13+
/**
14+
* Namespace-scoped worker visibility helpers for service-mode deployments.
15+
*
16+
* @api Stable class surface consumed by the standalone workflow-server.
17+
* The public static method signatures on this class are covered by
18+
* the workflow package's semver guarantee. See docs/api-stability.md.
19+
*/
20+
final class StandaloneWorkerVisibility
21+
{
22+
/**
23+
* @param class-string<Model> $workerRegistrationModel
24+
*/
25+
public static function queueSnapshot(
26+
string $namespace,
27+
string $workerRegistrationModel,
28+
?CarbonInterface $now = null,
29+
?int $staleAfterSeconds = null,
30+
): QueueVisibilitySnapshot {
31+
$staleAfterSeconds ??= self::staleAfterSeconds();
32+
33+
return OperatorQueueVisibility::forNamespace(
34+
$namespace,
35+
self::pollersByQueue($namespace, $workerRegistrationModel),
36+
$now,
37+
$staleAfterSeconds,
38+
);
39+
}
40+
41+
/**
42+
* @param class-string<Model> $workerRegistrationModel
43+
*/
44+
public static function queueDetail(
45+
string $namespace,
46+
string $taskQueue,
47+
string $workerRegistrationModel,
48+
?CarbonInterface $now = null,
49+
?int $staleAfterSeconds = null,
50+
): QueueVisibilityDetail {
51+
$staleAfterSeconds ??= self::staleAfterSeconds();
52+
53+
return OperatorQueueVisibility::forQueue(
54+
$namespace,
55+
$taskQueue,
56+
self::pollersForQueue($namespace, $taskQueue, $workerRegistrationModel),
57+
$now,
58+
$staleAfterSeconds,
59+
);
60+
}
61+
62+
/**
63+
* @param class-string<Model> $workerRegistrationModel
64+
* @return array<string, list<array<string, mixed>>>
65+
*/
66+
public static function pollersByQueue(string $namespace, string $workerRegistrationModel): array
67+
{
68+
$pollers = [];
69+
70+
foreach (
71+
self::workerQuery($workerRegistrationModel)
72+
->where('namespace', $namespace)
73+
->orderBy('task_queue')
74+
->orderByDesc('last_heartbeat_at')
75+
->orderBy('worker_id')
76+
->get() as $worker
77+
) {
78+
$taskQueue = self::stringValue(data_get($worker, 'task_queue'));
79+
80+
if ($taskQueue === null) {
81+
continue;
82+
}
83+
84+
$pollers[$taskQueue] ??= [];
85+
$pollers[$taskQueue][] = self::poller($worker);
86+
}
87+
88+
return $pollers;
89+
}
90+
91+
/**
92+
* @param class-string<Model> $workerRegistrationModel
93+
* @return list<array<string, mixed>>
94+
*/
95+
public static function pollersForQueue(
96+
string $namespace,
97+
string $taskQueue,
98+
string $workerRegistrationModel,
99+
): array {
100+
return self::workerQuery($workerRegistrationModel)
101+
->where('namespace', $namespace)
102+
->where('task_queue', $taskQueue)
103+
->orderByDesc('last_heartbeat_at')
104+
->orderBy('worker_id')
105+
->get()
106+
->map(static fn (Model $worker): array => self::poller($worker))
107+
->all();
108+
}
109+
110+
public static function staleAfterSeconds(
111+
?int $configuredStaleAfterSeconds = null,
112+
?int $pollingTimeoutSeconds = null,
113+
): int {
114+
if ($configuredStaleAfterSeconds !== null) {
115+
return max(1, $configuredStaleAfterSeconds);
116+
}
117+
118+
return max(($pollingTimeoutSeconds ?? 30) * 2, 60);
119+
}
120+
121+
public static function recordCompatibility(
122+
string $namespace,
123+
string $workerId,
124+
?string $taskQueue,
125+
?string $buildId,
126+
): void {
127+
WorkerCompatibilityFleet::recordForNamespace(
128+
$namespace,
129+
self::uniqueStrings([$buildId]),
130+
connection: null,
131+
queue: $taskQueue,
132+
workerId: $workerId,
133+
);
134+
}
135+
136+
/**
137+
* @return array{
138+
* namespace: string,
139+
* active_workers: int,
140+
* active_worker_scopes: int,
141+
* queues: list<string>,
142+
* build_ids: list<string>,
143+
* workers: list<array{
144+
* worker_id: string,
145+
* queues: list<string>,
146+
* build_ids: list<string>,
147+
* recorded_at: string|null,
148+
* expires_at: string|null
149+
* }>
150+
* }
151+
*/
152+
public static function fleetSummary(string $namespace): array
153+
{
154+
return WorkerCompatibilityFleet::summaryForNamespace($namespace);
155+
}
156+
157+
/**
158+
* @param class-string<Model> $workerRegistrationModel
159+
*/
160+
private static function workerQuery(string $workerRegistrationModel): Builder
161+
{
162+
if (! is_a($workerRegistrationModel, Model::class, true)) {
163+
throw new InvalidArgumentException(sprintf(
164+
'Worker registration model [%s] must extend %s.',
165+
$workerRegistrationModel,
166+
Model::class,
167+
));
168+
}
169+
170+
return $workerRegistrationModel::query();
171+
}
172+
173+
/**
174+
* @return array<string, mixed>
175+
*/
176+
private static function poller(Model $worker): array
177+
{
178+
return [
179+
'worker_id' => self::stringValue(data_get($worker, 'worker_id')),
180+
'runtime' => self::stringValue(data_get($worker, 'runtime')),
181+
'sdk_version' => self::stringValue(data_get($worker, 'sdk_version')),
182+
'build_id' => self::stringValue(data_get($worker, 'build_id')),
183+
'last_heartbeat_at' => self::carbon(data_get($worker, 'last_heartbeat_at')),
184+
'status' => self::stringValue(data_get($worker, 'status')) ?? 'active',
185+
'supported_workflow_types' => self::stringList(data_get($worker, 'supported_workflow_types')),
186+
'supported_activity_types' => self::stringList(data_get($worker, 'supported_activity_types')),
187+
'max_concurrent_workflow_tasks' => max(0, (int) data_get($worker, 'max_concurrent_workflow_tasks', 0)),
188+
'max_concurrent_activity_tasks' => max(0, (int) data_get($worker, 'max_concurrent_activity_tasks', 0)),
189+
];
190+
}
191+
192+
private static function stringValue(mixed $value): ?string
193+
{
194+
if (! is_string($value)) {
195+
return null;
196+
}
197+
198+
$value = trim($value);
199+
200+
return $value === '' ? null : $value;
201+
}
202+
203+
private static function carbon(mixed $value): CarbonInterface|string|null
204+
{
205+
if ($value instanceof CarbonInterface) {
206+
return $value;
207+
}
208+
209+
if ($value instanceof \DateTimeInterface) {
210+
return Carbon::instance($value);
211+
}
212+
213+
if (is_string($value) && trim($value) !== '') {
214+
return $value;
215+
}
216+
217+
return null;
218+
}
219+
220+
/**
221+
* @return list<string>
222+
*/
223+
private static function stringList(mixed $value): array
224+
{
225+
if (is_string($value)) {
226+
$decoded = json_decode($value, true);
227+
$value = is_array($decoded) ? $decoded : explode(',', $value);
228+
}
229+
230+
if (! is_array($value)) {
231+
return [];
232+
}
233+
234+
return self::uniqueStrings($value);
235+
}
236+
237+
/**
238+
* @param array<int, mixed> $values
239+
* @return list<string>
240+
*/
241+
private static function uniqueStrings(array $values): array
242+
{
243+
$strings = array_values(array_unique(array_filter(
244+
array_map(static fn (mixed $value): ?string => self::stringValue($value), $values),
245+
static fn (?string $value): bool => $value !== null,
246+
)));
247+
248+
sort($strings);
249+
250+
return $strings;
251+
}
252+
}

0 commit comments

Comments
 (0)