Skip to content

Commit c8a61a8

Browse files
authored
Merge pull request #359 from thekid/feature/filter-streams
Integrate stream filters into io.streams
2 parents 55d7c52 + 6f6c136 commit c8a61a8

File tree

5 files changed

+309
-0
lines changed

5 files changed

+309
-0
lines changed
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
<?php namespace io\streams;
2+
3+
/**
4+
* Input stream that runs input through a given filter
5+
*
6+
* @test io.unittest.FilterInputStreamTest
7+
*/
8+
class FilterInputStream extends FilterStream implements InputStream {
9+
const MODE= STREAM_FILTER_READ;
10+
11+
static function __static() { }
12+
13+
/**
14+
* Creates a new instance
15+
*
16+
* @param io.streams.InputStream $in
17+
* @param string|callable|string[]|callable[]|[:string] $filters
18+
*/
19+
public function __construct(InputStream $in, $filters) {
20+
$this->fd= Streams::readableFd($in);
21+
foreach (is_array($filters) ? $filters : [$filters] as $key => $filter) {
22+
is_array($filter) ? $this->append($key, $filter) : $this->append($filter);
23+
}
24+
}
25+
26+
/**
27+
* Read bytes
28+
*
29+
* @param int $limit default 8192
30+
* @return string
31+
*/
32+
public function read($limit= 8192) {
33+
return fread($this->fd, $limit);
34+
}
35+
36+
/**
37+
* Availably bytes
38+
*
39+
* @return int
40+
*/
41+
public function available() {
42+
return feof($this->fd) ? 0 : 1;
43+
}
44+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
<?php namespace io\streams;
2+
3+
/**
4+
* Output stream that runs input through a given filter
5+
*
6+
* @test io.unittest.FilterOutputStreamTest
7+
*/
8+
class FilterOutputStream extends FilterStream implements OutputStream {
9+
const MODE= STREAM_FILTER_WRITE;
10+
11+
static function __static() { }
12+
13+
/**
14+
* Creates a new instance
15+
*
16+
* @param io.streams.OutputStream $out
17+
* @param string|callable|string[]|callable[]|[:string] $filters
18+
*/
19+
public function __construct(OutputStream $out, $filters= []) {
20+
$this->fd= Streams::writeableFd($out);
21+
foreach (is_array($filters) ? $filters : [$filters] as $key => $filter) {
22+
is_array($filter) ? $this->append($key, $filter) : $this->append($filter);
23+
}
24+
}
25+
26+
/**
27+
* Write bytes
28+
*
29+
* @param string $bytes
30+
* @return void
31+
*/
32+
public function write($bytes) {
33+
fwrite($this->fd, $bytes);
34+
}
35+
36+
/**
37+
* Flush
38+
*
39+
* @return void
40+
*/
41+
public function flush() {
42+
fflush($this->fd);
43+
}
44+
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
<?php namespace io\streams;
2+
3+
use ReflectionFunction, php_user_filter;
4+
use lang\IllegalArgumentException;
5+
6+
/** @see https://www.php.net/manual/en/filters.php */
7+
abstract class FilterStream {
8+
public static $filters= [];
9+
private static $id= 0;
10+
protected $fd;
11+
private $remove= [];
12+
13+
static function __static() {
14+
15+
// Suppress "Declaration should be compatible" in PHP 7.4
16+
stream_filter_register('iostrl.*', get_class(@new class() extends php_user_filter {
17+
public function filter($in, $out, &$consumed, bool $closing): int {
18+
while ($bucket= stream_bucket_make_writeable($in)) {
19+
$consumed+= $bucket->datalen;
20+
$bucket->data= FilterStream::$filters[$this->filtername]($bucket->data);
21+
null === $bucket->data || stream_bucket_append($out, $bucket);
22+
}
23+
return PSFS_PASS_ON;
24+
}
25+
}));
26+
stream_filter_register('iostrf.*', get_class(@new class() extends php_user_filter {
27+
public function filter($in, $out, &$consumed, bool $closing): int {
28+
return FilterStream::$filters[$this->filtername]($in, $out, $consumed, $closing);
29+
}
30+
}));
31+
}
32+
33+
/**
34+
* Appends a filter and returns its handle
35+
*
36+
* @param string|callable $filter
37+
* @param array $parameters
38+
* @return mixed
39+
* @throws lang.IllegalArgumentException
40+
*/
41+
public function append($filter, $parameters= []) {
42+
if (is_string($filter)) {
43+
$name= $filter;
44+
} else {
45+
$f= new ReflectionFunction($filter);
46+
$this->remove[]= $name= (1 === $f->getNumberOfParameters() ? 'iostrl.' : 'iostrf.').(++self::$id);
47+
self::$filters[$name]= $filter;
48+
}
49+
50+
if (!($handle= stream_filter_append($this->fd, $name, static::MODE, $parameters))) {
51+
throw new IllegalArgumentException('Could not append stream filter '.$name);
52+
}
53+
return $handle;
54+
}
55+
56+
/**
57+
* Removes a stream filter
58+
*
59+
* @param mixed $handle
60+
* @return void
61+
* @throws lang.IllegalArgumentException
62+
*/
63+
public function remove($handle) {
64+
if (!stream_filter_remove($handle)) {
65+
throw new IllegalArgumentException('Could not remove stream filter '.$name);
66+
}
67+
}
68+
69+
/**
70+
* Close stream
71+
*
72+
* @return void
73+
*/
74+
public function close() {
75+
fclose($this->fd);
76+
$this->fd= null;
77+
}
78+
79+
/** Ensures stream is closed */
80+
public function __destruct() {
81+
foreach ($this->remove as $filter) {
82+
unset(self::$filters[$filter]);
83+
}
84+
$this->fd && $this->close();
85+
}
86+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
<?php namespace io\unittest;
2+
3+
use io\streams\{FilterInputStream, MemoryInputStream, Streams};
4+
use lang\IllegalStateException;
5+
use test\{Assert, Expect, Test, Values};
6+
7+
class FilterInputStreamTest {
8+
9+
/** Test helper */
10+
private function read($input, $filter) {
11+
return Streams::readAll(new FilterInputStream(new MemoryInputStream($input), $filter));
12+
}
13+
14+
#[Test, Values([['str_rot13', 'string.rot13'], ['base64_encode', 'convert.base64-decode'], ['quoted_printable_encode', 'convert.quoted-printable-decode']])]
15+
public function builtin($encode, $filter) {
16+
Assert::equals('Test', $this->read($encode('Test'), $filter));
17+
}
18+
19+
#[Test]
20+
public function chained() {
21+
Assert::equals('Test', $this->read(base64_encode(str_rot13('Test')), [
22+
'convert.base64-decode',
23+
'string.rot13',
24+
]));
25+
}
26+
27+
#[Test]
28+
public function lambda() {
29+
Assert::equals('test', $this->read('TEST', fn($chunk) => strtolower($chunk)));
30+
}
31+
32+
#[Test, Expect(IllegalStateException::class)]
33+
public function lambda_raising_error() {
34+
$this->read('TEST', function($chunk) {
35+
throw new IllegalStateException('Test');
36+
});
37+
}
38+
39+
#[Test]
40+
public function user_filter() {
41+
Assert::equals('test', $this->read('TEST', function($in, $out, &$consumed, $closing) {
42+
while ($bucket= stream_bucket_make_writeable($in)) {
43+
$consumed+= $bucket->datalen;
44+
$bucket->data= strtolower($bucket->data);
45+
stream_bucket_append($out, $bucket);
46+
}
47+
return PSFS_PASS_ON;
48+
}));
49+
}
50+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
<?php namespace io\unittest;
2+
3+
use io\streams\{FilterOutputStream, MemoryOutputStream};
4+
use test\verify\Runtime;
5+
use test\{Assert, Test, Values};
6+
7+
class FilterOutputStreamTest {
8+
9+
/** Test helper */
10+
private function write($input, $filter) {
11+
$out= new MemoryOutputStream();
12+
13+
$fixture= new FilterOutputStream($out, $filter);
14+
$fixture->write($input);
15+
$fixture->close();
16+
17+
return $out->bytes();
18+
}
19+
20+
#[Test, Values([['str_rot13', 'string.rot13'], ['base64_encode', 'convert.base64-encode'], ['quoted_printable_encode', 'convert.quoted-printable-encode']])]
21+
public function builtin($encode, $filter) {
22+
Assert::equals($encode('Test'), $this->write('Test', $filter));
23+
}
24+
25+
#[Test]
26+
public function chained() {
27+
Assert::equals(base64_encode(str_rot13('Test')), $this->write('Test', [
28+
'string.rot13',
29+
'convert.base64-encode',
30+
]));
31+
}
32+
33+
#[Test]
34+
public function lambda() {
35+
Assert::equals('test', $this->write('TEST', fn($chunk) => strtolower($chunk)));
36+
}
37+
38+
#[Test]
39+
public function user_filter() {
40+
Assert::equals('test', $this->write('TEST', function($in, $out, &$consumed, $closing) {
41+
while ($bucket= stream_bucket_make_writeable($in)) {
42+
$consumed+= $bucket->datalen;
43+
$bucket->data= strtolower($bucket->data);
44+
stream_bucket_append($out, $bucket);
45+
}
46+
return PSFS_PASS_ON;
47+
}));
48+
}
49+
50+
#[Test]
51+
public function append() {
52+
$out= new MemoryOutputStream();
53+
$fixture= new FilterOutputStream($out);
54+
55+
$toupper= $fixture->append('string.toupper');
56+
$rot13= $fixture->append('string.rot13');
57+
$fixture->write('test');
58+
59+
$fixture->remove($rot13);
60+
$fixture->write('e');
61+
62+
$fixture->remove($toupper);
63+
$fixture->write('d');
64+
65+
$fixture->close();
66+
67+
Assert::equals(str_rot13(strtoupper('test')).strtoupper('e').'d', $out->bytes());
68+
}
69+
70+
#[Test]
71+
public function base64_line_length() {
72+
Assert::equals(
73+
"VGhpcyBp\r\ncyBhIHRl\r\nc3QuCg==",
74+
$this->write("This is a test.\n", ['convert.base64-encode' => ['line-length' => 8]])
75+
);
76+
}
77+
78+
#[Test, Runtime(extensions: ['zlib'])]
79+
public function deflate_level() {
80+
Assert::equals(
81+
gzdeflate('This is a test. This also.', 6),
82+
$this->write('This is a test. This also.', ['zlib.deflate' => ['level' => 6]])
83+
);
84+
}
85+
}

0 commit comments

Comments
 (0)