Skip to content

Commit 00e0dc8

Browse files
committed
Add option to pause accepting new connections
1 parent e4636e0 commit 00e0dc8

File tree

3 files changed

+173
-4
lines changed

3 files changed

+173
-4
lines changed

README.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -415,6 +415,29 @@ $server->on('connection', function (ConnectionInterface $connection) {
415415
});
416416
```
417417

418+
You can optionally configure the server to pause accepting new
419+
connections once the connection limit is reached. In this case, it will
420+
pause the underlying server and no longer process any new connections at
421+
all, thus also no longer closing any excessive connections.
422+
The underlying operating system is responsible for keeping a backlog of
423+
pending connections until its limit is reached, at which point it will
424+
start rejecting further connections.
425+
Once the server is below the connection limit, it will continue consuming
426+
connections from the backlog and will process any outstanding data on
427+
each connection.
428+
This mode may be useful for some protocols that are designed to wait for
429+
a response message (such as HTTP), but may be less useful for other
430+
protocols that demand immediate responses (such as a "welcome" message in
431+
an interactive chat).
432+
433+
```php
434+
$server = new AccountingServer($server, 50, true);
435+
$server->on('connection', function (ConnectionInterface $connection) {
436+
$connection->write('hello there!' . PHP_EOL);
437+
438+
});
439+
```
440+
418441
#### getConnections()
419442

420443
The `getConnections(): ConnectionInterface[]` method can be used to

src/AccountingServer.php

Lines changed: 73 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ class AccountingServer extends EventEmitter implements ServerInterface
3434
private $server;
3535
private $limit;
3636

37+
private $pauseOnLimit = false;
38+
private $autoPaused = false;
39+
private $manuPaused = false;
40+
3741
/**
3842
* Instantiates a new AccountingServer.
3943
*
@@ -42,13 +46,48 @@ class AccountingServer extends EventEmitter implements ServerInterface
4246
* is exceeded. In this case, it will emit an `error` event to inform about
4347
* this and no `connection` event will be emitted.
4448
*
49+
* ```php
50+
* $server = new AccountingServer($server, 50);
51+
* $server->on('connection', function (ConnectionInterface $connection) {
52+
* $connection->write('hello there!' . PHP_EOL);
53+
* …
54+
* });
55+
* ```
56+
*
57+
* You can optionally configure the server to pause accepting new
58+
* connections once the connection limit is reached. In this case, it will
59+
* pause the underlying server and no longer process any new connections at
60+
* all, thus also no longer closing any excessive connections.
61+
* The underlying operating system is responsible for keeping a backlog of
62+
* pending connections until its limit is reached, at which point it will
63+
* start rejecting further connections.
64+
* Once the server is below the connection limit, it will continue consuming
65+
* connections from the backlog and will process any outstanding data on
66+
* each connection.
67+
* This mode may be useful for some protocols that are designed to wait for
68+
* a response message (such as HTTP), but may be less useful for other
69+
* protocols that demand immediate responses (such as a "welcome" message in
70+
* an interactive chat).
71+
*
72+
* ```php
73+
* $server = new AccountingServer($server, 50, true);
74+
* $server->on('connection', function (ConnectionInterface $connection) {
75+
* $connection->write('hello there!' . PHP_EOL);
76+
* …
77+
* });
78+
* ```
79+
*
4580
* @param ServerInterface $server
4681
* @param null|int $connectionLimit
82+
* @param bool $pauseOnLimit
4783
*/
48-
public function __construct(ServerInterface $server, $connectionLimit = null)
84+
public function __construct(ServerInterface $server, $connectionLimit = null, $pauseOnLimit = false)
4985
{
5086
$this->server = $server;
5187
$this->limit = $connectionLimit;
88+
if ($connectionLimit !== null) {
89+
$this->pauseOnLimit = $pauseOnLimit;
90+
}
5291

5392
$this->server->on('connection', array($this, 'handleConnection'));
5493
$this->server->on('error', array($this, 'handleError'));
@@ -77,12 +116,24 @@ public function getAddress()
77116

78117
public function pause()
79118
{
80-
$this->server->pause();
119+
if (!$this->manuPaused) {
120+
$this->manuPaused = true;
121+
122+
if (!$this->autoPaused) {
123+
$this->server->pause();
124+
}
125+
}
81126
}
82127

83128
public function resume()
84129
{
85-
$this->server->resume();
130+
if ($this->manuPaused) {
131+
$this->manuPaused = false;
132+
133+
if (!$this->autoPaused) {
134+
$this->server->resume();
135+
}
136+
}
86137
}
87138

88139
public function close()
@@ -93,7 +144,7 @@ public function close()
93144
/** @internal */
94145
public function handleConnection(ConnectionInterface $connection)
95146
{
96-
// close connection is limit exceeded
147+
// close connection if limit exceeded
97148
if ($this->limit !== null && count($this->connections) >= $this->limit) {
98149
$this->handleError(new \OverflowException('Connection closed because server reached connection limit'));
99150
$connection->close();
@@ -106,13 +157,31 @@ public function handleConnection(ConnectionInterface $connection)
106157
$that->handleDisconnection($connection);
107158
});
108159

160+
// pause accepting new connections if limit exceeded
161+
if ($this->pauseOnLimit && !$this->autoPaused && count($this->connections) >= $this->limit) {
162+
$this->autoPaused = true;
163+
164+
if (!$this->manuPaused) {
165+
$this->server->pause();
166+
}
167+
}
168+
109169
$this->emit('connection', array($connection));
110170
}
111171

112172
/** @internal */
113173
public function handleDisconnection(ConnectionInterface $connection)
114174
{
115175
unset($this->connections[array_search($connection, $this->connections)]);
176+
177+
// continue accepting new connection if below limit
178+
if ($this->autoPaused && count($this->connections) < $this->limit) {
179+
$this->autoPaused = false;
180+
181+
if (!$this->manuPaused) {
182+
$this->server->resume();
183+
}
184+
}
116185
}
117186

118187
/** @internal */

tests/AccountingServerTest.php

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,37 @@ public function testPauseWillBePassedThroughToTcpServer()
3535
$server->pause();
3636
}
3737

38+
public function testPauseTwiceWillBePassedThroughToTcpServerOnce()
39+
{
40+
$tcp = $this->getMockBuilder('React\Socket\ServerInterface')->getMock();
41+
$tcp->expects($this->once())->method('pause');
42+
43+
$server = new AccountingServer($tcp);
44+
45+
$server->pause();
46+
$server->pause();
47+
}
48+
3849
public function testResumeWillBePassedThroughToTcpServer()
3950
{
4051
$tcp = $this->getMockBuilder('React\Socket\ServerInterface')->getMock();
4152
$tcp->expects($this->once())->method('resume');
4253

4354
$server = new AccountingServer($tcp);
4455

56+
$server->pause();
57+
$server->resume();
58+
}
59+
60+
public function testResumeTwiceWillBePassedThroughToTcpServerOnce()
61+
{
62+
$tcp = $this->getMockBuilder('React\Socket\ServerInterface')->getMock();
63+
$tcp->expects($this->once())->method('resume');
64+
65+
$server = new AccountingServer($tcp);
66+
67+
$server->pause();
68+
$server->resume();
4569
$server->resume();
4670
}
4771

@@ -104,6 +128,21 @@ public function testSocketConnectionWillBeClosedOnceLimitIsReached()
104128
$tcp->emit('connection', array($second));
105129
}
106130

131+
public function testPausingServerWillBePausedOnceLimitIsReached()
132+
{
133+
$loop = $this->getMock('React\EventLoop\LoopInterface');
134+
$loop->expects($this->once())->method('addReadStream');
135+
$loop->expects($this->once())->method('removeReadStream');
136+
137+
$tcp = new Server(0, $loop);
138+
139+
$connection = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock();
140+
141+
$server = new AccountingServer($tcp, 1, true);
142+
143+
$tcp->emit('connection', array($connection));
144+
}
145+
107146
public function testSocketDisconnectionWillRemoveFromList()
108147
{
109148
$loop = Factory::create();
@@ -121,4 +160,42 @@ public function testSocketDisconnectionWillRemoveFromList()
121160

122161
$this->assertEquals(array(), $server->getConnections());
123162
}
163+
164+
public function testPausingServerWillEmitOnlyOneButAcceptTwoConnectionsDueToOperatingSystem()
165+
{
166+
$loop = Factory::create();
167+
168+
$server = new Server(0, $loop);
169+
$server = new AccountingServer($server, 1, true);
170+
$server->on('connection', $this->expectCallableOnce());
171+
$server->on('error', $this->expectCallableNever());
172+
173+
$first = stream_socket_client('tcp://' . $server->getAddress());
174+
$second = stream_socket_client('tcp://' . $server->getAddress());
175+
176+
Block\sleep(0.1, $loop);
177+
178+
fclose($first);
179+
fclose($second);
180+
}
181+
182+
public function testPausingServerWillEmitTwoConnectionsFromBacklog()
183+
{
184+
$loop = Factory::create();
185+
186+
$twice = $this->createCallableMock();
187+
$twice->expects($this->exactly(2))->method('__invoke');
188+
189+
$server = new Server(0, $loop);
190+
$server = new AccountingServer($server, 1, true);
191+
$server->on('connection', $twice);
192+
$server->on('error', $this->expectCallableNever());
193+
194+
$first = stream_socket_client('tcp://' . $server->getAddress());
195+
fclose($first);
196+
$second = stream_socket_client('tcp://' . $server->getAddress());
197+
fclose($second);
198+
199+
Block\sleep(0.1, $loop);
200+
}
124201
}

0 commit comments

Comments
 (0)