Skip to content

Commit 84d4981

Browse files
authored
Merge pull request #84 from clue-labs/pause-resume
Add pause() and resume() methods to limit active connections
2 parents 4d80a98 + 027262a commit 84d4981

File tree

7 files changed

+262
-12
lines changed

7 files changed

+262
-12
lines changed

README.md

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ and [`Stream`](https://github.com/reactphp/stream) components.
1616
* [connection event](#connection-event)
1717
* [error event](#error-event)
1818
* [getAddress()](#getaddress)
19+
* [pause()](#pause)
20+
* [resume()](#resume)
1921
* [close()](#close)
2022
* [Server](#server)
2123
* [SecureServer](#secureserver)
@@ -134,6 +136,61 @@ $port = parse_url('tcp://' . $address, PHP_URL_PORT);
134136
echo 'Server listening on port ' . $port . PHP_EOL;
135137
```
136138

139+
#### pause()
140+
141+
The `pause(): void` method can be used to
142+
pause accepting new incoming connections.
143+
144+
Removes the socket resource from the EventLoop and thus stop accepting
145+
new connections. Note that the listening socket stays active and is not
146+
closed.
147+
148+
This means that new incoming connections will stay pending in the
149+
operating system backlog until its configurable backlog is filled.
150+
Once the backlog is filled, the operating system may reject further
151+
incoming connections until the backlog is drained again by resuming
152+
to accept new connections.
153+
154+
Once the server is paused, no futher `connection` events SHOULD
155+
be emitted.
156+
157+
```php
158+
$server->pause();
159+
160+
$server->on('connection', assertShouldNeverCalled());
161+
```
162+
163+
This method is advisory-only, though generally not recommended, the
164+
server MAY continue emitting `connection` events.
165+
166+
Unless otherwise noted, a successfully opened server SHOULD NOT start
167+
in paused state.
168+
169+
You can continue processing events by calling `resume()` again.
170+
171+
Note that both methods can be called any number of times, in particular
172+
calling `pause()` more than once SHOULD NOT have any effect.
173+
Similarly, calling this after `close()` is a NO-OP.
174+
175+
#### resume()
176+
177+
The `resume(): void` method can be used to
178+
resume accepting new incoming connections.
179+
180+
Re-attach the socket resource to the EventLoop after a previous `pause()`.
181+
182+
```php
183+
$server->pause();
184+
185+
$loop->addTimer(1.0, function () use ($server) {
186+
$server->resume();
187+
});
188+
```
189+
190+
Note that both methods can be called any number of times, in particular
191+
calling `resume()` without a prior `pause()` SHOULD NOT have any effect.
192+
Similarly, calling this after `close()` is a NO-OP.
193+
137194
#### close()
138195

139196
The `close(): void` method can be used to

src/SecureServer.php

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,16 @@ public function getAddress()
140140
return $this->tcp->getAddress();
141141
}
142142

143+
public function pause()
144+
{
145+
$this->tcp->pause();
146+
}
147+
148+
public function resume()
149+
{
150+
$this->tcp->resume();
151+
}
152+
143153
public function close()
144154
{
145155
return $this->tcp->close();

src/Server.php

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ final class Server extends EventEmitter implements ServerInterface
3939
{
4040
private $master;
4141
private $loop;
42+
private $listening = false;
4243

4344
/**
4445
* Creates a plaintext TCP/IP socket server and starts listening on the given address
@@ -168,17 +169,7 @@ public function __construct($uri, LoopInterface $loop, array $context = array())
168169
}
169170
stream_set_blocking($this->master, 0);
170171

171-
$that = $this;
172-
173-
$this->loop->addReadStream($this->master, function ($master) use ($that) {
174-
$newSocket = @stream_socket_accept($master);
175-
if (false === $newSocket) {
176-
$that->emit('error', array(new \RuntimeException('Error accepting new connection')));
177-
178-
return;
179-
}
180-
$that->handleConnection($newSocket);
181-
});
172+
$this->resume();
182173
}
183174

184175
public function getAddress()
@@ -199,13 +190,42 @@ public function getAddress()
199190
return $address;
200191
}
201192

193+
public function pause()
194+
{
195+
if (!$this->listening) {
196+
return;
197+
}
198+
199+
$this->loop->removeReadStream($this->master);
200+
$this->listening = false;
201+
}
202+
203+
public function resume()
204+
{
205+
if ($this->listening || !is_resource($this->master)) {
206+
return;
207+
}
208+
209+
$that = $this;
210+
$this->loop->addReadStream($this->master, function ($master) use ($that) {
211+
$newSocket = @stream_socket_accept($master);
212+
if (false === $newSocket) {
213+
$that->emit('error', array(new \RuntimeException('Error accepting new connection')));
214+
215+
return;
216+
}
217+
$that->handleConnection($newSocket);
218+
});
219+
$this->listening = true;
220+
}
221+
202222
public function close()
203223
{
204224
if (!is_resource($this->master)) {
205225
return;
206226
}
207227

208-
$this->loop->removeStream($this->master);
228+
$this->pause();
209229
fclose($this->master);
210230
$this->removeAllListeners();
211231
}

src/ServerInterface.php

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,67 @@ interface ServerInterface extends EventEmitterInterface
7272
*/
7373
public function getAddress();
7474

75+
/**
76+
* Pauses accepting new incoming connections.
77+
*
78+
* Removes the socket resource from the EventLoop and thus stop accepting
79+
* new connections. Note that the listening socket stays active and is not
80+
* closed.
81+
*
82+
* This means that new incoming connections will stay pending in the
83+
* operating system backlog until its configurable backlog is filled.
84+
* Once the backlog is filled, the operating system may reject further
85+
* incoming connections until the backlog is drained again by resuming
86+
* to accept new connections.
87+
*
88+
* Once the server is paused, no futher `connection` events SHOULD
89+
* be emitted.
90+
*
91+
* ```php
92+
* $server->pause();
93+
*
94+
* $server->on('connection', assertShouldNeverCalled());
95+
* ```
96+
*
97+
* This method is advisory-only, though generally not recommended, the
98+
* server MAY continue emitting `connection` events.
99+
*
100+
* Unless otherwise noted, a successfully opened server SHOULD NOT start
101+
* in paused state.
102+
*
103+
* You can continue processing events by calling `resume()` again.
104+
*
105+
* Note that both methods can be called any number of times, in particular
106+
* calling `pause()` more than once SHOULD NOT have any effect.
107+
* Similarly, calling this after `close()` is a NO-OP.
108+
*
109+
* @see self::resume()
110+
* @return void
111+
*/
112+
public function pause();
113+
114+
/**
115+
* Resumes accepting new incoming connections.
116+
*
117+
* Re-attach the socket resource to the EventLoop after a previous `pause()`.
118+
*
119+
* ```php
120+
* $server->pause();
121+
*
122+
* $loop->addTimer(1.0, function () use ($server) {
123+
* $server->resume();
124+
* });
125+
* ```
126+
*
127+
* Note that both methods can be called any number of times, in particular
128+
* calling `resume()` without a prior `pause()` SHOULD NOT have any effect.
129+
* Similarly, calling this after `close()` is a NO-OP.
130+
*
131+
* @see self::pause()
132+
* @return void
133+
*/
134+
public function resume();
135+
75136
/**
76137
* Shuts down this listening socket
77138
*

tests/FunctionalServerTest.php

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,39 @@ public function testEmitsConnectionForNewConnection()
2525
Block\sleep(0.1, $loop);
2626
}
2727

28+
public function testEmitsNoConnectionForNewConnectionWhenPaused()
29+
{
30+
$loop = Factory::create();
31+
32+
$server = new Server(0, $loop);
33+
$server->on('connection', $this->expectCallableNever());
34+
$server->pause();
35+
36+
$connector = new TcpConnector($loop);
37+
$promise = $connector->connect($server->getAddress());
38+
39+
$promise->then($this->expectCallableOnce());
40+
41+
Block\sleep(0.1, $loop);
42+
}
43+
44+
public function testEmitsConnectionForNewConnectionWhenResumedAfterPause()
45+
{
46+
$loop = Factory::create();
47+
48+
$server = new Server(0, $loop);
49+
$server->on('connection', $this->expectCallableOnce());
50+
$server->pause();
51+
$server->resume();
52+
53+
$connector = new TcpConnector($loop);
54+
$promise = $connector->connect($server->getAddress());
55+
56+
$promise->then($this->expectCallableOnce());
57+
58+
Block\sleep(0.1, $loop);
59+
}
60+
2861
public function testEmitsConnectionWithRemoteIp()
2962
{
3063
$loop = Factory::create();

tests/SecureServerTest.php

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,30 @@ public function testGetAddressWillBePassedThroughToTcpServer()
2626
$this->assertEquals('127.0.0.1:1234', $server->getAddress());
2727
}
2828

29+
public function testPauseWillBePassedThroughToTcpServer()
30+
{
31+
$tcp = $this->getMockBuilder('React\Socket\ServerInterface')->getMock();
32+
$tcp->expects($this->once())->method('pause');
33+
34+
$loop = $this->getMock('React\EventLoop\LoopInterface');
35+
36+
$server = new SecureServer($tcp, $loop, array());
37+
38+
$server->pause();
39+
}
40+
41+
public function testResumeWillBePassedThroughToTcpServer()
42+
{
43+
$tcp = $this->getMockBuilder('React\Socket\ServerInterface')->getMock();
44+
$tcp->expects($this->once())->method('resume');
45+
46+
$loop = $this->getMock('React\EventLoop\LoopInterface');
47+
48+
$server = new SecureServer($tcp, $loop, array());
49+
50+
$server->resume();
51+
}
52+
2953
public function testCloseWillBePassedThroughToTcpServer()
3054
{
3155
$tcp = $this->getMockBuilder('React\Socket\ServerInterface')->getMock();

tests/ServerTest.php

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,51 @@ public function testConnectionDoesEndWhenClientCloses()
232232
$this->loop->tick();
233233
}
234234

235+
public function testCtorAddsResourceToLoop()
236+
{
237+
$loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock();
238+
$loop->expects($this->once())->method('addReadStream');
239+
240+
$server = new Server(0, $loop);
241+
}
242+
243+
public function testResumeWithoutPauseIsNoOp()
244+
{
245+
$loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock();
246+
$loop->expects($this->once())->method('addReadStream');
247+
248+
$server = new Server(0, $loop);
249+
$server->resume();
250+
}
251+
252+
public function testPauseRemovesResourceFromLoop()
253+
{
254+
$loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock();
255+
$loop->expects($this->once())->method('removeReadStream');
256+
257+
$server = new Server(0, $loop);
258+
$server->pause();
259+
}
260+
261+
public function testPauseAfterPauseIsNoOp()
262+
{
263+
$loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock();
264+
$loop->expects($this->once())->method('removeReadStream');
265+
266+
$server = new Server(0, $loop);
267+
$server->pause();
268+
$server->pause();
269+
}
270+
271+
public function testCloseRemovesResourceFromLoop()
272+
{
273+
$loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock();
274+
$loop->expects($this->once())->method('removeReadStream');
275+
276+
$server = new Server(0, $loop);
277+
$server->close();
278+
}
279+
235280
/**
236281
* @expectedException RuntimeException
237282
*/

0 commit comments

Comments
 (0)