Skip to content

Commit 3372144

Browse files
Scope dashboard worker fleet metrics to the configured namespace
1 parent fec5b15 commit 3372144

2 files changed

Lines changed: 135 additions & 4 deletions

File tree

app/Repositories/Workflow/Infrastructure/V2WorkflowRepository.php

Lines changed: 107 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,14 @@
22

33
namespace Waterline\Repositories\Workflow\Infrastructure;
44

5+
use Carbon\CarbonInterface;
6+
use Waterline\Repositories\Workflow\Interfaces\WorkflowRepositoryInterface;
7+
use Waterline\Support\ActionabilityVisibilityFilters;
58
use Workflow\V2\Contracts\OperatorObservabilityRepository;
69
use Workflow\V2\Support\RunSummarySortKey;
710
use Workflow\V2\Support\SelectedRunLocator;
8-
use Waterline\Support\ActionabilityVisibilityFilters;
9-
use Waterline\Repositories\Workflow\Interfaces\WorkflowRepositoryInterface;
11+
use Workflow\V2\Support\WorkerCompatibility;
12+
use Workflow\V2\Support\WorkerCompatibilityFleet;
1013

1114
class V2WorkflowRepository implements WorkflowRepositoryInterface
1215
{
@@ -67,7 +70,15 @@ public function findFlowSelection(string $instanceId, ?string $runId = null)
6770

6871
public function dashboardStats(): array
6972
{
70-
return app(OperatorObservabilityRepository::class)->dashboardSummary(namespace: $this->namespace());
73+
$namespace = $this->namespace();
74+
$now = now();
75+
$summary = app(OperatorObservabilityRepository::class)->dashboardSummary($now, $namespace);
76+
$summary['operator_metrics']['workers'] = $this->scopedWorkerMetrics(
77+
$summary['operator_metrics']['workers'] ?? null,
78+
$namespace,
79+
);
80+
81+
return $summary;
7182
}
7283

7384
public function flowsPastHour(): int
@@ -139,7 +150,99 @@ public function totalFlows(): int
139150

140151
public function operatorMetrics()
141152
{
142-
return app(OperatorObservabilityRepository::class)->metrics(namespace: $this->namespace());
153+
$namespace = $this->namespace();
154+
$metrics = app(OperatorObservabilityRepository::class)->metrics(null, $namespace);
155+
$metrics['workers'] = $this->scopedWorkerMetrics($metrics['workers'] ?? null, $namespace);
156+
157+
return $metrics;
158+
}
159+
160+
/**
161+
* Keep Waterline compatible with older workflow alphas that still expose
162+
* a fleet-global workers snapshot even when the rest of the operator
163+
* metrics payload is namespace-scoped.
164+
*
165+
* @param mixed $workers
166+
* @return mixed
167+
*/
168+
private function scopedWorkerMetrics(mixed $workers, ?string $namespace): mixed
169+
{
170+
if (! is_array($workers) || $namespace === null) {
171+
return $workers;
172+
}
173+
174+
if (($workers['compatibility_namespace'] ?? null) === $namespace) {
175+
return $workers;
176+
}
177+
178+
$required = WorkerCompatibility::current();
179+
$snapshots = WorkerCompatibilityFleet::detailsForNamespace($namespace, $required);
180+
$workerIds = [];
181+
$supportingWorkerIds = [];
182+
$fleet = [];
183+
184+
foreach ($snapshots as $snapshot) {
185+
$workerId = is_string($snapshot['worker_id'] ?? null)
186+
? $snapshot['worker_id']
187+
: null;
188+
189+
if ($workerId === null) {
190+
continue;
191+
}
192+
193+
$workerIds[$workerId] = true;
194+
195+
if (($snapshot['supports_required'] ?? false) === true) {
196+
$supportingWorkerIds[$workerId] = true;
197+
}
198+
199+
$fleet[] = $this->fleetEntry($snapshot);
200+
}
201+
202+
$workers['compatibility_namespace'] = $namespace;
203+
$workers['required_compatibility'] = $required;
204+
$workers['active_workers'] = count($workerIds);
205+
$workers['active_worker_scopes'] = count($snapshots);
206+
$workers['active_workers_supporting_required'] = count($supportingWorkerIds);
207+
$workers['fleet'] = $fleet;
208+
209+
return $workers;
210+
}
211+
212+
/**
213+
* @param array<string, mixed> $snapshot
214+
* @return array<string, mixed>
215+
*/
216+
private function fleetEntry(array $snapshot): array
217+
{
218+
$recordedAt = $snapshot['recorded_at'] ?? null;
219+
$expiresAt = $snapshot['expires_at'] ?? null;
220+
$supported = is_array($snapshot['supported'] ?? null)
221+
? array_values(array_filter($snapshot['supported'], static fn ($value): bool => is_string($value)))
222+
: [];
223+
224+
return [
225+
'worker_id' => (string) ($snapshot['worker_id'] ?? ''),
226+
'namespace' => $this->stringOrNull($snapshot['namespace'] ?? null),
227+
'host' => $this->stringOrNull($snapshot['host'] ?? null),
228+
'process_id' => $this->stringOrNull($snapshot['process_id'] ?? null),
229+
'connection' => $this->stringOrNull($snapshot['connection'] ?? null),
230+
'queue' => $this->stringOrNull($snapshot['queue'] ?? null),
231+
'supported' => $supported,
232+
'supports_required' => ($snapshot['supports_required'] ?? false) === true,
233+
'recorded_at' => $recordedAt instanceof CarbonInterface ? $recordedAt->toJSON() : $this->stringOrNull(
234+
$recordedAt
235+
),
236+
'expires_at' => $expiresAt instanceof CarbonInterface ? $expiresAt->toJSON() : $this->stringOrNull(
237+
$expiresAt
238+
),
239+
'source' => is_string($snapshot['source'] ?? null) ? $snapshot['source'] : '',
240+
];
241+
}
242+
243+
private function stringOrNull(mixed $value): ?string
244+
{
245+
return is_string($value) ? $value : null;
143246
}
144247

145248
protected function orderedRunsQuery(?string $bucket = null)

tests/Feature/V2DashboardStatsControllerTest.php

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -555,6 +555,34 @@ public function testIndexIncludesV2OperatorMetrics(): void
555555
->assertJsonPath('operator_metrics.repair_policy.failure_backoff_strategy', 'exponential_by_repair_count');
556556
}
557557

558+
public function testIndexScopesWorkerFleetMetricsToConfiguredNamespace(): void
559+
{
560+
config()->set('waterline.engine_source', 'v2');
561+
config()->set('waterline.namespace', 'billing');
562+
config()->set('workflows.v2.compatibility.current', 'build-a');
563+
config()->set('workflows.v2.compatibility.supported', ['build-a']);
564+
WorkerCompatibilityFleet::clear();
565+
$this->beforeApplicationDestroyed(static function (): void {
566+
WorkerCompatibilityFleet::clear();
567+
});
568+
569+
WorkerCompatibilityFleet::recordForNamespace('billing', ['build-a'], 'redis', 'default', 'worker-billing');
570+
WorkerCompatibilityFleet::recordForNamespace('shipping', ['build-a'], 'redis', 'default', 'worker-shipping');
571+
572+
$this->get('/waterline/api/stats')
573+
->assertOk()
574+
->assertJsonPath('operator_metrics.workers.compatibility_namespace', 'billing')
575+
->assertJsonPath('operator_metrics.workers.active_workers', 1)
576+
->assertJsonPath('operator_metrics.workers.active_worker_scopes', 1)
577+
->assertJsonPath('operator_metrics.workers.active_workers_supporting_required', 1)
578+
->assertJsonCount(1, 'operator_metrics.workers.fleet')
579+
->assertJsonPath('operator_metrics.workers.fleet.0.worker_id', 'worker-billing')
580+
->assertJsonPath('operator_metrics.workers.fleet.0.namespace', 'billing')
581+
->assertJsonPath('operator_metrics.workers.fleet.0.connection', 'redis')
582+
->assertJsonPath('operator_metrics.workers.fleet.0.queue', 'default')
583+
->assertJsonPath('operator_metrics.workers.fleet.0.supports_required', true);
584+
}
585+
558586
public function testIndexIncludesCommandContractBackfillMetrics(): void
559587
{
560588
config()->set('waterline.engine_source', 'v2');

0 commit comments

Comments
 (0)