Skip to content

Commit 9e4b0b1

Browse files
committed
feat 优化channel的信号响应方案
1 parent 9ebd814 commit 9e4b0b1

4 files changed

Lines changed: 97 additions & 11 deletions

File tree

README.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,4 +273,27 @@
273273
Cache::ChRemoveListener('test', '1', false);
274274
```
275275
276+
- **实验性功能:信号通知**
277+
- 由于共享内存无法使用事件监听,所以底层使用Timer定时器进行轮询,实验性功能可以开启使用系统信号来监听数据的变化
278+
```php
279+
// 设置信号
280+
// 因为event等事件循环库是对标准信号的监听,所以不能使用自定实时信号SIGRTMIN ~ SIGRTMAX
281+
// 默认暂时使用SIGPOLL,异步IO监听信号,可能影响异步文件IO相关的触发
282+
Future::$signal = \SIGPOLL;
283+
// 开启信号监听,这时候开启的监听会触发之前的回调和通道回调,不会影响之前的回调
284+
Cache::channelUseSignalEnable(true)
285+
```
286+
- 当使用的监听信号存在已注册的回调产生回调冲突时,可以手动设置回调事件共享
287+
```php
288+
// 设置信号
289+
// 因为event等事件循环库是对标准信号的监听,所以不能使用自定实时信号SIGRTMIN ~ SIGRTMAX
290+
// 默认暂时使用SIGPOLL
291+
Future::$signal = \SIGPOLL;
292+
// 假设\SIGPOLL存在一个已注册的回调,YourEventLoop::getCallback(\SIGPOLL)可以获取该事件在当前进程注册的回调响应
293+
// 设置回调
294+
Future::setSignalCallback(YourEventLoop::getCallback(\SIGPOLL));
295+
// 开启信号监听,这时候开启的监听会触发之前的回调和通道回调,不会影响之前的回调
296+
Cache::channelUseSignalEnable(true)
297+
```
298+
> 通道信号监听维系了一个事件队列,多次触发信号时,回调只会根据事件队列是否存在事件消费标记而执行事件回调
276299
### 其他功能具体可以参看代码注释和测试用例

composer.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@
2525
"webman/console": "^1.0 | ^2.0"
2626
},
2727
"suggest": {
28-
"webman/console": "Webman-CLI support. "
28+
"webman/console": "Webman-CLI support. ",
29+
"ext-posix": "Channel signal support. "
2930
},
3031
"autoload": {
3132
"psr-4": {

src/Future.php

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,20 @@ class Future
3333
*/
3434
protected static array $_futures = [];
3535

36+
/**
37+
* @var Closure|null
38+
*/
39+
protected static ?Closure $_signalCallback = null;
40+
41+
/**
42+
* @param Closure|null $func
43+
* @return void
44+
*/
45+
public static function setSignalCallback(?Closure $func): void
46+
{
47+
self::$_signalCallback = $func;
48+
}
49+
3650
/**
3751
* @param Closure $func
3852
* @param array $args
@@ -63,13 +77,18 @@ public static function add(Closure $func, array $args = [], float|int $interval
6377
break;
6478
# 信号
6579
case self::DRIVER_SIGNAL:
66-
$id = false;
80+
$func = function () use ($func, $args) {
81+
// 触发信号原回调
82+
if (self::$_signalCallback) {
83+
call_user_func(self::$_signalCallback);
84+
}
85+
// 触发信号通道回调
86+
call_user_func($func, $args);
87+
};
6788
if (method_exists(Worker::$globalEvent, 'onSignal')) {
68-
Worker::$globalEvent->onSignal(self::$signal, function () use ($func, $args) {
69-
call_user_func($func, $args);
70-
});
89+
Worker::$globalEvent->onSignal(self::$signal, $func);
7190
} else {
72-
Worker::$globalEvent->add(self::$signal, EventInterface::EV_SIGNAL, $func, $args);
91+
Worker::$globalEvent->add(self::$signal, EventInterface::EV_SIGNAL, $func);
7392
}
7493
self::$_futures[$id = 0] = $func;
7594
break;
@@ -120,10 +139,19 @@ public static function del(int|null $id = null): void
120139
break;
121140
case self::DRIVER_SIGNAL:
122141
if ($id === 0) {
123-
if (method_exists(Worker::$globalEvent, 'offSignal')) {
124-
Worker::$globalEvent->offSignal(self::$signal);
142+
// 如果有信号回调,则恢复信号回调
143+
if (self::$_signalCallback) {
144+
if (method_exists(Worker::$globalEvent, 'onSignal')) {
145+
Worker::$globalEvent->onSignal(self::$signal, self::$_signalCallback);
146+
} else {
147+
Worker::$globalEvent->add(self::$signal, EventInterface::EV_SIGNAL, self::$_signalCallback);
148+
}
125149
} else {
126-
Worker::$globalEvent->del(self::$signal, EventInterface::EV_SIGNAL);
150+
if (method_exists(Worker::$globalEvent, 'offSignal')) {
151+
Worker::$globalEvent->offSignal(self::$signal);
152+
} else {
153+
Worker::$globalEvent->del(self::$signal, EventInterface::EV_SIGNAL);
154+
}
127155
}
128156
}
129157
break;

src/Traits/ChannelMethods.php

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ trait ChannelMethods
2323
/** @var string 通道pid列表 */
2424
protected static string $_CHANNEL_PID_LIST = '#ChannelPidList#';
2525

26+
/** @var string 通道事件列表 */
27+
protected static string $_CHANNEL_EVENT_LIST = '#ChannelEventList#';
28+
2629
/**
2730
* @var array = [channelKey => futureId]
2831
*/
@@ -152,7 +155,14 @@ protected static function _ChPublish(string $key, mixed $message, bool $store =
152155
if (self::isChannelUseSignal()) {
153156
$list = self::_Get(self::$_CHANNEL_PID_LIST, []);
154157
foreach ($list as $pid) {
155-
@posix_kill($pid, Future::$signal);
158+
self::_Atomic(self::$_CHANNEL_EVENT_LIST, function () use ($pid) {
159+
// 设置通道事件标记
160+
$channelEventList = self::_Get(self::$_CHANNEL_EVENT_LIST, []);
161+
$channelEventList[$pid][] = 1;
162+
self::_Set(self::$_CHANNEL_EVENT_LIST, $channelEventList);
163+
// 发送信号通知进程
164+
@posix_kill($pid, Future::$signal);
165+
});
156166
}
157167
}
158168
return [
@@ -202,9 +212,25 @@ protected static function _ChCreateListener(string $key, string|int $workerId, C
202212
* ]
203213
*/
204214
$channel = self::_Get($channelName = self::GetChannelKey($key), []);
215+
// 监听器回调函数
205216
$callback = function () use ($key, $workerId, $listener) {
206217
// 原子性执行
207218
self::_Atomic($key, function () use ($key, $workerId, $listener) {
219+
// 信号监听
220+
if (self::isChannelUseSignal()) {
221+
$pid = posix_getpid();
222+
// 获取通道事件标记列表
223+
$channelEventList = self::_Get(self::$_CHANNEL_EVENT_LIST, []);
224+
$events = $channelEventList[$pid] ?? [];
225+
// 如果没有事件标记则跳过
226+
if (!array_pop($events)) {
227+
return;
228+
}
229+
// 更新通道事件标记
230+
$channelEventList[$pid] = $events;
231+
self::_Set(self::$_CHANNEL_EVENT_LIST, $channelEventList);
232+
}
233+
// 数据回调
208234
$channel = self::_Get($channelName = self::GetChannelKey($key), []);
209235
if ((!empty($value = $channel[$workerId]['value'] ?? []))) {
210236
// 先进先出
@@ -257,11 +283,19 @@ protected static function _ChRemoveListener(string $key, string|int $workerId, b
257283
Future::del($id);
258284
// 信号监听则注册pid
259285
if (self::isChannelUseSignal()) {
286+
$pid = posix_getpid();
287+
// 移除pid
260288
$channelPidList = self::_Get(self::$_CHANNEL_PID_LIST, []);
261-
if ($channelPidList[$pid = posix_getpid()] ?? null) {
289+
if ($channelPidList[$pid] ?? null) {
262290
unset($channelPidList[$pid]);
263291
self::_Set(self::$_CHANNEL_PID_LIST, $channelPidList);
264292
}
293+
// 移除事件标记
294+
$channelEventList = self::_Get(self::$_CHANNEL_EVENT_LIST, []);
295+
if ($channelEventList[$pid] ?? null) {
296+
unset($channelEventList[$pid]);
297+
self::_Set(self::$_CHANNEL_EVENT_LIST, $channelEventList);
298+
}
265299
}
266300
if ($remove) {
267301
/**

0 commit comments

Comments
 (0)