Skip to content

Commit db2cef8

Browse files
Surface run wait age on operator metrics and durable_resume_paths health
Adds runs.waiting, runs.oldest_wait_started_at, and runs.max_wait_age_ms to OperatorMetrics::snapshot() so operators can answer "how long has the worst-case run been waiting at a durable resume point?" from the metric alone, mirroring the existing tasks.oldest_lease_expired_at, tasks.oldest_ready_due_at, and backlog.oldest_compatibility_blocked_started_at shapes. The signal counts every kind of wait — signal, update, timer, and compatibility-blocked — because each is a durable resume point the system is parked on; consumers that want to isolate the non-compatibility share can subtract runs.compatibility_blocked. Forwards the same trio on HealthCheck::durableResumePathCheck() data as waiting_runs / oldest_wait_started_at / max_wait_age_ms so the wait age is legible from /healthz without re-reading the metrics snapshot. The check's escalation predicate stays unchanged (it still escalates only on repair_needed_runs); the wait-age data is observability, since a long-parked wait is not by itself a stuck condition the system can repair without application-level action. Pins the new keys in docs/architecture/rollout-safety.md frozen metric table and adds a wait-row regression assertion in RolloutSafetyDocumentationTest. Extends V2OperatorMetricsTest with two focused tests proving the metric counts non-compatibility waits and returns null/0 when no runs are waiting, plus an existing-fixture assertion in the broad metrics test. Extends HealthCheckTest's testSnapshotWarnsWhenRunSummaryProjectionSchemaIsOutdated assertion to pin the new health-check fields.
1 parent f28486a commit db2cef8

6 files changed

Lines changed: 209 additions & 5 deletions

File tree

docs/architecture/rollout-safety.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,7 @@ change.
405405
| `runs` | `repair_needed` | open runs with `liveness_state = repair_needed` |
406406
| `runs` | `claim_failed` | runs whose most recent task claim failed |
407407
| `runs` | `compatibility_blocked` | runs blocked by compatibility mismatch |
408+
| `runs` | `waiting`, `oldest_wait_started_at`, `max_wait_age_ms` | running runs currently parked at a durable resume point (`status_bucket = 'running'` and `wait_started_at IS NOT NULL`), the earliest `wait_started_at` among them, and the largest wait age in milliseconds. Mirrors the `backlog.oldest_compatibility_blocked_started_at` / `max_compatibility_blocked_age_ms` and `tasks.oldest_lease_expired_at` / `max_lease_expired_age_ms` shapes so operators can answer "how long has the worst-case run been waiting at a signal, update, timer, or compatible-worker arrival?" from the metric alone. The signal is unconditional and includes compatibility-blocked waits; consumers that want the non-compatibility share can subtract `runs.compatibility_blocked` and `backlog.oldest_compatibility_blocked_started_at`. |
408409
| `tasks` | `ready`, `ready_due`, `delayed`, `leased` | queue depth by phase |
409410
| `tasks` | `dispatch_failed`, `claim_failed` | transport failure counts |
410411
| `tasks` | `dispatch_overdue`, `lease_expired` | lease and dispatch timing |
@@ -494,6 +495,18 @@ are authoritative and how they surface.
494495
- **Stale projection.** A projection behind the authoritative
495496
history surfaces through the `run_summary_projection` and
496497
`selected_run_projections` checks on `HealthCheck::snapshot()`.
498+
- **Long-parked wait.** A running run whose projector has recorded
499+
a `wait_started_at` is counted under `runs.waiting`, and its
500+
worst-case wait age is surfaced through `runs.oldest_wait_started_at`
501+
and `runs.max_wait_age_ms`, both forwarded on the
502+
`durable_resume_paths` health check (`waiting_runs`,
503+
`oldest_wait_started_at`, `max_wait_age_ms`). The signal includes
504+
every kind of wait — signal, update, timer, and compatibility-blocked
505+
wait — because each is a durable resume point the system is
506+
parked on. The check itself escalates only on `repair_needed_runs`;
507+
the wait-age data is observability so operators can decide whether
508+
the worst-case wait reflects healthy long-running work or a lost
509+
signal/update that the application must resend.
497510

498511
Guarantees:
499512

src/V2/Support/HealthCheck.php

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,11 @@ public static function snapshot(?CarbonInterface $now = null): array
2828
self::historyRetentionInvariantCheck($metrics['history'] ?? []),
2929
self::commandContractCheck($metrics['command_contracts'] ?? []),
3030
self::taskTransportCheck($metrics['tasks'] ?? [], $metrics['backlog'] ?? []),
31-
self::durableResumePathCheck($metrics['backlog'] ?? [], $metrics['repair'] ?? []),
31+
self::durableResumePathCheck(
32+
$metrics['backlog'] ?? [],
33+
$metrics['repair'] ?? [],
34+
$metrics['runs'] ?? [],
35+
),
3236
self::workerCompatibilityCheck($metrics['workers'] ?? []),
3337
self::schedulerRoleCheck($metrics['schedules'] ?? []),
3438
self::longPollWakeAccelerationCheck(),
@@ -247,9 +251,10 @@ private static function taskTransportCheck(array $tasks, array $backlog): array
247251
/**
248252
* @param array<string, mixed> $backlog
249253
* @param array<string, mixed> $repair
254+
* @param array<string, mixed> $runs
250255
* @return array<string, mixed>
251256
*/
252-
private static function durableResumePathCheck(array $backlog, array $repair): array
257+
private static function durableResumePathCheck(array $backlog, array $repair, array $runs): array
253258
{
254259
$repairNeededRuns = self::integer($backlog['repair_needed_runs'] ?? 0);
255260

@@ -268,6 +273,11 @@ private static function durableResumePathCheck(array $backlog, array $repair): a
268273
? $repair['oldest_missing_run_started_at']
269274
: null,
270275
'max_missing_run_age_ms' => self::integer($repair['max_missing_run_age_ms'] ?? 0),
276+
'waiting_runs' => self::integer($runs['waiting'] ?? 0),
277+
'oldest_wait_started_at' => is_string($runs['oldest_wait_started_at'] ?? null)
278+
? $runs['oldest_wait_started_at']
279+
: null,
280+
'max_wait_age_ms' => self::integer($runs['max_wait_age_ms'] ?? 0),
271281
],
272282
);
273283
}

src/V2/Support/OperatorMetrics.php

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public static function snapshot(?CarbonInterface $now = null, ?string $namespace
4040

4141
return [
4242
'generated_at' => $now->toJSON(),
43-
'runs' => self::runMetrics($namespace),
43+
'runs' => self::runMetrics($now, $namespace),
4444
'tasks' => self::taskMetrics($now, $namespace),
4545
'activities' => self::activityMetrics($namespace),
4646
'backlog' => self::backlogMetrics($now, $namespace),
@@ -88,10 +88,12 @@ private static function matchingRoleSnapshot(): array
8888
}
8989

9090
/**
91-
* @return array<string, int>
91+
* @return array<string, int|string|null>
9292
*/
93-
private static function runMetrics(?string $namespace): array
93+
private static function runMetrics(CarbonInterface $now, ?string $namespace): array
9494
{
95+
$oldestWaitStartedAt = self::oldestRunWaitStartedAt($namespace);
96+
9597
return [
9698
'total' => self::summaryQuery($namespace)->count(),
9799
'current' => self::summaryQuery($namespace)->where('is_current_run', true)->count(),
@@ -104,6 +106,11 @@ private static function runMetrics(?string $namespace): array
104106
'repair_needed' => self::summaryQuery($namespace)->where('liveness_state', 'repair_needed')->count(),
105107
'claim_failed' => self::claimFailedRuns($namespace),
106108
'compatibility_blocked' => self::compatibilityBlockedRuns($namespace),
109+
'waiting' => self::waitingRuns($namespace),
110+
'oldest_wait_started_at' => $oldestWaitStartedAt?->toJSON(),
111+
'max_wait_age_ms' => $oldestWaitStartedAt === null
112+
? 0
113+
: (int) $oldestWaitStartedAt->diffInMilliseconds($now),
107114
];
108115
}
109116

@@ -711,6 +718,50 @@ private static function claimFailedRuns(?string $namespace): int
711718
->count();
712719
}
713720

721+
/**
722+
* Open runs that are currently parked at a wait point — running runs whose
723+
* `RunSummaryProjector` has recorded a `wait_started_at` because they are
724+
* blocked on a signal, update, timer, or compatible-worker arrival.
725+
*
726+
* Counted unconditionally so the worst-case wait age is legible regardless
727+
* of what kind of wait it is; consumers that want to exclude
728+
* compatibility-blocked waits can subtract `runs.compatibility_blocked`.
729+
*/
730+
private static function waitingRuns(?string $namespace): int
731+
{
732+
return self::summaryQuery($namespace)
733+
->where('status_bucket', 'running')
734+
->whereNotNull('wait_started_at')
735+
->count();
736+
}
737+
738+
/**
739+
* Earliest `wait_started_at` timestamp across runs currently parked at a
740+
* wait point. Rollout-safety surfaces this alongside `runs.waiting` so
741+
* operators can answer "how long has the worst-case run been waiting at
742+
* a durable resume point?" from the metric alone, mirroring the existing
743+
* `backlog.oldest_compatibility_blocked_started_at` and
744+
* `tasks.oldest_lease_expired_at` shapes. The signal includes
745+
* compatibility-blocked waits because they too are durable-resume points
746+
* the system is waiting on; consumers that need to isolate the
747+
* non-compatibility share can use `backlog.oldest_compatibility_blocked_started_at`.
748+
*/
749+
private static function oldestRunWaitStartedAt(?string $namespace): ?CarbonInterface
750+
{
751+
/** @var WorkflowRunSummary|null $summary */
752+
$summary = self::summaryQuery($namespace)
753+
->where('status_bucket', 'running')
754+
->whereNotNull('wait_started_at')
755+
->orderBy('wait_started_at')
756+
->first();
757+
758+
if (! $summary instanceof WorkflowRunSummary) {
759+
return null;
760+
}
761+
762+
return $summary->wait_started_at;
763+
}
764+
714765
private static function retryingActivities(?string $namespace): int
715766
{
716767
return self::scopedRunModelQuery(self::activityExecutionModel(), $namespace)

tests/Feature/V2/V2OperatorMetricsTest.php

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,14 @@ public function testSnapshotSummarizesDurableBacklogRepairCompatibilityAndWorker
202202
$this->assertSame(1, $snapshot['runs']['repair_needed']);
203203
$this->assertSame(1, $snapshot['runs']['claim_failed']);
204204
$this->assertSame(1, $snapshot['runs']['compatibility_blocked']);
205+
$this->assertSame(1, $snapshot['runs']['waiting']);
206+
$this->assertSame(
207+
Carbon::parse('2026-04-09 12:00:00')
208+
->subMinutes(4)
209+
->toJSON(),
210+
$snapshot['runs']['oldest_wait_started_at'],
211+
);
212+
$this->assertSame(4 * 60 * 1000, $snapshot['runs']['max_wait_age_ms']);
205213
$this->assertSame(7, $snapshot['tasks']['open']);
206214
$this->assertSame(5, $snapshot['tasks']['ready']);
207215
$this->assertSame(4, $snapshot['tasks']['ready_due']);
@@ -931,6 +939,111 @@ public function testSnapshotSurfacesSchedulerRoleHealth(): void
931939
$this->assertSame(2, $snapshot['schedules']['failures_total']);
932940
}
933941

942+
public function testSnapshotSurfacesRunWaitAgeForEveryRunningWaitNotJustCompatibilityBlocked(): void
943+
{
944+
Carbon::setTestNow('2026-04-09 12:00:00');
945+
$this->beforeApplicationDestroyed(static function (): void {
946+
Carbon::setTestNow();
947+
});
948+
949+
$now = Carbon::now();
950+
951+
// Signal-wait run parked the longest — earliest wait_started_at wins.
952+
$signalWait = $this->createRunWithSummary(
953+
instanceId: 'wait-age-instance-signal',
954+
runId: '01JWAITSIGNALRUN0000000001',
955+
status: 'waiting',
956+
statusBucket: 'running',
957+
livenessState: 'workflow_task_waiting_for_signal',
958+
);
959+
WorkflowRunSummary::query()
960+
->whereKey($signalWait->id)
961+
->update([
962+
'wait_started_at' => $now->copy()
963+
->subMinutes(7),
964+
]);
965+
966+
// Compatibility-blocked wait — counted under runs.waiting too.
967+
$compatibilityWait = $this->createRunWithSummary(
968+
instanceId: 'wait-age-instance-compat',
969+
runId: '01JWAITCOMPATRUN0000000002',
970+
status: 'waiting',
971+
statusBucket: 'running',
972+
livenessState: 'workflow_task_waiting_for_compatible_worker',
973+
);
974+
WorkflowRunSummary::query()
975+
->whereKey($compatibilityWait->id)
976+
->update([
977+
'wait_started_at' => $now->copy()
978+
->subMinutes(2),
979+
]);
980+
981+
// Running run that is actively executing — must not count.
982+
$this->createRunWithSummary(
983+
instanceId: 'wait-age-instance-active',
984+
runId: '01JWAITACTIVERUN0000000003',
985+
status: 'running',
986+
statusBucket: 'running',
987+
livenessState: 'running',
988+
);
989+
990+
// Closed run that previously had a wait — must not count.
991+
$closedWait = $this->createRunWithSummary(
992+
instanceId: 'wait-age-instance-closed',
993+
runId: '01JWAITCLOSEDRUN0000000004',
994+
status: 'completed',
995+
statusBucket: 'completed',
996+
livenessState: 'closed',
997+
);
998+
WorkflowRunSummary::query()
999+
->whereKey($closedWait->id)
1000+
->update([
1001+
'wait_started_at' => $now->copy()
1002+
->subHours(2),
1003+
]);
1004+
1005+
$snapshot = OperatorMetrics::snapshot($now);
1006+
1007+
$expectedOldestWaitAt = $now->copy()
1008+
->subMinutes(7)
1009+
->toJSON();
1010+
1011+
$this->assertSame(2, $snapshot['runs']['waiting']);
1012+
$this->assertSame($expectedOldestWaitAt, $snapshot['runs']['oldest_wait_started_at']);
1013+
$this->assertSame(7 * 60 * 1000, $snapshot['runs']['max_wait_age_ms']);
1014+
1015+
$healthSnapshot = HealthCheck::snapshot($now);
1016+
$resumePaths = collect($healthSnapshot['checks'])->firstWhere('name', 'durable_resume_paths');
1017+
$this->assertNotNull($resumePaths);
1018+
$this->assertSame(2, $resumePaths['data']['waiting_runs']);
1019+
$this->assertSame($expectedOldestWaitAt, $resumePaths['data']['oldest_wait_started_at']);
1020+
$this->assertSame(7 * 60 * 1000, $resumePaths['data']['max_wait_age_ms']);
1021+
}
1022+
1023+
public function testSnapshotReportsRunWaitAgeAsZeroWhenNoRunsAreWaiting(): void
1024+
{
1025+
Carbon::setTestNow('2026-04-09 12:00:00');
1026+
$this->beforeApplicationDestroyed(static function (): void {
1027+
Carbon::setTestNow();
1028+
});
1029+
1030+
$now = Carbon::now();
1031+
1032+
$this->createRunWithSummary(
1033+
instanceId: 'no-wait-instance-active',
1034+
runId: '01JWAITNONERUN00000000001',
1035+
status: 'running',
1036+
statusBucket: 'running',
1037+
livenessState: 'running',
1038+
);
1039+
1040+
$snapshot = OperatorMetrics::snapshot($now);
1041+
1042+
$this->assertSame(0, $snapshot['runs']['waiting']);
1043+
$this->assertNull($snapshot['runs']['oldest_wait_started_at']);
1044+
$this->assertSame(0, $snapshot['runs']['max_wait_age_ms']);
1045+
}
1046+
9341047
public function testSnapshotReportsInWorkerMatchingRoleShapeByDefault(): void
9351048
{
9361049
config()->set('workflows.v2.matching_role.queue_wake_enabled', true);

tests/Unit/V2/HealthCheckTest.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,9 @@ public function testSnapshotWarnsWhenOpenRunHasNoDurableResumePath(): void
559559
$this->assertSame(1, $resumePaths['data']['selected_missing_task_candidates']);
560560
$this->assertSame('2026-04-09T11:55:00.000000Z', $resumePaths['data']['oldest_missing_run_started_at']);
561561
$this->assertSame(300000, $resumePaths['data']['max_missing_run_age_ms']);
562+
$this->assertSame(1, $resumePaths['data']['waiting_runs']);
563+
$this->assertSame('2026-04-09T11:55:00.000000Z', $resumePaths['data']['oldest_wait_started_at']);
564+
$this->assertSame(300000, $resumePaths['data']['max_wait_age_ms']);
562565
}
563566

564567
public function testSnapshotWarnsWhenRunSummaryProjectionSchemaIsOutdated(): void

tests/Unit/V2/RolloutSafetyDocumentationTest.php

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,9 @@ final class RolloutSafetyDocumentationTest extends TestCase
138138
'compatibility_blocked_runs',
139139
'oldest_compatibility_blocked_started_at',
140140
'max_compatibility_blocked_age_ms',
141+
'waiting',
142+
'oldest_wait_started_at',
143+
'max_wait_age_ms',
141144
'missing_task_candidates',
142145
'selected_missing_task_candidates',
143146
'oldest_missing_run_started_at',
@@ -347,6 +350,17 @@ public function testContractDocumentFreezesReadyDueAgeRow(): void
347350
);
348351
}
349352

353+
public function testContractDocumentFreezesRunWaitAgeRow(): void
354+
{
355+
$contents = $this->documentContents();
356+
357+
$this->assertMatchesRegularExpression(
358+
'/\|\s*`runs`\s*\|[^|]*`waiting`[^|]*`oldest_wait_started_at`[^|]*`max_wait_age_ms`/',
359+
$contents,
360+
'Rollout safety contract must pin the runs wait-age row so operators can read "how long has the worst-case run been waiting at a durable resume point?" from OperatorMetrics::snapshot() without scanning workflow_run_summaries.',
361+
);
362+
}
363+
350364
public function testContractDocumentFreezesHealthCheckNames(): void
351365
{
352366
$contents = $this->documentContents();

0 commit comments

Comments
 (0)