Skip to content

Commit b677ac1

Browse files
Surface routing-health evidence on Waterline workers
Surface routing-health evidence on Waterline workers
1 parent 333f37f commit b677ac1

5 files changed

Lines changed: 256 additions & 2 deletions

File tree

app/Http/Controllers/V2HealthController.php

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ private function healthCheckAlertDetails(string $key, array $facts): ?string
146146
{
147147
return match ($key) {
148148
'task_transport' => $this->taskTransportAlertDetails($facts),
149+
'routing_health' => $this->routingHealthAlertDetails($facts),
149150
'durable_resume_paths' => $this->durableResumePathAlertDetails($facts),
150151
'worker_compatibility' => $this->workerCompatibilityAlertDetails($facts),
151152
'command_contract_snapshots' => $this->commandContractAlertDetails($facts),
@@ -204,6 +205,76 @@ private function taskTransportAlertDetails(array $facts): ?string
204205
return $parts !== [] ? ucfirst(implode('; ', $parts)).'.' : null;
205206
}
206207

208+
/**
209+
* @param array<string, mixed> $facts
210+
*/
211+
private function routingHealthAlertDetails(array $facts): ?string
212+
{
213+
$compatibilityBlockedRuns = $this->integerValue($facts['compatibility_blocked_runs'] ?? 0);
214+
$dispatchOverdueTasks = $this->integerValue($facts['dispatch_overdue_tasks'] ?? 0);
215+
$claimFailedTasks = $this->integerValue($facts['claim_failed_tasks'] ?? 0);
216+
$activeWorkerScopes = $this->integerValue($facts['active_worker_scopes'] ?? 0);
217+
$matchingShape = is_string($facts['matching_shape'] ?? null)
218+
? trim((string) $facts['matching_shape'])
219+
: 'in_worker';
220+
$taskDispatchMode = is_string($facts['task_dispatch_mode'] ?? null)
221+
? trim((string) $facts['task_dispatch_mode'])
222+
: 'queue';
223+
$queueWakeEnabled = ($facts['queue_wake_enabled'] ?? false) === true;
224+
$maxAgeMs = max([
225+
$this->integerValue($facts['max_compatibility_blocked_age_ms'] ?? 0),
226+
$this->integerValue($facts['max_dispatch_overdue_age_ms'] ?? 0),
227+
$this->integerValue($facts['max_claim_failed_age_ms'] ?? 0),
228+
]);
229+
230+
$signals = [];
231+
232+
if ($compatibilityBlockedRuns > 0) {
233+
$signals[] = sprintf(
234+
'%d compatibility-blocked run%s',
235+
$compatibilityBlockedRuns,
236+
$compatibilityBlockedRuns === 1 ? '' : 's',
237+
);
238+
}
239+
240+
if ($dispatchOverdueTasks > 0) {
241+
$signals[] = sprintf(
242+
'%d dispatch-overdue task%s',
243+
$dispatchOverdueTasks,
244+
$dispatchOverdueTasks === 1 ? '' : 's',
245+
);
246+
}
247+
248+
if ($claimFailedTasks > 0) {
249+
$signals[] = sprintf(
250+
'%d claim-failed task%s',
251+
$claimFailedTasks,
252+
$claimFailedTasks === 1 ? '' : 's',
253+
);
254+
}
255+
256+
$parts = [];
257+
258+
if ($signals !== []) {
259+
$parts[] = ucfirst(implode(', ', $signals));
260+
}
261+
262+
$parts[] = sprintf(
263+
'matching role %s in %s mode with queue wake %s across %d active worker scope%s',
264+
$matchingShape,
265+
$taskDispatchMode,
266+
$queueWakeEnabled ? 'enabled' : 'disabled',
267+
$activeWorkerScopes,
268+
$activeWorkerScopes === 1 ? '' : 's',
269+
);
270+
271+
if ($maxAgeMs > 0) {
272+
$parts[] = sprintf('worst-case age %s', $this->formatDurationMilliseconds($maxAgeMs));
273+
}
274+
275+
return $parts !== [] ? implode('; ', $parts).'.' : null;
276+
}
277+
207278
/**
208279
* @param array<string, mixed> $facts
209280
*/

public/app.js

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

public/mix-manifest.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
2-
"/app.js": "/app.js?id=40d63a20f5d313235d41d339a0ec6d16",
2+
"/app.js": "/app.js?id=c3adba4ee13203405ba01be50b407d25",
33
"/app-dark.css": "/app-dark.css?id=8b1b08a71c8e9860d0a9030d902c30d0",
44
"/app.css": "/app.css?id=4d346e04fa466f5227cee3c313fbea25",
55
"/img/favicon.png": "/img/favicon.png?id=7c006241b093796d6abfa3049df93a59",

resources/js/components/WorkerHealth.vue

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1012,6 +1012,10 @@ export default {
10121012
},
10131013
10141014
coordinationAlertFacts(alert) {
1015+
if (alert?.source === 'health_check' && alert?.key === 'routing_health') {
1016+
return this.routingHealthAlertFacts(alert);
1017+
}
1018+
10151019
const facts = [];
10161020
const queueCount = Number(alert?.queue_count);
10171021
const backlogCount = Number(alert?.backlog_count);
@@ -1042,6 +1046,56 @@ export default {
10421046
return facts;
10431047
},
10441048
1049+
routingHealthAlertFacts(alert) {
1050+
const facts = [];
1051+
const healthFacts = alert?.facts && typeof alert.facts === 'object'
1052+
? alert.facts
1053+
: {};
1054+
const compatibilityBlockedRuns = Number(healthFacts.compatibility_blocked_runs || 0);
1055+
const dispatchOverdueTasks = Number(healthFacts.dispatch_overdue_tasks || 0);
1056+
const claimFailedTasks = Number(healthFacts.claim_failed_tasks || 0);
1057+
const maxAgeMs = Math.max(
1058+
Number(healthFacts.max_compatibility_blocked_age_ms || 0),
1059+
Number(healthFacts.max_dispatch_overdue_age_ms || 0),
1060+
Number(healthFacts.max_claim_failed_age_ms || 0)
1061+
);
1062+
const activeWorkerScopes = Number(healthFacts.active_worker_scopes || 0);
1063+
1064+
if (Number.isFinite(compatibilityBlockedRuns) && compatibilityBlockedRuns > 0) {
1065+
facts.push(`compat blocked ${compatibilityBlockedRuns.toLocaleString()}`);
1066+
}
1067+
1068+
if (Number.isFinite(dispatchOverdueTasks) && dispatchOverdueTasks > 0) {
1069+
facts.push(`dispatch overdue ${dispatchOverdueTasks.toLocaleString()}`);
1070+
}
1071+
1072+
if (Number.isFinite(claimFailedTasks) && claimFailedTasks > 0) {
1073+
facts.push(`claim failed ${claimFailedTasks.toLocaleString()}`);
1074+
}
1075+
1076+
if (typeof healthFacts.matching_shape === 'string' && healthFacts.matching_shape !== '') {
1077+
facts.push(`matching ${healthFacts.matching_shape}`);
1078+
}
1079+
1080+
if (typeof healthFacts.task_dispatch_mode === 'string' && healthFacts.task_dispatch_mode !== '') {
1081+
facts.push(`dispatch ${healthFacts.task_dispatch_mode}`);
1082+
}
1083+
1084+
if (typeof healthFacts.queue_wake_enabled === 'boolean') {
1085+
facts.push(healthFacts.queue_wake_enabled ? 'wake enabled' : 'wake disabled');
1086+
}
1087+
1088+
if (Number.isFinite(activeWorkerScopes) && activeWorkerScopes > 0) {
1089+
facts.push(`worker scopes ${activeWorkerScopes.toLocaleString()}`);
1090+
}
1091+
1092+
if (Number.isFinite(maxAgeMs) && maxAgeMs > 0) {
1093+
facts.push(`max age ${this.durationMillisecondsLabel(maxAgeMs)}`);
1094+
}
1095+
1096+
return facts;
1097+
},
1098+
10451099
integerLabel(value) {
10461100
const number = Number(value || 0);
10471101

tests/Feature/V2HealthControllerTest.php

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,135 @@ public function testHealthEndpointPublishesCompatibilityAlertFactsWhenFailClosed
339339
$this->assertStringContainsString('0 supporting workers', (string) ($alert['details'] ?? ''));
340340
}
341341

342+
public function testHealthEndpointPublishesRoutingHealthAlertFactsForWorkersPanel(): void
343+
{
344+
Carbon::setTestNow('2026-04-09 12:00:00');
345+
$this->beforeApplicationDestroyed(static function (): void {
346+
Carbon::setTestNow();
347+
});
348+
349+
config()->set('queue.default', 'redis');
350+
config()->set('queue.connections.redis.driver', 'redis');
351+
config()->set('cache.default', 'file');
352+
config()->set('waterline.namespace', 'waterline-routing-health');
353+
config()->set('workflows.v2.matching_role.queue_wake_enabled', false);
354+
config()->set('workflows.v2.task_dispatch_mode', 'poll');
355+
356+
$this->createWorkerRegistrationsTable();
357+
358+
WorkerRegistration::create([
359+
'worker_id' => 'routing-health-poller',
360+
'namespace' => 'waterline-routing-health',
361+
'task_queue' => 'default',
362+
'runtime' => 'php',
363+
'sdk_version' => '1.0.0',
364+
'build_id' => 'build-routing-health',
365+
'supported_workflow_types' => ['workflow.test'],
366+
'supported_activity_types' => ['activity.test'],
367+
'max_concurrent_workflow_tasks' => 8,
368+
'max_concurrent_activity_tasks' => 4,
369+
'last_heartbeat_at' => now()->subSeconds(15),
370+
'status' => 'active',
371+
]);
372+
373+
$instance = WorkflowInstance::query()->create([
374+
'id' => 'waterline-routing-health-instance',
375+
'workflow_class' => 'WorkflowClass',
376+
'workflow_type' => 'workflow.test',
377+
'run_count' => 1,
378+
]);
379+
380+
$run = WorkflowRun::query()->create([
381+
'id' => '01JHWATERLINEROUTINGRUN01',
382+
'workflow_instance_id' => $instance->id,
383+
'run_number' => 1,
384+
'namespace' => 'waterline-routing-health',
385+
'workflow_class' => 'WorkflowClass',
386+
'workflow_type' => 'workflow.test',
387+
'status' => 'running',
388+
'started_at' => now()->subMinutes(12),
389+
'last_progress_at' => now()->subMinute(),
390+
]);
391+
392+
$instance->forceFill([
393+
'current_run_id' => $run->id,
394+
])->save();
395+
396+
WorkflowRunSummary::query()->create([
397+
'id' => $run->id,
398+
'workflow_instance_id' => $instance->id,
399+
'run_number' => 1,
400+
'is_current_run' => true,
401+
'engine_source' => 'v2',
402+
'class' => 'WorkflowClass',
403+
'workflow_type' => 'workflow.test',
404+
'status' => 'running',
405+
'status_bucket' => 'running',
406+
'namespace' => 'waterline-routing-health',
407+
'started_at' => now()->subMinutes(12),
408+
'next_task_at' => now()->subMinutes(7),
409+
'liveness_state' => 'workflow_task_waiting_for_compatible_worker',
410+
'liveness_reason' => 'No active worker heartbeat advertises the required compatibility marker.',
411+
'created_at' => now()->subMinutes(12),
412+
'updated_at' => now(),
413+
]);
414+
415+
WorkflowTask::query()->create([
416+
'id' => '01JHWATERLINEROUTINGTASK0001',
417+
'workflow_run_id' => $run->id,
418+
'namespace' => 'waterline-routing-health',
419+
'task_type' => TaskType::Workflow->value,
420+
'status' => TaskStatus::Ready->value,
421+
'connection' => 'redis',
422+
'queue' => 'default',
423+
'available_at' => now()->subMinutes(4),
424+
'last_dispatched_at' => now()->subMinutes(4),
425+
'created_at' => now()->subMinutes(4),
426+
'updated_at' => now(),
427+
]);
428+
429+
WorkflowTask::query()->create([
430+
'id' => '01JHWATERLINEROUTINGTASK0002',
431+
'workflow_run_id' => $run->id,
432+
'namespace' => 'waterline-routing-health',
433+
'task_type' => TaskType::Workflow->value,
434+
'status' => TaskStatus::Ready->value,
435+
'connection' => 'redis',
436+
'queue' => 'default',
437+
'available_at' => now()->subSeconds(90),
438+
'last_claim_failed_at' => now()->subSeconds(90),
439+
'last_claim_error' => 'Previous claim attempt failed before lease grant.',
440+
'created_at' => now()->subSeconds(90),
441+
'updated_at' => now(),
442+
]);
443+
444+
$payload = $this->get('/waterline/api/v2/health')
445+
->assertStatus(200)
446+
->json();
447+
448+
$alert = $this->coordinationAlertByKey($payload, 'routing_health');
449+
$this->assertNotNull($alert);
450+
$this->assertSame('health_check', $alert['source']);
451+
$this->assertSame('warning', $alert['status']);
452+
$this->assertSame('correctness', $alert['category']);
453+
$this->assertSame(1, $alert['facts']['compatibility_blocked_runs'] ?? null);
454+
$this->assertSame(1, $alert['facts']['dispatch_overdue_tasks'] ?? null);
455+
$this->assertSame(1, $alert['facts']['claim_failed_tasks'] ?? null);
456+
$this->assertSame(7 * 60 * 1000, $alert['facts']['max_compatibility_blocked_age_ms'] ?? null);
457+
$this->assertSame(4 * 60 * 1000, $alert['facts']['max_dispatch_overdue_age_ms'] ?? null);
458+
$this->assertSame(90 * 1000, $alert['facts']['max_claim_failed_age_ms'] ?? null);
459+
$this->assertFalse($alert['facts']['queue_wake_enabled'] ?? true);
460+
$this->assertSame('dedicated', $alert['facts']['matching_shape'] ?? null);
461+
$this->assertSame('poll', $alert['facts']['task_dispatch_mode'] ?? null);
462+
$this->assertSame(0, $alert['facts']['active_worker_scopes'] ?? null);
463+
$this->assertStringContainsString('compatibility-blocked run', (string) ($alert['details'] ?? ''));
464+
$this->assertStringContainsString('dispatch-overdue task', (string) ($alert['details'] ?? ''));
465+
$this->assertStringContainsString('claim-failed task', (string) ($alert['details'] ?? ''));
466+
$this->assertStringContainsString('matching role dedicated in poll mode', (string) ($alert['details'] ?? ''));
467+
$this->assertStringContainsString('queue wake disabled', (string) ($alert['details'] ?? ''));
468+
$this->assertStringContainsString('worst-case age 7m00s', (string) ($alert['details'] ?? ''));
469+
}
470+
342471
public function testHealthEndpointReturnsUnavailableForBlockingBackendIssues(): void
343472
{
344473
config()->set('queue.default', 'sync');

0 commit comments

Comments
 (0)