Skip to content

Commit 6efc487

Browse files
Surface dispatch-overdue wake-latency age on operator metrics and task_transport health
Adds tasks.oldest_dispatch_overdue_since and tasks.max_dispatch_overdue_age_ms to OperatorMetrics::snapshot() so operators can answer "how long has the oldest ready-but-unclaimed task been waiting for a working dispatch wake?" from the metric alone, completing the coordination-health age set alongside tasks.oldest_lease_expired_at, tasks.oldest_ready_due_at, runs.oldest_wait_started_at, and backlog.oldest_compatibility_blocked_started_at. The age is the effective COALESCE(last_dispatched_at, created_at) — the timestamp the task has been waiting for a successful dispatch since (either the last attempted dispatch that didn't stick, or the task's creation time if it was never dispatched), computed across the dispatch_overdue subset only. Refactors the existing filter into a private dispatchOverdueQuery() helper so the count and the age use one authoritative query. Forwards the same trio on HealthCheck::taskTransportCheck() data as dispatch_overdue_tasks / oldest_dispatch_overdue_since / max_dispatch_overdue_age_ms so wake-latency is legible from /healthz without re-reading the metrics snapshot. The check's escalation predicate stays unchanged (it still escalates only on tasks.unhealthy, which already counts dispatch_overdue); the age data is observability so operators can tell "dispatch wake is sporadically slow" apart from "dispatch wake has stalled on this task for minutes". Pins the new keys in docs/architecture/rollout-safety.md frozen metric table and "Ready but unclaimed" stuck-detector entry, adds a dispatch-overdue-age row assertion in RolloutSafetyDocumentationTest, and extends the task_transport HealthCheckTest assertions to pin the new fields in the health check data. Extends V2OperatorMetricsTest with two focused tests proving the metric selects the earliest effective dispatch moment across mixed dispatched/never-dispatched/failed/healthy task fixtures and returns null/0 when no tasks are overdue, plus broad-snapshot and task_transport forward assertions on the existing mixed-state fixture.
1 parent db2cef8 commit 6efc487

6 files changed

Lines changed: 192 additions & 4 deletions

File tree

docs/architecture/rollout-safety.md

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,7 @@ change.
411411
| `tasks` | `dispatch_overdue`, `lease_expired` | lease and dispatch timing |
412412
| `tasks` | `oldest_lease_expired_at`, `max_lease_expired_age_ms` | earliest `lease_expires_at` among leased tasks whose lease has expired at snapshot time and the largest expired-lease age in milliseconds, mirroring the `backlog.oldest_compatibility_blocked_started_at` / `max_compatibility_blocked_age_ms` shape so operators can answer "how long has the worst leased task been expired without redelivery?" (the primary stuck-lease duplicate-risk age indicator) from the metric alone |
413413
| `tasks` | `oldest_ready_due_at`, `max_ready_due_age_ms` | earliest "ready since" timestamp among ready-due tasks (the effective `COALESCE(available_at, created_at)``available_at` when the task was delayed, otherwise the creation time that made it immediately actionable) and the largest ready-age in milliseconds, mirroring the `oldest_lease_expired_at` / `max_lease_expired_age_ms` shape so operators can read queue latency ("how long has the oldest actionable task been waiting to dispatch?") from the metric alone without walking `workflow_tasks` |
414+
| `tasks` | `oldest_dispatch_overdue_since`, `max_dispatch_overdue_age_ms` | earliest `COALESCE(last_dispatched_at, created_at)` among dispatch-overdue tasks — the timestamp the worst-case ready-but-unclaimed task has been waiting for a successful dispatch wake since (either its last attempted dispatch that didn't stick or its creation time if it was never dispatched) — and the largest age in milliseconds, mirroring the `oldest_ready_due_at` / `max_ready_due_age_ms` shape so operators can read wake-latency ("how long has the oldest ready-but-unclaimed task been waiting for a working dispatch wake?") from the metric alone without walking `workflow_tasks` |
414415
| `tasks` | `unhealthy` | sum of transport failure and lease expiry counts (the primary duplicate-risk indicator) |
415416
| `backlog` | `runnable_tasks`, `delayed_tasks`, `leased_tasks` | authoritative backlog counts |
416417
| `backlog` | `unhealthy_tasks`, `repair_needed_runs`, `claim_failed_runs`, `compatibility_blocked_runs` | stuck/blocked roll-ups |
@@ -486,8 +487,16 @@ are authoritative and how they surface.
486487
authority for the redelivery decision.
487488
- **Ready but unclaimed.** A ready task that has sat past the
488489
repair window without being claimed is counted under
489-
`tasks.dispatch_overdue` and considered by
490-
`TaskRepairPolicy::readyTaskNeedsRedispatch()`.
490+
`tasks.dispatch_overdue`, its worst-case waiting-for-dispatch
491+
timestamp is surfaced through
492+
`tasks.oldest_dispatch_overdue_since` and
493+
`tasks.max_dispatch_overdue_age_ms`, and both keys are forwarded on
494+
the `task_transport` health check (`dispatch_overdue_tasks`,
495+
`oldest_dispatch_overdue_since`, `max_dispatch_overdue_age_ms`).
496+
`TaskRepairPolicy::readyTaskNeedsRedispatch()` is the authority for
497+
the redispatch decision; the age data is observability so operators
498+
can tell the difference between "dispatch wake is sporadically
499+
slow" and "dispatch wake has stalled on this task for minutes".
491500
- **Repair-needed runs.** Runs whose projected state shows
492501
`liveness_state = repair_needed` are counted under
493502
`runs.repair_needed` and surface through the

src/V2/Support/HealthCheck.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,11 @@ private static function taskTransportCheck(array $tasks, array $backlog): array
235235
? $tasks['oldest_ready_due_at']
236236
: null,
237237
'max_ready_due_age_ms' => self::integer($tasks['max_ready_due_age_ms'] ?? 0),
238+
'dispatch_overdue_tasks' => self::integer($tasks['dispatch_overdue'] ?? 0),
239+
'oldest_dispatch_overdue_since' => is_string($tasks['oldest_dispatch_overdue_since'] ?? null)
240+
? $tasks['oldest_dispatch_overdue_since']
241+
: null,
242+
'max_dispatch_overdue_age_ms' => self::integer($tasks['max_dispatch_overdue_age_ms'] ?? 0),
238243
'repair_needed_runs' => self::integer($backlog['repair_needed_runs'] ?? 0),
239244
'claim_failed_runs' => self::integer($backlog['claim_failed_runs'] ?? 0),
240245
'compatibility_blocked_runs' => self::integer($backlog['compatibility_blocked_runs'] ?? 0),

src/V2/Support/OperatorMetrics.php

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ private static function taskMetrics(CarbonInterface $now, ?string $namespace): a
121121
{
122122
$oldestLeaseExpiredAt = self::oldestLeaseExpiredAt($now, $namespace);
123123
$oldestReadyDueAt = self::oldestReadyDueAt($now, $namespace);
124+
$oldestDispatchOverdueSince = self::oldestDispatchOverdueSince($now, $namespace);
124125

125126
return [
126127
'open' => self::openTasks($namespace),
@@ -140,6 +141,10 @@ private static function taskMetrics(CarbonInterface $now, ?string $namespace): a
140141
'max_ready_due_age_ms' => $oldestReadyDueAt === null
141142
? 0
142143
: (int) $oldestReadyDueAt->diffInMilliseconds($now),
144+
'oldest_dispatch_overdue_since' => $oldestDispatchOverdueSince?->toJSON(),
145+
'max_dispatch_overdue_age_ms' => $oldestDispatchOverdueSince === null
146+
? 0
147+
: (int) $oldestDispatchOverdueSince->diffInMilliseconds($now),
143148
'unhealthy' => self::dispatchFailedTasks($namespace)
144149
+ self::claimFailedTasks($namespace)
145150
+ self::dispatchOverdueTasks($now, $namespace)
@@ -948,6 +953,36 @@ private static function claimFailedTasks(?string $namespace): int
948953
}
949954

950955
private static function dispatchOverdueTasks(CarbonInterface $now, ?string $namespace): int
956+
{
957+
return self::dispatchOverdueQuery($now, $namespace)->count();
958+
}
959+
960+
/**
961+
* Earliest "waiting-for-dispatch" moment among dispatch-overdue tasks —
962+
* the effective `COALESCE(last_dispatched_at, created_at)`, which is the
963+
* timestamp the task has been waiting for a successful dispatch since
964+
* (either the last attempted dispatch that didn't stick, or the task's
965+
* creation time for tasks that were never dispatched at all). Rollout-safety
966+
* surfaces this alongside `tasks.dispatch_overdue` so operators can read
967+
* wake-latency ("how long has the oldest ready-but-unclaimed task been
968+
* waiting for a working dispatch wake?") from the metric alone, mirroring
969+
* the existing `oldest_ready_due_at` / `max_ready_due_age_ms` shape.
970+
*/
971+
private static function oldestDispatchOverdueSince(CarbonInterface $now, ?string $namespace): ?CarbonInterface
972+
{
973+
/** @var WorkflowTask|null $task */
974+
$task = self::dispatchOverdueQuery($now, $namespace)
975+
->orderByRaw('COALESCE(last_dispatched_at, created_at) asc')
976+
->first();
977+
978+
if (! $task instanceof WorkflowTask) {
979+
return null;
980+
}
981+
982+
return $task->last_dispatched_at ?? $task->created_at;
983+
}
984+
985+
private static function dispatchOverdueQuery(CarbonInterface $now, ?string $namespace)
951986
{
952987
$cutoff = $now->copy()
953988
->subSeconds(TaskRepairPolicy::redispatchAfterSeconds());
@@ -972,8 +1007,7 @@ private static function dispatchOverdueTasks(CarbonInterface $now, ?string $name
9721007
$neverDispatched->whereNull('last_dispatched_at')
9731008
->where('created_at', '<=', $cutoff);
9741009
});
975-
})
976-
->count();
1010+
});
9771011
}
9781012

9791013
private static function dispatchFailedQuery(?string $namespace)

tests/Feature/V2/V2OperatorMetricsTest.php

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,13 @@ public function testSnapshotSummarizesDurableBacklogRepairCompatibilityAndWorker
233233
$snapshot['tasks']['oldest_ready_due_at'],
234234
);
235235
$this->assertSame(10 * 1000, $snapshot['tasks']['max_ready_due_age_ms']);
236+
$this->assertSame(
237+
Carbon::parse('2026-04-09 12:00:00')
238+
->subSeconds(10)
239+
->toJSON(),
240+
$snapshot['tasks']['oldest_dispatch_overdue_since'],
241+
);
242+
$this->assertSame(10 * 1000, $snapshot['tasks']['max_dispatch_overdue_age_ms']);
236243
$this->assertSame(4, $snapshot['tasks']['unhealthy']);
237244
$this->assertSame(4, $snapshot['backlog']['runnable_tasks']);
238245
$this->assertSame(1, $snapshot['backlog']['delayed_tasks']);
@@ -351,6 +358,14 @@ public function testSnapshotSummarizesDurableBacklogRepairCompatibilityAndWorker
351358
$taskTransport['data']['oldest_ready_due_at'],
352359
);
353360
$this->assertSame(10 * 1000, $taskTransport['data']['max_ready_due_age_ms']);
361+
$this->assertSame(1, $taskTransport['data']['dispatch_overdue_tasks']);
362+
$this->assertSame(
363+
Carbon::parse('2026-04-09 12:00:00')
364+
->subSeconds(10)
365+
->toJSON(),
366+
$taskTransport['data']['oldest_dispatch_overdue_since'],
367+
);
368+
$this->assertSame(10 * 1000, $taskTransport['data']['max_dispatch_overdue_age_ms']);
354369
}
355370

356371
public function testSnapshotCountsStaleRunSummaryProjectionRows(): void
@@ -1020,6 +1035,117 @@ public function testSnapshotSurfacesRunWaitAgeForEveryRunningWaitNotJustCompatib
10201035
$this->assertSame(7 * 60 * 1000, $resumePaths['data']['max_wait_age_ms']);
10211036
}
10221037

1038+
public function testSnapshotSurfacesDispatchOverdueAgeFromEarliestEffectiveDispatchMoment(): void
1039+
{
1040+
Carbon::setTestNow('2026-04-09 12:00:00');
1041+
$this->beforeApplicationDestroyed(static function (): void {
1042+
Carbon::setTestNow();
1043+
});
1044+
1045+
$now = Carbon::now();
1046+
1047+
$run = $this->createRunWithSummary(
1048+
instanceId: 'dispatch-overdue-age-instance',
1049+
runId: '01JDISPATCHOVDRUN0000000001',
1050+
status: 'running',
1051+
statusBucket: 'running',
1052+
livenessState: 'running',
1053+
);
1054+
1055+
// Worst-case: ready task whose last successful dispatch was 90s ago.
1056+
$this->createTask($run, '01JDISPATCHOVDTASK000000001', TaskStatus::Ready->value, [
1057+
'available_at' => $now->copy()
1058+
->subSeconds(120),
1059+
'last_dispatched_at' => $now->copy()
1060+
->subSeconds(90),
1061+
'created_at' => $now->copy()
1062+
->subSeconds(150),
1063+
]);
1064+
1065+
// Never-dispatched ready task created well past the cutoff — counted
1066+
// as overdue but its effective age (created_at = 30s ago) is newer
1067+
// than the 90s task above, so it must not win the "oldest since".
1068+
$this->createTask($run, '01JDISPATCHOVDTASK000000002', TaskStatus::Ready->value, [
1069+
'available_at' => $now->copy()
1070+
->subSeconds(30),
1071+
'created_at' => $now->copy()
1072+
->subSeconds(30),
1073+
]);
1074+
1075+
// Ready task dispatched well within the cutoff — healthy, NOT overdue.
1076+
$this->createTask($run, '01JDISPATCHOVDTASK000000003', TaskStatus::Ready->value, [
1077+
'available_at' => $now->copy()
1078+
->subSecond(),
1079+
'last_dispatched_at' => $now->copy()
1080+
->subSecond(),
1081+
'created_at' => $now->copy()
1082+
->subSecond(),
1083+
]);
1084+
1085+
// Dispatch-failed ready task — applyDispatchHealthy excludes it from
1086+
// the overdue set even though its created_at is well past cutoff.
1087+
$this->createTask($run, '01JDISPATCHOVDTASK000000004', TaskStatus::Ready->value, [
1088+
'available_at' => $now->copy()
1089+
->subSeconds(120),
1090+
'last_dispatch_attempt_at' => $now->copy()
1091+
->subSeconds(30),
1092+
'last_dispatch_error' => 'Queue transport unavailable.',
1093+
'created_at' => $now->copy()
1094+
->subSeconds(120),
1095+
]);
1096+
1097+
$snapshot = OperatorMetrics::snapshot($now);
1098+
1099+
$expectedOldestDispatchOverdueSince = $now->copy()
1100+
->subSeconds(90)
1101+
->toJSON();
1102+
1103+
$this->assertSame(2, $snapshot['tasks']['dispatch_overdue']);
1104+
$this->assertSame($expectedOldestDispatchOverdueSince, $snapshot['tasks']['oldest_dispatch_overdue_since']);
1105+
$this->assertSame(90 * 1000, $snapshot['tasks']['max_dispatch_overdue_age_ms']);
1106+
1107+
$healthSnapshot = HealthCheck::snapshot($now);
1108+
$taskTransport = collect($healthSnapshot['checks'])->firstWhere('name', 'task_transport');
1109+
$this->assertNotNull($taskTransport);
1110+
$this->assertSame(2, $taskTransport['data']['dispatch_overdue_tasks']);
1111+
$this->assertSame($expectedOldestDispatchOverdueSince, $taskTransport['data']['oldest_dispatch_overdue_since']);
1112+
$this->assertSame(90 * 1000, $taskTransport['data']['max_dispatch_overdue_age_ms']);
1113+
}
1114+
1115+
public function testSnapshotReportsDispatchOverdueAgeAsZeroWhenNoTasksAreOverdue(): void
1116+
{
1117+
Carbon::setTestNow('2026-04-09 12:00:00');
1118+
$this->beforeApplicationDestroyed(static function (): void {
1119+
Carbon::setTestNow();
1120+
});
1121+
1122+
$now = Carbon::now();
1123+
1124+
$run = $this->createRunWithSummary(
1125+
instanceId: 'dispatch-overdue-none-instance',
1126+
runId: '01JDISPATCHNONERUN0000000001',
1127+
status: 'running',
1128+
statusBucket: 'running',
1129+
livenessState: 'running',
1130+
);
1131+
1132+
// Fresh ready task — dispatched within the cutoff, not overdue.
1133+
$this->createTask($run, '01JDISPATCHNONETASK000000001', TaskStatus::Ready->value, [
1134+
'available_at' => $now->copy()
1135+
->subSecond(),
1136+
'last_dispatched_at' => $now->copy()
1137+
->subSecond(),
1138+
'created_at' => $now->copy()
1139+
->subSecond(),
1140+
]);
1141+
1142+
$snapshot = OperatorMetrics::snapshot($now);
1143+
1144+
$this->assertSame(0, $snapshot['tasks']['dispatch_overdue']);
1145+
$this->assertNull($snapshot['tasks']['oldest_dispatch_overdue_since']);
1146+
$this->assertSame(0, $snapshot['tasks']['max_dispatch_overdue_age_ms']);
1147+
}
1148+
10231149
public function testSnapshotReportsRunWaitAgeAsZeroWhenNoRunsAreWaiting(): void
10241150
{
10251151
Carbon::setTestNow('2026-04-09 12:00:00');

tests/Unit/V2/HealthCheckTest.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -552,6 +552,9 @@ public function testSnapshotWarnsWhenOpenRunHasNoDurableResumePath(): void
552552
$this->assertSame(200, HealthCheck::httpStatus($snapshot));
553553
$this->assertSame('ok', $taskTransport['status']);
554554
$this->assertSame(0, $taskTransport['data']['unhealthy_tasks']);
555+
$this->assertSame(0, $taskTransport['data']['dispatch_overdue_tasks']);
556+
$this->assertNull($taskTransport['data']['oldest_dispatch_overdue_since']);
557+
$this->assertSame(0, $taskTransport['data']['max_dispatch_overdue_age_ms']);
555558
$this->assertSame(1, $taskTransport['data']['repair_needed_runs']);
556559
$this->assertSame('warning', $resumePaths['status']);
557560
$this->assertSame(1, $resumePaths['data']['repair_needed_runs']);

tests/Unit/V2/RolloutSafetyDocumentationTest.php

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,17 @@ public function testContractDocumentFreezesRunWaitAgeRow(): void
361361
);
362362
}
363363

364+
public function testContractDocumentFreezesDispatchOverdueAgeRow(): void
365+
{
366+
$contents = $this->documentContents();
367+
368+
$this->assertMatchesRegularExpression(
369+
'/\|\s*`tasks`\s*\|[^|]*`oldest_dispatch_overdue_since`[^|]*`max_dispatch_overdue_age_ms`/',
370+
$contents,
371+
'Rollout safety contract must pin the tasks dispatch-overdue age row so operators can read wake-latency ("how long has the oldest ready-but-unclaimed task been waiting for a working dispatch wake?") from OperatorMetrics::snapshot() without walking workflow_tasks.',
372+
);
373+
}
374+
364375
public function testContractDocumentFreezesHealthCheckNames(): void
365376
{
366377
$contents = $this->documentContents();

0 commit comments

Comments
 (0)