Skip to content

Commit 3113c2b

Browse files
committed
Forward compatibility with Stream v1.0 and v0.7
1 parent 7b5d8cc commit 3113c2b

File tree

6 files changed

+62
-22
lines changed

6 files changed

+62
-22
lines changed

composer.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@
88
"evenement/evenement": "~2.0|~1.0",
99
"react/dns": "0.4.*|0.3.*",
1010
"react/event-loop": "0.4.*|0.3.*",
11-
"react/stream": "^0.6 || ^0.5 || ^0.4.5",
11+
"react/stream": "^1.0 || ^0.7 || ^0.6 || ^0.5 || ^0.4.5",
1212
"react/promise": "^2.1 || ^1.2",
1313
"react/promise-timer": "~1.0"
1414
},
1515
"require-dev": {
1616
"clue/block-react": "^1.1",
1717
"phpunit/phpunit": "~4.8",
18-
"react/stream": "^0.6"
18+
"react/stream": "^1.0 || ^0.7 || ^0.6"
1919
},
2020
"autoload": {
2121
"psr-4": {

src/Connection.php

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@
22

33
namespace React\Socket;
44

5-
use React\Stream\Stream;
6-
use React\EventLoop\LoopInterface;
75
use Evenement\EventEmitter;
6+
use React\EventLoop\LoopInterface;
7+
use React\Stream\DuplexResourceStream;
8+
use React\Stream\Stream;
89
use React\Stream\Util;
910
use React\Stream\WritableStreamInterface;
1011

@@ -35,23 +36,33 @@ class Connection extends EventEmitter implements ConnectionInterface
3536

3637
public function __construct($resource, LoopInterface $loop)
3738
{
38-
$this->input = new Stream($resource, $loop);
39-
$this->stream = $resource;
40-
41-
Util::forwardEvents($this->input, $this, array('data', 'end', 'error', 'close', 'pipe', 'drain'));
42-
43-
$this->input->on('close', array($this, 'close'));
44-
4539
// PHP < 5.6.8 suffers from a buffer indicator bug on secure TLS connections
4640
// as a work-around we always read the complete buffer until its end.
4741
// The buffer size is limited due to TCP/IP buffers anyway, so this
4842
// should not affect usage otherwise.
4943
// See https://bugs.php.net/bug.php?id=65137
5044
// https://bugs.php.net/bug.php?id=41631
5145
// https://github.com/reactphp/socket-client/issues/24
52-
if (version_compare(PHP_VERSION, '5.6.8', '<')) {
53-
$this->input->bufferSize = null;
46+
$clearCompleteBuffer = (version_compare(PHP_VERSION, '5.6.8', '<'));
47+
48+
// @codeCoverageIgnoreStart
49+
if (class_exists('React\Stream\Stream')) {
50+
// legacy react/stream < 0.7 requires additional buffer property
51+
$this->input = new Stream($resource, $loop);
52+
if ($clearCompleteBuffer) {
53+
$this->input->bufferSize = null;
54+
}
55+
} else {
56+
// preferred react/stream >= 0.7 accepts buffer parameter
57+
$this->input = new DuplexResourceStream($resource, $loop, $clearCompleteBuffer ? -1 : null);
5458
}
59+
// @codeCoverageIgnoreEnd
60+
61+
$this->stream = $resource;
62+
63+
Util::forwardEvents($this->input, $this, array('data', 'end', 'error', 'close', 'pipe', 'drain'));
64+
65+
$this->input->on('close', array($this, 'close'));
5566
}
5667

5768
public function isReadable()

tests/IntegrationTest.php

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
use React\Socket\Connector;
88
use React\Socket\SecureConnector;
99
use React\Socket\TcpConnector;
10-
use React\Stream\BufferedSink;
1110
use Clue\React\Block;
1211
use React\Socket\DnsConnector;
1312

@@ -28,7 +27,7 @@ public function gettingStuffFromGoogleShouldWork()
2827

2928
$conn->write("GET / HTTP/1.0\r\n\r\n");
3029

31-
$response = Block\await(BufferedSink::createPromise($conn), $loop, self::TIMEOUT);
30+
$response = $this->buffer($conn, $loop, self::TIMEOUT);
3231

3332
$this->assertRegExp('#^HTTP/1\.0#', $response);
3433
}
@@ -47,7 +46,7 @@ public function gettingEncryptedStuffFromGoogleShouldWork()
4746

4847
$conn->write("GET / HTTP/1.0\r\n\r\n");
4948

50-
$response = Block\await(BufferedSink::createPromise($conn), $loop, self::TIMEOUT);
49+
$response = $this->buffer($conn, $loop, self::TIMEOUT);
5150

5251
$this->assertRegExp('#^HTTP/1\.0#', $response);
5352
}
@@ -76,7 +75,7 @@ public function gettingEncryptedStuffFromGoogleShouldWorkIfHostIsResolvedFirst()
7675

7776
$conn->write("GET / HTTP/1.0\r\n\r\n");
7877

79-
$response = Block\await(BufferedSink::createPromise($conn), $loop, self::TIMEOUT);
78+
$response = $this->buffer($conn, $loop, self::TIMEOUT);
8079

8180
$this->assertRegExp('#^HTTP/1\.0#', $response);
8281
}
@@ -94,7 +93,7 @@ public function gettingPlaintextStuffFromEncryptedGoogleShouldNotWork()
9493

9594
$conn->write("GET / HTTP/1.0\r\n\r\n");
9695

97-
$response = Block\await(BufferedSink::createPromise($conn), $loop, self::TIMEOUT);
96+
$response = $this->buffer($conn, $loop, self::TIMEOUT);
9897

9998
$this->assertNotRegExp('#^HTTP/1\.0#', $response);
10099
}

tests/SecureIntegrationTest.php

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
use React\Promise\Promise;
1212
use Evenement\EventEmitterInterface;
1313
use React\Promise\Deferred;
14-
use React\Stream\BufferedSink;
1514
use React\Socket\ConnectionInterface;
1615

1716
class SecureIntegrationTest extends TestCase
@@ -164,7 +163,7 @@ public function testConnectToServerWhichSendsDataWithEndReceivesAllData()
164163
/* @var $client ConnectionInterface */
165164

166165
// await data from client until it closes
167-
$received = Block\await(BufferedSink::createPromise($client), $this->loop, self::TIMEOUT);
166+
$received = $this->buffer($client, $this->loop, self::TIMEOUT);
168167

169168
$this->assertEquals($data, $received);
170169
}

tests/TcpServerTest.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
namespace React\Tests\Socket;
44

55
use React\EventLoop\StreamSelectLoop;
6-
use React\Stream\Stream;
76
use React\Socket\TcpServer;
7+
use React\Stream\DuplexResourceStream;
88

99
class TcpServerTest extends TestCase
1010
{
@@ -139,7 +139,7 @@ public function testLoopWillEndWhenServerIsClosedAfterSingleConnection()
139139
public function testDataWillBeEmittedInMultipleChunksWhenClientSendsExcessiveAmounts()
140140
{
141141
$client = stream_socket_client('tcp://localhost:' . $this->port);
142-
$stream = new Stream($client, $this->loop);
142+
$stream = new DuplexResourceStream($client, $this->loop);
143143

144144
$bytes = 1024 * 1024;
145145
$stream->end(str_repeat('*', $bytes));

tests/TestCase.php

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@
22

33
namespace React\Tests\Socket;
44

5+
use React\Stream\ReadableStreamInterface;
6+
use React\EventLoop\LoopInterface;
7+
use Clue\React\Block;
8+
use React\Promise\Promise;
9+
510
class TestCase extends \PHPUnit_Framework_TestCase
611
{
712
protected function expectCallableExactly($amount)
@@ -49,4 +54,30 @@ protected function createCallableMock()
4954
{
5055
return $this->getMock('React\Tests\Socket\Stub\CallableStub');
5156
}
57+
58+
protected function buffer(ReadableStreamInterface $stream, LoopInterface $loop, $timeout)
59+
{
60+
if (!$stream->isReadable()) {
61+
return '';
62+
}
63+
64+
return Block\await(new Promise(
65+
function ($resolve, $reject) use ($stream) {
66+
$buffer = '';
67+
$stream->on('data', function ($chunk) use (&$buffer) {
68+
$buffer .= $chunk;
69+
});
70+
71+
$stream->on('error', $reject);
72+
73+
$stream->on('close', function () use (&$buffer, $resolve) {
74+
$resolve($buffer);
75+
});
76+
},
77+
function () use ($stream) {
78+
$stream->close();
79+
throw new \RuntimeException();
80+
}
81+
), $loop, $timeout);
82+
}
5283
}

0 commit comments

Comments
 (0)