Skip to content

Commit 028efdb

Browse files
Durable Workflow Builderdurable-workflow-ops
authored andcommitted
Scope operator observability snapshots by namespace
1 parent bc32e08 commit 028efdb

6 files changed

Lines changed: 468 additions & 269 deletions

src/V2/Contracts/OperatorObservabilityRepository.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,10 @@ public function runHistoryExport(
4646
/**
4747
* @return array<string, mixed>
4848
*/
49-
public function dashboardSummary(?CarbonInterface $now = null): array;
49+
public function dashboardSummary(?CarbonInterface $now = null, ?string $namespace = null): array;
5050

5151
/**
5252
* @return array<string, mixed>
5353
*/
54-
public function metrics(?CarbonInterface $now = null): array;
54+
public function metrics(?CarbonInterface $now = null, ?string $namespace = null): array;
5555
}

src/V2/Support/DefaultOperatorObservabilityRepository.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,16 @@ public function runHistoryExport(
4242
/**
4343
* @return array<string, mixed>
4444
*/
45-
public function dashboardSummary(?CarbonInterface $now = null): array
45+
public function dashboardSummary(?CarbonInterface $now = null, ?string $namespace = null): array
4646
{
47-
return OperatorDashboardSummary::snapshot($now);
47+
return OperatorDashboardSummary::snapshot($now, $namespace);
4848
}
4949

5050
/**
5151
* @return array<string, mixed>
5252
*/
53-
public function metrics(?CarbonInterface $now = null): array
53+
public function metrics(?CarbonInterface $now = null, ?string $namespace = null): array
5454
{
55-
return OperatorMetrics::snapshot($now);
55+
return OperatorMetrics::snapshot($now, $namespace);
5656
}
5757
}

src/V2/Support/OperatorDashboardSummary.php

Lines changed: 74 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,26 @@ final class OperatorDashboardSummary
1616
/**
1717
* @return array<string, mixed>
1818
*/
19-
public static function snapshot(?CarbonInterface $now = null): array
19+
public static function snapshot(?CarbonInterface $now = null, ?string $namespace = null): array
2020
{
2121
$now ??= now();
22-
$flowsPastHour = self::flowsPastHour($now);
22+
$namespace = self::normalizeNamespace($namespace);
23+
$flowsPastHour = self::flowsPastHour($now, $namespace);
2324

2425
return [
25-
'flows' => self::totalFlows(),
26+
'flows' => self::totalFlows($namespace),
2627
'flows_per_minute' => $flowsPastHour / 60,
2728
'flows_past_hour' => $flowsPastHour,
28-
'exceptions_past_hour' => self::exceptionsPastHour($now),
29-
'failed_flows_past_week' => self::failedFlowsPastWeek($now),
30-
'max_wait_time_workflow' => self::modelArray(self::maxWaitTimeWorkflow()),
31-
'max_duration_workflow' => self::modelArray(self::maxDurationWorkflow()),
32-
'max_exceptions_workflow' => self::modelArray(self::maxExceptionsWorkflow()),
33-
'fleet_overview' => self::fleetOverview($now),
34-
'workflow_type_health' => self::workflowTypeHealth($now),
35-
'needs_attention' => self::needsAttention($now),
36-
'fleet_trends_series' => self::fleetTrendsSeries($now),
37-
'operator_metrics' => OperatorMetrics::snapshot($now),
29+
'exceptions_past_hour' => self::exceptionsPastHour($now, $namespace),
30+
'failed_flows_past_week' => self::failedFlowsPastWeek($now, $namespace),
31+
'max_wait_time_workflow' => self::modelArray(self::maxWaitTimeWorkflow($namespace)),
32+
'max_duration_workflow' => self::modelArray(self::maxDurationWorkflow($namespace)),
33+
'max_exceptions_workflow' => self::modelArray(self::maxExceptionsWorkflow($namespace)),
34+
'fleet_overview' => self::fleetOverview($now, $namespace),
35+
'workflow_type_health' => self::workflowTypeHealth($now, $namespace),
36+
'needs_attention' => self::needsAttention($now, $namespace),
37+
'fleet_trends_series' => self::fleetTrendsSeries($now, $namespace),
38+
'operator_metrics' => OperatorMetrics::snapshot($now, $namespace),
3839
];
3940
}
4041

@@ -44,15 +45,16 @@ public static function snapshot(?CarbonInterface $now = null): array
4445
*
4546
* @return array<string, mixed>
4647
*/
47-
public static function fleetTrendsSeries(?CarbonInterface $now = null): array
48+
public static function fleetTrendsSeries(?CarbonInterface $now = null, ?string $namespace = null): array
4849
{
4950
$now ??= now();
51+
$namespace = self::normalizeNamespace($namespace);
5052
$weekAgo = $now->copy()
5153
->subWeek();
5254

5355
// Fetch terminal-run rows in the last 7 days and bucket them hourly in PHP
5456
// to avoid DB-specific date formatting (DATE_FORMAT is MySQL-only).
55-
$rows = self::summaryModel()::query()
57+
$rows = self::summaryQuery($namespace)
5658
->select('closed_at', 'status_bucket')
5759
->whereNotNull('closed_at')
5860
->where('closed_at', '>=', $weekAgo)
@@ -98,7 +100,7 @@ public static function fleetTrendsSeries(?CarbonInterface $now = null): array
98100
*
99101
* @return array<string, mixed>
100102
*/
101-
private static function fleetOverview(CarbonInterface $now): array
103+
private static function fleetOverview(CarbonInterface $now, ?string $namespace): array
102104
{
103105
$hourAgo = $now->copy()
104106
->subHour();
@@ -108,15 +110,15 @@ private static function fleetOverview(CarbonInterface $now): array
108110
->subWeek();
109111

110112
// Current counts by status bucket
111-
$currentCounts = self::summaryModel()::query()
113+
$currentCounts = self::summaryQuery($namespace)
112114
->select('status_bucket', DB::raw('COUNT(*) as count'))
113115
->where('status_bucket', '!=', 'completed') // Only active statuses
114116
->groupBy('status_bucket')
115117
->pluck('count', 'status_bucket')
116118
->toArray();
117119

118120
// Trend over last hour (completed in last hour)
119-
$hourTrend = self::summaryModel()::query()
121+
$hourTrend = self::summaryQuery($namespace)
120122
->select('status_bucket', DB::raw('COUNT(*) as count'))
121123
->where('closed_at', '>=', $hourAgo)
122124
->whereIn('status_bucket', ['completed', 'failed'])
@@ -125,7 +127,7 @@ private static function fleetOverview(CarbonInterface $now): array
125127
->toArray();
126128

127129
// Trend over last day
128-
$dayTrend = self::summaryModel()::query()
130+
$dayTrend = self::summaryQuery($namespace)
129131
->select('status_bucket', DB::raw('COUNT(*) as count'))
130132
->where('closed_at', '>=', $dayAgo)
131133
->whereIn('status_bucket', ['completed', 'failed'])
@@ -134,7 +136,7 @@ private static function fleetOverview(CarbonInterface $now): array
134136
->toArray();
135137

136138
// Trend over last week
137-
$weekTrend = self::summaryModel()::query()
139+
$weekTrend = self::summaryQuery($namespace)
138140
->select('status_bucket', DB::raw('COUNT(*) as count'))
139141
->where('closed_at', '>=', $weekAgo)
140142
->whereIn('status_bucket', ['completed', 'failed'])
@@ -169,13 +171,13 @@ private static function fleetOverview(CarbonInterface $now): array
169171
*
170172
* @return array<int, array<string, mixed>>
171173
*/
172-
private static function workflowTypeHealth(CarbonInterface $now): array
174+
private static function workflowTypeHealth(CarbonInterface $now, ?string $namespace): array
173175
{
174176
$weekAgo = $now->copy()
175177
->subWeek();
176178

177179
// Get top 10 workflow types by volume in the last week
178-
$types = self::summaryModel()::query()
180+
$types = self::summaryQuery($namespace)
179181
->select('workflow_type', DB::raw('COUNT(*) as total_runs'))
180182
->where('created_at', '>=', $weekAgo)
181183
->groupBy('workflow_type')
@@ -188,7 +190,7 @@ private static function workflowTypeHealth(CarbonInterface $now): array
188190

189191
foreach ($types as $workflowType => $totalRuns) {
190192
// Get status breakdown for this type
191-
$statusCounts = self::summaryModel()::query()
193+
$statusCounts = self::summaryQuery($namespace)
192194
->select('status', DB::raw('COUNT(*) as count'))
193195
->where('workflow_type', $workflowType)
194196
->where('created_at', '>=', $weekAgo)
@@ -205,7 +207,7 @@ private static function workflowTypeHealth(CarbonInterface $now): array
205207
$passRate = $terminalRuns > 0 ? ($completed / $terminalRuns) * 100 : 0;
206208

207209
// Get median duration for completed runs
208-
$durations = self::summaryModel()::query()
210+
$durations = self::summaryQuery($namespace)
209211
->where('workflow_type', $workflowType)
210212
->where('status', RunStatus::Completed->value)
211213
->where('created_at', '>=', $weekAgo)
@@ -245,7 +247,7 @@ private static function workflowTypeHealth(CarbonInterface $now): array
245247
*
246248
* @return array<string, mixed>
247249
*/
248-
private static function needsAttention(CarbonInterface $now): array
250+
private static function needsAttention(CarbonInterface $now, ?string $namespace): array
249251
{
250252
$alerts = [];
251253

@@ -270,7 +272,7 @@ private static function needsAttention(CarbonInterface $now): array
270272
// 2. Long-running workflows (running > 1 hour without wait)
271273
$longRunningThreshold = $now->copy()
272274
->subHour();
273-
$longRunners = self::summaryModel()::query()
275+
$longRunners = self::summaryQuery($namespace)
274276
->where('status_bucket', 'running')
275277
->where('started_at', '<', $longRunningThreshold)
276278
->whereNull('wait_started_at') // Not waiting
@@ -289,7 +291,7 @@ private static function needsAttention(CarbonInterface $now): array
289291
// 3. Retry storms (workflows with 10+ exceptions in last hour)
290292
$hourAgo = $now->copy()
291293
->subHour();
292-
$retryStorms = self::summaryModel()::query()
294+
$retryStorms = self::summaryQuery($namespace)
293295
->where('status_bucket', 'running')
294296
->where('updated_at', '>=', $hourAgo)
295297
->where('exception_count', '>=', 10)
@@ -306,12 +308,12 @@ private static function needsAttention(CarbonInterface $now): array
306308
}
307309

308310
// 4. High failure rate in last hour
309-
$recentFailed = self::summaryModel()::query()
311+
$recentFailed = self::summaryQuery($namespace)
310312
->where('status', RunStatus::Failed->value)
311313
->where('closed_at', '>=', $hourAgo)
312314
->count();
313315

314-
$recentCompleted = self::summaryModel()::query()
316+
$recentCompleted = self::summaryQuery($namespace)
315317
->where('status', RunStatus::Completed->value)
316318
->where('closed_at', '>=', $hourAgo)
317319
->count();
@@ -338,7 +340,7 @@ private static function needsAttention(CarbonInterface $now): array
338340
// 5. Workflows waiting too long (wait > 30 minutes)
339341
$longWaitThreshold = $now->copy()
340342
->subMinutes(30);
341-
$longWaits = self::summaryModel()::query()
343+
$longWaits = self::summaryQuery($namespace)
342344
->where('status_bucket', 'running')
343345
->whereNotNull('wait_started_at')
344346
->where('wait_started_at', '<', $longWaitThreshold)
@@ -362,17 +364,17 @@ private static function needsAttention(CarbonInterface $now): array
362364
];
363365
}
364366

365-
private static function totalFlows(): int
367+
private static function totalFlows(?string $namespace): int
366368
{
367-
return self::summaryModel()::query()->count();
369+
return self::summaryQuery($namespace)->count();
368370
}
369371

370-
private static function flowsPastHour(CarbonInterface $now): int
372+
private static function flowsPastHour(CarbonInterface $now, ?string $namespace): int
371373
{
372374
$cutoff = $now->copy()
373375
->subHour();
374376

375-
return self::summaryModel()::query()
377+
return self::summaryQuery($namespace)
376378
->where(static function ($query) use ($cutoff): void {
377379
$query->where('sort_timestamp', '>=', $cutoff)
378380
->orWhere(static function ($fallback) use ($cutoff): void {
@@ -383,24 +385,30 @@ private static function flowsPastHour(CarbonInterface $now): int
383385
->count();
384386
}
385387

386-
private static function exceptionsPastHour(CarbonInterface $now): int
388+
private static function exceptionsPastHour(CarbonInterface $now, ?string $namespace): int
387389
{
388-
return self::failureModel()::query()
390+
$query = self::failureModel()::query()
389391
->where('created_at', '>=', $now->copy()->subHour())
390-
->count();
392+
->whereHas('run', static function ($run) use ($namespace): void {
393+
if ($namespace !== null) {
394+
$run->where('namespace', $namespace);
395+
}
396+
});
397+
398+
return $query->count();
391399
}
392400

393-
private static function failedFlowsPastWeek(CarbonInterface $now): int
401+
private static function failedFlowsPastWeek(CarbonInterface $now, ?string $namespace): int
394402
{
395-
return self::summaryModel()::query()
403+
return self::summaryQuery($namespace)
396404
->where('status', RunStatus::Failed->value)
397405
->where('updated_at', '>=', $now->copy()->subDays(7))
398406
->count();
399407
}
400408

401-
private static function maxWaitTimeWorkflow(): ?WorkflowRunSummary
409+
private static function maxWaitTimeWorkflow(?string $namespace): ?WorkflowRunSummary
402410
{
403-
return self::summaryModel()::query()
411+
return self::summaryQuery($namespace)
404412
->where('status_bucket', 'running')
405413
->whereNotNull('wait_started_at')
406414
->orderBy('wait_started_at')
@@ -410,9 +418,9 @@ private static function maxWaitTimeWorkflow(): ?WorkflowRunSummary
410418
->first();
411419
}
412420

413-
private static function maxDurationWorkflow(): ?WorkflowRunSummary
421+
private static function maxDurationWorkflow(?string $namespace): ?WorkflowRunSummary
414422
{
415-
return self::summaryModel()::query()
423+
return self::summaryQuery($namespace)
416424
->whereNotNull('duration_ms')
417425
->orderByDesc('duration_ms')
418426
->orderByDesc('sort_timestamp')
@@ -421,9 +429,9 @@ private static function maxDurationWorkflow(): ?WorkflowRunSummary
421429
->first();
422430
}
423431

424-
private static function maxExceptionsWorkflow(): ?WorkflowRunSummary
432+
private static function maxExceptionsWorkflow(?string $namespace): ?WorkflowRunSummary
425433
{
426-
return self::summaryModel()::query()
434+
return self::summaryQuery($namespace)
427435
->where('exception_count', '>', 0)
428436
->orderByDesc('exception_count')
429437
->orderByDesc('sort_timestamp')
@@ -451,6 +459,27 @@ private static function summaryModel(): string
451459
return $model;
452460
}
453461

462+
private static function summaryQuery(?string $namespace)
463+
{
464+
$model = self::summaryModel();
465+
$query = $model::query();
466+
467+
if ($namespace !== null) {
468+
$query->where((new $model())->getTable() . '.namespace', $namespace);
469+
}
470+
471+
return $query;
472+
}
473+
474+
private static function normalizeNamespace(?string $namespace): ?string
475+
{
476+
if ($namespace === null || trim($namespace) === '') {
477+
return null;
478+
}
479+
480+
return trim($namespace);
481+
}
482+
454483
/**
455484
* @return class-string<WorkflowFailure>
456485
*/

0 commit comments

Comments
 (0)