Skip to content

Commit 9f99c01

Browse files
committed
feat(watch): fix SIGCHLD watcher management and callback signature; improve fork rebuild flow
1 parent 5a92cc4 commit 9f99c01

2 files changed

Lines changed: 127 additions & 41 deletions

File tree

src/Process.php

Lines changed: 69 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,9 @@
2222

2323
use function Co\wait;
2424
use function pcntl_fork;
25-
use function extension_loaded;
2625
use function call_user_func;
2726
use function pcntl_wait;
28-
use function spl_object_hash;
27+
use function spl_object_id;
2928
use function pcntl_wexitstatus;
3029
use function pcntl_wifexited;
3130
use function pcntl_wifsignaled;
@@ -45,46 +44,42 @@
4544
class Process
4645
{
4746
/**
48-
* 子进程 PID 列表
49-
* @var int[]
47+
*fork后的回调函数列表
48+
* @var Closure[]
5049
*/
51-
private static array $children = [];
50+
private static array $forked = [];
5251

5352
/**
54-
* fork 后的回调函数列表
55-
* @var Closure[]
53+
* 等待子进程的协程列表
54+
* @var array<int, Coroutine[]>
5655
*/
57-
private static array $forked = [];
56+
private static array $watchers = [];
5857

5958
/**
60-
* 信号监听器 ID
61-
* @var int|null
59+
* 退出码缓存
60+
* @var array<int,int>
6261
*/
63-
private static ?int $watchId = null;
62+
private static array $exited = [];
6463

6564
/**
66-
* 等待子进程的协程列表
67-
* @var array<int, Coroutine[]>
65+
* 子进程信号监听器
66+
* @var int|null
6867
*/
69-
private static array $watchers = [];
68+
private static ?int $watchId = null;
7069

7170
/**
7271
* 创建子进程
7372
* 在子进程中会:
7473
* - 清理调度器
7574
* - 重置事件监听器
76-
* - 执行 fork 回调
75+
* - 执行fork回调
7776
* - 执行用户回调
7877
*
7978
* @param Closure $callback 子进程执行的回调函数
8079
* @return int 返回子进程PID
8180
*/
8281
public static function fork(Closure $callback): int
8382
{
84-
if (!extension_loaded('pcntl')) {
85-
return -1;
86-
}
87-
8883
$owner = \Co\current();
8984
if ($owner instanceof MainCoroutine) {
9085
return self::spawn($callback);
@@ -111,21 +106,39 @@ private static function spawn(Closure $callback): int
111106
$pid = pcntl_fork();
112107

113108
if ($pid === -1 || $pid > 0) {
109+
while (true) {
110+
$childId = pcntl_wait($status, WNOHANG | WUNTRACED);
111+
if ($childId <= 0) {
112+
break;
113+
}
114+
115+
if (pcntl_wifexited($status)) {
116+
$exitCode = pcntl_wexitstatus($status);
117+
self::dispatchExit($childId, $exitCode);
118+
} elseif (pcntl_wifsignaled($status)) {
119+
$signal = pcntl_wtermsig($status);
120+
self::dispatchExit($childId, -$signal);
121+
}
122+
}
123+
114124
return $pid;
115125
}
116126

117127
// 清理调度器和事件监听器
118128
Scheduler::clear();
119129
Runtime::watcher()->forked();
120130

121-
// 保存并清空 fork 回调列表
131+
// 保存并清空fork回调列表
122132
$forked = self::$forked;
123133
self::$forked = [];
124-
self::$children = [];
125134

126-
// 执行所有 fork 回调
135+
// 执行所有fork回调
127136
foreach ($forked as $forkedCallback) {
128-
call_user_func($forkedCallback);
137+
try {
138+
call_user_func($forkedCallback);
139+
} catch (Throwable $e) {
140+
Stdin::println($e->getMessage());
141+
}
129142
}
130143

131144
// 执行用户回调
@@ -148,7 +161,6 @@ private static function spawn(Closure $callback): int
148161
*/
149162
public static function wait(int $pid): int
150163
{
151-
// 设置信号监听器(如果还没有设置)
152164
if (!self::$watchId) {
153165
self::$watchId = Event::watchSignal(SIGCHLD, static function () {
154166
while (true) {
@@ -171,36 +183,53 @@ public static function wait(int $pid): int
171183
});
172184
}
173185

174-
$co = \Co\current();
186+
if (isset(self::$exited[$pid])) {
187+
$code = self::$exited[$pid];
188+
unset(self::$exited[$pid]);
189+
return $code;
190+
}
191+
192+
$owner = \Co\current();
175193

176194
// 注册等待该子进程的协程
177195
if (!isset(self::$watchers[$pid])) {
178196
self::$watchers[$pid] = [];
179197
}
180198

181-
self::$watchers[$pid][spl_object_hash($co)] = $co;
199+
self::$watchers[$pid][spl_object_id($owner)] = $owner;
182200

183201
try {
184-
return $co->suspend();
202+
$result = $owner->suspend();
203+
// 清理协程注册
204+
unset(self::$watchers[$pid][spl_object_id($owner)]);
205+
if (empty(self::$watchers[$pid])) {
206+
unset(self::$watchers[$pid]);
207+
}
208+
209+
if (empty(self::$watchers)) {
210+
Event::unwatch(self::$watchId);
211+
self::$watchId = null;
212+
}
213+
214+
return $result;
185215
} catch (Throwable $e) {
186-
throw new RuntimeException('Child process error: ' . $e->getMessage());
187-
} finally {
188216
// 清理协程注册
189-
unset(self::$watchers[$pid][spl_object_hash($co)]);
217+
unset(self::$watchers[$pid][spl_object_id($owner)]);
190218
if (empty(self::$watchers[$pid])) {
191219
unset(self::$watchers[$pid]);
192220
}
193221

194-
// 如果没有等待的子进程, 清理信号监听器
195222
if (empty(self::$watchers)) {
196223
Event::unwatch(self::$watchId);
197224
self::$watchId = null;
198225
}
226+
227+
throw new RuntimeException('Child process error: ' . $e->getMessage());
199228
}
200229
}
201230

202231
/**
203-
* 注册 fork 后的回调函数
232+
* 注册fork后的回调函数
204233
* 这些回调会在子进程中执行
205234
*
206235
* @param Closure $callback 回调函数
@@ -230,8 +259,14 @@ public static function signal(int $pid, int $signal): bool
230259
*/
231260
private static function dispatchExit(int $pid, int $exitCode): void
232261
{
233-
foreach (self::$watchers[$pid] ?? [] as $coroutine) {
234-
Scheduler::resume($coroutine, $exitCode)->resolve(CoroutineStateException::class);
262+
$hasSubscribers = !empty(self::$watchers[$pid]);
263+
if ($hasSubscribers) {
264+
foreach (self::$watchers[$pid] as $coroutine) {
265+
Scheduler::resume($coroutine, $exitCode)->resolve(CoroutineStateException::class);
266+
}
267+
unset(self::$watchers[$pid]);
268+
} else {
269+
self::$exited[$pid] = $exitCode;
235270
}
236271
}
237272
}

src/Watch/ExtEventWatcher.php

Lines changed: 58 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424

2525
use function extension_loaded;
2626

27+
use const SIGCHLD;
28+
2729
if (!extension_loaded('event')) {
2830
return;
2931
}
@@ -46,6 +48,18 @@ final class ExtEventWatcher extends WatchAbstract implements WatcherInterface, E
4648
*/
4749
private array $watchers = [];
4850

51+
/**
52+
* 内建SIGCHLD
53+
* @var Event|null
54+
*/
55+
private ?Event $internalSigchld = null;
56+
57+
/**
58+
* SIGCHLD
59+
* @var array<int, Closure>
60+
*/
61+
private array $sigchldWatchers = [];
62+
4963
/**
5064
* 下一个监听器ID
5165
* @var int
@@ -61,6 +75,7 @@ public function __construct()
6175
throw new RuntimeException('ext-event extension is required for EventWatcher');
6276
}
6377
$this->base = new EventBase();
78+
// $this->ensureInternalSigchld();
6479
}
6580

6681
/**
@@ -84,14 +99,15 @@ public function isActive(): bool
8499
*/
85100
public function forked(): void
86101
{
87-
foreach ($this->watchers as $watcher) {
88-
$watcher->free();
89-
}
102+
$this->base->reInit();
90103
$this->watchers = [];
91104
$this->nextWatchId = 1;
92105

93-
$this->base->stop();
94106
$this->base = new EventBase();
107+
108+
$this->sigchldWatchers = [];
109+
$this->internalSigchld = null;
110+
// $this->ensureInternalSigchld();
95111
}
96112

97113
/**
@@ -103,6 +119,10 @@ public function stop(): void
103119
$watcher->free();
104120
}
105121
$this->watchers = [];
122+
if ($this->internalSigchld) {
123+
$this->internalSigchld->free();
124+
$this->internalSigchld = null;
125+
}
106126
$this->base->exit();
107127
}
108128

@@ -153,6 +173,11 @@ public function watchSignal(int $signal, Closure $callback): int
153173
{
154174
$watchId = $this->nextWatchId++;
155175

176+
// if ($signal === SIGCHLD) {
177+
// $this->sigchldWatchers[$watchId] = $callback;
178+
// return $watchId;
179+
// }
180+
156181
$watcher = new Event($this->base, $signal, Event::SIGNAL | Event::PERSIST, function ($fd, $what, $arg) use ($callback, $watchId, $signal) {
157182
try {
158183
$callback($signal, $watchId);
@@ -203,11 +228,37 @@ public function timer(float $after, float $repeat, Closure $callback): int
203228
*/
204229
public function unwatch(int $watchId): void
205230
{
206-
if (!$watcher = $this->watchers[$watchId] ?? null) {
231+
if ($watcher = $this->watchers[$watchId] ?? null) {
232+
$watcher->free();
233+
unset($this->watchers[$watchId]);
234+
return;
235+
}
236+
237+
if (isset($this->sigchldWatchers[$watchId])) {
238+
unset($this->sigchldWatchers[$watchId]);
239+
}
240+
}
241+
242+
/**
243+
* @return void
244+
*/
245+
private function ensureInternalSigchld(): void
246+
{
247+
if ($this->internalSigchld) {
207248
return;
208249
}
209250

210-
$watcher->free();
211-
unset($this->watchers[$watchId]);
251+
$watcher = new Event($this->base, SIGCHLD, Event::SIGNAL | Event::PERSIST, function () {
252+
foreach ($this->sigchldWatchers as $watchId => $callback) {
253+
try {
254+
$callback(SIGCHLD, $watchId);
255+
} catch (Throwable $exception) {
256+
Stdin::println($exception->getMessage());
257+
}
258+
}
259+
});
260+
261+
$this->internalSigchld = $watcher;
262+
$watcher->add();
212263
}
213264
}

0 commit comments

Comments
 (0)