Skip to content

Commit ef83358

Browse files
authored
Merge pull request #352 from thekid/feature/sequence-input
Add SequenceInputStream
2 parents ce56679 + 5afca8f commit ef83358

2 files changed

Lines changed: 195 additions & 0 deletions

File tree

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
<?php namespace io\streams;
2+
3+
use lang\IllegalArgumentException;
4+
5+
/**
6+
* Reads through all given input streams
7+
*
8+
* @test io.unittest.SequenceInputStreamTest
9+
*/
10+
class SequenceInputStream implements InputStream {
11+
private $streams, $current;
12+
13+
/**
14+
* Creates a new instance
15+
*
16+
* @param iterable|io.streams.InputStream... $sources
17+
* @throws lang.IllegalArgumentException if streams are empty
18+
*/
19+
public function __construct(... $sources) {
20+
$this->streams= $this->iterator($sources);
21+
if (!$this->streams->valid()) {
22+
throw new IllegalArgumentException('Streams may not be empty');
23+
}
24+
25+
$this->current= $this->streams->current();
26+
}
27+
28+
/** Creates an iterator from the given arguments */
29+
private function iterator($sources) {
30+
foreach ($sources as $source) {
31+
if ($source instanceof InputStream) {
32+
yield $source;
33+
} else {
34+
yield from $source;
35+
}
36+
}
37+
}
38+
39+
/** @return int */
40+
public function available() {
41+
do {
42+
if ($r= $this->current->available()) return $r;
43+
44+
// No more data available on current stream, close and select next
45+
$this->current->close();
46+
$this->streams->next();
47+
} while ($this->streams->valid() && ($this->current= $this->streams->current()));
48+
49+
return 0;
50+
}
51+
52+
/**
53+
* Reads up to the specified number of bytes
54+
*
55+
* @param int $bytes
56+
* @return string
57+
*/
58+
public function read($bytes= 8192) {
59+
do {
60+
if ('' !== ($r= $this->current->read($bytes))) return $r;
61+
62+
// EOF from current stream, close and select next
63+
$this->current->close();
64+
$this->streams->next();
65+
} while ($this->streams->valid() && ($this->current= $this->streams->current()));
66+
67+
return '';
68+
}
69+
70+
/** @return void */
71+
public function close() {
72+
while ($this->streams->valid()) {
73+
$this->streams->current()->close();
74+
$this->streams->next();
75+
}
76+
}
77+
78+
/** Ensure streams are closed */
79+
public function __destruct() {
80+
$this->close();
81+
}
82+
}
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
<?php namespace io\unittest;
2+
3+
use ArrayIterator;
4+
use io\streams\{InputStream, MemoryInputStream, SequenceInputStream};
5+
use lang\IllegalArgumentException;
6+
use test\{Assert, Expect, Test};
7+
8+
class SequenceInputStreamTest {
9+
10+
/** Drains a stream */
11+
private function drain(InputStream $stream): array {
12+
$r= [];
13+
while ($available= $stream->available()) {
14+
$r[]= [$available, $stream->read()];
15+
}
16+
$r[]= [$stream->available(), $stream->read()];
17+
return $r;
18+
}
19+
20+
/** Creates a memory input stream with a `closed` property */
21+
private function closeable(string $input): MemoryInputStream {
22+
return new class($input) extends MemoryInputStream {
23+
public $closed= false;
24+
public function close() { $this->closed= true; }
25+
};
26+
}
27+
28+
#[Test, Expect(IllegalArgumentException::class)]
29+
public function without_arguments() {
30+
new SequenceInputStream();
31+
}
32+
33+
#[Test, Expect(IllegalArgumentException::class)]
34+
public function with_empty_array() {
35+
new SequenceInputStream([]);
36+
}
37+
38+
#[Test]
39+
public function drain_one() {
40+
$fixture= new SequenceInputStream(new MemoryInputStream('Test'));
41+
Assert::equals([[4, 'Test'], [0, '']], $this->drain($fixture));
42+
}
43+
44+
#[Test]
45+
public function drain_multiple() {
46+
$fixture= new SequenceInputStream(
47+
new MemoryInputStream('One'),
48+
new MemoryInputStream('Two')
49+
);
50+
Assert::equals([[3, 'One'], [3, 'Two'], [0, '']], $this->drain($fixture));
51+
}
52+
53+
#[Test]
54+
public function drain_array() {
55+
$fixture= new SequenceInputStream([
56+
new MemoryInputStream('One'),
57+
new MemoryInputStream('Two')
58+
]);
59+
Assert::equals([[3, 'One'], [3, 'Two'], [0, '']], $this->drain($fixture));
60+
}
61+
62+
#[Test]
63+
public function drain_iterator() {
64+
$fixture= new SequenceInputStream(new ArrayIterator([
65+
yield new MemoryInputStream('One'),
66+
yield new MemoryInputStream('Two')
67+
]));
68+
Assert::equals([[3, 'One'], [3, 'Two'], [0, '']], $this->drain($fixture));
69+
}
70+
71+
#[Test]
72+
public function drain_generator() {
73+
$streams= function() {
74+
yield new MemoryInputStream('One');
75+
yield new MemoryInputStream('Two');
76+
};
77+
$fixture= new SequenceInputStream($streams());
78+
Assert::equals([[3, 'One'], [3, 'Two'], [0, '']], $this->drain($fixture));
79+
}
80+
81+
#[Test]
82+
public function using_only_read() {
83+
$fixture= new SequenceInputStream(
84+
new MemoryInputStream('One'),
85+
new MemoryInputStream('Two')
86+
);
87+
88+
Assert::equals('One', $fixture->read());
89+
Assert::equals('Two', $fixture->read());
90+
Assert::equals('', $fixture->read());
91+
}
92+
93+
#[Test]
94+
public function close_closes_all_streams() {
95+
$one= $this->closeable('One');
96+
$two= $this->closeable('Two');
97+
$fixture= new SequenceInputStream($one, $two);
98+
$fixture->close();
99+
100+
Assert::equals([true, true], [$one->closed, $two->closed]);
101+
}
102+
103+
#[Test]
104+
public function streams_closed_when_drained() {
105+
$one= $this->closeable('One');
106+
$two= $this->closeable('Two');
107+
$fixture= new SequenceInputStream($one, $two);
108+
$this->drain($fixture);
109+
$fixture->close();
110+
111+
Assert::equals([true, true], [$one->closed, $two->closed]);
112+
}
113+
}

0 commit comments

Comments
 (0)