Skip to content

Commit 7b5d8cc

Browse files
committed
Use stream composition instead of inheritance for internal Connection
1 parent 4102012 commit 7b5d8cc

File tree

7 files changed

+84
-60
lines changed

7 files changed

+84
-60
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -637,9 +637,9 @@ know what you're doing.
637637
Internally, the `SecureServer` has to set the required TLS context options on
638638
the underlying stream resources.
639639
These resources are not exposed through any of the interfaces defined in this
640-
package, but only through the `React\Stream\Stream` class.
640+
package, but only through the internal `Connection` class.
641641
The `TcpServer` class is guaranteed to emit connections that implement
642-
the `ConnectionInterface` and also extend the `Stream` class in order to
642+
the `ConnectionInterface` and uses the internal `Connection` class in order to
643643
expose these underlying resources.
644644
If you use a custom `ServerInterface` and its `connection` event does not
645645
meet this requirement, the `SecureServer` will emit an `error` event and

src/Connection.php

Lines changed: 59 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44

55
use React\Stream\Stream;
66
use React\EventLoop\LoopInterface;
7+
use Evenement\EventEmitter;
8+
use React\Stream\Util;
9+
use React\Stream\WritableStreamInterface;
710

811
/**
912
* The actual connection implementation for ConnectionInterface
@@ -13,7 +16,7 @@
1316
* @see ConnectionInterface
1417
* @internal
1518
*/
16-
class Connection extends Stream implements ConnectionInterface
19+
class Connection extends EventEmitter implements ConnectionInterface
1720
{
1821
/**
1922
* Internal flag whether encryption has been enabled on this connection
@@ -25,9 +28,19 @@ class Connection extends Stream implements ConnectionInterface
2528
*/
2629
public $encryptionEnabled = false;
2730

28-
public function __construct($stream, LoopInterface $loop)
31+
/** @internal */
32+
public $stream;
33+
34+
private $input;
35+
36+
public function __construct($resource, LoopInterface $loop)
2937
{
30-
parent::__construct($stream, $loop);
38+
$this->input = new Stream($resource, $loop);
39+
$this->stream = $resource;
40+
41+
Util::forwardEvents($this->input, $this, array('data', 'end', 'error', 'close', 'pipe', 'drain'));
42+
43+
$this->input->on('close', array($this, 'close'));
3144

3245
// PHP < 5.6.8 suffers from a buffer indicator bug on secure TLS connections
3346
// as a work-around we always read the complete buffer until its end.
@@ -37,10 +50,52 @@ public function __construct($stream, LoopInterface $loop)
3750
// https://bugs.php.net/bug.php?id=41631
3851
// https://github.com/reactphp/socket-client/issues/24
3952
if (version_compare(PHP_VERSION, '5.6.8', '<')) {
40-
$this->bufferSize = null;
53+
$this->input->bufferSize = null;
4154
}
4255
}
4356

57+
public function isReadable()
58+
{
59+
return $this->input->isReadable();
60+
}
61+
62+
public function isWritable()
63+
{
64+
return $this->input->isWritable();
65+
}
66+
67+
public function pause()
68+
{
69+
$this->input->pause();
70+
}
71+
72+
public function resume()
73+
{
74+
$this->input->resume();
75+
}
76+
77+
public function pipe(WritableStreamInterface $dest, array $options = array())
78+
{
79+
return $this->input->pipe($dest, $options);
80+
}
81+
82+
public function write($data)
83+
{
84+
return $this->input->write($data);
85+
}
86+
87+
public function end($data = null)
88+
{
89+
$this->input->end($data);
90+
}
91+
92+
public function close()
93+
{
94+
$this->input->close();
95+
$this->handleClose();
96+
$this->removeAllListeners();
97+
}
98+
4499
public function handleClose()
45100
{
46101
if (!is_resource($this->stream)) {

src/SecureConnector.php

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
namespace React\Socket;
44

55
use React\EventLoop\LoopInterface;
6-
use React\Stream\Stream;
76
use React\Promise;
87

98
final class SecureConnector implements ConnectorInterface
@@ -41,9 +40,9 @@ public function connect($uri)
4140
return $this->connector->connect($uri)->then(function (ConnectionInterface $connection) use ($context, $encryption) {
4241
// (unencrypted) TCP/IP connection succeeded
4342

44-
if (!$connection instanceof Stream) {
43+
if (!$connection instanceof Connection) {
4544
$connection->close();
46-
throw new \UnexpectedValueException('Connection MUST extend Stream in order to access underlying stream resource');
45+
throw new \UnexpectedValueException('Base connector does not use internal Connection class exposing stream resource');
4746
}
4847

4948
// set required SSL/TLS context options

src/SecureServer.php

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
use React\EventLoop\LoopInterface;
77
use React\Socket\TcpServer;
88
use React\Socket\ConnectionInterface;
9-
use React\Stream\Stream;
109

1110
/**
1211
* The `SecureServer` class implements the `ServerInterface` and is responsible
@@ -101,9 +100,9 @@ final class SecureServer extends EventEmitter implements ServerInterface
101100
* Internally, the `SecureServer` has to set the required TLS context options on
102101
* the underlying stream resources.
103102
* These resources are not exposed through any of the interfaces defined in this
104-
* package, but only through the `React\Stream\Stream` class.
103+
* package, but only through the internal `Connection` class.
105104
* The `TcpServer` class is guaranteed to emit connections that implement
106-
* the `ConnectionInterface` and also extend the `Stream` class in order to
105+
* the `ConnectionInterface` and uses the internal `Connection` class in order to
107106
* expose these underlying resources.
108107
* If you use a custom `ServerInterface` and its `connection` event does not
109108
* meet this requirement, the `SecureServer` will emit an `error` event and
@@ -163,8 +162,8 @@ public function close()
163162
/** @internal */
164163
public function handleConnection(ConnectionInterface $connection)
165164
{
166-
if (!$connection instanceof Stream) {
167-
$this->emit('error', array(new \UnexpectedValueException('Connection event MUST emit an instance extending Stream in order to access underlying stream resource')));
165+
if (!$connection instanceof Connection) {
166+
$this->emit('error', array(new \UnexpectedValueException('Base server does not use internal Connection class exposing stream resource')));
168167
$connection->end();
169168
return;
170169
}

tests/FunctionalSecureServerTest.php

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
namespace React\Tests\Socket;
44

55
use React\EventLoop\Factory;
6-
use React\Stream\Stream;
76
use React\Socket\SecureServer;
87
use React\Socket\ConnectionInterface;
98
use React\Socket\TcpServer;
@@ -60,7 +59,7 @@ public function testWritesDataToConnection()
6059
$promise = $connector->connect($server->getAddress());
6160

6261
$local = Block\await($promise, $loop, self::TIMEOUT);
63-
/* @var $local React\Stream\Stream */
62+
/* @var $local ConnectionInterface */
6463

6564
$local->on('data', $this->expectCallableOnceWith('foo'));
6665

@@ -346,7 +345,7 @@ public function testEmitsErrorIfConnectionIsNotSecureHandshake()
346345
$connector = new TcpConnector($loop);
347346
$promise = $connector->connect(str_replace('tls://', '', $server->getAddress()));
348347

349-
$promise->then(function (Stream $stream) {
348+
$promise->then(function (ConnectionInterface $stream) {
350349
$stream->write("GET / HTTP/1.0\r\n\r\n");
351350
});
352351

tests/SecureIntegrationTest.php

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@
77
use React\Socket\SecureServer;
88
use React\Socket\TcpConnector;
99
use React\Socket\SecureConnector;
10-
use React\Stream\Stream;
1110
use Clue\React\Block;
1211
use React\Promise\Promise;
1312
use Evenement\EventEmitterInterface;
1413
use React\Promise\Deferred;
1514
use React\Stream\BufferedSink;
15+
use React\Socket\ConnectionInterface;
1616

1717
class SecureIntegrationTest extends TestCase
1818
{
@@ -49,7 +49,7 @@ public function tearDown()
4949
public function testConnectToServer()
5050
{
5151
$client = Block\await($this->connector->connect($this->address), $this->loop, self::TIMEOUT);
52-
/* @var $client Stream */
52+
/* @var $client ConnectionInterface */
5353

5454
$client->close();
5555
}
@@ -61,7 +61,7 @@ public function testConnectToServerEmitsConnection()
6161
$promiseClient = $this->connector->connect($this->address);
6262

6363
list($_, $client) = Block\awaitAll(array($promiseServer, $promiseClient), $this->loop, self::TIMEOUT);
64-
/* @var $client Stream */
64+
/* @var $client ConnectionInterface */
6565

6666
$client->close();
6767
}
@@ -70,14 +70,14 @@ public function testSendSmallDataToServerReceivesOneChunk()
7070
{
7171
// server expects one connection which emits one data event
7272
$received = new Deferred();
73-
$this->server->on('connection', function (Stream $peer) use ($received) {
73+
$this->server->on('connection', function (ConnectionInterface $peer) use ($received) {
7474
$peer->on('data', function ($chunk) use ($received) {
7575
$received->resolve($chunk);
7676
});
7777
});
7878

7979
$client = Block\await($this->connector->connect($this->address), $this->loop, self::TIMEOUT);
80-
/* @var $client Stream */
80+
/* @var $client ConnectionInterface */
8181

8282
$client->write('hello');
8383

@@ -92,7 +92,7 @@ public function testSendSmallDataToServerReceivesOneChunk()
9292
public function testSendDataWithEndToServerReceivesAllData()
9393
{
9494
$disconnected = new Deferred();
95-
$this->server->on('connection', function (Stream $peer) use ($disconnected) {
95+
$this->server->on('connection', function (ConnectionInterface $peer) use ($disconnected) {
9696
$received = '';
9797
$peer->on('data', function ($chunk) use (&$received) {
9898
$received .= $chunk;
@@ -103,7 +103,7 @@ public function testSendDataWithEndToServerReceivesAllData()
103103
});
104104

105105
$client = Block\await($this->connector->connect($this->address), $this->loop, self::TIMEOUT);
106-
/* @var $client Stream */
106+
/* @var $client ConnectionInterface */
107107

108108
$data = str_repeat('a', 200000);
109109
$client->end($data);
@@ -117,14 +117,14 @@ public function testSendDataWithEndToServerReceivesAllData()
117117
public function testSendDataWithoutEndingToServerReceivesAllData()
118118
{
119119
$received = '';
120-
$this->server->on('connection', function (Stream $peer) use (&$received) {
120+
$this->server->on('connection', function (ConnectionInterface $peer) use (&$received) {
121121
$peer->on('data', function ($chunk) use (&$received) {
122122
$received .= $chunk;
123123
});
124124
});
125125

126126
$client = Block\await($this->connector->connect($this->address), $this->loop, self::TIMEOUT);
127-
/* @var $client Stream */
127+
/* @var $client ConnectionInterface */
128128

129129
$data = str_repeat('d', 200000);
130130
$client->write($data);
@@ -139,12 +139,12 @@ public function testSendDataWithoutEndingToServerReceivesAllData()
139139

140140
public function testConnectToServerWhichSendsSmallDataReceivesOneChunk()
141141
{
142-
$this->server->on('connection', function (Stream $peer) {
142+
$this->server->on('connection', function (ConnectionInterface $peer) {
143143
$peer->write('hello');
144144
});
145145

146146
$client = Block\await($this->connector->connect($this->address), $this->loop, self::TIMEOUT);
147-
/* @var $client Stream */
147+
/* @var $client ConnectionInterface */
148148

149149
// await client to report one "data" event
150150
$receive = $this->createPromiseForEvent($client, 'data', $this->expectCallableOnceWith('hello'));
@@ -156,12 +156,12 @@ public function testConnectToServerWhichSendsSmallDataReceivesOneChunk()
156156
public function testConnectToServerWhichSendsDataWithEndReceivesAllData()
157157
{
158158
$data = str_repeat('b', 100000);
159-
$this->server->on('connection', function (Stream $peer) use ($data) {
159+
$this->server->on('connection', function (ConnectionInterface $peer) use ($data) {
160160
$peer->end($data);
161161
});
162162

163163
$client = Block\await($this->connector->connect($this->address), $this->loop, self::TIMEOUT);
164-
/* @var $client Stream */
164+
/* @var $client ConnectionInterface */
165165

166166
// await data from client until it closes
167167
$received = Block\await(BufferedSink::createPromise($client), $this->loop, self::TIMEOUT);
@@ -172,12 +172,12 @@ public function testConnectToServerWhichSendsDataWithEndReceivesAllData()
172172
public function testConnectToServerWhichSendsDataWithoutEndingReceivesAllData()
173173
{
174174
$data = str_repeat('c', 100000);
175-
$this->server->on('connection', function (Stream $peer) use ($data) {
175+
$this->server->on('connection', function (ConnectionInterface $peer) use ($data) {
176176
$peer->write($data);
177177
});
178178

179179
$client = Block\await($this->connector->connect($this->address), $this->loop, self::TIMEOUT);
180-
/* @var $client Stream */
180+
/* @var $client ConnectionInterface */
181181

182182
// buffer incoming data for 0.1s (should be plenty of time)
183183
$received = '';

tests/TcpServerTest.php

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,6 @@ public function testConnectionWithManyClients()
5555
$this->loop->tick();
5656
}
5757

58-
/**
59-
* @covers React\EventLoop\StreamSelectLoop::tick
60-
* @covers React\Socket\Connection::handleData
61-
*/
6258
public function testDataEventWillNotBeEmittedWhenClientSendsNoData()
6359
{
6460
$client = stream_socket_client('tcp://localhost:'.$this->port);
@@ -72,10 +68,6 @@ public function testDataEventWillNotBeEmittedWhenClientSendsNoData()
7268
$this->loop->tick();
7369
}
7470

75-
/**
76-
* @covers React\EventLoop\StreamSelectLoop::tick
77-
* @covers React\Socket\Connection::handleData
78-
*/
7971
public function testDataWillBeEmittedWithDataClientSends()
8072
{
8173
$client = stream_socket_client('tcp://localhost:'.$this->port);
@@ -91,10 +83,6 @@ public function testDataWillBeEmittedWithDataClientSends()
9183
$this->loop->tick();
9284
}
9385

94-
/**
95-
* @covers React\EventLoop\StreamSelectLoop::tick
96-
* @covers React\Socket\Connection::handleData
97-
*/
9886
public function testDataWillBeEmittedEvenWhenClientShutsDownAfterSending()
9987
{
10088
$client = stream_socket_client('tcp://localhost:' . $this->port);
@@ -110,22 +98,6 @@ public function testDataWillBeEmittedEvenWhenClientShutsDownAfterSending()
11098
$this->loop->tick();
11199
}
112100

113-
public function testDataWillBeFragmentedToBufferSize()
114-
{
115-
$client = stream_socket_client('tcp://localhost:' . $this->port);
116-
117-
fwrite($client, "Hello World!\n");
118-
119-
$mock = $this->expectCallableOnceWith("He");
120-
121-
$this->server->on('connection', function ($conn) use ($mock) {
122-
$conn->bufferSize = 2;
123-
$conn->on('data', $mock);
124-
});
125-
$this->loop->tick();
126-
$this->loop->tick();
127-
}
128-
129101
public function testLoopWillEndWhenServerIsClosed()
130102
{
131103
// explicitly unset server because we already call close()

0 commit comments

Comments
 (0)