Skip to content

Commit ef22bea

Browse files
committed
feat(watch): fix SIGCHLD watcher management
1 parent 5a92cc4 commit ef22bea

3 files changed

Lines changed: 95 additions & 51 deletions

File tree

src/Process.php

Lines changed: 69 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,9 @@
2222

2323
use function Co\wait;
2424
use function pcntl_fork;
25-
use function extension_loaded;
2625
use function call_user_func;
2726
use function pcntl_wait;
28-
use function spl_object_hash;
27+
use function spl_object_id;
2928
use function pcntl_wexitstatus;
3029
use function pcntl_wifexited;
3130
use function pcntl_wifsignaled;
@@ -45,46 +44,42 @@
4544
class Process
4645
{
4746
/**
48-
* 子进程 PID 列表
49-
* @var int[]
47+
*fork后的回调函数列表
48+
* @var Closure[]
5049
*/
51-
private static array $children = [];
50+
private static array $forked = [];
5251

5352
/**
54-
* fork 后的回调函数列表
55-
* @var Closure[]
53+
* 等待子进程的协程列表
54+
* @var array<int, Coroutine[]>
5655
*/
57-
private static array $forked = [];
56+
private static array $watchers = [];
5857

5958
/**
60-
* 信号监听器 ID
61-
* @var int|null
59+
* 退出码缓存
60+
* @var array<int,int>
6261
*/
63-
private static ?int $watchId = null;
62+
private static array $exited = [];
6463

6564
/**
66-
* 等待子进程的协程列表
67-
* @var array<int, Coroutine[]>
65+
* 子进程信号监听器
66+
* @var int|null
6867
*/
69-
private static array $watchers = [];
68+
private static ?int $watchId = null;
7069

7170
/**
7271
* 创建子进程
7372
* 在子进程中会:
7473
* - 清理调度器
7574
* - 重置事件监听器
76-
* - 执行 fork 回调
75+
* - 执行fork回调
7776
* - 执行用户回调
7877
*
7978
* @param Closure $callback 子进程执行的回调函数
8079
* @return int 返回子进程PID
8180
*/
8281
public static function fork(Closure $callback): int
8382
{
84-
if (!extension_loaded('pcntl')) {
85-
return -1;
86-
}
87-
8883
$owner = \Co\current();
8984
if ($owner instanceof MainCoroutine) {
9085
return self::spawn($callback);
@@ -111,21 +106,39 @@ private static function spawn(Closure $callback): int
111106
$pid = pcntl_fork();
112107

113108
if ($pid === -1 || $pid > 0) {
109+
while (true) {
110+
$childId = pcntl_wait($status, WNOHANG | WUNTRACED);
111+
if ($childId <= 0) {
112+
break;
113+
}
114+
115+
if (pcntl_wifexited($status)) {
116+
$exitCode = pcntl_wexitstatus($status);
117+
self::dispatchExit($childId, $exitCode);
118+
} elseif (pcntl_wifsignaled($status)) {
119+
$signal = pcntl_wtermsig($status);
120+
self::dispatchExit($childId, -$signal);
121+
}
122+
}
123+
114124
return $pid;
115125
}
116126

117127
// 清理调度器和事件监听器
118128
Scheduler::clear();
119129
Runtime::watcher()->forked();
120130

121-
// 保存并清空 fork 回调列表
131+
// 保存并清空fork回调列表
122132
$forked = self::$forked;
123133
self::$forked = [];
124-
self::$children = [];
125134

126-
// 执行所有 fork 回调
135+
// 执行所有fork回调
127136
foreach ($forked as $forkedCallback) {
128-
call_user_func($forkedCallback);
137+
try {
138+
call_user_func($forkedCallback);
139+
} catch (Throwable $e) {
140+
Stdin::println($e->getMessage());
141+
}
129142
}
130143

131144
// 执行用户回调
@@ -148,7 +161,6 @@ private static function spawn(Closure $callback): int
148161
*/
149162
public static function wait(int $pid): int
150163
{
151-
// 设置信号监听器(如果还没有设置)
152164
if (!self::$watchId) {
153165
self::$watchId = Event::watchSignal(SIGCHLD, static function () {
154166
while (true) {
@@ -171,36 +183,53 @@ public static function wait(int $pid): int
171183
});
172184
}
173185

174-
$co = \Co\current();
186+
if (isset(self::$exited[$pid])) {
187+
$code = self::$exited[$pid];
188+
unset(self::$exited[$pid]);
189+
return $code;
190+
}
191+
192+
$owner = \Co\current();
175193

176194
// 注册等待该子进程的协程
177195
if (!isset(self::$watchers[$pid])) {
178196
self::$watchers[$pid] = [];
179197
}
180198

181-
self::$watchers[$pid][spl_object_hash($co)] = $co;
199+
self::$watchers[$pid][spl_object_id($owner)] = $owner;
182200

183201
try {
184-
return $co->suspend();
202+
$result = $owner->suspend();
203+
// 清理协程注册
204+
unset(self::$watchers[$pid][spl_object_id($owner)]);
205+
if (empty(self::$watchers[$pid])) {
206+
unset(self::$watchers[$pid]);
207+
}
208+
209+
if (empty(self::$watchers)) {
210+
Event::unwatch(self::$watchId);
211+
self::$watchId = null;
212+
}
213+
214+
return $result;
185215
} catch (Throwable $e) {
186-
throw new RuntimeException('Child process error: ' . $e->getMessage());
187-
} finally {
188216
// 清理协程注册
189-
unset(self::$watchers[$pid][spl_object_hash($co)]);
217+
unset(self::$watchers[$pid][spl_object_id($owner)]);
190218
if (empty(self::$watchers[$pid])) {
191219
unset(self::$watchers[$pid]);
192220
}
193221

194-
// 如果没有等待的子进程, 清理信号监听器
195222
if (empty(self::$watchers)) {
196223
Event::unwatch(self::$watchId);
197224
self::$watchId = null;
198225
}
226+
227+
throw new RuntimeException('Child process error: ' . $e->getMessage());
199228
}
200229
}
201230

202231
/**
203-
* 注册 fork 后的回调函数
232+
* 注册fork后的回调函数
204233
* 这些回调会在子进程中执行
205234
*
206235
* @param Closure $callback 回调函数
@@ -230,8 +259,14 @@ public static function signal(int $pid, int $signal): bool
230259
*/
231260
private static function dispatchExit(int $pid, int $exitCode): void
232261
{
233-
foreach (self::$watchers[$pid] ?? [] as $coroutine) {
234-
Scheduler::resume($coroutine, $exitCode)->resolve(CoroutineStateException::class);
262+
$hasSubscribers = !empty(self::$watchers[$pid]);
263+
if ($hasSubscribers) {
264+
foreach (self::$watchers[$pid] as $coroutine) {
265+
Scheduler::resume($coroutine, $exitCode)->resolve(CoroutineStateException::class);
266+
}
267+
unset(self::$watchers[$pid]);
268+
} else {
269+
self::$exited[$pid] = $exitCode;
235270
}
236271
}
237272
}

src/Watch/ExtEvWatcher.php

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,6 @@
2222
use EvLoop;
2323
use Throwable;
2424

25-
use function extension_loaded;
26-
27-
if (!extension_loaded('ev')) {
28-
return;
29-
}
30-
3125
/**
3226
* ExtEv扩展驱动
3327
*/

src/Watch/ExtEventWatcher.php

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,6 @@
2424

2525
use function extension_loaded;
2626

27-
if (!extension_loaded('event')) {
28-
return;
29-
}
30-
3127
/**
3228
* 基于 ext-event 的事件驱动
3329
*/
@@ -46,6 +42,18 @@ final class ExtEventWatcher extends WatchAbstract implements WatcherInterface, E
4642
*/
4743
private array $watchers = [];
4844

45+
/**
46+
* 内建SIGCHLD
47+
* @var Event|null
48+
*/
49+
private ?Event $internalSigchld = null;
50+
51+
/**
52+
* SIGCHLD
53+
* @var array<int, Closure>
54+
*/
55+
private array $sigchldWatchers = [];
56+
4957
/**
5058
* 下一个监听器ID
5159
* @var int
@@ -84,14 +92,14 @@ public function isActive(): bool
8492
*/
8593
public function forked(): void
8694
{
87-
foreach ($this->watchers as $watcher) {
88-
$watcher->free();
89-
}
95+
$this->base->reInit();
9096
$this->watchers = [];
9197
$this->nextWatchId = 1;
9298

93-
$this->base->stop();
9499
$this->base = new EventBase();
100+
101+
$this->sigchldWatchers = [];
102+
$this->internalSigchld = null;
95103
}
96104

97105
/**
@@ -103,6 +111,10 @@ public function stop(): void
103111
$watcher->free();
104112
}
105113
$this->watchers = [];
114+
if ($this->internalSigchld) {
115+
$this->internalSigchld->free();
116+
$this->internalSigchld = null;
117+
}
106118
$this->base->exit();
107119
}
108120

@@ -203,11 +215,14 @@ public function timer(float $after, float $repeat, Closure $callback): int
203215
*/
204216
public function unwatch(int $watchId): void
205217
{
206-
if (!$watcher = $this->watchers[$watchId] ?? null) {
218+
if ($watcher = $this->watchers[$watchId] ?? null) {
219+
$watcher->free();
220+
unset($this->watchers[$watchId]);
207221
return;
208222
}
209223

210-
$watcher->free();
211-
unset($this->watchers[$watchId]);
224+
if (isset($this->sigchldWatchers[$watchId])) {
225+
unset($this->sigchldWatchers[$watchId]);
226+
}
212227
}
213228
}

0 commit comments

Comments
 (0)