Skip to content

Commit e4636e0

Browse files
committed
Add AccountingServer to limit number of open connections
1 parent 027262a commit e4636e0

File tree

4 files changed

+301
-11
lines changed

4 files changed

+301
-11
lines changed

README.md

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ and [`Stream`](https://github.com/reactphp/stream) components.
2121
* [close()](#close)
2222
* [Server](#server)
2323
* [SecureServer](#secureserver)
24+
* [AccountingServer](#accountingserver)
25+
* [getConnections()](#getconnections)
2426
* [ConnectionInterface](#connectioninterface)
2527
* [getRemoteAddress()](#getremoteaddress)
2628
* [getLocalAddress()](#getlocaladdress)
@@ -378,6 +380,52 @@ If you use a custom `ServerInterface` and its `connection` event does not
378380
meet this requirement, the `SecureServer` will emit an `error` event and
379381
then close the underlying connection.
380382

383+
### AccountingServer
384+
385+
The `AccountingServer` decorators wraps a given `ServerInterface` and is responsible
386+
for keeping track of open connections to this server instance.
387+
388+
Whenever the underlying server emits a `connection` event, it will keep track
389+
of this connection by adding it to the list of open connections and then
390+
forward the `connection` event (unless its limits are exceeded).
391+
392+
Whenever a connection closes, it will remove this connection from the list of
393+
open connections.
394+
395+
```php
396+
$server = new AccountingServer($server);
397+
$server->on('connection', function (ConnectionInterface $connection) {
398+
$connection->write('hello there!' . PHP_EOL);
399+
400+
});
401+
```
402+
403+
See also the [second example](examples) for more details.
404+
405+
You can optionally pass a maximum number of open connections to ensure
406+
the server will automatically reject (close) connections once this limit
407+
is exceeded. In this case, it will emit an `error` event to inform about
408+
this and no `connection` event will be emitted.
409+
410+
```php
411+
$server = new AccountingServer($server, 50);
412+
$server->on('connection', function (ConnectionInterface $connection) {
413+
$connection->write('hello there!' . PHP_EOL);
414+
415+
});
416+
```
417+
418+
#### getConnections()
419+
420+
The `getConnections(): ConnectionInterface[]` method can be used to
421+
return an array with all currently active connections.
422+
423+
```php
424+
foreach ($server->getConnection() as $connection) {
425+
$connection->write('Hi!');
426+
}
427+
```
428+
381429
### ConnectionInterface
382430

383431
The `ConnectionInterface` is used to represent any incoming connection.

examples/02-chat-server.php

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use React\Socket\Server;
1616
use React\Socket\ConnectionInterface;
1717
use React\Socket\SecureServer;
18+
use React\Socket\AccountingServer;
1819

1920
require __DIR__ . '/../vendor/autoload.php';
2021

@@ -29,17 +30,11 @@
2930
));
3031
}
3132

32-
$clients = array();
33-
34-
$server->on('connection', function (ConnectionInterface $client) use (&$clients) {
35-
// keep a list of all connected clients
36-
$clients []= $client;
37-
$client->on('close', function() use ($client, &$clients) {
38-
unset($clients[array_search($client, $clients)]);
39-
});
33+
$server = new AccountingServer($server);
4034

35+
$server->on('connection', function (ConnectionInterface $client) use ($server) {
4136
// whenever a new message comes in
42-
$client->on('data', function ($data) use ($client, &$clients) {
37+
$client->on('data', function ($data) use ($client, $server) {
4338
// remove any non-word characters (just for the demo)
4439
$data = trim(preg_replace('/[^\w\d \.\,\-\!\?]/u', '', $data));
4540

@@ -50,8 +45,8 @@
5045

5146
// prefix with client IP and broadcast to all connected clients
5247
$data = $client->getRemoteAddress() . ': ' . $data . PHP_EOL;
53-
foreach ($clients as $client) {
54-
$client->write($data);
48+
foreach ($server->getConnections() as $connection) {
49+
$connection->write($data);
5550
}
5651
});
5752
});

src/AccountingServer.php

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
<?php
2+
3+
namespace React\Socket;
4+
5+
use Evenement\EventEmitter;
6+
7+
/**
8+
* The `AccountingServer` decorators wraps a given `ServerInterface` and is responsible
9+
* for keeping track of open connections to this server instance.
10+
*
11+
* Whenever the underlying server emits a `connection` event, it will keep track
12+
* of this connection by adding it to the list of open connections and then
13+
* forward the `connection` event (unless its limits are exceeded).
14+
*
15+
* Whenever a connection closes, it will remove this connection from the list of
16+
* open connections.
17+
*
18+
* ```php
19+
* $server = new AccountingServer($server);
20+
* $server->on('connection', function (ConnectionInterface $connection) {
21+
* $connection->write('hello there!' . PHP_EOL);
22+
* …
23+
* });
24+
* ```
25+
*
26+
* See also the `ServerInterface` for more details.
27+
*
28+
* @see ServerInterface
29+
* @see ConnectionInterface
30+
*/
31+
class AccountingServer extends EventEmitter implements ServerInterface
32+
{
33+
private $connections = array();
34+
private $server;
35+
private $limit;
36+
37+
/**
38+
* Instantiates a new AccountingServer.
39+
*
40+
* You can optionally pass a maximum number of open connections to ensure
41+
* the server will automatically reject (close) connections once this limit
42+
* is exceeded. In this case, it will emit an `error` event to inform about
43+
* this and no `connection` event will be emitted.
44+
*
45+
* @param ServerInterface $server
46+
* @param null|int $connectionLimit
47+
*/
48+
public function __construct(ServerInterface $server, $connectionLimit = null)
49+
{
50+
$this->server = $server;
51+
$this->limit = $connectionLimit;
52+
53+
$this->server->on('connection', array($this, 'handleConnection'));
54+
$this->server->on('error', array($this, 'handleError'));
55+
}
56+
57+
/**
58+
* Returns an array with all currently active connections
59+
*
60+
* ```php
61+
* foreach ($server->getConnection() as $connection) {
62+
* $connection->write('Hi!');
63+
* }
64+
* ```
65+
*
66+
* @return ConnectionInterface[]
67+
*/
68+
public function getConnections()
69+
{
70+
return $this->connections;
71+
}
72+
73+
public function getAddress()
74+
{
75+
return $this->server->getAddress();
76+
}
77+
78+
public function pause()
79+
{
80+
$this->server->pause();
81+
}
82+
83+
public function resume()
84+
{
85+
$this->server->resume();
86+
}
87+
88+
public function close()
89+
{
90+
$this->server->close();
91+
}
92+
93+
/** @internal */
94+
public function handleConnection(ConnectionInterface $connection)
95+
{
96+
// close connection is limit exceeded
97+
if ($this->limit !== null && count($this->connections) >= $this->limit) {
98+
$this->handleError(new \OverflowException('Connection closed because server reached connection limit'));
99+
$connection->close();
100+
return;
101+
}
102+
103+
$this->connections[] = $connection;
104+
$that = $this;
105+
$connection->on('close', function () use ($that, $connection) {
106+
$that->handleDisconnection($connection);
107+
});
108+
109+
$this->emit('connection', array($connection));
110+
}
111+
112+
/** @internal */
113+
public function handleDisconnection(ConnectionInterface $connection)
114+
{
115+
unset($this->connections[array_search($connection, $this->connections)]);
116+
}
117+
118+
/** @internal */
119+
public function handleError(\Exception $error)
120+
{
121+
$this->emit('error', array($error));
122+
}
123+
}

tests/AccountingServerTest.php

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
<?php
2+
3+
namespace React\Tests\Socket;
4+
5+
use React\Socket\AccountingServer;
6+
use React\Socket\Server;
7+
use React\EventLoop\Factory;
8+
use Clue\React\Block;
9+
10+
class AccountingServerTest extends TestCase
11+
{
12+
public function testA()
13+
{
14+
$server = $this->getMockBuilder('React\Socket\ServerInterface')->getMock();
15+
$server = new AccountingServer($server);
16+
}
17+
18+
public function testGetAddressWillBePassedThroughToTcpServer()
19+
{
20+
$tcp = $this->getMockBuilder('React\Socket\ServerInterface')->getMock();
21+
$tcp->expects($this->once())->method('getAddress')->willReturn('127.0.0.1:1234');
22+
23+
$server = new AccountingServer($tcp);
24+
25+
$this->assertEquals('127.0.0.1:1234', $server->getAddress());
26+
}
27+
28+
public function testPauseWillBePassedThroughToTcpServer()
29+
{
30+
$tcp = $this->getMockBuilder('React\Socket\ServerInterface')->getMock();
31+
$tcp->expects($this->once())->method('pause');
32+
33+
$server = new AccountingServer($tcp);
34+
35+
$server->pause();
36+
}
37+
38+
public function testResumeWillBePassedThroughToTcpServer()
39+
{
40+
$tcp = $this->getMockBuilder('React\Socket\ServerInterface')->getMock();
41+
$tcp->expects($this->once())->method('resume');
42+
43+
$server = new AccountingServer($tcp);
44+
45+
$server->resume();
46+
}
47+
48+
public function testCloseWillBePassedThroughToTcpServer()
49+
{
50+
$tcp = $this->getMockBuilder('React\Socket\ServerInterface')->getMock();
51+
$tcp->expects($this->once())->method('close');
52+
53+
$server = new AccountingServer($tcp);
54+
55+
$server->close();
56+
}
57+
58+
public function testSocketErrorWillBeForwarded()
59+
{
60+
$loop = $this->getMock('React\EventLoop\LoopInterface');
61+
62+
$tcp = new Server(0, $loop);
63+
64+
$server = new AccountingServer($tcp);
65+
66+
$server->on('error', $this->expectCallableOnce());
67+
68+
$tcp->emit('error', array(new \RuntimeException('test')));
69+
}
70+
71+
public function testSocketConnectionWillBeForwarded()
72+
{
73+
$connection = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock();
74+
75+
$loop = $this->getMock('React\EventLoop\LoopInterface');
76+
77+
$tcp = new Server(0, $loop);
78+
79+
$server = new AccountingServer($tcp);
80+
$server->on('connection', $this->expectCallableOnceWith($connection));
81+
$server->on('error', $this->expectCallableNever());
82+
83+
$tcp->emit('connection', array($connection));
84+
85+
$this->assertEquals(array($connection), $server->getConnections());
86+
}
87+
88+
public function testSocketConnectionWillBeClosedOnceLimitIsReached()
89+
{
90+
$first = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock();
91+
$first->expects($this->never())->method('close');
92+
$second = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock();
93+
$second->expects($this->once())->method('close');
94+
95+
$loop = $this->getMock('React\EventLoop\LoopInterface');
96+
97+
$tcp = new Server(0, $loop);
98+
99+
$server = new AccountingServer($tcp, 1);
100+
$server->on('connection', $this->expectCallableOnceWith($first));
101+
$server->on('error', $this->expectCallableOnce());
102+
103+
$tcp->emit('connection', array($first));
104+
$tcp->emit('connection', array($second));
105+
}
106+
107+
public function testSocketDisconnectionWillRemoveFromList()
108+
{
109+
$loop = Factory::create();
110+
111+
$tcp = new Server(0, $loop);
112+
113+
$socket = stream_socket_client('tcp://' . $tcp->getAddress());
114+
fclose($socket);
115+
116+
$server = new AccountingServer($tcp);
117+
$server->on('connection', $this->expectCallableOnce());
118+
$server->on('error', $this->expectCallableNever());
119+
120+
Block\sleep(0.1, $loop);
121+
122+
$this->assertEquals(array(), $server->getConnections());
123+
}
124+
}

0 commit comments

Comments
 (0)