Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 114 additions & 0 deletions src/main/php/io/streams/SpooledInputStream.class.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
<?php namespace io\streams;

use io\IOException;
use lang\Environment;

/**
* Seekable input stream which spools to a temporary file
*
* @test io.unittest.SpooledInputStreamTest
*/
class SpooledInputStream implements InputStream, Seekable {
const PREFIX= 'spooled-';

private $in, $buffer;
private $end= null;

/**
* Creates a new spooled input stream
*
* @param io.streams.InputStream $in
* @param ?string|io.Path|io.Folder|io.File|io.streams.Buffer $temp
*/
public function __construct(InputStream $in, $temp= null) {
$this->in= $in;
$this->buffer= $temp instanceof Buffer ? $temp : new Buffer($temp ?? Environment::tempDir(), 0);
}

/** @return int */
public function available() {
return null === $this->end
? $this->in->available() + $this->buffer->size() - $this->buffer->tell()
: $this->end - $this->buffer->tell()
;
}

/**
* Reads, spooling the data to the file
*
* @param int $limit
* @return string
*/
public function read($limit= 8192) {

// Read from the underlying stream if we're at the end of the file
if (null === $this->end && $this->buffer->tell() >= $this->buffer->size()) {
$chunk= $this->in->read($limit);
if ('' === $chunk) {
$this->end= $this->buffer->tell();
} else {
$this->buffer->write($chunk);
}
} else {
$chunk= $this->buffer->read($limit);
}

return $chunk;
}

/** @return int */
private function drain() {
$this->buffer->seek(0, SEEK_END);
while ('' !== ($chunk= $this->in->read())) {
$this->buffer->write($chunk);
}
return $this->buffer->tell();
}

/**
* Seeks to a given offset
*
* @param int $offset
* @param int $whence SEEK_SET, SEEK_CUR or SEEK_END
* @return void
* @throws io.IOException
*/
public function seek($offset, $whence= SEEK_SET) {
switch ($whence) {
case SEEK_SET: $position= $offset; break;
case SEEK_CUR: $position= $this->buffer->tell() + $offset; break;
case SEEK_END: $position= ($this->end??= $this->drain()) + $offset; break;
default: $position= -1; break;
}

if ($position < 0) {
throw new IOException("Seek error, position {$offset} in mode {$whence}");
}

// Read from underlying stream when seeking forward, clamping on EOF.
if (null === $this->end && ($fill= ($position - $this->buffer->size())) > 0) {
$this->buffer->seek(0, SEEK_END);
while ($fill > 0 && $this->in->available()) {
$chunk= $this->in->read($fill);
$this->buffer->write($chunk);
$fill-= strlen($chunk);
}
$fill && $this->end= $this->buffer->tell();
}

$this->buffer->seek(min($this->end ?? $this->buffer->size(), $position), SEEK_SET);
}

/** @return int */
public function tell() { return $this->buffer->tell(); }

/** @return void */
public function close() {
$this->buffer->close();
}

/** Ensures close() is called */
public function __destruct() {
$this->close();
}
}
183 changes: 183 additions & 0 deletions src/test/php/io/unittest/SpooledInputStreamTest.class.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
<?php namespace io\unittest;

use io\streams\{SpooledInputStream, MemoryInputStream, Streams};
use io\{TempFile, Folder, Path, IOException};
use lang\Environment;
use test\{Assert, Expect, Test, Values};

class SpooledInputStreamTest {
const BYTES= 'Test success';

/** @param ?string|io.Path|io.Folder|io.File $temp */
private function newFixture($temp= null): SpooledInputStream {
return new SpooledInputStream(new MemoryInputStream(self::BYTES), $temp);
}

#[Test]
public function can_create() {
$this->newFixture();
}

#[Test]
public function reading() {
$stream= $this->newFixture();

Assert::equals(0, $stream->tell());
Assert::equals(strlen(self::BYTES), $stream->available());
Assert::equals(self::BYTES, Streams::readAll($stream));
}

#[Test]
public function reading_after_seeking() {
$stream= $this->newFixture();
$stream->seek(5);
$stream->seek(0);

Assert::equals(0, $stream->tell());
Assert::equals(strlen(self::BYTES), $stream->available());
Assert::equals(self::BYTES, Streams::readAll($stream));
}

#[Test, Values([0, 1, 5])]
public function seeking_forward($offset) {
$stream= $this->newFixture();
$stream->seek($offset, SEEK_SET);

Assert::equals($offset, $stream->tell());
Assert::equals(strlen(self::BYTES) - $offset, $stream->available());
Assert::equals(substr(self::BYTES, $offset), Streams::readAll($stream));
}

#[Test, Values([0, -1, -5])]
public function seeking_to_end($offset) {
$stream= $this->newFixture();
$stream->seek($offset, SEEK_END);

Assert::equals(strlen(self::BYTES) + $offset, $stream->tell());
Assert::equals(-$offset, $stream->available());
Assert::equals(substr(self::BYTES, strlen(self::BYTES) + $offset), Streams::readAll($stream));
}

#[Test, Values([0, 1, -1])]
public function seeking_relative($offset) {
$stream= $this->newFixture();
$stream->seek(5, SEEK_SET);
$stream->seek($offset, SEEK_CUR);

Assert::equals(5 + $offset, $stream->tell());
Assert::equals(strlen(self::BYTES) - 5 - $offset, $stream->available());
Assert::equals(substr(self::BYTES, 5 + $offset), Streams::readAll($stream));
}

#[Test, Values([0, 1, 5])]
public function seeking_forward_set($offset) {
$stream= $this->newFixture();
$stream->seek($offset, SEEK_SET);

Assert::equals($offset, $stream->tell());
Assert::equals(strlen(self::BYTES) - $offset, $stream->available());
Assert::equals(substr(self::BYTES, $offset), Streams::readAll($stream));
}

#[Test, Values([0, 1, 5])]
public function seeking_forward_cur($offset) {
$stream= $this->newFixture();
$stream->seek(strlen(self::BYTES), SEEK_SET);
$stream->seek(-$offset, SEEK_CUR);

Assert::equals(strlen(self::BYTES) - $offset, $stream->tell());
Assert::equals($offset, $stream->available());
Assert::equals(substr(self::BYTES, strlen(self::BYTES) - $offset), Streams::readAll($stream));
}

#[Test, Values([0, 1, 5])]
public function seeking_forward_triggers_read($offset) {
$stream= $this->newFixture();
$stream->seek(5, SEEK_SET);
$stream->seek(strlen(self::BYTES) - $offset, SEEK_SET);

Assert::equals(strlen(self::BYTES) - $offset, $stream->tell());
Assert::equals($offset, $stream->available());
Assert::equals(substr(self::BYTES, strlen(self::BYTES) - $offset), Streams::readAll($stream));
}

#[Test, Values([0, 1, 5])]
public function seeking_back_to_offset_after_reading_until_end($offset) {
$stream= $this->newFixture();
while ($stream->available()) {
$stream->read();
}
$stream->seek($offset, SEEK_SET);

Assert::equals($offset, $stream->tell());
Assert::equals(strlen(self::BYTES) - $offset, $stream->available());
Assert::equals(substr(self::BYTES, $offset), Streams::readAll($stream));
}

#[Test, Values([0, -1, -5])]
public function seeking_to_end_after_reading_until_end($offset) {
$stream= $this->newFixture();
while ($stream->available()) {
$stream->read();
}
$stream->seek($offset, SEEK_END);

Assert::equals(strlen(self::BYTES) + $offset, $stream->tell());
Assert::equals(-$offset, $stream->available());
Assert::equals(substr(self::BYTES, strlen(self::BYTES) + $offset), Streams::readAll($stream));
}

#[Test, Values([0, -1, -5])]
public function seeking_to_end_after_seeking_relative($offset) {
$stream= $this->newFixture();
$stream->seek(5, SEEK_SET);
$stream->seek($offset, SEEK_END);

Assert::equals(strlen(self::BYTES) + $offset, $stream->tell());
Assert::equals(-$offset, $stream->available());
Assert::equals(substr(self::BYTES, strlen(self::BYTES) + $offset), Streams::readAll($stream));
}

#[Test]
public function close_can_be_called_twice() {
$stream= $this->newFixture();

$stream->close();
$stream->close();
}

#[Test, Expect(IOException::class), Values([SEEK_SET, SEEK_CUR])]
public function cannot_seek_before_beginning_of_file($whence) {
$this->newFixture()->seek(-1, $whence);
}

#[Test, Expect(IOException::class)]
public function cannot_seek_with_invalid_whence() {
$this->newFixture()->seek(0, 6100);
}

#[Test]
public function position_after_seek_error() {
$stream= $this->newFixture();
$stream->read(4);

Assert::throws(IOException::class, fn() => $stream->seek(-1));
Assert::equals(4, $stream->tell());
}

#[Test]
public function position_after_seek_set_past_end() {
$stream= $this->newFixture();
$stream->seek(strlen(self::BYTES) + 1, SEEK_SET);

Assert::equals(strlen(self::BYTES), $stream->tell());
}

#[Test]
public function position_after_seek_end_past_end() {
$stream= $this->newFixture();
$stream->seek(1, SEEK_END);

Assert::equals(strlen(self::BYTES), $stream->tell());
}
}
Loading