Skip to content

Commit bcefcf4

Browse files
committed
update: opz the main co state
1 parent b7e93ad commit bcefcf4

8 files changed

Lines changed: 99 additions & 60 deletions

File tree

examples/08-process.php

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
<?php declare(strict_types=1);
2+
23
/**
34
* Copyright © 2024 cclilshy
45
* Email: jingnigg@gmail.com
@@ -13,6 +14,8 @@
1314
use Ripple\Process;
1415
use Ripple\Time;
1516

17+
use function Co\go;
18+
1619
require_once __DIR__ . '/../vendor/autoload.php';
1720

1821

@@ -22,8 +25,24 @@
2225

2326
$child = Process::fork(function () {
2427
\var_dump('is child');
25-
Time::sleep(10);
28+
Time::sleep(1);
2629
exit(127);
2730
});
2831

32+
echo "child pid > {$child} \n";
33+
2934
\var_dump(Process::wait($child));
35+
36+
go(function () {
37+
$child = Process::fork(function () {
38+
\var_dump('is child');
39+
Time::sleep(1);
40+
exit(127);
41+
});
42+
43+
echo "child pid > {$child} \n";
44+
45+
\var_dump(Process::wait($child));
46+
});
47+
48+
\Co\wait();

src/Coroutine.php

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,6 @@ abstract public function start(): mixed;
101101
* 暂停协程执行
102102
* @param mixed $value 暂停时传递的值
103103
* @return mixed 恢复时传入的值
104-
* @throws Throwable 当协程暂停过程中发生异常时抛出
105104
*/
106105
abstract public function suspend(mixed $value = null): mixed;
107106

src/Process.php

Lines changed: 42 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use Closure;
1616
use Ripple\Runtime\Exception\CoroutineStateException;
17+
use Ripple\Runtime\MainCoroutine;
1718
use Ripple\Runtime\Scheduler;
1819
use Ripple\Runtime\Support\Stdin;
1920
use Throwable;
@@ -85,46 +86,58 @@ public static function fork(Closure $callback): int
8586
}
8687

8788
$owner = \Co\current();
89+
if ($owner instanceof MainCoroutine) {
90+
return self::spawn($callback);
91+
}
92+
8893
Scheduler::nextTick(static function () use ($callback, $owner) {
89-
// 子进程执行
90-
$pid = pcntl_fork();
94+
Scheduler::resume($owner, self::spawn($callback));
95+
});
9196

92-
if ($pid === -1 || $pid > 0) {
93-
Scheduler::resume($owner, $pid);
94-
return;
95-
}
97+
try {
98+
return $owner->suspend();
99+
} catch (Throwable $e) {
100+
throw new RuntimeException($e->getMessage(), $e->getCode(), $e);
101+
}
102+
}
96103

97-
// 清理调度器和事件监听器
98-
Scheduler::clear();
99-
Runtime::watcher()->forked();
104+
/**
105+
* @param Closure $callback
106+
* @return int
107+
*/
108+
private static function spawn(Closure $callback): int
109+
{
110+
// 子进程执行
111+
$pid = pcntl_fork();
100112

101-
// 保存并清空 fork 回调列表
102-
$forked = self::$forked;
103-
self::$forked = [];
104-
self::$children = [];
113+
if ($pid === -1 || $pid > 0) {
114+
return $pid;
115+
}
105116

106-
// 执行所有 fork 回调
107-
foreach ($forked as $forkedCallback) {
108-
call_user_func($forkedCallback);
109-
}
117+
// 清理调度器和事件监听器
118+
Scheduler::clear();
119+
Runtime::watcher()->forked();
110120

111-
// 执行用户回调
112-
try {
113-
$callback();
114-
} catch (Throwable $e) {
115-
Stdin::println($e->getMessage());
116-
}
121+
// 保存并清空 fork 回调列表
122+
$forked = self::$forked;
123+
self::$forked = [];
124+
self::$children = [];
117125

118-
// 等待所有协程完成并退出
119-
wait();
120-
exit(0);
121-
});
126+
// 执行所有 fork 回调
127+
foreach ($forked as $forkedCallback) {
128+
call_user_func($forkedCallback);
129+
}
122130

131+
// 执行用户回调
123132
try {
124-
return $owner->suspend();
133+
$callback();
125134
} catch (Throwable $e) {
126-
throw new RuntimeException($e->getMessage(), $e->getCode(), $e);
135+
Stdin::println($e->getMessage());
127136
}
137+
138+
// 等待所有协程完成并退出
139+
wait();
140+
exit(0);
128141
}
129142

130143
/**

src/Runtime.php

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -93,15 +93,6 @@ public static function watcher(): WatchAbstract
9393
return self::$watcher;
9494
}
9595

96-
/**
97-
* 获取事件桥
98-
* @return Bridge
99-
*/
100-
public static function bridge(): Bridge
101-
{
102-
return self::$bridge;
103-
}
104-
10596
/**
10697
* 运行回调并进入主循环
10798
* @param Closure|null $callback

src/Runtime/MainCoroutine.php

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
namespace Ripple\Runtime;
1414

1515
use Closure;
16-
use Ripple\Runtime\Exception\CoroutineStateException;
1716
use Ripple\Coroutine;
1817
use Ripple\Watch\Interface\WatchAbstract;
1918
use Throwable;
@@ -37,6 +36,11 @@ final class MainCoroutine extends Coroutine
3736
*/
3837
private bool $hasMessage = false;
3938

39+
/**
40+
* @var mixed
41+
*/
42+
private mixed $suspendValue = null;
43+
4044
/**
4145
* 设置主协程为可运行状态
4246
* @return void
@@ -52,6 +56,7 @@ public function runnable(): void
5256
*/
5357
public function start(): bool
5458
{
59+
$this->setState(Coroutine::STATE_WAITING);
5560
call_user_func($this->callback);
5661
return true;
5762
}
@@ -64,45 +69,60 @@ public function start(): bool
6469
*/
6570
public function suspend(mixed $value = null): mixed
6671
{
67-
if ($this->state === Coroutine::STATE_CREATED) {
68-
Scheduler::start($this);
69-
return $this->result;
70-
}
71-
72-
if ($this->state === Coroutine::STATE_RUNNABLE) {
73-
Scheduler::tick();
74-
return $this->result;
75-
}
72+
$this->suspendValue = $value;
73+
try {
74+
if ($this->state === Coroutine::STATE_CREATED) {
75+
Scheduler::start($this);
76+
$this->suspendValue = null;
77+
return $this->result;
78+
}
7679

77-
if ($this->state === Coroutine::STATE_RUNNING) {
80+
$this->setState(Coroutine::STATE_WAITING);
7881
Scheduler::tick();
82+
$this->suspendValue = null;
7983
return $this->result;
84+
} finally {
85+
$this->setState(Coroutine::STATE_RUNNING);
8086
}
81-
82-
throw new CoroutineStateException('Cannot suspend a main coroutine in the current state.');
8387
}
8488

8589
/**
8690
* 恢复主协程执行
8791
* @param ?mixed $value 恢复时传入的值
8892
* @return bool 恢复是否成功
93+
* @throws Throwable
8994
*/
90-
public function resume(mixed $value = null): bool
95+
public function resume(mixed $value = null): mixed
9196
{
9297
$this->hasMessage = true;
9398
$this->result = $value;
99+
100+
$owner = \Co\current();
101+
if ($owner instanceof FiberCoroutine) {
102+
Scheduler::nextTick(fn () => Scheduler::resume($owner, $this->suspendValue));
103+
return $owner->suspend();
104+
}
105+
94106
return true;
95107
}
96108

97109
/**
98110
* 向主协程抛出异常
99111
* @param Throwable $exception 要抛出的异常对象
100112
* @return bool 异常设置是否成功
113+
* @throws Throwable
101114
*/
102-
public function throw(Throwable $exception): bool
115+
public function throw(Throwable $exception): mixed
103116
{
104117
$this->hasMessage = true;
105118
$this->hasExcept = $exception;
119+
120+
$owner = \Co\current();
121+
if ($owner instanceof FiberCoroutine) {
122+
Scheduler::nextTick(fn () => Scheduler::resume($owner, $this->suspendValue));
123+
return $owner->suspend();
124+
}
125+
106126
return true;
107127
}
108128

src/Stream.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ public function flush(float|int|null $timeout = null): void
253253
$timer = null;
254254
if ($timeout && $timeout >= 0) {
255255
$timer = Time::afterFunc($timeout, function () use ($timeout) {
256-
$this->wco->isSuspended() && Scheduler::throw($this->wco, new ConnectionException('Write timeout'));
256+
Scheduler::throw($this->wco, new ConnectionException('Write timeout'));
257257
});
258258
}
259259

src/Sync/Channel.php

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,6 @@ public function receive(): mixed
185185
return $owner->suspend();
186186
} catch (Throwable $e) {
187187
throw new RuntimeException($e->getMessage(), $e->getCode(), $e);
188-
189188
}
190189
}
191190

tests/Sync/ChannelTest.php

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -255,8 +255,6 @@ public function testMultipleSendersReceivers(): void
255255
*/
256256
public function testChannelFIFO(): void
257257
{
258-
259-
260258
$channel = new Channel(3);
261259
$received = [];
262260

@@ -273,7 +271,7 @@ public function testChannelFIFO(): void
273271
}
274272
});
275273

276-
Time::sleep(0.1);
274+
\Co\wait();
277275

278276
$this->assertCount(5, $received);
279277
$this->assertEquals("Message 1", $received[0]);

0 commit comments

Comments
 (0)