Skip to content

Commit a540e15

Browse files
committed
Add new BufferStream and PumpStream, update code.
1 parent d0c936a commit a540e15

6 files changed

Lines changed: 366 additions & 100 deletions

File tree

src/BufferStream.php

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
<?php
2+
3+
namespace Swlib\Http;
4+
5+
use Psr\Http\Message\StreamInterface;
6+
use RuntimeException;
7+
8+
/**
9+
* Provides a buffer stream that can be written to to fill a buffer, and read
10+
* from to remove bytes from the buffer.
11+
*
12+
* This stream returns a "hwm" metadata value that tells upstream consumers
13+
* what the configured high water mark of the stream is, or the maximum
14+
* preferred size of the buffer.
15+
*/
16+
class BufferStream implements StreamInterface
17+
{
18+
private $hwm;
19+
private $buffer = '';
20+
21+
/**
22+
* @param int $hwm High water mark, representing the preferred maximum
23+
* buffer size. If the size of the buffer exceeds the high
24+
* water mark, then calls to write will continue to succeed
25+
* but will return false to inform writers to slow down
26+
* until the buffer has been drained by reading from it.
27+
*/
28+
public function __construct($hwm = 16384)
29+
{
30+
$this->hwm = $hwm;
31+
}
32+
33+
public function __toString()
34+
{
35+
return $this->getContents();
36+
}
37+
38+
public function getContents()
39+
{
40+
$buffer = $this->buffer;
41+
$this->buffer = '';
42+
43+
return $buffer;
44+
}
45+
46+
public function close()
47+
{
48+
$this->buffer = '';
49+
}
50+
51+
public function detach()
52+
{
53+
$this->close();
54+
}
55+
56+
public function getSize()
57+
{
58+
return strlen($this->buffer);
59+
}
60+
61+
public function isReadable()
62+
{
63+
return true;
64+
}
65+
66+
public function isWritable()
67+
{
68+
return true;
69+
}
70+
71+
public function isSeekable()
72+
{
73+
return false;
74+
}
75+
76+
public function rewind()
77+
{
78+
$this->seek(0);
79+
}
80+
81+
public function seek($offset, $whence = SEEK_SET)
82+
{
83+
throw new RuntimeException('Cannot seek a BufferStream');
84+
}
85+
86+
public function eof()
87+
{
88+
return strlen($this->buffer) === 0;
89+
}
90+
91+
public function tell()
92+
{
93+
throw new RuntimeException('Cannot determine the position of a BufferStream');
94+
}
95+
96+
/**
97+
* Reads data from the buffer.
98+
*/
99+
public function read($length)
100+
{
101+
$currentLength = strlen($this->buffer);
102+
103+
if ($length >= $currentLength) {
104+
// No need to slice the buffer because we don't have enough data.
105+
$result = $this->buffer;
106+
$this->buffer = '';
107+
} else {
108+
// Slice up the result to provide a subset of the buffer.
109+
$result = substr($this->buffer, 0, $length);
110+
$this->buffer = substr($this->buffer, $length);
111+
}
112+
113+
return $result;
114+
}
115+
116+
/**
117+
* Writes data to the buffer.
118+
*/
119+
public function write($string)
120+
{
121+
$this->buffer .= $string;
122+
123+
// TODO: What should happen here?
124+
if (strlen($this->buffer) >= $this->hwm) {
125+
return false;
126+
}
127+
128+
return strlen($string);
129+
}
130+
131+
public function getMetadata($key = null)
132+
{
133+
if ($key == 'hwm') {
134+
return $this->hwm;
135+
}
136+
137+
return $key ? null : [];
138+
}
139+
}

src/Message.php

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ class Message implements MessageInterface
2121
{
2222

2323
/**@var string */
24-
protected $protocolVersion = '1.1';
24+
protected $protocolVersion;
2525
/**@var Uri */
2626
protected $uri;
2727
/**@var [][]string */
@@ -30,8 +30,9 @@ class Message implements MessageInterface
3030
/**@var StreamInterface */
3131
protected $body;
3232

33-
function __construct(array $headers = [], $body = null)
33+
function __construct(array $headers = [], $body = null, string $protocolVersion = '1.1')
3434
{
35+
$this->withProtocolVersion($protocolVersion);
3536
$this->withAddedHeaders($headers);
3637
if ($body !== '' && $body !== null) {
3738
$this->body = stream_for($body);

src/PumpStream.php

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
<?php
2+
3+
namespace Swlib\Http;
4+
5+
use Exception;
6+
use Psr\Http\Message\StreamInterface;
7+
use RuntimeException;
8+
9+
/**
10+
* Provides a read only stream that pumps data from a PHP callable.
11+
*
12+
* When invoking the provided callable, the PumpStream will pass the amount of
13+
* data requested to read to the callable. The callable can choose to ignore
14+
* this value and return fewer or more bytes than requested. Any extra data
15+
* returned by the provided callable is buffered internally until drained using
16+
* the read() function of the PumpStream. The provided callable MUST return
17+
* false when there is no more data to read.
18+
*/
19+
class PumpStream implements StreamInterface
20+
{
21+
/** @var callable */
22+
private $source;
23+
24+
/** @var int */
25+
private $size;
26+
27+
/** @var int */
28+
private $tellPos = 0;
29+
30+
/** @var array */
31+
private $metadata;
32+
33+
/** @var BufferStream */
34+
private $buffer;
35+
36+
/**
37+
* @param callable $source Source of the stream data. The callable MAY
38+
* accept an integer argument used to control the
39+
* amount of data to return. The callable MUST
40+
* return a string when called, or false on error
41+
* or EOF.
42+
* @param array $options Stream options:
43+
* - metadata: Hash of metadata to use with stream.
44+
* - size: Size of the stream, if known.
45+
*/
46+
public function __construct(callable $source, array $options = [])
47+
{
48+
$this->source = $source;
49+
$this->size = isset($options['size']) ? $options['size'] : null;
50+
$this->metadata = isset($options['metadata']) ? $options['metadata'] : [];
51+
$this->buffer = new BufferStream();
52+
}
53+
54+
public function __toString()
55+
{
56+
try {
57+
return copy_to_string($this);
58+
} catch (Exception $e) {
59+
return '';
60+
}
61+
}
62+
63+
public function close()
64+
{
65+
$this->detach();
66+
}
67+
68+
public function detach()
69+
{
70+
$this->tellPos = false;
71+
$this->source = null;
72+
}
73+
74+
public function getSize()
75+
{
76+
return $this->size;
77+
}
78+
79+
public function tell()
80+
{
81+
return $this->tellPos;
82+
}
83+
84+
public function eof()
85+
{
86+
return !$this->source;
87+
}
88+
89+
public function isSeekable()
90+
{
91+
return false;
92+
}
93+
94+
public function rewind()
95+
{
96+
$this->seek(0);
97+
}
98+
99+
public function seek($offset, $whence = SEEK_SET)
100+
{
101+
throw new RuntimeException('Cannot seek a PumpStream');
102+
}
103+
104+
public function isWritable()
105+
{
106+
return false;
107+
}
108+
109+
public function write($string)
110+
{
111+
throw new RuntimeException('Cannot write to a PumpStream');
112+
}
113+
114+
public function isReadable()
115+
{
116+
return true;
117+
}
118+
119+
public function read($length)
120+
{
121+
$data = $this->buffer->read($length);
122+
$readLen = strlen($data);
123+
$this->tellPos += $readLen;
124+
$remaining = $length - $readLen;
125+
126+
if ($remaining) {
127+
$this->pump($remaining);
128+
$data .= $this->buffer->read($remaining);
129+
$this->tellPos += strlen($data) - $readLen;
130+
}
131+
132+
return $data;
133+
}
134+
135+
public function getContents()
136+
{
137+
$result = '';
138+
while (!$this->eof()) {
139+
$result .= $this->read(1000000);
140+
}
141+
142+
return $result;
143+
}
144+
145+
public function getMetadata($key = null)
146+
{
147+
if (!$key) {
148+
return $this->metadata;
149+
}
150+
151+
return isset($this->metadata[$key]) ? $this->metadata[$key] : null;
152+
}
153+
154+
private function pump($length)
155+
{
156+
if ($this->source) {
157+
do {
158+
$data = call_user_func($this->source, $length);
159+
if ($data === false || $data === null) {
160+
$this->source = null;
161+
return;
162+
}
163+
$this->buffer->write($data);
164+
$length -= strlen($data);
165+
} while ($length > 0);
166+
}
167+
}
168+
}

src/Request.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ class Request extends Message implements RequestInterface
2424
protected $parsedBody;
2525
protected $uploadedFiles = [];
2626

27-
function __construct(string $method = 'GET', $uri = '', array $headers = [], $body = null)
27+
function __construct(string $method = 'GET', $uri = '', array $headers = [], $body = null, string $protocolVersion = '1.1')
2828
{
29-
parent::__construct($headers, $body);
29+
parent::__construct($headers, $body, $protocolVersion);
3030
if (!($uri instanceof UriInterface)) {
3131
$uri = new Uri($uri); // request must has uri
3232
}

src/Response.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ function __construct(
2020
$status = Status::OK,
2121
array $headers = [],
2222
$body = null,
23-
string $version = '1.1',
23+
string $protocolVersion = '1.1',
2424
string $reason = null
2525
) {
2626
$this->statusCode = (int)$status;
@@ -30,7 +30,7 @@ function __construct(
3030
} else {
3131
$this->reasonPhrase = (string)$reason;
3232
}
33-
$this->protocolVersion = $version;
33+
$this->protocolVersion = $protocolVersion;
3434
}
3535

3636
/**

0 commit comments

Comments
 (0)