-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path07-targeted-manager-control.php
More file actions
99 lines (79 loc) · 3.05 KB
/
07-targeted-manager-control.php
File metadata and controls
99 lines (79 loc) · 3.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
<?php
declare(strict_types=1);
/**
* This file is part of fast-forward/fork.
*
* This source file is subject to the license bundled
* with this source code in the file LICENSE.
*
* @copyright Copyright (c) 2026 Felipe Sayão Lobato Abreu <github@mentordosnerds.com>
* @license https://opensource.org/licenses/MIT MIT License
*
* @see https://github.com/php-fast-forward/fork
* @see https://github.com/php-fast-forward
* @see https://datatracker.ietf.org/doc/html/rfc2119
*/
use FastForward\Fork\Examples\Support\ExampleConsole;
use FastForward\Fork\Examples\Support\ExampleRuntime;
use FastForward\Fork\Manager\ForkManager;
use FastForward\Fork\Signal\Signal;
use FastForward\Fork\Worker\WorkerInterface;
require __DIR__ . '/bootstrap.php';
$console = new ExampleConsole();
$console->title(
'07 Targeted manager control',
'Use $manager->kill() and $manager->wait() with individual workers and worker groups in the same call.',
);
$manager = new ForkManager();
$apiGroup = $manager->fork(
static function (WorkerInterface $worker): int {
echo sprintf("api worker %d ready\n", $worker->getPid());
while (true) {
echo sprintf("api worker %d heartbeat\n", $worker->getPid());
usleep(140_000);
}
},
2,
);
$queueGroup = $manager->fork(
static function (WorkerInterface $worker): int {
echo sprintf("queue worker %d ready\n", $worker->getPid());
while (true) {
echo sprintf("queue worker %d heartbeat\n", $worker->getPid());
usleep(140_000);
}
},
2,
);
$allReady = ExampleRuntime::waitUntil(
static function () use ($apiGroup, $queueGroup): bool {
foreach ([$apiGroup, $queueGroup] as $group) {
foreach ($group->all() as $worker) {
if (! str_contains($worker->getOutput(), 'ready')) {
return false;
}
}
}
return true;
},
timeoutSeconds: 1.2,
);
if (! $allReady) {
throw new RuntimeException('Expected both groups to become ready before targeted control.');
}
$queueWorkers = array_values($queueGroup->all());
$selectedQueueWorker = $queueWorkers[0] ?? throw new RuntimeException('Expected a queue worker to select.');
$remainingQueueWorkers = array_slice($queueWorkers, 1);
$console->line(sprintf(
'Stopping the whole API group plus queue worker %d with $manager->kill().',
$selectedQueueWorker->getPid(),
));
$manager->kill(Signal::Terminate, $apiGroup, $selectedQueueWorker);
$manager->wait($apiGroup, $selectedQueueWorker);
$console->printGroup('API group after targeted shutdown', $apiGroup);
$console->printWorker('Individually stopped queue worker', $selectedQueueWorker);
$console->printWorkers('Queue workers still running', $queueGroup->getRunning());
$console->line('Stopping the remaining queue workers with targeted manager calls.');
$manager->kill(Signal::Terminate, ...$remainingQueueWorkers);
$manager->wait(...$remainingQueueWorkers);
$console->printGroup('Queue group after all targeted operations', $queueGroup);