Skip to content

Commit eaf4516

Browse files
committed
Add suspension listeners
1 parent fc50add commit eaf4516

File tree

3 files changed

+324
-12
lines changed

3 files changed

+324
-12
lines changed

src/EventLoop/Listener.php

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
<?php
2+
3+
namespace Revolt\EventLoop;
4+
5+
interface Listener
6+
{
7+
/**
8+
* Called when a Suspension is suspended.
9+
*
10+
* @param int $id The object ID of the Suspension.
11+
*/
12+
public function onSuspend(int $id): void;
13+
14+
/**
15+
* Called when a Suspension is resumed.
16+
*
17+
* @param int $id The object ID of the Suspension.
18+
*/
19+
public function onResume(int $id): void;
20+
21+
/**
22+
* Called when a suspension is destroyed.
23+
*
24+
* @param int $id The object ID of the Suspension.
25+
*/
26+
public function onDestruct(int $id): void;
27+
}

src/EventLoop/Suspension.php

Lines changed: 86 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
* **Example**
1111
*
1212
* ```php
13-
* $suspension = Scheduler::createSuspension();
13+
* $suspension = EventLoop::createSuspension();
1414
*
1515
* $promise->then(fn ($value) => $suspension->resume($value), fn ($throwable) => $suspension->throw($throwable));
1616
*
@@ -19,13 +19,21 @@
1919
*/
2020
final class Suspension
2121
{
22+
/** @var string Next listener ID. */
23+
private static string $nextId = 'a';
24+
25+
/** @var Listener[] */
26+
private static array $listeners = [];
27+
28+
private static bool $invokingListeners = false;
29+
2230
private ?\Fiber $fiber;
2331
private \Fiber $scheduler;
2432
private Driver $driver;
2533
private bool $pending = false;
2634

2735
/**
28-
* Suspension constructor.
36+
* Use {@see EventLoop::createSuspension()} to create Suspensions.
2937
*
3038
* @param Driver $driver
3139
* @param \Fiber $scheduler
@@ -48,12 +56,23 @@ public function __construct(Driver $driver, \Fiber $scheduler)
4856
$this->scheduler = $scheduler;
4957
}
5058

59+
public function __destruct()
60+
{
61+
if (!empty(self::$listeners)) {
62+
$this->invokeListeners('onDestruct');
63+
}
64+
}
65+
5166
public function throw(\Throwable $throwable): void
5267
{
5368
if (!$this->pending) {
5469
throw new \Error('Must call throw() before calling resume()');
5570
}
5671

72+
if (self::$invokingListeners) {
73+
throw new \Error('Cannot call throw() within a suspension listener');
74+
}
75+
5776
$this->pending = false;
5877

5978
if ($this->fiber) {
@@ -70,6 +89,10 @@ public function resume(mixed $value): void
7089
throw new \Error('Must call suspend() before calling resume()');
7190
}
7291

92+
if (self::$invokingListeners) {
93+
throw new \Error('Cannot call throw() within a suspension listener');
94+
}
95+
7396
$this->pending = false;
7497

7598
if ($this->fiber) {
@@ -90,22 +113,73 @@ public function suspend(): mixed
90113
throw new \Error('Must not call suspend() from another fiber');
91114
}
92115

116+
if (self::$invokingListeners) {
117+
throw new \Error('Cannot call suspend() within a suspension listener');
118+
}
119+
93120
$this->pending = true;
94121

95-
// Awaiting from within a fiber.
96-
if ($this->fiber) {
97-
return \Fiber::suspend();
122+
if (!empty(self::$listeners)) {
123+
$this->invokeListeners('onSuspend');
98124
}
99125

100-
// Awaiting from {main}.
101-
$lambda = $this->scheduler->isStarted() ? $this->scheduler->resume() : $this->scheduler->start();
126+
try {
127+
// Awaiting from within a fiber.
128+
if ($this->fiber) {
129+
return \Fiber::suspend();
130+
}
131+
132+
// Awaiting from {main}.
133+
$lambda = $this->scheduler->isStarted() ? $this->scheduler->resume() : $this->scheduler->start();
134+
135+
/** @psalm-suppress RedundantCondition $this->pending should be changed when resumed. */
136+
if ($this->pending) {
137+
// Should only be true if the event loop exited without resolving the promise.
138+
throw new \Error('Event loop suspended or exited unexpectedly');
139+
}
140+
141+
return $lambda();
142+
} finally {
143+
if (!empty(self::$listeners)) {
144+
$this->invokeListeners('onResume');
145+
}
146+
}
147+
}
102148

103-
/** @psalm-suppress RedundantCondition $this->pending should be changed when resumed. */
104-
if ($this->pending) {
105-
// Should only be true if the event loop exited without resolving the promise.
106-
throw new \Error('Scheduler suspended or exited unexpectedly');
149+
private function invokeListeners(string $method): void
150+
{
151+
$id = \spl_object_id($this);
152+
self::$invokingListeners = true;
153+
foreach (self::$listeners as $listener) {
154+
try {
155+
$listener->{$method}($id);
156+
} catch (\Throwable $exception) {
157+
$this->driver->queue(static fn () => throw $exception);
158+
}
107159
}
160+
self::$invokingListeners = false;
161+
}
162+
163+
/**
164+
* Add a listener that is invoked when any Suspension is suspended, resumed, or destroyed.
165+
*
166+
* @param Listener $listener
167+
* @return string ID that can be used to remove the listener using {@see unlisten()}.
168+
*/
169+
public static function listen(Listener $listener): string
170+
{
171+
$id = self::$nextId++;
172+
self::$listeners[$id] = $listener;
173+
return $id;
174+
}
108175

109-
return $lambda();
176+
/**
177+
* Remove the suspension listener.
178+
*
179+
* @param string $id
180+
*/
181+
public static function unlisten(string $id): void
182+
{
183+
unset(self::$listeners[$id]);
110184
}
111185
}

test/SuspensionTest.php

Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
<?php
2+
3+
namespace Revolt\EventLoop;
4+
5+
use PHPUnit\Framework\TestCase;
6+
use Revolt\EventLoop;
7+
8+
class SuspensionTest extends TestCase
9+
{
10+
public function testListen(): void
11+
{
12+
$listener = new class () implements Listener {
13+
public int $suspended = 0;
14+
public int $resumed = 0;
15+
public int $destroyed = 0;
16+
17+
public function onSuspend(int $id): void
18+
{
19+
++$this->suspended;
20+
}
21+
22+
public function onResume(int $id): void
23+
{
24+
++$this->resumed;
25+
}
26+
27+
public function onDestruct(int $id): void
28+
{
29+
++$this->destroyed;
30+
}
31+
};
32+
33+
$id = Suspension::listen($listener);
34+
35+
$suspension = EventLoop::createSuspension();
36+
EventLoop::delay(0, fn () => $suspension->resume(null));
37+
38+
$suspension->suspend();
39+
40+
self::assertSame(1, $listener->suspended);
41+
self::assertSame(1, $listener->resumed);
42+
43+
Suspension::listen($listener);
44+
45+
$suspension = EventLoop::createSuspension();
46+
EventLoop::delay(0, fn () => $suspension->throw(new \Exception()));
47+
48+
try {
49+
$suspension->suspend();
50+
self::fail('Exception was expected to be thrown from suspend');
51+
} catch (\Exception $e) {
52+
// Expected, ignore.
53+
}
54+
55+
self::assertSame(3, $listener->suspended);
56+
self::assertSame(3, $listener->resumed);
57+
self::assertSame(2, $listener->destroyed);
58+
59+
Suspension::unlisten($id);
60+
61+
$suspension = EventLoop::createSuspension();
62+
EventLoop::delay(0, fn () => $suspension->resume(null));
63+
64+
$suspension->suspend();
65+
66+
self::assertSame(4, $listener->suspended);
67+
self::assertSame(4, $listener->resumed);
68+
}
69+
70+
public function provideListenerMethods(): iterable
71+
{
72+
$reflectionClass = new \ReflectionClass(Listener::class);
73+
$methods = $reflectionClass->getMethods();
74+
return \array_map(static fn (\ReflectionMethod $reflectionMethod) => [$reflectionMethod->getName()], $methods);
75+
}
76+
77+
/**
78+
* @dataProvider provideListenerMethods
79+
*/
80+
public function testSuspendDuringListenerInvocation(string $functionName): void
81+
{
82+
$suspension = EventLoop::createSuspension();
83+
84+
$listener = new class ($functionName, $suspension) implements Listener {
85+
public function __construct(
86+
private string $functionName,
87+
private Suspension $suspension,
88+
) {
89+
}
90+
91+
public function onSuspend(int $id): void
92+
{
93+
if ($this->functionName === __FUNCTION__) {
94+
$this->suspension->suspend();
95+
}
96+
}
97+
98+
public function onResume(int $id): void
99+
{
100+
if ($this->functionName === __FUNCTION__) {
101+
$this->suspension->suspend();
102+
}
103+
}
104+
105+
public function onDestruct(int $id): void
106+
{
107+
if ($this->functionName === __FUNCTION__) {
108+
$this->suspension->suspend();
109+
}
110+
}
111+
};
112+
113+
Suspension::listen($listener);
114+
115+
$suspension = EventLoop::createSuspension();
116+
EventLoop::delay(0, fn () => $suspension->resume(null));
117+
118+
self::expectException(\Error::class);
119+
self::expectExceptionMessage('within a suspension listener');
120+
121+
$suspension->suspend();
122+
}
123+
124+
/**
125+
* @dataProvider provideListenerMethods
126+
*/
127+
public function testResumeDuringListenerInvocation(string $functionName): void
128+
{
129+
$suspension = EventLoop::createSuspension();
130+
131+
$listener = new class ($functionName, $suspension) implements Listener {
132+
public function __construct(
133+
private string $functionName,
134+
private Suspension $suspension,
135+
) {
136+
}
137+
138+
public function onSuspend(int $id): void
139+
{
140+
if ($this->functionName === __FUNCTION__) {
141+
$this->suspension->resume(null);
142+
}
143+
}
144+
145+
public function onResume(int $id): void
146+
{
147+
if ($this->functionName === __FUNCTION__) {
148+
$this->suspension->resume(null);
149+
}
150+
}
151+
152+
public function onDestruct(int $id): void
153+
{
154+
if ($this->functionName === __FUNCTION__) {
155+
$this->suspension->resume(null);
156+
}
157+
}
158+
};
159+
160+
Suspension::listen($listener);
161+
162+
self::expectException(\Error::class);
163+
self::expectExceptionMessage('within a suspension listener');
164+
165+
$suspension->suspend();
166+
}
167+
168+
/**
169+
* @dataProvider provideListenerMethods
170+
*/
171+
public function testThrowDuringListenerInvocation(string $functionName): void
172+
{
173+
$suspension = EventLoop::createSuspension();
174+
175+
$listener = new class ($functionName, $suspension) implements Listener {
176+
public function __construct(
177+
private string $functionName,
178+
private Suspension $suspension,
179+
) {
180+
}
181+
182+
public function onSuspend(int $id): void
183+
{
184+
if ($this->functionName === __FUNCTION__) {
185+
$this->suspension->throw(new \Exception());
186+
}
187+
}
188+
189+
public function onResume(int $id): void
190+
{
191+
if ($this->functionName === __FUNCTION__) {
192+
$this->suspension->throw(new \Exception());
193+
}
194+
}
195+
196+
public function onDestruct(int $id): void
197+
{
198+
if ($this->functionName === __FUNCTION__) {
199+
$this->suspension->throw(new \Exception());
200+
}
201+
}
202+
};
203+
204+
Suspension::listen($listener);
205+
206+
self::expectException(\Error::class);
207+
self::expectExceptionMessage('within a suspension listener');
208+
209+
$suspension->suspend();
210+
}
211+
}

0 commit comments

Comments
 (0)