Skip to content

Commit 0ddd5ab

Browse files
Surface routing drain cohorts in Waterline health
Surface routing drain cohorts on Waterline health
1 parent 803b410 commit 0ddd5ab

6 files changed

Lines changed: 386 additions & 6 deletions

File tree

app/Http/Controllers/V2HealthController.php

Lines changed: 268 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use Carbon\CarbonInterface;
66
use Illuminate\Support\Facades\Schema;
7+
use Waterline\Models\WorkerBuildIdRollout;
78
use Waterline\Models\WorkerRegistration;
89
use Waterline\Support\WorkflowEngineSourceResolver;
910
use Workflow\V2\Enums\TaskStatus;
@@ -19,6 +20,7 @@ public function show()
1920
{
2021
$engineSource = WorkflowEngineSourceResolver::status();
2122
$namespace = $this->namespace();
23+
$routingDrains = $this->routingDrains($namespace);
2224

2325
if (($engineSource['uses_v2'] ?? false) !== true) {
2426
$payload = [
@@ -27,6 +29,7 @@ public function show()
2729
$namespace,
2830
'Queue visibility is unavailable until Waterline uses the v2 operator bridge.',
2931
),
32+
'routing_drains' => $routingDrains,
3033
'generated_at' => now()->toJSON(),
3134
'status' => 'error',
3235
'healthy' => false,
@@ -50,6 +53,7 @@ public function show()
5053
$payload['coordination_alerts'] = $this->coordinationAlerts(
5154
$payload['checks'],
5255
$payload['queue_visibility'],
56+
$routingDrains,
5357
);
5458

5559
return response()->json($payload, 503);
@@ -70,11 +74,13 @@ public function show()
7074
]);
7175
$snapshot['namespace'] = $namespace;
7276
$snapshot['queue_visibility'] = $this->queueVisibility($namespace);
77+
$snapshot['routing_drains'] = $routingDrains;
7378
$snapshot['engine_source'] = $engineSource;
7479
$snapshot['readiness_contract'] = $engineSource['readiness_contract'] ?? null;
7580
$snapshot['coordination_alerts'] = $this->coordinationAlerts(
7681
$snapshot['checks'],
7782
$snapshot['queue_visibility'],
83+
$routingDrains,
7884
);
7985

8086
return response()->json($snapshot, HealthCheck::httpStatus($snapshot));
@@ -83,21 +89,23 @@ public function show()
8389
/**
8490
* @param array<int, array<string, mixed>> $checks
8591
* @param array<string, mixed> $queueVisibility
92+
* @param array<string, mixed> $routingDrains
8693
* @return array<int, array<string, mixed>>
8794
*/
88-
private function coordinationAlerts(array $checks, array $queueVisibility): array
95+
private function coordinationAlerts(array $checks, array $queueVisibility, array $routingDrains): array
8996
{
9097
return array_values(array_merge(
91-
$this->healthCheckAlerts($checks),
98+
$this->healthCheckAlerts($checks, $routingDrains),
9299
$this->queueVisibilityAlerts($queueVisibility),
93100
));
94101
}
95102

96103
/**
97104
* @param array<int, array<string, mixed>> $checks
105+
* @param array<string, mixed> $routingDrains
98106
* @return array<int, array<string, mixed>>
99107
*/
100-
private function healthCheckAlerts(array $checks): array
108+
private function healthCheckAlerts(array $checks, array $routingDrains): array
101109
{
102110
$alerts = [];
103111

@@ -119,6 +127,16 @@ private function healthCheckAlerts(array $checks): array
119127
? trim((string) $check['message'])
120128
: sprintf('%s reported %s.', $title, $status);
121129
$facts = is_array($check['data'] ?? null) ? $check['data'] : [];
130+
if ($key === 'routing_health') {
131+
$facts = array_merge($facts, [
132+
'queues_with_drains' => $this->integerValue($routingDrains['queues_with_drains'] ?? 0),
133+
'draining_build_id_count' => $this->integerValue($routingDrains['draining_build_id_count'] ?? 0),
134+
'active_worker_count' => $this->integerValue($routingDrains['active_worker_count'] ?? 0),
135+
'draining_worker_count' => $this->integerValue($routingDrains['draining_worker_count'] ?? 0),
136+
'stale_worker_count' => $this->integerValue($routingDrains['stale_worker_count'] ?? 0),
137+
'routing_drains' => $routingDrains,
138+
]);
139+
}
122140

123141
$alerts[] = [
124142
'key' => $key,
@@ -270,9 +288,256 @@ private function routingHealthAlertDetails(array $facts): ?string
270288
$parts[] = sprintf('worst-case age %s', $this->formatDurationMilliseconds($maxAgeMs));
271289
}
272290

291+
$routingDrains = is_array($facts['routing_drains'] ?? null) ? $facts['routing_drains'] : [];
292+
$drainQueues = is_array($routingDrains['queues'] ?? null) ? $routingDrains['queues'] : [];
293+
$drainQueueLabels = [];
294+
295+
foreach (array_slice($drainQueues, 0, 3) as $queue) {
296+
if (! is_array($queue)) {
297+
continue;
298+
}
299+
300+
$taskQueue = is_string($queue['task_queue'] ?? null) && trim((string) $queue['task_queue']) !== ''
301+
? trim((string) $queue['task_queue'])
302+
: 'default';
303+
$buildIds = is_array($queue['build_ids'] ?? null) ? $queue['build_ids'] : [];
304+
$buildLabels = [];
305+
306+
foreach ($buildIds as $build) {
307+
if (! is_array($build)) {
308+
continue;
309+
}
310+
311+
$buildLabel = is_string($build['build_id'] ?? null) && trim((string) $build['build_id']) !== ''
312+
? trim((string) $build['build_id'])
313+
: 'unversioned';
314+
$buildLabels[] = $buildLabel;
315+
}
316+
317+
$buildLabels = array_values(array_unique($buildLabels));
318+
319+
if ($buildLabels === []) {
320+
$drainQueueLabels[] = $taskQueue;
321+
continue;
322+
}
323+
324+
$drainQueueLabels[] = sprintf('%s (%s)', $taskQueue, implode(', ', array_slice($buildLabels, 0, 2)));
325+
}
326+
327+
if ($drainQueueLabels !== []) {
328+
$queueSuffix = count($drainQueues) > count($drainQueueLabels)
329+
? sprintf(' and %d more queue%s', count($drainQueues) - count($drainQueueLabels), count($drainQueues) - count($drainQueueLabels) === 1 ? '' : 's')
330+
: '';
331+
$parts[] = sprintf('draining cohorts %s%s', implode('; ', $drainQueueLabels), $queueSuffix);
332+
}
333+
273334
return $parts !== [] ? implode('; ', $parts).'.' : null;
274335
}
275336

337+
/**
338+
* @return array{
339+
* queues_with_drains: int,
340+
* draining_build_id_count: int,
341+
* active_worker_count: int,
342+
* draining_worker_count: int,
343+
* stale_worker_count: int,
344+
* queues: array<int, array<string, mixed>>
345+
* }
346+
*/
347+
private function routingDrains(?string $namespace): array
348+
{
349+
if ($namespace === null) {
350+
return $this->emptyRoutingDrains();
351+
}
352+
353+
if (! Schema::hasTable((new WorkerRegistration())->getTable())
354+
|| ! Schema::hasTable((new WorkerBuildIdRollout())->getTable())) {
355+
return $this->emptyRoutingDrains();
356+
}
357+
358+
$rollouts = WorkerBuildIdRollout::query()
359+
->where('namespace', $namespace)
360+
->where('drain_intent', WorkerBuildIdRollout::DRAIN_INTENT_DRAINING)
361+
->orderBy('task_queue')
362+
->orderBy('build_id')
363+
->get();
364+
365+
if ($rollouts->isEmpty()) {
366+
return $this->emptyRoutingDrains();
367+
}
368+
369+
$now = now();
370+
$staleAfterSeconds = StandaloneWorkerVisibility::staleAfterSeconds();
371+
$workers = WorkerRegistration::query()
372+
->where('namespace', $namespace)
373+
->orderBy('task_queue')
374+
->orderByDesc('last_heartbeat_at')
375+
->orderBy('worker_id')
376+
->get();
377+
378+
$workersByQueue = [];
379+
foreach ($workers as $worker) {
380+
$queueKey = $this->routingDrainQueueKey($worker->task_queue);
381+
$workersByQueue[$queueKey] ??= [];
382+
$workersByQueue[$queueKey][] = $worker;
383+
}
384+
385+
$queues = [];
386+
387+
foreach ($rollouts as $rollout) {
388+
$queueKey = $this->routingDrainQueueKey($rollout->task_queue);
389+
$queueWorkers = $workersByQueue[$queueKey] ?? [];
390+
391+
if (! isset($queues[$queueKey])) {
392+
$queueCounts = $this->routingDrainWorkerCounts($queueWorkers, $now, $staleAfterSeconds);
393+
$queues[$queueKey] = [
394+
'namespace' => $namespace,
395+
'task_queue' => $queueKey,
396+
'draining_build_id_count' => 0,
397+
'active_worker_count' => $queueCounts['active_worker_count'],
398+
'draining_worker_count' => $queueCounts['draining_worker_count'],
399+
'stale_worker_count' => $queueCounts['stale_worker_count'],
400+
'build_ids' => [],
401+
];
402+
}
403+
404+
$buildWorkers = array_values(array_filter(
405+
$queueWorkers,
406+
fn (WorkerRegistration $worker): bool => $this->routingDrainBuildIdKey($worker->build_id) === (string) $rollout->build_id,
407+
));
408+
$buildCounts = $this->routingDrainWorkerCounts($buildWorkers, $now, $staleAfterSeconds);
409+
410+
$queues[$queueKey]['draining_build_id_count']++;
411+
$queues[$queueKey]['build_ids'][] = [
412+
'build_id' => $rollout->publicBuildId(),
413+
'drain_intent' => (string) $rollout->drain_intent,
414+
'drained_at' => $rollout->drained_at?->toJSON(),
415+
'active_worker_count' => $buildCounts['active_worker_count'],
416+
'draining_worker_count' => $buildCounts['draining_worker_count'],
417+
'stale_worker_count' => $buildCounts['stale_worker_count'],
418+
'total_worker_count' => $buildCounts['total_worker_count'],
419+
];
420+
}
421+
422+
ksort($queues);
423+
424+
$queueSummaries = array_values(array_map(function (array $queue): array {
425+
usort($queue['build_ids'], function (array $left, array $right): int {
426+
return strcmp(
427+
$left['build_id'] ?? '',
428+
$right['build_id'] ?? '',
429+
);
430+
});
431+
432+
return $queue;
433+
}, $queues));
434+
435+
return [
436+
'queues_with_drains' => count($queueSummaries),
437+
'draining_build_id_count' => array_sum(array_map(
438+
fn (array $queue): int => $this->integerValue($queue['draining_build_id_count'] ?? 0),
439+
$queueSummaries,
440+
)),
441+
'active_worker_count' => array_sum(array_map(
442+
fn (array $queue): int => $this->integerValue($queue['active_worker_count'] ?? 0),
443+
$queueSummaries,
444+
)),
445+
'draining_worker_count' => array_sum(array_map(
446+
fn (array $queue): int => $this->integerValue($queue['draining_worker_count'] ?? 0),
447+
$queueSummaries,
448+
)),
449+
'stale_worker_count' => array_sum(array_map(
450+
fn (array $queue): int => $this->integerValue($queue['stale_worker_count'] ?? 0),
451+
$queueSummaries,
452+
)),
453+
'queues' => $queueSummaries,
454+
];
455+
}
456+
457+
/**
458+
* @param array<int, WorkerRegistration> $workers
459+
* @return array{
460+
* active_worker_count: int,
461+
* draining_worker_count: int,
462+
* stale_worker_count: int,
463+
* total_worker_count: int
464+
* }
465+
*/
466+
private function routingDrainWorkerCounts(array $workers, CarbonInterface $now, int $staleAfterSeconds): array
467+
{
468+
$counts = [
469+
'active_worker_count' => 0,
470+
'draining_worker_count' => 0,
471+
'stale_worker_count' => 0,
472+
'total_worker_count' => count($workers),
473+
];
474+
475+
foreach ($workers as $worker) {
476+
$status = $this->routingDrainWorkerStatus($worker, $now, $staleAfterSeconds);
477+
478+
if ($status === 'stale') {
479+
$counts['stale_worker_count']++;
480+
} elseif ($status === WorkerBuildIdRollout::DRAIN_INTENT_DRAINING) {
481+
$counts['draining_worker_count']++;
482+
} else {
483+
$counts['active_worker_count']++;
484+
}
485+
}
486+
487+
return $counts;
488+
}
489+
490+
private function routingDrainWorkerStatus(
491+
WorkerRegistration $worker,
492+
CarbonInterface $now,
493+
int $staleAfterSeconds,
494+
): string {
495+
$heartbeat = $worker->last_heartbeat_at;
496+
497+
if ($heartbeat instanceof CarbonInterface
498+
&& $heartbeat->lt($now->copy()->subSeconds($staleAfterSeconds))) {
499+
return 'stale';
500+
}
501+
502+
return is_string($worker->status) && trim($worker->status) !== ''
503+
? trim($worker->status)
504+
: WorkerBuildIdRollout::DRAIN_INTENT_ACTIVE;
505+
}
506+
507+
private function routingDrainQueueKey(mixed $taskQueue): string
508+
{
509+
return is_string($taskQueue) && trim($taskQueue) !== ''
510+
? trim($taskQueue)
511+
: 'default';
512+
}
513+
514+
private function routingDrainBuildIdKey(mixed $buildId): string
515+
{
516+
return WorkerBuildIdRollout::buildIdKey(is_string($buildId) ? $buildId : null);
517+
}
518+
519+
/**
520+
* @return array{
521+
* queues_with_drains: int,
522+
* draining_build_id_count: int,
523+
* active_worker_count: int,
524+
* draining_worker_count: int,
525+
* stale_worker_count: int,
526+
* queues: array<int, array<string, mixed>>
527+
* }
528+
*/
529+
private function emptyRoutingDrains(): array
530+
{
531+
return [
532+
'queues_with_drains' => 0,
533+
'draining_build_id_count' => 0,
534+
'active_worker_count' => 0,
535+
'draining_worker_count' => 0,
536+
'stale_worker_count' => 0,
537+
'queues' => [],
538+
];
539+
}
540+
276541
/**
277542
* @param array<string, mixed> $facts
278543
*/
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Waterline\Models;
6+
7+
use Illuminate\Database\Eloquent\Model;
8+
9+
class WorkerBuildIdRollout extends Model
10+
{
11+
public const DRAIN_INTENT_ACTIVE = 'active';
12+
13+
public const DRAIN_INTENT_DRAINING = 'draining';
14+
15+
public const UNVERSIONED_KEY = '';
16+
17+
protected $table = 'workflow_worker_build_id_rollouts';
18+
19+
protected $fillable = [
20+
'namespace',
21+
'task_queue',
22+
'build_id',
23+
'drain_intent',
24+
'drained_at',
25+
];
26+
27+
protected $casts = [
28+
'drained_at' => 'datetime',
29+
];
30+
31+
public static function buildIdKey(?string $buildId): string
32+
{
33+
if (! is_string($buildId)) {
34+
return self::UNVERSIONED_KEY;
35+
}
36+
37+
$trimmed = trim($buildId);
38+
39+
return $trimmed === '' ? self::UNVERSIONED_KEY : $trimmed;
40+
}
41+
42+
public function publicBuildId(): ?string
43+
{
44+
$buildId = (string) $this->build_id;
45+
46+
return $buildId === self::UNVERSIONED_KEY ? null : $buildId;
47+
}
48+
}

public/app.js

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)