Skip to content

Commit 7662852

Browse files
authored
Feature: chunked encoding (#58)
1 parent 19dd5ac commit 7662852

File tree

5 files changed

+412
-5
lines changed

5 files changed

+412
-5
lines changed

README.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ Interesting events emitted by Request:
2424
Interesting events emitted by Response:
2525

2626
* `data`: Passes a chunk of the response body as first argument and a Response
27-
object itself as second argument.
27+
object itself as second argument. When a response encounters a chunked encoded response it will parse it transparently for the user of `Response` and removing the `Transfer-Encoding` header.
2828
* `error`: An error occurred.
2929
* `end`: The response has been fully received. If an error
3030
occurred, it is passed as first argument.
@@ -55,6 +55,5 @@ $loop->run();
5555
## TODO
5656

5757
* gzip content encoding
58-
* chunked transfer encoding
5958
* keep-alive connections
6059
* following redirections

src/ChunkedStreamDecoder.php

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
<?php
2+
3+
namespace React\HttpClient;
4+
5+
use Evenement\EventEmitterTrait;
6+
use Exception;
7+
use React\Stream\ReadableStreamInterface;
8+
use React\Stream\Util;
9+
use React\Stream\WritableStreamInterface;
10+
11+
/**
12+
* @internal
13+
*/
14+
class ChunkedStreamDecoder implements ReadableStreamInterface
15+
{
16+
const CRLF = "\r\n";
17+
18+
use EventEmitterTrait;
19+
20+
/**
21+
* @var string
22+
*/
23+
protected $buffer = '';
24+
25+
/**
26+
* @var int
27+
*/
28+
protected $remainingLength = 0;
29+
30+
/**
31+
* @var bool
32+
*/
33+
protected $nextChunkIsLength = true;
34+
35+
/**
36+
* @var ReadableStreamInterface
37+
*/
38+
protected $stream;
39+
40+
/**
41+
* @var bool
42+
*/
43+
protected $closed = false;
44+
45+
/**
46+
* @var bool
47+
*/
48+
protected $reachedEnd = false;
49+
50+
/**
51+
* @param ReadableStreamInterface $stream
52+
*/
53+
public function __construct(ReadableStreamInterface $stream)
54+
{
55+
$this->stream = $stream;
56+
$this->stream->on('data', array($this, 'handleData'));
57+
$this->stream->on('end', array($this, 'handleEnd'));
58+
Util::forwardEvents($this->stream, $this, [
59+
'error',
60+
]);
61+
}
62+
63+
/** @internal */
64+
public function handleData($data)
65+
{
66+
$this->buffer .= $data;
67+
68+
do {
69+
$bufferLength = strlen($this->buffer);
70+
$continue = $this->iterateBuffer();
71+
$iteratedBufferLength = strlen($this->buffer);
72+
} while (
73+
$continue &&
74+
$bufferLength !== $iteratedBufferLength &&
75+
$iteratedBufferLength > 0
76+
);
77+
78+
if ($this->buffer === false) {
79+
$this->buffer = '';
80+
}
81+
}
82+
83+
protected function iterateBuffer()
84+
{
85+
if (strlen($this->buffer) <= 1) {
86+
return false;
87+
}
88+
89+
if ($this->nextChunkIsLength) {
90+
$crlfPosition = strpos($this->buffer, static::CRLF);
91+
if ($crlfPosition === false && strlen($this->buffer) > 1024) {
92+
$this->emit('error', [
93+
new Exception('Chunk length header longer then 1024 bytes'),
94+
]);
95+
$this->close();
96+
return false;
97+
}
98+
if ($crlfPosition === false) {
99+
return false; // Chunk header hasn't completely come in yet
100+
}
101+
$this->nextChunkIsLength = false;
102+
$lengthChunk = substr($this->buffer, 0, $crlfPosition);
103+
if (strpos($lengthChunk, ';') !== false) {
104+
list($lengthChunk) = explode(';', $lengthChunk, 2);
105+
}
106+
if (dechex(hexdec($lengthChunk)) !== $lengthChunk) {
107+
$this->emit('error', [
108+
new Exception('Unable to validate "' . $lengthChunk . '" as chunk length header'),
109+
]);
110+
$this->close();
111+
return false;
112+
}
113+
$this->remainingLength = hexdec($lengthChunk);
114+
$this->buffer = substr($this->buffer, $crlfPosition + 2);
115+
return true;
116+
}
117+
118+
if ($this->remainingLength > 0) {
119+
$chunkLength = $this->getChunkLength();
120+
if ($chunkLength === 0) {
121+
return true;
122+
}
123+
$this->emit('data', array(
124+
substr($this->buffer, 0, $chunkLength),
125+
$this
126+
));
127+
$this->remainingLength -= $chunkLength;
128+
$this->buffer = substr($this->buffer, $chunkLength);
129+
return true;
130+
}
131+
132+
$this->nextChunkIsLength = true;
133+
$this->buffer = substr($this->buffer, 2);
134+
135+
if (substr($this->buffer, 0, 3) === "0\r\n") {
136+
$this->reachedEnd = true;
137+
$this->emit('end');
138+
$this->close();
139+
return false;
140+
}
141+
return true;
142+
}
143+
144+
protected function getChunkLength()
145+
{
146+
$bufferLength = strlen($this->buffer);
147+
148+
if ($bufferLength >= $this->remainingLength) {
149+
return $this->remainingLength;
150+
}
151+
152+
return $bufferLength;
153+
}
154+
155+
public function pause()
156+
{
157+
$this->stream->pause();
158+
}
159+
160+
public function resume()
161+
{
162+
$this->stream->resume();
163+
}
164+
165+
public function isReadable()
166+
{
167+
return $this->stream->isReadable();
168+
}
169+
170+
public function pipe(WritableStreamInterface $dest, array $options = array())
171+
{
172+
Util::pipe($this, $dest, $options);
173+
174+
return $dest;
175+
}
176+
177+
public function close()
178+
{
179+
$this->closed = true;
180+
return $this->stream->close();
181+
}
182+
183+
/** @internal */
184+
public function handleEnd()
185+
{
186+
if ($this->closed) {
187+
return;
188+
}
189+
190+
if ($this->buffer === '' && $this->reachedEnd) {
191+
$this->emit('end');
192+
$this->close();
193+
return;
194+
}
195+
196+
$this->emit(
197+
'error',
198+
[
199+
new Exception('Stream ended with incomplete control code')
200+
]
201+
);
202+
$this->close();
203+
}
204+
}

src/Response.php

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,22 @@ public function __construct(DuplexStreamInterface $stream, $protocol, $version,
3333
$this->code = $code;
3434
$this->reasonPhrase = $reasonPhrase;
3535
$this->headers = $headers;
36+
$normalizedHeaders = array_change_key_case($headers, CASE_LOWER);
3637

37-
$stream->on('data', array($this, 'handleData'));
38-
$stream->on('error', array($this, 'handleError'));
39-
$stream->on('end', array($this, 'handleEnd'));
38+
if (isset($normalizedHeaders['transfer-encoding']) && strtolower($normalizedHeaders['transfer-encoding']) === 'chunked') {
39+
$this->stream = new ChunkedStreamDecoder($stream);
40+
41+
foreach ($this->headers as $key => $value) {
42+
if (strcasecmp('transfer-encoding', $key) === 0) {
43+
unset($this->headers[$key]);
44+
break;
45+
}
46+
}
47+
}
48+
49+
$this->stream->on('data', array($this, 'handleData'));
50+
$this->stream->on('error', array($this, 'handleError'));
51+
$this->stream->on('end', array($this, 'handleEnd'));
4052
}
4153

4254
public function getProtocol()

tests/DecodeChunkedStreamTest.php

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
<?php
2+
3+
namespace React\Tests\HttpClient;
4+
5+
use Exception;
6+
use React\HttpClient\ChunkedStreamDecoder;
7+
use React\Stream\ThroughStream;
8+
9+
class DecodeChunkedStreamTest extends TestCase
10+
{
11+
public function provideChunkedEncoding()
12+
{
13+
return [
14+
'data-set-1' => [
15+
["4\r\nWiki\r\n5\r\npedia\r\ne\r\n in\r\n\r\nchunks.\r\n0\r\n\r\n"],
16+
],
17+
'data-set-2' => [
18+
["4\r\nWiki\r\n", "5\r\npedia\r\ne\r\n in\r\n\r\nchunks.\r\n0\r\n\r\n"],
19+
],
20+
'data-set-3' => [
21+
["4\r\nWiki\r\n", "5\r\n", "pedia\r\ne\r\n in\r\n\r\nchunks.\r\n0\r\n\r\n"],
22+
],
23+
'data-set-4' => [
24+
["4\r\nWiki\r\n", "5\r\n", "pedia\r\ne\r\n in\r\n", "\r\nchunks.\r\n0\r\n\r\n"],
25+
],
26+
'data-set-5' => [
27+
["4\r\n", "Wiki\r\n", "5\r\n", "pedia\r\ne\r\n in\r\n", "\r\nchunks.\r\n0\r\n\r\n"],
28+
],
29+
'data-set-6' => [
30+
["4\r\n", "Wiki\r\n", "5\r\n", "pedia\r\ne; foo=[bar,beer,pool,cue,win,won]\r\n", " in\r\n", "\r\nchunks.\r\n0\r\n\r\n"],
31+
],
32+
'header-fields' => [
33+
["4; foo=bar\r\n", "Wiki\r\n", "5\r\n", "pedia\r\ne\r\n", " in\r\n", "\r\nchunks.\r\n", "0\r\n\r\n"],
34+
],
35+
'character-for-charactrr' => [
36+
str_split("4\r\nWiki\r\n5\r\npedia\r\ne\r\n in\r\n\r\nchunks.\r\n0\r\n\r\n"),
37+
],
38+
'extra-newline-in-wiki-character-for-chatacter' => [
39+
str_split("6\r\nWi\r\nki\r\n5\r\npedia\r\ne\r\n in\r\n\r\nchunks.\r\n0\r\n\r\n"),
40+
"Wi\r\nkipedia in\r\n\r\nchunks."
41+
],
42+
'extra-newline-in-wiki' => [
43+
["6\r\nWi\r\n", "ki\r\n5\r\npedia\r\ne\r\n in\r\n\r\nchunks.\r\n0\r\n\r\n"],
44+
"Wi\r\nkipedia in\r\n\r\nchunks."
45+
],
46+
];
47+
}
48+
49+
/**
50+
* @test
51+
* @dataProvider provideChunkedEncoding
52+
*/
53+
public function testChunkedEncoding(array $strings, $expected = "Wikipedia in\r\n\r\nchunks.")
54+
{
55+
$stream = new ThroughStream();
56+
$response = new ChunkedStreamDecoder($stream);
57+
$buffer = '';
58+
$response->on('data', function ($data) use (&$buffer) {
59+
$buffer .= $data;
60+
});
61+
$response->on('error', function (Exception $exception) {
62+
throw $exception;
63+
});
64+
foreach ($strings as $string) {
65+
$stream->write($string);
66+
}
67+
$this->assertSame($expected, $buffer);
68+
}
69+
70+
public function provideInvalidChunkedEncoding()
71+
{
72+
return [
73+
'chunk-body-longer-than-header-suggests' => [
74+
["4\r\nWiwot40n98w3498tw3049nyn039409t34\r\n", "ki\r\n5\r\npedia\r\ne\r\n in\r\n\r\nchunks.\r\n0\r\n\r\n"],
75+
],
76+
'invalid-header-charactrrs' => [
77+
str_split("xyz\r\nWi\r\nki\r\n5\r\npedia\r\ne\r\n in\r\n\r\nchunks.\r\n0\r\n\r\n")
78+
],
79+
'header-chunk-to-long' => [
80+
str_split(str_repeat('a', 2015) . "\r\nWi\r\nki\r\n5\r\npedia\r\ne\r\n in\r\n\r\nchunks.\r\n0\r\n\r\n")
81+
],
82+
];
83+
}
84+
85+
/**
86+
* @test
87+
* @dataProvider provideInvalidChunkedEncoding
88+
* @expectedException Exception
89+
*/
90+
public function testInvalidChunkedEncoding(array $strings)
91+
{
92+
$stream = new ThroughStream();
93+
$response = new ChunkedStreamDecoder($stream);
94+
$response->on('error', function (Exception $exception) {
95+
throw $exception;
96+
});
97+
foreach ($strings as $string) {
98+
$stream->write($string);
99+
}
100+
}
101+
102+
public function testHandleEnd()
103+
{
104+
$ended = false;
105+
$stream = new ThroughStream();
106+
$response = new ChunkedStreamDecoder($stream);
107+
$response->on('end', function () use (&$ended) {
108+
$ended = true;
109+
});
110+
111+
$stream->write("4\r\nWiki\r\n0\r\n\r\n");
112+
113+
$this->assertTrue($ended);
114+
}
115+
116+
public function testHandleEndIncomplete()
117+
{
118+
$exception = null;
119+
$stream = new ThroughStream();
120+
$response = new ChunkedStreamDecoder($stream);
121+
$response->on('error', function ($e) use (&$exception) {
122+
$exception = $e;
123+
});
124+
125+
$stream->end("4\r\nWiki");
126+
127+
$this->assertInstanceOf('Exception', $exception);
128+
}
129+
130+
public function testHandleEndTrailers()
131+
{
132+
$ended = false;
133+
$stream = new ThroughStream();
134+
$response = new ChunkedStreamDecoder($stream);
135+
$response->on('end', function () use (&$ended) {
136+
$ended = true;
137+
});
138+
139+
$stream->write("4\r\nWiki\r\n0\r\nabc: def\r\nghi: klm\r\n\r\n");
140+
141+
$this->assertTrue($ended);
142+
}
143+
}

0 commit comments

Comments
 (0)