Skip to content

Commit 7c11f29

Browse files
[cross-repo from server#137] Worker Heartbeats / Status surface: every SDK emits periodic heartbeat; CLI+UI list workers per task queue (Temporal-parity) (#539)
1 parent 168a616 commit 7c11f29

3 files changed

Lines changed: 291 additions & 0 deletions

File tree

src/V2/Support/StandaloneWorkerVisibility.php

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,9 +186,74 @@ private static function poller(Model $worker): array
186186
'supported_activity_types' => self::stringList(data_get($worker, 'supported_activity_types')),
187187
'max_concurrent_workflow_tasks' => max(0, (int) data_get($worker, 'max_concurrent_workflow_tasks', 0)),
188188
'max_concurrent_activity_tasks' => max(0, (int) data_get($worker, 'max_concurrent_activity_tasks', 0)),
189+
'available_workflow_slots' => self::nonNegativeIntOrNull(data_get($worker, 'available_workflow_slots')),
190+
'available_activity_slots' => self::nonNegativeIntOrNull(data_get($worker, 'available_activity_slots')),
191+
'available_session_slots' => self::nonNegativeIntOrNull(data_get($worker, 'available_session_slots')),
192+
'process_metrics' => self::processMetrics(data_get($worker, 'process_metrics')),
189193
];
190194
}
191195

196+
private static function nonNegativeIntOrNull(mixed $value): ?int
197+
{
198+
if ($value === null) {
199+
return null;
200+
}
201+
202+
if (is_int($value)) {
203+
return max(0, $value);
204+
}
205+
206+
if (is_string($value) && ctype_digit($value)) {
207+
return max(0, (int) $value);
208+
}
209+
210+
return null;
211+
}
212+
213+
/**
214+
* @return array<string, float|int|string>|null
215+
*/
216+
private static function processMetrics(mixed $value): ?array
217+
{
218+
if (is_string($value) && trim($value) !== '') {
219+
$decoded = json_decode($value, true);
220+
$value = is_array($decoded) ? $decoded : null;
221+
}
222+
223+
if (! is_array($value) || $value === []) {
224+
return null;
225+
}
226+
227+
$allowed = ['cpu_percent', 'memory_bytes', 'process_uptime_seconds', 'process_id', 'host'];
228+
$clean = [];
229+
230+
foreach ($allowed as $key) {
231+
if (! array_key_exists($key, $value) || $value[$key] === null) {
232+
continue;
233+
}
234+
235+
if ($key === 'host' && is_string($value[$key]) && trim($value[$key]) !== '') {
236+
$clean[$key] = mb_substr(trim($value[$key]), 0, 255);
237+
238+
continue;
239+
}
240+
241+
if ($key === 'cpu_percent' && (is_int($value[$key]) || is_float($value[$key]))) {
242+
$clean[$key] = max(0.0, (float) $value[$key]);
243+
244+
continue;
245+
}
246+
247+
if (is_int($value[$key])) {
248+
$clean[$key] = max(0, $value[$key]);
249+
} elseif (is_string($value[$key]) && ctype_digit($value[$key])) {
250+
$clean[$key] = max(0, (int) $value[$key]);
251+
}
252+
}
253+
254+
return $clean === [] ? null : $clean;
255+
}
256+
192257
private static function stringValue(mixed $value): ?string
193258
{
194259
if (! is_string($value)) {
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Workflow\V2\Support;
6+
7+
/**
8+
* Builds the worker-fleet telemetry payload (task-slot availability + basic
9+
* process-level metrics) that every official SDK ships with periodic worker
10+
* heartbeats. The payload feeds the worker management API, the CLI worker
11+
* listing, and the operator Worker Status view so operators can answer
12+
* "what workers are polling task queue X right now, what's their slot
13+
* capacity, when did each last check in" without writing custom monitoring.
14+
*
15+
* SDKs are not required to populate every key — anything they don't have
16+
* cheap access to in the runtime is simply omitted, and the server records
17+
* only what the SDK reports. The shape is shared between PHP and Python
18+
* SDKs (and future runtimes) so the operator surface stays identical
19+
* regardless of which SDK emitted the heartbeat.
20+
*
21+
* @api Stable surface intended for SDK integrators that drive the
22+
* worker-protocol heartbeat directly. Adding new optional keys to the
23+
* returned arrays is a minor change; renaming or removing keys is a
24+
* breaking change.
25+
*/
26+
final class WorkerHeartbeatTelemetry
27+
{
28+
/**
29+
* Build the `task_slots` entry for a heartbeat payload.
30+
*
31+
* Each *_inflight argument is the number of slots currently consumed by
32+
* in-flight tasks of that family; the available count is derived as
33+
* max(0, capacity - inflight). When a SDK does not track in-flight count
34+
* for a slot family, callers should pass null for that family and the
35+
* key will be omitted from the resulting payload.
36+
*
37+
* @return array<string, int>
38+
*/
39+
public static function taskSlots(
40+
?int $workflowCapacity = null,
41+
?int $workflowInflight = null,
42+
?int $activityCapacity = null,
43+
?int $activityInflight = null,
44+
?int $sessionCapacity = null,
45+
?int $sessionInflight = null,
46+
): array {
47+
$slots = [];
48+
49+
if ($workflowCapacity !== null && $workflowInflight !== null) {
50+
$slots['workflow_available'] = max(0, $workflowCapacity - $workflowInflight);
51+
}
52+
53+
if ($activityCapacity !== null && $activityInflight !== null) {
54+
$slots['activity_available'] = max(0, $activityCapacity - $activityInflight);
55+
}
56+
57+
if ($sessionCapacity !== null && $sessionInflight !== null) {
58+
$slots['session_available'] = max(0, $sessionCapacity - $sessionInflight);
59+
}
60+
61+
return $slots;
62+
}
63+
64+
/**
65+
* Build the `process_metrics` entry for a heartbeat payload using
66+
* runtime APIs that PHP exposes everywhere (no extension required).
67+
*
68+
* The optional `$startedAt` argument is a Unix timestamp captured at
69+
* worker boot; it is used to derive `process_uptime_seconds`. When
70+
* `$startedAt` is null the uptime entry is omitted.
71+
*
72+
* @return array<string, float|int|string>
73+
*/
74+
public static function processMetrics(?int $startedAt = null): array
75+
{
76+
$metrics = [
77+
'memory_bytes' => self::memoryBytes(),
78+
'process_id' => self::processId(),
79+
];
80+
81+
$cpuPercent = self::cpuPercent();
82+
if ($cpuPercent !== null) {
83+
$metrics['cpu_percent'] = $cpuPercent;
84+
}
85+
86+
if ($startedAt !== null) {
87+
$metrics['process_uptime_seconds'] = max(0, time() - $startedAt);
88+
}
89+
90+
$host = self::host();
91+
if ($host !== null) {
92+
$metrics['host'] = $host;
93+
}
94+
95+
return $metrics;
96+
}
97+
98+
private static function memoryBytes(): int
99+
{
100+
return max(0, (int) memory_get_usage(true));
101+
}
102+
103+
private static function processId(): int
104+
{
105+
return max(0, (int) getmypid());
106+
}
107+
108+
/**
109+
* Approximate CPU percent for the current process across the runtime
110+
* since the process started. Returns null when the runtime does not
111+
* expose `getrusage()` on this platform.
112+
*/
113+
private static function cpuPercent(): ?float
114+
{
115+
if (! function_exists('getrusage')) {
116+
return null;
117+
}
118+
119+
$usage = getrusage();
120+
if (! is_array($usage)) {
121+
return null;
122+
}
123+
124+
$userSeconds = (int) ($usage['ru_utime.tv_sec'] ?? 0)
125+
+ ((int) ($usage['ru_utime.tv_usec'] ?? 0)) / 1_000_000;
126+
$systemSeconds = (int) ($usage['ru_stime.tv_sec'] ?? 0)
127+
+ ((int) ($usage['ru_stime.tv_usec'] ?? 0)) / 1_000_000;
128+
$cpuSeconds = $userSeconds + $systemSeconds;
129+
130+
$wallSeconds = max(0.001, microtime(true) - (float) self::processStart());
131+
132+
return round(min(100.0, max(0.0, ($cpuSeconds / $wallSeconds) * 100.0)), 2);
133+
}
134+
135+
private static function host(): ?string
136+
{
137+
$host = gethostname();
138+
139+
return is_string($host) && $host !== '' ? $host : null;
140+
}
141+
142+
/**
143+
* Best-effort estimate of the process start time. Returns the
144+
* server-time at first call, which underestimates the true start
145+
* for long-running workers but is monotonic and sufficient for the
146+
* CPU-percent ratio.
147+
*/
148+
private static function processStart(): float
149+
{
150+
static $startedAt = null;
151+
152+
if ($startedAt === null) {
153+
$startedAt = microtime(true);
154+
}
155+
156+
return $startedAt;
157+
}
158+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Tests\Unit\V2;
6+
7+
use Tests\TestCase;
8+
use Workflow\V2\Support\WorkerHeartbeatTelemetry;
9+
10+
final class WorkerHeartbeatTelemetryTest extends TestCase
11+
{
12+
public function test_task_slots_derives_available_from_capacity_minus_inflight(): void
13+
{
14+
$slots = WorkerHeartbeatTelemetry::taskSlots(
15+
workflowCapacity: 10,
16+
workflowInflight: 3,
17+
activityCapacity: 8,
18+
activityInflight: 8,
19+
sessionCapacity: 4,
20+
sessionInflight: 0,
21+
);
22+
23+
self::assertSame(7, $slots['workflow_available']);
24+
self::assertSame(0, $slots['activity_available']);
25+
self::assertSame(4, $slots['session_available']);
26+
}
27+
28+
public function test_task_slots_floors_at_zero_when_inflight_exceeds_capacity(): void
29+
{
30+
$slots = WorkerHeartbeatTelemetry::taskSlots(
31+
workflowCapacity: 4,
32+
workflowInflight: 9,
33+
);
34+
35+
self::assertSame(0, $slots['workflow_available']);
36+
}
37+
38+
public function test_task_slots_omits_keys_when_inputs_are_unknown(): void
39+
{
40+
$slots = WorkerHeartbeatTelemetry::taskSlots(
41+
activityCapacity: 5,
42+
activityInflight: 1,
43+
);
44+
45+
self::assertSame(['activity_available' => 4], $slots);
46+
}
47+
48+
public function test_process_metrics_include_memory_pid_and_optional_uptime(): void
49+
{
50+
$metrics = WorkerHeartbeatTelemetry::processMetrics(startedAt: time() - 30);
51+
52+
self::assertArrayHasKey('memory_bytes', $metrics);
53+
self::assertArrayHasKey('process_id', $metrics);
54+
self::assertGreaterThan(0, $metrics['memory_bytes']);
55+
self::assertGreaterThan(0, $metrics['process_id']);
56+
57+
self::assertArrayHasKey('process_uptime_seconds', $metrics);
58+
self::assertGreaterThanOrEqual(0, $metrics['process_uptime_seconds']);
59+
self::assertLessThan(120, $metrics['process_uptime_seconds']);
60+
}
61+
62+
public function test_process_metrics_omit_uptime_when_started_at_is_null(): void
63+
{
64+
$metrics = WorkerHeartbeatTelemetry::processMetrics();
65+
66+
self::assertArrayNotHasKey('process_uptime_seconds', $metrics);
67+
}
68+
}

0 commit comments

Comments
 (0)