Skip to content

Commit 390c71b

Browse files
authored
Merge pull request #86 from clue-labs/limit
Add LimitingServer to limit and keep track of open connections
2 parents 84d4981 + fee5f36 commit 390c71b

File tree

4 files changed

+482
-11
lines changed

4 files changed

+482
-11
lines changed

README.md

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ and [`Stream`](https://github.com/reactphp/stream) components.
2121
* [close()](#close)
2222
* [Server](#server)
2323
* [SecureServer](#secureserver)
24+
* [LimitingServer](#limitingserver)
25+
* [getConnections()](#getconnections)
2426
* [ConnectionInterface](#connectioninterface)
2527
* [getRemoteAddress()](#getremoteaddress)
2628
* [getLocalAddress()](#getlocaladdress)
@@ -378,6 +380,84 @@ If you use a custom `ServerInterface` and its `connection` event does not
378380
meet this requirement, the `SecureServer` will emit an `error` event and
379381
then close the underlying connection.
380382

383+
### LimitingServer
384+
385+
The `LimitingServer` decorator wraps a given `ServerInterface` and is responsible
386+
for limiting and keeping track of open connections to this server instance.
387+
388+
Whenever the underlying server emits a `connection` event, it will check its
389+
limits and then either
390+
- keep track of this connection by adding it to the list of
391+
open connections and then forward the `connection` event
392+
- or reject (close) the connection when its limits are exceeded and will
393+
forward an `error` event instead.
394+
395+
Whenever a connection closes, it will remove this connection from the list of
396+
open connections.
397+
398+
```php
399+
$server = new LimitingServer($server, 100);
400+
$server->on('connection', function (ConnectionInterface $connection) {
401+
$connection->write('hello there!' . PHP_EOL);
402+
403+
});
404+
```
405+
406+
See also the [second example](examples) for more details.
407+
408+
You have to pass a maximum number of open connections to ensure
409+
the server will automatically reject (close) connections once this limit
410+
is exceeded. In this case, it will emit an `error` event to inform about
411+
this and no `connection` event will be emitted.
412+
413+
```php
414+
$server = new LimitingServer($server, 100);
415+
$server->on('connection', function (ConnectionInterface $connection) {
416+
$connection->write('hello there!' . PHP_EOL);
417+
418+
});
419+
```
420+
421+
You MAY pass a `null` limit in order to put no limit on the number of
422+
open connections and keep accepting new connection until you run out of
423+
operating system resources (such as open file handles). This may be
424+
useful it you do not want to take care of applying a limit but still want
425+
to use the `getConnections()` method.
426+
427+
You can optionally configure the server to pause accepting new
428+
connections once the connection limit is reached. In this case, it will
429+
pause the underlying server and no longer process any new connections at
430+
all, thus also no longer closing any excessive connections.
431+
The underlying operating system is responsible for keeping a backlog of
432+
pending connections until its limit is reached, at which point it will
433+
start rejecting further connections.
434+
Once the server is below the connection limit, it will continue consuming
435+
connections from the backlog and will process any outstanding data on
436+
each connection.
437+
This mode may be useful for some protocols that are designed to wait for
438+
a response message (such as HTTP), but may be less useful for other
439+
protocols that demand immediate responses (such as a "welcome" message in
440+
an interactive chat).
441+
442+
```php
443+
$server = new LimitingServer($server, 100, true);
444+
$server->on('connection', function (ConnectionInterface $connection) {
445+
$connection->write('hello there!' . PHP_EOL);
446+
447+
});
448+
```
449+
450+
#### getConnections()
451+
452+
The `getConnections(): ConnectionInterface[]` method can be used to
453+
return an array with all currently active connections.
454+
455+
```php
456+
foreach ($server->getConnection() as $connection) {
457+
$connection->write('Hi!');
458+
}
459+
```
460+
381461
### ConnectionInterface
382462

383463
The `ConnectionInterface` is used to represent any incoming connection.

examples/02-chat-server.php

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use React\Socket\Server;
1616
use React\Socket\ConnectionInterface;
1717
use React\Socket\SecureServer;
18+
use React\Socket\LimitingServer;
1819

1920
require __DIR__ . '/../vendor/autoload.php';
2021

@@ -29,17 +30,11 @@
2930
));
3031
}
3132

32-
$clients = array();
33-
34-
$server->on('connection', function (ConnectionInterface $client) use (&$clients) {
35-
// keep a list of all connected clients
36-
$clients []= $client;
37-
$client->on('close', function() use ($client, &$clients) {
38-
unset($clients[array_search($client, $clients)]);
39-
});
33+
$server = new LimitingServer($server, null);
4034

35+
$server->on('connection', function (ConnectionInterface $client) use ($server) {
4136
// whenever a new message comes in
42-
$client->on('data', function ($data) use ($client, &$clients) {
37+
$client->on('data', function ($data) use ($client, $server) {
4338
// remove any non-word characters (just for the demo)
4439
$data = trim(preg_replace('/[^\w\d \.\,\-\!\?]/u', '', $data));
4540

@@ -50,8 +45,8 @@
5045

5146
// prefix with client IP and broadcast to all connected clients
5247
$data = $client->getRemoteAddress() . ': ' . $data . PHP_EOL;
53-
foreach ($clients as $client) {
54-
$client->write($data);
48+
foreach ($server->getConnections() as $connection) {
49+
$connection->write($data);
5550
}
5651
});
5752
});

src/LimitingServer.php

Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
<?php
2+
3+
namespace React\Socket;
4+
5+
use Evenement\EventEmitter;
6+
7+
/**
8+
* The `LimitingServer` decorator wraps a given `ServerInterface` and is responsible
9+
* for limiting and keeping track of open connections to this server instance.
10+
*
11+
* Whenever the underlying server emits a `connection` event, it will check its
12+
* limits and then either
13+
* - keep track of this connection by adding it to the list of
14+
* open connections and then forward the `connection` event
15+
* - or reject (close) the connection when its limits are exceeded and will
16+
* forward an `error` event instead.
17+
*
18+
* Whenever a connection closes, it will remove this connection from the list of
19+
* open connections.
20+
*
21+
* ```php
22+
* $server = new LimitingServer($server, 100);
23+
* $server->on('connection', function (ConnectionInterface $connection) {
24+
* $connection->write('hello there!' . PHP_EOL);
25+
* …
26+
* });
27+
* ```
28+
*
29+
* See also the `ServerInterface` for more details.
30+
*
31+
* @see ServerInterface
32+
* @see ConnectionInterface
33+
*/
34+
class LimitingServer extends EventEmitter implements ServerInterface
35+
{
36+
private $connections = array();
37+
private $server;
38+
private $limit;
39+
40+
private $pauseOnLimit = false;
41+
private $autoPaused = false;
42+
private $manuPaused = false;
43+
44+
/**
45+
* Instantiates a new LimitingServer.
46+
*
47+
* You have to pass a maximum number of open connections to ensure
48+
* the server will automatically reject (close) connections once this limit
49+
* is exceeded. In this case, it will emit an `error` event to inform about
50+
* this and no `connection` event will be emitted.
51+
*
52+
* ```php
53+
* $server = new LimitingServer($server, 100);
54+
* $server->on('connection', function (ConnectionInterface $connection) {
55+
* $connection->write('hello there!' . PHP_EOL);
56+
* …
57+
* });
58+
* ```
59+
*
60+
* You MAY pass a `null` limit in order to put no limit on the number of
61+
* open connections and keep accepting new connection until you run out of
62+
* operating system resources (such as open file handles). This may be
63+
* useful it you do not want to take care of applying a limit but still want
64+
* to use the `getConnections()` method.
65+
*
66+
* You can optionally configure the server to pause accepting new
67+
* connections once the connection limit is reached. In this case, it will
68+
* pause the underlying server and no longer process any new connections at
69+
* all, thus also no longer closing any excessive connections.
70+
* The underlying operating system is responsible for keeping a backlog of
71+
* pending connections until its limit is reached, at which point it will
72+
* start rejecting further connections.
73+
* Once the server is below the connection limit, it will continue consuming
74+
* connections from the backlog and will process any outstanding data on
75+
* each connection.
76+
* This mode may be useful for some protocols that are designed to wait for
77+
* a response message (such as HTTP), but may be less useful for other
78+
* protocols that demand immediate responses (such as a "welcome" message in
79+
* an interactive chat).
80+
*
81+
* ```php
82+
* $server = new LimitingServer($server, 100, true);
83+
* $server->on('connection', function (ConnectionInterface $connection) {
84+
* $connection->write('hello there!' . PHP_EOL);
85+
* …
86+
* });
87+
* ```
88+
*
89+
* @param ServerInterface $server
90+
* @param int|null $connectionLimit
91+
* @param bool $pauseOnLimit
92+
*/
93+
public function __construct(ServerInterface $server, $connectionLimit, $pauseOnLimit = false)
94+
{
95+
$this->server = $server;
96+
$this->limit = $connectionLimit;
97+
if ($connectionLimit !== null) {
98+
$this->pauseOnLimit = $pauseOnLimit;
99+
}
100+
101+
$this->server->on('connection', array($this, 'handleConnection'));
102+
$this->server->on('error', array($this, 'handleError'));
103+
}
104+
105+
/**
106+
* Returns an array with all currently active connections
107+
*
108+
* ```php
109+
* foreach ($server->getConnection() as $connection) {
110+
* $connection->write('Hi!');
111+
* }
112+
* ```
113+
*
114+
* @return ConnectionInterface[]
115+
*/
116+
public function getConnections()
117+
{
118+
return $this->connections;
119+
}
120+
121+
public function getAddress()
122+
{
123+
return $this->server->getAddress();
124+
}
125+
126+
public function pause()
127+
{
128+
if (!$this->manuPaused) {
129+
$this->manuPaused = true;
130+
131+
if (!$this->autoPaused) {
132+
$this->server->pause();
133+
}
134+
}
135+
}
136+
137+
public function resume()
138+
{
139+
if ($this->manuPaused) {
140+
$this->manuPaused = false;
141+
142+
if (!$this->autoPaused) {
143+
$this->server->resume();
144+
}
145+
}
146+
}
147+
148+
public function close()
149+
{
150+
$this->server->close();
151+
}
152+
153+
/** @internal */
154+
public function handleConnection(ConnectionInterface $connection)
155+
{
156+
// close connection if limit exceeded
157+
if ($this->limit !== null && count($this->connections) >= $this->limit) {
158+
$this->handleError(new \OverflowException('Connection closed because server reached connection limit'));
159+
$connection->close();
160+
return;
161+
}
162+
163+
$this->connections[] = $connection;
164+
$that = $this;
165+
$connection->on('close', function () use ($that, $connection) {
166+
$that->handleDisconnection($connection);
167+
});
168+
169+
// pause accepting new connections if limit exceeded
170+
if ($this->pauseOnLimit && !$this->autoPaused && count($this->connections) >= $this->limit) {
171+
$this->autoPaused = true;
172+
173+
if (!$this->manuPaused) {
174+
$this->server->pause();
175+
}
176+
}
177+
178+
$this->emit('connection', array($connection));
179+
}
180+
181+
/** @internal */
182+
public function handleDisconnection(ConnectionInterface $connection)
183+
{
184+
unset($this->connections[array_search($connection, $this->connections)]);
185+
186+
// continue accepting new connection if below limit
187+
if ($this->autoPaused && count($this->connections) < $this->limit) {
188+
$this->autoPaused = false;
189+
190+
if (!$this->manuPaused) {
191+
$this->server->resume();
192+
}
193+
}
194+
}
195+
196+
/** @internal */
197+
public function handleError(\Exception $error)
198+
{
199+
$this->emit('error', array($error));
200+
}
201+
}

0 commit comments

Comments
 (0)