Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions src/main/php/io/streams/SequenceInputStream.class.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
<?php namespace io\streams;

use lang\IllegalArgumentException;

/**
* Reads through all given input streams
*
* @test io.unittest.SequenceInputStreamTest
*/
class SequenceInputStream implements InputStream {
private $streams, $current;

/**
* Creates a new instance
*
* @param iterable|io.streams.InputStream... $sources
* @throws lang.IllegalArgumentException if streams are empty
*/
public function __construct(... $sources) {
$this->streams= $this->iterator($sources);
if (!$this->streams->valid()) {
throw new IllegalArgumentException('Streams may not be empty');
}

$this->current= $this->streams->current();
}

/** Creates an iterator from the given arguments */
private function iterator($sources) {
foreach ($sources as $source) {
if ($source instanceof InputStream) {
yield $source;
} else {
yield from $source;
}
}
}

/** @return int */
public function available() {
do {
if ($r= $this->current->available()) return $r;

// No more data available on current stream, close and select next
$this->current->close();
$this->streams->next();
} while ($this->streams->valid() && ($this->current= $this->streams->current()));

return 0;
}

/**
* Reads up to the specified number of bytes
*
* @param int $bytes
* @return string
*/
public function read($bytes= 8192) {
do {
if ('' !== ($r= $this->current->read($bytes))) return $r;

// EOF from current stream, close and select next
$this->current->close();
$this->streams->next();
} while ($this->streams->valid() && ($this->current= $this->streams->current()));

return '';
}

/** @return void */
public function close() {
while ($this->streams->valid()) {
$this->streams->current()->close();
$this->streams->next();
}
}

/** Ensure streams are closed */
public function __destruct() {
$this->close();
}
}
113 changes: 113 additions & 0 deletions src/test/php/io/unittest/SequenceInputStreamTest.class.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
<?php namespace io\unittest;

use ArrayIterator;
use io\streams\{InputStream, MemoryInputStream, SequenceInputStream};
use lang\IllegalArgumentException;
use test\{Assert, Expect, Test};

class SequenceInputStreamTest {

/** Drains a stream */
private function drain(InputStream $stream): array {
$r= [];
while ($available= $stream->available()) {
$r[]= [$available, $stream->read()];
}
$r[]= [$stream->available(), $stream->read()];
return $r;
}

/** Creates a memory input stream with a `closed` property */
private function closeable(string $input): MemoryInputStream {
return new class($input) extends MemoryInputStream {
public $closed= false;
public function close() { $this->closed= true; }
};
}

#[Test, Expect(IllegalArgumentException::class)]
public function without_arguments() {
new SequenceInputStream();
}

#[Test, Expect(IllegalArgumentException::class)]
public function with_empty_array() {
new SequenceInputStream([]);
}

#[Test]
public function drain_one() {
$fixture= new SequenceInputStream(new MemoryInputStream('Test'));
Assert::equals([[4, 'Test'], [0, '']], $this->drain($fixture));
}

#[Test]
public function drain_multiple() {
$fixture= new SequenceInputStream(
new MemoryInputStream('One'),
new MemoryInputStream('Two')
);
Assert::equals([[3, 'One'], [3, 'Two'], [0, '']], $this->drain($fixture));
}

#[Test]
public function drain_array() {
$fixture= new SequenceInputStream([
new MemoryInputStream('One'),
new MemoryInputStream('Two')
]);
Assert::equals([[3, 'One'], [3, 'Two'], [0, '']], $this->drain($fixture));
}

#[Test]
public function drain_iterator() {
$fixture= new SequenceInputStream(new ArrayIterator([
yield new MemoryInputStream('One'),
yield new MemoryInputStream('Two')
]));
Assert::equals([[3, 'One'], [3, 'Two'], [0, '']], $this->drain($fixture));
}

#[Test]
public function drain_generator() {
$streams= function() {
yield new MemoryInputStream('One');
yield new MemoryInputStream('Two');
};
$fixture= new SequenceInputStream($streams());
Assert::equals([[3, 'One'], [3, 'Two'], [0, '']], $this->drain($fixture));
}

#[Test]
public function using_only_read() {
$fixture= new SequenceInputStream(
new MemoryInputStream('One'),
new MemoryInputStream('Two')
);

Assert::equals('One', $fixture->read());
Assert::equals('Two', $fixture->read());
Assert::equals('', $fixture->read());
}

#[Test]
public function close_closes_all_streams() {
$one= $this->closeable('One');
$two= $this->closeable('Two');
$fixture= new SequenceInputStream($one, $two);
$fixture->close();

Assert::equals([true, true], [$one->closed, $two->closed]);
}

#[Test]
public function streams_closed_when_drained() {
$one= $this->closeable('One');
$two= $this->closeable('Two');
$fixture= new SequenceInputStream($one, $two);
$this->drain($fixture);
$fixture->close();

Assert::equals([true, true], [$one->closed, $two->closed]);
}
}