Skip to content

Commit c2bb956

Browse files
Surface dispatch-failed age on operator metrics and task_transport health
Freezes `operator_metrics.tasks.oldest_dispatch_failed_at` (ISO-8601 or null) and `operator_metrics.tasks.max_dispatch_failed_age_ms` (integer ms) on `OperatorMetrics::snapshot()`. The pair mirrors the existing `oldest_claim_failed_at` / `max_claim_failed_age_ms` shape on the claim path but for the dispatch path, so operators can read "how long has the worst-case task been sitting with an uncleared dispatch error?" — the primary transport-failure age indicator on the dispatch path — from the metric alone without walking `workflow_tasks`. Forwards the pair plus `dispatch_failed_tasks` on `HealthCheck::taskTransportCheck()` data so the dispatch-failed shape sits next to claim-failed, dispatch-overdue, ready-due, and lease-expired on the same task_transport check. The dispatch-failed predicate matches `applyDispatchFailed()` exactly: Ready tasks whose most recent `last_dispatch_attempt_at` recorded a non-empty `last_dispatch_error` that has not been superseded by a later successful `last_dispatched_at`. Tasks whose dispatch error has been cleared, whose last dispatch attempt has been superseded by a successful dispatch, or whose status has moved past Ready (Leased, etc.) are excluded so the signal isolates the active dispatch-failure cohort from healthy and progressing work. Pins the row on `docs/architecture/rollout-safety.md` and adds a dispatch-failed-age bullet, guarded by `RolloutSafetyDocumentationTest::testContractDocumentFreezesDispatchFailedAgeRow`.
1 parent 4451698 commit c2bb956

6 files changed

Lines changed: 208 additions & 0 deletions

File tree

docs/architecture/rollout-safety.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,7 @@ change.
413413
| `tasks` | `oldest_ready_due_at`, `max_ready_due_age_ms` | earliest "ready since" timestamp among ready-due tasks (the effective `COALESCE(available_at, created_at)``available_at` when the task was delayed, otherwise the creation time that made it immediately actionable) and the largest ready-age in milliseconds, mirroring the `oldest_lease_expired_at` / `max_lease_expired_age_ms` shape so operators can read queue latency ("how long has the oldest actionable task been waiting to dispatch?") from the metric alone without walking `workflow_tasks` |
414414
| `tasks` | `oldest_dispatch_overdue_since`, `max_dispatch_overdue_age_ms` | earliest `COALESCE(last_dispatched_at, created_at)` among dispatch-overdue tasks — the timestamp the worst-case ready-but-unclaimed task has been waiting for a successful dispatch wake since (either its last attempted dispatch that didn't stick or its creation time if it was never dispatched) — and the largest age in milliseconds, mirroring the `oldest_ready_due_at` / `max_ready_due_age_ms` shape so operators can read wake-latency ("how long has the oldest ready-but-unclaimed task been waiting for a working dispatch wake?") from the metric alone without walking `workflow_tasks` |
415415
| `tasks` | `oldest_claim_failed_at`, `max_claim_failed_age_ms` | earliest `last_claim_failed_at` among claim-failed tasks (Ready tasks whose most recent claim attempt recorded an uncleared `last_claim_error`) and the largest claim-failed age in milliseconds, mirroring the `oldest_dispatch_overdue_since` / `max_dispatch_overdue_age_ms` shape for the dispatch path so operators can read "how long has the worst-case task been sitting with an uncleared claim error?" — the primary lease-conflict and duplicate-risk age indicator for the claim path — from the metric alone without walking `workflow_tasks` |
416+
| `tasks` | `oldest_dispatch_failed_at`, `max_dispatch_failed_age_ms` | earliest `last_dispatch_attempt_at` among dispatch-failed tasks (Ready tasks whose most recent dispatch attempt recorded an uncleared `last_dispatch_error` that has not been superseded by a later successful dispatch) and the largest dispatch-failed age in milliseconds, mirroring the `oldest_claim_failed_at` / `max_claim_failed_age_ms` shape for the claim path so operators can read "how long has the worst-case task been sitting with an uncleared dispatch error?" — the primary transport-failure age indicator on the dispatch path — from the metric alone without walking `workflow_tasks` |
416417
| `tasks` | `unhealthy` | sum of transport failure and lease expiry counts (the primary duplicate-risk indicator) |
417418
| `activities` | `retrying`, `oldest_retrying_started_at`, `max_retrying_age_ms` | activity executions currently in the retry window (Pending status with `attempt_count > 0`), the earliest `started_at` among them, and the largest retrying age in milliseconds, mirroring the `tasks.oldest_lease_expired_at` / `max_lease_expired_age_ms` shape on the task path so operators can answer "how long has the worst-case activity been chewing retries?" — the primary retry-rate age indicator on the activity path — from the metric alone without walking `activity_executions` |
418419
| `backlog` | `runnable_tasks`, `delayed_tasks`, `leased_tasks` | authoritative backlog counts |

src/V2/Support/HealthCheck.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,11 @@ private static function taskTransportCheck(array $tasks, array $backlog): array
245245
? $tasks['oldest_claim_failed_at']
246246
: null,
247247
'max_claim_failed_age_ms' => self::integer($tasks['max_claim_failed_age_ms'] ?? 0),
248+
'dispatch_failed_tasks' => self::integer($tasks['dispatch_failed'] ?? 0),
249+
'oldest_dispatch_failed_at' => is_string($tasks['oldest_dispatch_failed_at'] ?? null)
250+
? $tasks['oldest_dispatch_failed_at']
251+
: null,
252+
'max_dispatch_failed_age_ms' => self::integer($tasks['max_dispatch_failed_age_ms'] ?? 0),
248253
'repair_needed_runs' => self::integer($backlog['repair_needed_runs'] ?? 0),
249254
'claim_failed_runs' => self::integer($backlog['claim_failed_runs'] ?? 0),
250255
'compatibility_blocked_runs' => self::integer($backlog['compatibility_blocked_runs'] ?? 0),

src/V2/Support/OperatorMetrics.php

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ private static function taskMetrics(CarbonInterface $now, ?string $namespace): a
123123
$oldestReadyDueAt = self::oldestReadyDueAt($now, $namespace);
124124
$oldestDispatchOverdueSince = self::oldestDispatchOverdueSince($now, $namespace);
125125
$oldestClaimFailedAt = self::oldestClaimFailedAt($namespace);
126+
$oldestDispatchFailedAt = self::oldestDispatchFailedAt($namespace);
126127

127128
return [
128129
'open' => self::openTasks($namespace),
@@ -150,6 +151,10 @@ private static function taskMetrics(CarbonInterface $now, ?string $namespace): a
150151
'max_claim_failed_age_ms' => $oldestClaimFailedAt === null
151152
? 0
152153
: (int) $oldestClaimFailedAt->diffInMilliseconds($now),
154+
'oldest_dispatch_failed_at' => $oldestDispatchFailedAt?->toJSON(),
155+
'max_dispatch_failed_age_ms' => $oldestDispatchFailedAt === null
156+
? 0
157+
: (int) $oldestDispatchFailedAt->diffInMilliseconds($now),
153158
'unhealthy' => self::dispatchFailedTasks($namespace)
154159
+ self::claimFailedTasks($namespace)
155160
+ self::dispatchOverdueTasks($now, $namespace)
@@ -1046,6 +1051,32 @@ private static function oldestClaimFailedAt(?string $namespace): ?CarbonInterfac
10461051
return $task->last_claim_failed_at;
10471052
}
10481053

1054+
/**
1055+
* Earliest `last_dispatch_attempt_at` among tasks currently counted by
1056+
* `tasks.dispatch_failed` (Ready tasks whose most recent dispatch
1057+
* attempt recorded an uncleared `last_dispatch_error` that has not
1058+
* been superseded by a later successful dispatch). Rollout-safety
1059+
* surfaces this alongside `tasks.dispatch_failed` so operators can
1060+
* read "how long has the worst-case task been sitting with an
1061+
* uncleared dispatch error?" — the primary transport-failure age
1062+
* indicator on the dispatch path — from the metric alone, mirroring
1063+
* the existing `oldest_claim_failed_at` / `max_claim_failed_age_ms`
1064+
* shape for the claim path.
1065+
*/
1066+
private static function oldestDispatchFailedAt(?string $namespace): ?CarbonInterface
1067+
{
1068+
/** @var WorkflowTask|null $task */
1069+
$task = self::dispatchFailedQuery($namespace)
1070+
->orderBy('last_dispatch_attempt_at')
1071+
->first();
1072+
1073+
if (! $task instanceof WorkflowTask) {
1074+
return null;
1075+
}
1076+
1077+
return $task->last_dispatch_attempt_at;
1078+
}
1079+
10491080
private static function dispatchOverdueQuery(CarbonInterface $now, ?string $namespace)
10501081
{
10511082
$cutoff = $now->copy()

tests/Feature/V2/V2OperatorMetricsTest.php

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1298,6 +1298,161 @@ public function testSnapshotReportsClaimFailedAgeAsZeroWhenNoTasksFailedToClaim(
12981298
$this->assertSame(0, $taskTransport['data']['max_claim_failed_age_ms']);
12991299
}
13001300

1301+
public function testSnapshotSurfacesDispatchFailedAgeFromOldestDispatchFailure(): void
1302+
{
1303+
Carbon::setTestNow('2026-04-09 12:00:00');
1304+
$this->beforeApplicationDestroyed(static function (): void {
1305+
Carbon::setTestNow();
1306+
});
1307+
1308+
$now = Carbon::now();
1309+
1310+
$run = $this->createRunWithSummary(
1311+
instanceId: 'dispatch-failed-age-instance',
1312+
runId: '01JDSPFAILRUN0000000000001',
1313+
status: 'running',
1314+
statusBucket: 'running',
1315+
livenessState: 'running',
1316+
);
1317+
1318+
// Worst-case: ready task whose last dispatch attempt failed 90s ago
1319+
// and has not been superseded by a successful dispatch.
1320+
$this->createTask($run, '01JDSPFAILTASK000000000001', TaskStatus::Ready->value, [
1321+
'available_at' => $now->copy()
1322+
->subSeconds(120),
1323+
'last_dispatched_at' => null,
1324+
'last_dispatch_attempt_at' => $now->copy()
1325+
->subSeconds(90),
1326+
'last_dispatch_error' => 'Workflow v2 backend capabilities are unsupported: [queue_sync_unsupported] sync.',
1327+
'created_at' => $now->copy()
1328+
->subSeconds(150),
1329+
]);
1330+
1331+
// Newer dispatch failure — counted but must not win the "oldest at".
1332+
$this->createTask($run, '01JDSPFAILTASK000000000002', TaskStatus::Ready->value, [
1333+
'available_at' => $now->copy()
1334+
->subSeconds(30),
1335+
'last_dispatched_at' => null,
1336+
'last_dispatch_attempt_at' => $now->copy()
1337+
->subSeconds(15),
1338+
'last_dispatch_error' => 'Connection refused while broadcasting workflow task wake.',
1339+
'created_at' => $now->copy()
1340+
->subSeconds(30),
1341+
]);
1342+
1343+
// Healthy ready task — not counted, and its created_at must not win.
1344+
$this->createTask($run, '01JDSPFAILTASK000000000003', TaskStatus::Ready->value, [
1345+
'available_at' => $now->copy()
1346+
->subSecond(),
1347+
'last_dispatched_at' => $now->copy()
1348+
->subSecond(),
1349+
'created_at' => $now->copy()
1350+
->subSeconds(200),
1351+
]);
1352+
1353+
// Older dispatch error superseded by a later successful dispatch —
1354+
// excluded because applyDispatchFailed requires the failed attempt
1355+
// to have happened after the most recent successful dispatch.
1356+
$this->createTask($run, '01JDSPFAILTASK000000000004', TaskStatus::Ready->value, [
1357+
'available_at' => $now->copy()
1358+
->subSeconds(360),
1359+
'last_dispatch_attempt_at' => $now->copy()
1360+
->subSeconds(300),
1361+
'last_dispatch_error' => 'Earlier dispatch attempt failed before redelivery.',
1362+
'last_dispatched_at' => $now->copy()
1363+
->subSeconds(100),
1364+
'created_at' => $now->copy()
1365+
->subSeconds(360),
1366+
]);
1367+
1368+
// Dispatch error cleared (empty string) — excluded by applyDispatchFailed.
1369+
$this->createTask($run, '01JDSPFAILTASK000000000005', TaskStatus::Ready->value, [
1370+
'available_at' => $now->copy()
1371+
->subSeconds(60),
1372+
'last_dispatched_at' => null,
1373+
'last_dispatch_attempt_at' => $now->copy()
1374+
->subSeconds(300),
1375+
'last_dispatch_error' => '',
1376+
'created_at' => $now->copy()
1377+
->subSeconds(60),
1378+
]);
1379+
1380+
// Leased task with an older last_dispatch_attempt_at — excluded
1381+
// because applyDispatchFailed requires status=Ready.
1382+
$this->createTask($run, '01JDSPFAILTASK000000000006', TaskStatus::Leased->value, [
1383+
'available_at' => $now->copy()
1384+
->subSeconds(60),
1385+
'leased_at' => $now->copy()
1386+
->subSeconds(5),
1387+
'lease_owner' => 'worker-leased',
1388+
'lease_expires_at' => $now->copy()
1389+
->addSeconds(10),
1390+
'last_dispatch_attempt_at' => $now->copy()
1391+
->subSeconds(400),
1392+
'last_dispatch_error' => 'Previous dispatch attempt failed before lease grant.',
1393+
'created_at' => $now->copy()
1394+
->subSeconds(60),
1395+
]);
1396+
1397+
$snapshot = OperatorMetrics::snapshot($now);
1398+
1399+
$expectedOldestDispatchFailedAt = $now->copy()
1400+
->subSeconds(90)
1401+
->toJSON();
1402+
1403+
$this->assertSame(2, $snapshot['tasks']['dispatch_failed']);
1404+
$this->assertSame($expectedOldestDispatchFailedAt, $snapshot['tasks']['oldest_dispatch_failed_at']);
1405+
$this->assertSame(90 * 1000, $snapshot['tasks']['max_dispatch_failed_age_ms']);
1406+
1407+
$healthSnapshot = HealthCheck::snapshot($now);
1408+
$taskTransport = collect($healthSnapshot['checks'])->firstWhere('name', 'task_transport');
1409+
$this->assertNotNull($taskTransport);
1410+
$this->assertSame(2, $taskTransport['data']['dispatch_failed_tasks']);
1411+
$this->assertSame($expectedOldestDispatchFailedAt, $taskTransport['data']['oldest_dispatch_failed_at']);
1412+
$this->assertSame(90 * 1000, $taskTransport['data']['max_dispatch_failed_age_ms']);
1413+
}
1414+
1415+
public function testSnapshotReportsDispatchFailedAgeAsZeroWhenNoTasksFailedToDispatch(): void
1416+
{
1417+
Carbon::setTestNow('2026-04-09 12:00:00');
1418+
$this->beforeApplicationDestroyed(static function (): void {
1419+
Carbon::setTestNow();
1420+
});
1421+
1422+
$now = Carbon::now();
1423+
1424+
$run = $this->createRunWithSummary(
1425+
instanceId: 'dispatch-failed-none-instance',
1426+
runId: '01JDSPFNONRUN0000000000001',
1427+
status: 'running',
1428+
statusBucket: 'running',
1429+
livenessState: 'running',
1430+
);
1431+
1432+
// Fresh healthy ready task — never failed to dispatch.
1433+
$this->createTask($run, '01JDSPFNONTASK000000000001', TaskStatus::Ready->value, [
1434+
'available_at' => $now->copy()
1435+
->subSecond(),
1436+
'last_dispatched_at' => $now->copy()
1437+
->subSecond(),
1438+
'created_at' => $now->copy()
1439+
->subSecond(),
1440+
]);
1441+
1442+
$snapshot = OperatorMetrics::snapshot($now);
1443+
1444+
$this->assertSame(0, $snapshot['tasks']['dispatch_failed']);
1445+
$this->assertNull($snapshot['tasks']['oldest_dispatch_failed_at']);
1446+
$this->assertSame(0, $snapshot['tasks']['max_dispatch_failed_age_ms']);
1447+
1448+
$healthSnapshot = HealthCheck::snapshot($now);
1449+
$taskTransport = collect($healthSnapshot['checks'])->firstWhere('name', 'task_transport');
1450+
$this->assertNotNull($taskTransport);
1451+
$this->assertSame(0, $taskTransport['data']['dispatch_failed_tasks']);
1452+
$this->assertNull($taskTransport['data']['oldest_dispatch_failed_at']);
1453+
$this->assertSame(0, $taskTransport['data']['max_dispatch_failed_age_ms']);
1454+
}
1455+
13011456
public function testSnapshotReportsRunWaitAgeAsZeroWhenNoRunsAreWaiting(): void
13021457
{
13031458
Carbon::setTestNow('2026-04-09 12:00:00');

tests/Unit/V2/HealthCheckTest.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -558,6 +558,9 @@ public function testSnapshotWarnsWhenOpenRunHasNoDurableResumePath(): void
558558
$this->assertSame(0, $taskTransport['data']['claim_failed_tasks']);
559559
$this->assertNull($taskTransport['data']['oldest_claim_failed_at']);
560560
$this->assertSame(0, $taskTransport['data']['max_claim_failed_age_ms']);
561+
$this->assertSame(0, $taskTransport['data']['dispatch_failed_tasks']);
562+
$this->assertNull($taskTransport['data']['oldest_dispatch_failed_at']);
563+
$this->assertSame(0, $taskTransport['data']['max_dispatch_failed_age_ms']);
561564
$this->assertSame(1, $taskTransport['data']['repair_needed_runs']);
562565
$this->assertSame('warning', $resumePaths['status']);
563566
$this->assertSame(1, $resumePaths['data']['repair_needed_runs']);

tests/Unit/V2/RolloutSafetyDocumentationTest.php

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,8 @@ final class RolloutSafetyDocumentationTest extends TestCase
130130
'max_ready_due_age_ms',
131131
'oldest_claim_failed_at',
132132
'max_claim_failed_age_ms',
133+
'oldest_dispatch_failed_at',
134+
'max_dispatch_failed_age_ms',
133135
'oldest_retrying_started_at',
134136
'max_retrying_age_ms',
135137
'unhealthy',
@@ -387,6 +389,17 @@ public function testContractDocumentFreezesClaimFailedAgeRow(): void
387389
);
388390
}
389391

392+
public function testContractDocumentFreezesDispatchFailedAgeRow(): void
393+
{
394+
$contents = $this->documentContents();
395+
396+
$this->assertMatchesRegularExpression(
397+
'/\|\s*`tasks`\s*\|[^|]*`oldest_dispatch_failed_at`[^|]*`max_dispatch_failed_age_ms`/',
398+
$contents,
399+
'Rollout safety contract must pin the tasks dispatch-failed age row so operators can read "how long has the worst-case task been sitting with an uncleared dispatch error?" — the primary transport-failure age indicator on the dispatch path — from OperatorMetrics::snapshot() without walking workflow_tasks.',
400+
);
401+
}
402+
390403
public function testContractDocumentFreezesRetryingActivityAgeRow(): void
391404
{
392405
$contents = $this->documentContents();

0 commit comments

Comments
 (0)